Adhesive框架中是分布式組件客戶端首先實(shí)現(xiàn)的是基于Json序列化+二進(jìn)制協(xié)議的Memcached客戶端。在本文中會介紹其中的實(shí)現(xiàn)細(xì)節(jié)。
我們先來看一下項(xiàng)目結(jié)構(gòu):
從這個(gè)結(jié)構(gòu)大致可以看出:
1)Memcached只是其中的一個(gè)具體實(shí)現(xiàn),這個(gè)組件期望提供一個(gè)ClientSocket-ClientNode-ClientCluster的基礎(chǔ)實(shí)現(xiàn),以后可以有各種客戶端基于這種結(jié)構(gòu)來實(shí)現(xiàn)
2)對于Memcached的實(shí)現(xiàn),其中把協(xié)議部分放在的Protocol文件夾中,并且根據(jù)協(xié)議為每一個(gè)請求和響應(yīng)封裝類型,也就是使用面向?qū)ο蟮姆绞蕉皇瞧磾?shù)據(jù)包的方式來封裝協(xié)議
那么現(xiàn)在首先來介紹基礎(chǔ)結(jié)構(gòu)。從最底部的層次開始,最底部應(yīng)該是對Socket進(jìn)行一個(gè)封裝,在這里我們實(shí)現(xiàn)了一個(gè)ClientSocket,主要完成下面功能:
1)封裝Read、Write、Connect、Reset(因?yàn)槲覀儗?shí)現(xiàn)的是Socket池,所以在Socket使用之后,歸還池之前需要重置)操作
2)封裝Socket基本狀態(tài),包括創(chuàng)建時(shí)間、忙碌時(shí)間、閑置時(shí)間、發(fā)生錯(cuò)誤時(shí)的回調(diào)方法
?
在ClientSocket之上的一層是ClientNode,也就是一個(gè)節(jié)點(diǎn)的客戶端,很明顯,這里需要做的是Socket連接池,具體完成的工作有:
1)進(jìn)行連接池的維護(hù),包括移除空閑超時(shí)的Socket、強(qiáng)制結(jié)束忙碌時(shí)間過長的Socket、補(bǔ)充新的Socket到連接池的下限
2)初始化池、結(jié)束池、從池獲取Socket、把使用后的Socket返回池、創(chuàng)建非池Socket
在正常使用的時(shí)候,所有Socket都從池中獲取,如果整個(gè)Node不可用,那么我們定時(shí)創(chuàng)建非池Socket來測試Node是否恢復(fù)
?
在ClientNode之上的是ClientCluster,也就是集群,對于需要客戶端進(jìn)行一致性哈希分發(fā)節(jié)點(diǎn)的分布式組件來說,這層就很必要了,完成的功能主要有:
1)初始化集群、使用一致性哈希從集群獲得節(jié)點(diǎn)、直接獲得ClientSocket
2)在節(jié)點(diǎn)出錯(cuò)的時(shí)候進(jìn)行重新節(jié)點(diǎn)分配、嘗試恢復(fù)出錯(cuò)的節(jié)點(diǎn)
?
ClientCluster是使用ClientNodeLocator來分配節(jié)點(diǎn)的,其中的算法也就是一致性哈希算法。
之前說過節(jié)點(diǎn)有權(quán)重的概念,在這里也就是通過虛擬節(jié)點(diǎn)的數(shù)量來設(shè)置節(jié)點(diǎn)權(quán)重,權(quán)重越高分配到Key的數(shù)量也就會越多。
?
在ClientCluster之上還封裝了一層AbstractClient,也就是直接面向用戶的API入口。
public abstract class AbstractClient<T> where T : AbstractClient<T>, new ()
完成的功能有:
1)保存所有的Cluster,初始化Cluster
2)獲取具體的XXXClient的實(shí)現(xiàn),比如MemcachedClient
?
很明顯,我們的第一個(gè)實(shí)現(xiàn)MemcachedClient是繼承了AbstractClient:
public partial class MemcachedClient : AbstractClient<MemcachedClient>
在這里使用了部分類,內(nèi)部的實(shí)現(xiàn)都放在了MemcachedClient_Internal.cs中,而對外的API都放在了MemcachedClient.cs中。
?
對于Memcached的二進(jìn)制協(xié)議,我們首先是實(shí)現(xiàn)一個(gè)頭的格式包:
[StructLayout(LayoutKind.Sequential, Pack = 1)] internal struct Header { internal byte Magic; internal byte Opcode; internal ushort KeyLength; internal byte ExtraLength; internal byte DataType; internal ushort Reserved; internal uint TotalBodyLength; internal uint Opaque; internal ulong Version; }
由于我們會直接把結(jié)構(gòu)打包為字節(jié)數(shù)組,所以這里聲明了結(jié)構(gòu)的內(nèi)存布局。在Protocol.cs中,我們有一些實(shí)用的方法,比如結(jié)構(gòu)和字節(jié)數(shù)組雙向轉(zhuǎn)換的實(shí)現(xiàn):
internal static T BytesToStruct<T>( this byte [] rawData) { T result = default (T); RespectEndianness( typeof (T), rawData); GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned); try { IntPtr rawDataPtr = handle.AddrOfPinnedObject(); result = (T)Marshal.PtrToStructure(rawDataPtr, typeof (T)); } finally { handle.Free(); } return result; } internal static byte [] StructToBytes<T>( this T data) { byte [] rawData = new byte [Marshal.SizeOf(data)]; GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned); try { IntPtr rawDataPtr = handle.AddrOfPinnedObject(); Marshal.StructureToPtr(data, rawDataPtr, false ); } finally { handle.Free(); } RespectEndianness( typeof (T), rawData); return rawData; } private static void RespectEndianness(Type type, byte [] data) { var fields = type.GetFields(BindingFlags.NonPublic | BindingFlags.Instance).Select(field => new { Field = field, Offset = Marshal.OffsetOf(type, field.Name).ToInt32() }).ToList(); fields.ForEach(item => Array.Reverse(data, item.Offset, Marshal.SizeOf(item.Field.FieldType))); }
在定義了頭之后,我們就可以封裝一個(gè)抽象的請求包了:
只要實(shí)現(xiàn)這個(gè)包,然后調(diào)用其GetBytes方法就可以直接獲得需要發(fā)送的請求數(shù)據(jù)包,它會在內(nèi)部處理Header和Body數(shù)據(jù)的打包。
比如,我們來看一個(gè)Set操作的包實(shí)現(xiàn):
internal class SetRequestPackage : AbstractRequestPackage { private TimeSpan expireSpan; private byte [] valueBytes; private ulong version; public override Opcode Opcode { get { return Opcode.Set; } } internal SetRequestPackage( string key, byte [] valueBytes, TimeSpan expireSpan, ulong version) : base (key) { if (expireSpan > TimeSpan.FromDays(30)) throw new ArgumentOutOfRangeException( "過期時(shí)間不能超過30天!" ); this .expireSpan = expireSpan; this .valueBytes = valueBytes; this .version = version; } internal SetRequestPackage( string key, string value , TimeSpan expireSpan, ulong version) : this (key, Encoding.UTF8.GetBytes( value ), expireSpan, version) { } internal SetRequestPackage( string key, string value , ulong version) : this (key, Encoding.UTF8.GetBytes( value ), TimeSpan.FromDays(30), version) { } internal SetRequestPackage( string key, byte [] valueBytes, ulong version) : this (key, valueBytes, TimeSpan.FromDays(30), version) { } protected override ulong GetVersion() { return version; } protected override byte [] GetExtraBytes() { var extraBytes = new List< byte >(); uint flag = 0xdeadbeef; extraBytes.AddRange(flag.GetBigEndianBytes()); uint expire = Convert.ToUInt32(expireSpan.TotalSeconds); extraBytes.AddRange(expire.GetBigEndianBytes()); return extraBytes.ToArray(); } protected override byte [] GetValueBytes() { return valueBytes; } }
在這里,我們只是實(shí)現(xiàn)了抽象方法來為基類提供沒有的數(shù)據(jù),并不需要關(guān)心數(shù)據(jù)是如何打包的。那么,之后發(fā)送Set請求的操作就很簡單了:
private bool InternalSet( string key, string value , TimeSpan expire, ulong version) { using (var socket = GetCluster().AcquireSocket(key)) { if (socket != null ) { AbstractRequestPackage requestPackage = expire == TimeSpan.MaxValue ? new SetRequestPackage(key, value , version) : new SetRequestPackage(key, value , expire, version); var requestData = requestPackage.GetBytes(); if (requestData != null ) { socket.Write(requestData); var responsePackage = ResponsePackageCreator.GetPackage(socket); if (responsePackage != null ) { if (responsePackage.ResponseStatus == ResponseStatus.NoError) { return true ; } else if (responsePackage.ResponseStatus != ResponseStatus.KeyExists && responsePackage.ResponseStatus != ResponseStatus.KeyNotFound) { LocalLoggingService.Warning( "在 {0} 上執(zhí)行操作 {1} 得到了不正確的回復(fù) Key : {2} -> {3}" , socket.Endpoint.ToString(), requestPackage.Opcode, key, responsePackage.ResponseStatus); } } else { LocalLoggingService.Error( "在 {0} 上執(zhí)行操作 {1} 沒有得到回復(fù) Key : {2}" , socket.Endpoint.ToString(), requestPackage.Opcode, key); } } } } return false ; }
1)首先是獲取到Cluster,再獲取到池中的Socket
2)然后初始化一個(gè)SetRequestPackage,再通過GetBytes獲得數(shù)據(jù)
3)直接把數(shù)據(jù)寫入Socket
4)通過ResponsePackageCreator來獲得返回的數(shù)據(jù)包
?
很明顯,ResponsePackageCreator和AbstractRequestPackage的意圖差不多,用來把響應(yīng)的數(shù)據(jù)包封裝成我們需要的數(shù)據(jù),其中有一個(gè):
internal static GeneralResponsePackage GetPackage(ClientSocket socket)
獲得的是一個(gè)通用的響應(yīng)數(shù)據(jù)包:
internal class GeneralResponsePackage { internal Opcode Opcode { get; set; } internal ResponseStatus ResponseStatus { get; set; } internal string Key { get; set; } internal byte [] ValueBytes { get; set; } internal ulong Version { get; set; } internal string Value { get { if (ValueBytes != null ) { return Encoding.UTF8.GetString(ValueBytes); } else { return null ; } } } }
在這里基本的信息都有了,比如操作代碼、響應(yīng)狀態(tài)、Key、Value、版本號。正因?yàn)镸emcached的協(xié)議比較簡單,所有的響應(yīng)包都是這么一個(gè)格式,所以我們并沒有實(shí)現(xiàn)特殊的響應(yīng)包。如果要實(shí)現(xiàn)的話,只需要在類頭部標(biāo)記OpCode并且繼承GeneralResponsePackage,ResponsePackageCreator會自動返回相應(yīng)的子類:
[AttributeUsage(AttributeTargets.Class)] internal class ResponsePackageAttribute : Attribute { internal Opcode Opcode { get; private set; } internal ResponsePackageAttribute(Opcode opcode) { this .Opcode = opcode; } }
在獲得了響應(yīng)之后,通過判斷ResponseStatus來知道響應(yīng)是否正確,并且記錄相關(guān)日志即可。這么一來,數(shù)據(jù)一去一回以及協(xié)議如何實(shí)現(xiàn)的整個(gè)過程就介紹完了。下面,我們再介紹一下客戶端中幾個(gè)特色功能的實(shí)現(xiàn)。
?
1)獲取一組Key功能。由于一個(gè)集群會有多個(gè)節(jié)點(diǎn),所以要獲取一組Key,我們首先需要把Key按照節(jié)點(diǎn)分類,然后對于不同的節(jié)點(diǎn),采用并行的方式同時(shí)獲取,這樣速度會很快,代碼片段如下:
var nodeCache = new Dictionary<ClientNode, List< string >>(); foreach (var key in keys) { var node = GetCluster().AcquireNode(key); if (!nodeCache.ContainsKey(node)) nodeCache.Add(node, new List< string > { key }); else if (!nodeCache[node].Contains(key)) nodeCache[node].Add(key); } var data = new Dictionary< string , string >(); Parallel.ForEach(nodeCache, node =>
2)List功能。Memcached只提供了Key、Value的存儲,有的時(shí)候我們的Value是一個(gè)列表,那么我們可以有兩種方式完成這個(gè)功能。第一種就是直接把列表序列化作為一個(gè)Value保存,優(yōu)點(diǎn)是簡單,缺點(diǎn)是如果以后需要修改的話需要整個(gè)列表取出,修改后再把整個(gè)列表保存進(jìn)去,并且由于Memcached Value大小的限制,這么做也不能保存大列表;第二種方式是一個(gè)Value保存列表中的一個(gè)項(xiàng),再使用一個(gè)KeyValue來保存其中每一項(xiàng)的ID,這么優(yōu)點(diǎn)是修改方便,獲取的數(shù)據(jù)可以是列表中的一部分,缺點(diǎn)是實(shí)現(xiàn)麻煩,要考慮并發(fā)問題、要維護(hù)另外一個(gè)KeyValue來保存所有的ID。在這里,我們封裝了后一種方式的實(shí)現(xiàn)。
3)Locker功能。使用Memcached完成鎖的功能其實(shí)很簡單,我們只需要在獲取鎖的時(shí)候判斷Add一個(gè)空值是否成功,如果不成功則表示占有,等待一段時(shí)間嘗試獲取,一直到超時(shí),在返回鎖的時(shí)候刪除這個(gè)項(xiàng)即可。在這里,我們封裝了MemcachedLocker來完成這個(gè)功能。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
