Mongodb數(shù)據(jù)服務可以直接接受任何類型數(shù)據(jù),并且它設計為可以承受大量數(shù)據(jù)的寫入。為了能保存任何類型的數(shù)據(jù),并且在后臺可以查看任何類型的數(shù)據(jù),因此我們必須在收到數(shù)據(jù)的時候?qū)?shù)據(jù)的元數(shù)據(jù)進行提取,隨同主體數(shù)據(jù)一并保存在數(shù)據(jù)庫中。對數(shù)據(jù)本身也需要重新組織結(jié)構(gòu),相當于進行一次序列化,然后保存到數(shù)據(jù)庫中。雖然Mongodb是支持Json格式的,但是由于我們在保存數(shù)據(jù)的時候還有很多邏輯,因此我們必須手動進行這個工作。其實對于提交數(shù)據(jù)來說,應該是一個非常快的動作,應該以異步方式進行,在一個盡量短的時間內(nèi)讓方法的調(diào)用可以返回,之后可以在后臺慢慢進行數(shù)據(jù)的轉(zhuǎn)換和數(shù)據(jù)發(fā)送到遠端。因此,開發(fā)了一個內(nèi)存隊列服務模塊來進行異步隊列處理工作,并且提交數(shù)據(jù)到遠端也使用了框架內(nèi)部的Wcf分布式服務模塊。當然,在服務端道理也一樣,我們可以通過一個內(nèi)存隊列來批量提交數(shù)據(jù),并且讓服務的調(diào)用盡快返回。Mongodb數(shù)據(jù)服務提交數(shù)據(jù)的過程如下:
項目的結(jié)構(gòu)如下:
1、Mongodb項目是客戶端部分的接口
2、Mongodb.Imp項目是客戶端部分的實現(xiàn)
3、Mongodb.Server是服務端部分的接口,或者說是服務契約
4、Mongodb.Server.Imp是服務端部分的實現(xiàn)
可以看到Mongodb數(shù)據(jù)本身依賴應用程序信息中心模塊、配置服務模塊、內(nèi)存隊列服務模塊、Wcf分布式服務模塊,對于大部分客戶端應用程序來說都應該只依賴Mongodb數(shù)據(jù)服務的客戶端而不是服務端。我們把Mongodb數(shù)據(jù)服務分成兩部分,插入數(shù)據(jù)的服務和查詢服務,后者的使用者一般而言只有Mongodb數(shù)據(jù)服務的后臺。本文主要介紹前者:
public
interface
IMongodbInsertService : IDisposable
{
void
Insert(
object
item);
}
從接口本身來看非常簡單,只有一個方法。我們來看看它的實現(xiàn)步驟:
1、調(diào)用配置服務,查看這個數(shù)據(jù)類型對應的配置,說到這里,讓我們來看一下Mongodb數(shù)據(jù)服務客戶端的配置:
[ConfigEntity(FriendlyName =
"Mongodb客戶端配置"
)]
public
class
MongodbServiceConfigurationEntity
{
[ConfigItem(FriendlyName =
"插入服務配置項列表"
)]
public
Dictionary<
string
, MongodbInsertServiceConfigurationItem> MongodbInsertServiceConfigurationItems { get; set; }
}
每一個類型的配置項如下:
[ConfigEntity(FriendlyName =
"Mongodb客戶端針對每個數(shù)據(jù)類型的配置"
)]
public
class
MongodbInsertServiceConfigurationItem
{
[ConfigItem(FriendlyName =
"類型完整名"
)]
public
string
TypeFullName { get; set; }
[ConfigItem(FriendlyName =
"是否提交到服務端"
)]
public
bool
SubmitToServer { get; set; }
[ConfigItem(FriendlyName =
"隊列最大項數(shù)"
)]
public
int
MaxItemCount { get; set; }
[ConfigItem(FriendlyName =
"消費的線程總數(shù)"
)]
public
int
ConsumeThreadCount { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)的時間間隔毫秒"
)]
public
int
ConsumeIntervalMilliseconds { get; set; }
[ConfigItem(FriendlyName =
"遇到錯誤時消費數(shù)據(jù)的時間間隔毫秒"
)]
public
int
ConsumeIntervalWhenErrorMilliseconds { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)的批量項數(shù)"
)]
public
int
ConsumeItemCountInOneBatch { get; set; }
[ConfigItem(FriendlyName =
"達到最大項數(shù)后的策略"
)]
public
MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)時不足批次數(shù)的策略"
)]
public
MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)遇到錯誤的策略"
)]
public
MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }
public
MongodbInsertServiceConfigurationItem()
{
TypeFullName =
""
;
SubmitToServer =
true
;
ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems
.Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);
ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;
ConsumeThreadCount = 1;
ConsumeIntervalMilliseconds = 10;
ConsumeIntervalWhenErrorMilliseconds = 1000;
ConsumeItemCountInOneBatch = 100;
NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;
MaxItemCount = 10000;
}
}
這里可以看到,除了是否提交到服務端這個配置,大多數(shù)的配置其實是內(nèi)存隊列服務的配置,在之后的文章中我們單獨會介紹內(nèi)存隊列服務。之所以需要為Mongodb數(shù)據(jù)服務的客戶端設置這樣的配置,一方面是允許修改隊列服務的配置,另一方面是為了限制沒有經(jīng)過配置隨便什么數(shù)據(jù)都往服務端發(fā)送,只有在后臺顯式配置的數(shù)據(jù)類型,才會發(fā)生到服務端。
2、如果沒獲取到配置的話返回,如果獲取到配置的話,則為這個類型初始化內(nèi)存隊列服務,設置一系列隊列服務的參數(shù),并且把隊列的處理委托掛載我們提交數(shù)據(jù)到服務端的處理方法。換句話說是每一個類型都會有自己的內(nèi)存隊列服務,我們在MongodbInsertService的實現(xiàn)定義了一個靜態(tài)字典用于保存內(nèi)存隊列服務的實現(xiàn):
private
static
Dictionary<
string
, IMemoryQueueService> submitDataMemoryQueueServices =
new
Dictionary<
string
, IMemoryQueueService>();
if
(!submitDataMemoryQueueServices.ContainsKey(typeFullName))
{
lock
(submitDataMemoryQueueServices)
{
if
(!submitDataMemoryQueueServices.ContainsKey(typeFullName))
{
var memoryQueueService = LocalServiceLocator.GetService<IMemoryQueueService>();
memoryQueueService.Init(
new
MemoryQueueServiceConfiguration(
string
.Format(
"{0}_{1}"
, ServiceName, typeFullName), InternalSubmitData)
{
ConsumeErrorAction = config.ConsumeErrorAction,
ConsumeIntervalMilliseconds = config.ConsumeIntervalMilliseconds,
ConsumeIntervalWhenErrorMilliseconds = config.ConsumeIntervalWhenErrorMilliseconds,
ConsumeItemCountInOneBatch = config.ConsumeItemCountInOneBatch,
ConsumeThreadCount = config.ConsumeThreadCount,
MaxItemCount = config.MaxItemCount,
NotReachBatchCountConsumeAction = config.NotReachBatchCountConsumeAction,
ReachMaxItemCountAction = config.ReachMaxItemCountAction,
});
submitDataMemoryQueueServices.Add(typeFullName, memoryQueueService);
}
}
}
3、然后會判斷是否已經(jīng)提取過這個類型元數(shù)據(jù)了,如果沒提取過則嘗試提取元數(shù)據(jù)并加入緩存:
if
(!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))
{
lock
(mongodbDatabaseDescriptionCache)
{
if
(!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))
{
MongodbDatabaseDescription mongodbDatabaseDescription = GetMongodbDatabaseDescription(item);
CheckMongodbDatabaseDescription(mongodbDatabaseDescription);
mongodbDatabaseDescriptionCache.Add(typeFullName, mongodbDatabaseDescription);
}
}
}
4、把數(shù)據(jù)加入隊列,等待隊列服務在合適的時候調(diào)用處理方法(也就是發(fā)送到服務端):
if
(config.SubmitToServer)
{
submitDataMemoryQueueServices[typeFullName].Enqueue(item);
}
?
其實到這里為止,方法已經(jīng)返回了,之后就是隊列服務在后臺的異步調(diào)用了。現(xiàn)在我們來深入一下細節(jié),首先看一下GetMongodbDatabaseDescription是如何提取元數(shù)據(jù)的,這個方法返回的是MongodbDatabaseDescription,它的定義如下:
[DataContract(Namespace =
"Adhesive.Mongodb"
)]
public
class
MongodbDatabaseDescription
{
[DataMember]
public
bool
SentToServer { get; set; }
[DataMember]
public
string
TypeFullName { get; set; }
[DataMember]
public
string
DatabasePrefix { get; set; }
[DataMember]
public
string
CategoryName { get; set; }
[DataMember]
public
string
Name { get; set; }
[DataMember]
public
string
DisplayName { get; set; }
[DataMember]
public
int
ExpireDays { get; set; }
[DataMember]
public
List<MongodbColumnDescription> MongodbColumnDescriptionList { get; set; }
[DataMember]
public
List<MongodbEnumColumnDescription> MongodbEnumColumnDescriptionList { get; set; }
}
在這里可以看到,我們主要解析的是MongodbPersistenceEntityAttribute,對于下一級的MongodbColumnDescriptionList ,我們主要是解析每一個列的元數(shù)據(jù),而MongodbEnumColumnDescriptionList則提取所有枚舉的信息。MongodbColumnDescription的定義如下:
[DataContract(Namespace =
"Adhesive.Mongodb"
)]
public
class
MongodbColumnDescription
{
[DataMember]
public
string
Name { get; set; }
[DataMember]
public
string
TypeName { get; set; }
[DataMember]
public
bool
IsArrayColumn { get; set; }
[DataMember]
public
bool
IsEntityColumn { get; set; }
[DataMember]
public
string
ColumnName { get; set; }
[DataMember]
public
string
DisplayName { get; set; }
[DataMember]
public
string
Description { get; set; }
[DataMember]
public
bool
ShowInTableView { get; set; }
[DataMember]
public
bool
IsTableColumn { get; set; }
[DataMember]
public
bool
IsTimeColumn { get; set; }
[DataMember]
public
bool
IsContextIdentityColumn { get; set; }
[DataMember]
public
bool
IsPrimaryKey { get; set; }
[DataMember]
public
MongodbIndexOption MongodbIndexOption { get; set; }
[DataMember]
public
MongodbFilterOption MongodbFilterOption { get; set; }
[DataMember]
public
MongodbCascadeFilterOption MongodbCascadeFilterOption { get; set; }
[DataMember]
public
MongodbSortOption MongodbSortOption { get; set; }
}
這里很多數(shù)據(jù)都來自MongodbPersistenceItemAttribute和MongodbPresentationItemAttribute。再來看看MongodbEnumColumnDescription:
[DataContract(Namespace =
"Adhesive.Mongodb"
)]
public
class
MongodbEnumColumnDescription
{
[DataMember]
public
string
Name { get; set; }
[DataMember]
public
Dictionary<
string
,
string
> EnumItems { get; set; }
}
它就簡單了,只是保存枚舉的列名,和枚舉每一項的數(shù)據(jù)。其實這些元數(shù)據(jù)提取本身沒什么復雜的,可以想到是反射提取,并且其中還涉及到遞歸,需要深入每一個自定義類型,GetMongodbColumnDescription方法其中有一段這樣的代碼實現(xiàn)了遞歸:
if
(!type.Assembly.GlobalAssemblyCache && type != pi.DeclaringType)
{
columnDescription.IsEntityColumn =
true
;
var properties = GetPropertyListFromCache(type);
if
(properties !=
null
)
{
foreach
(var property
in
properties)
{
GetMongodbColumnDescription(typeFullName, fullName, columnDescriptionList, enumColumnDescriptionList, property);
}
}
}
在提取元數(shù)據(jù)的時候,另一個重要的工作是緩存一些關鍵的PropertyInfo的配置,以便后期處理數(shù)據(jù)的時候使用:
internal
class
ProperyInfoConfig
{
public
bool
IsCascadeFilterLevelOne { get; set; }
public
bool
IsCascadeFilterLevelTwo { get; set; }
public
bool
IsCascadeFilterLevelThree { get; set; }
public
bool
IsDateColumn { get; set; }
public
bool
IsTableName { get; set; }
public
bool
IsIgnore { get; set; }
public
string
ColumnName { get; set; }
}
因為我們在提交數(shù)據(jù)之前,需要針對級聯(lián)下拉的數(shù)據(jù)進行處理,把第二級的值設置為第一級的值加上第二級的值,第三級的值設置為一加二加三,這樣在篩選的時候就會很方便;此外還需要替換列名,計算表名等等,只有緩存了PropertyInfo才能無需重新讀取元數(shù)據(jù):
private
static
Dictionary<
string
, Dictionary<PropertyInfo, ProperyInfoConfig>> propertyConfigCache =
new
Dictionary<
string
, Dictionary<PropertyInfo, ProperyInfoConfig>>();
之前說了元數(shù)據(jù)提取部分時的邏輯,然后來看一下格式化數(shù)據(jù)時的邏輯,之前為內(nèi)存隊列服務的提交數(shù)據(jù)的委托掛載的方法主要實現(xiàn)如下:
var mongodbDataList = items.Select(_ => ConvertItemToMongodbData(_)).Where(_ => _ !=
null
).ToList();
var desc = mongodbDatabaseDescriptionCache[typeFullName];
WcfServiceLocator.GetSafeService<IMongodbServer>().SubmitData(mongodbDataList, desc.SentToServer ?
null
: desc);
先是獲取要提交的數(shù)據(jù),然后再獲取元數(shù)據(jù),如果有的話和主體數(shù)據(jù)一并提交到服務端。通過Wcf分布式數(shù)據(jù)服務獲取到IMongodbServer,并調(diào)用它的SubmitData方法,定義如下:
[OperationContract]
void
SubmitData(IList<MongodbData> dataList, MongodbDatabaseDescription databaseDescription);
MongodbData的定義如下:
[DataContract(Namespace =
"Adhesive.Mongodb"
)]
public
class
MongodbData
{
[DataMember]
public
string
TypeFullName { get; set; }
[DataMember]
public
string
DatabaseName { get; set; }
[DataMember]
public
string
TableName { get; set; }
[DataMember]
public
string
Data { get; set; }
}
在這里可以發(fā)現(xiàn)Data是字符串類型,那是因為我們把要提交的數(shù)據(jù)主體轉(zhuǎn)換成了Json,否則我們是無法通過Wcf提交Dictionary<string, object>構(gòu)成的一顆無限級樹的。在這里,我們略去介紹ConvertItemToMongodbData的實現(xiàn),它其實并不復雜,也是通過遞歸和反射無限級獲取類的所有屬性的值,并轉(zhuǎn)換為Dictionary<string, object>,只不過在這里面需要處理列表類型、字典類型以及枚舉。
?
至此為止,客戶端的部分介紹完了,現(xiàn)在我們來看一下服務端部分。首先,服務端也有根據(jù)每一個類型的配置:
[ConfigEntity(FriendlyName =
"Mongodb服務端針對每個數(shù)據(jù)類型的配置"
)]
public
class
MongodbServerConfigurationItem
{
[ConfigItem(FriendlyName =
"類型完整名"
)]
public
string
TypeFullName { get; set; }
[ConfigItem(FriendlyName =
"服務器名"
)]
public
string
MongodbServerUrlName { get; set; }
[ConfigItem(FriendlyName =
"是否提交到數(shù)據(jù)庫"
)]
public
bool
SubmitToDatabase { get; set; }
[ConfigItem(FriendlyName =
"隊列最大項數(shù)"
)]
public
int
MaxItemCount { get; set; }
[ConfigItem(FriendlyName =
"消費的線程總數(shù)"
)]
public
int
ConsumeThreadCount { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)的時間間隔毫秒"
)]
public
int
ConsumeIntervalMilliseconds { get; set; }
[ConfigItem(FriendlyName =
"遇到錯誤時消費數(shù)據(jù)的時間間隔毫秒"
)]
public
int
ConsumeIntervalWhenErrorMilliseconds { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)的批量項數(shù)"
)]
public
int
ConsumeItemCountInOneBatch { get; set; }
[ConfigItem(FriendlyName =
"達到最大項數(shù)后的策略"
)]
public
MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)時不足批次數(shù)的策略"
)]
public
MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }
[ConfigItem(FriendlyName =
"消費數(shù)據(jù)遇到錯誤的策略"
)]
public
MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }
public
MongodbServerConfigurationItem()
{
TypeFullName =
""
;
SubmitToDatabase =
true
;
ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems
.Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);
ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;
ConsumeThreadCount = Environment.ProcessorCount;
ConsumeIntervalMilliseconds = 10;
ConsumeIntervalWhenErrorMilliseconds = 1000;
ConsumeItemCountInOneBatch = 100;
NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;
MaxItemCount = 100000;
}
}
這個配置和客戶端的配置差不多,只不過這里把是否提交到服務端改為了是否提交到數(shù)據(jù)庫。在獲取了配置之后,同樣把數(shù)據(jù)提交到內(nèi)存隊列,然后由內(nèi)存隊列提交到數(shù)據(jù)庫。核心代碼如下:
try
{
var sw = Stopwatch.StartNew();
var server = CreateMasterMongoServer(typeFullName);
if
(server !=
null
)
{
var database = server.GetDatabase(item.DatabaseName);
var collection = database.GetCollection(item.TableName);
var documentList =
new
List<BsonDocument>();
JavaScriptSerializer s =
new
JavaScriptSerializer();
mongodbDataList.ForEach(i =>
{
var dic = s.DeserializeObject(i.Data)
as
IDictionary;
var document =
new
BsonDocument().Add(dic);
documentList.Add(document);
});
collection.InsertBatch(documentList);
LocalLoggingService.Debug(
"Mongodb服務端成功服務提交 {0} 條數(shù)據(jù)到數(shù)據(jù)庫,類型是 '{1}',耗時 {2} 毫秒"
, documentList.Count, typeFullName, sw.ElapsedMilliseconds);
}
}
catch
(Exception ex)
{
AppInfoCenterService.ExceptionService.Handle(ex, categoryName: ServiceName, subcategoryName: typeFullName, description:
"寫入數(shù)據(jù)出現(xiàn)錯誤"
, extraInfo:
new
ExtraInfo
{
DisplayItems =
new
Dictionary<
string
,
string
>()
{
{
"DatabaseName"
, item.DatabaseName},
{
"TableName"
, item.TableName}
}
});
}
}
首先是Json反序列化獲取到數(shù)據(jù),然后轉(zhuǎn)換為BsonDocument,最后批量提交到數(shù)據(jù)庫中。
本文介紹了Mongodb數(shù)據(jù)服務的插入數(shù)據(jù)部分在客戶端和服務端之間的邏輯,下一篇將介紹Mongodb數(shù)據(jù)服務查詢數(shù)據(jù)的部分。
更多文章、技術交流、商務合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

