我們除了使用WF提供的SqlWorkflowPersistenceService外,還可以自定義持久化服務(wù)。因?yàn)橛械臅r(shí)候你可能不想使用Sql Server數(shù)據(jù)庫(kù),我們就可以通過(guò)自定義持久化服務(wù)來(lái)使用其他的數(shù)據(jù)庫(kù),文件等來(lái)進(jìn)行持久化存儲(chǔ)。
一:1.1 我們先看一個(gè)MSDN中的例子,當(dāng)從內(nèi)存中卸載工作流時(shí),工作流運(yùn)行時(shí)可使用該服務(wù)將工作流實(shí)例狀態(tài)保存到文件。該持久服務(wù)類代碼如下FilePersistence.cs:
???
public
class
FilePersistenceService : WorkflowPersistenceService
??? {
???????
public
readonly
static
TimeSpan MaxInterval =
new
TimeSpan(30, 0, 0, 0);
???????
private
bool
unloadOnIdle =
false
;
???????
private
Dictionary<Guid,Timer> instanceTimers;
???????
public
FilePersistenceService(
bool
unloadOnIdle)
??????? {
???????????
this
.unloadOnIdle = unloadOnIdle;
???????????
this
.instanceTimers =
new
Dictionary<Guid, Timer>();
??????? }
???????
protected
override
void
SaveWorkflowInstanceState(Activity rootActivity,
bool
unlock)
??????? {
???????????
// Save the workflow
??????????? Guid contextGuid = (Guid)rootActivity.GetValue(Activity.ActivityContextGuidProperty);
??????????? Console.WriteLine("
Saving instance: {0}\n
", contextGuid);
??????????? SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);
???????????
// See when the next timer (Delay activity) for this workflow will expire
??????????? TimerEventSubscriptionCollection timers = (TimerEventSubscriptionCollection)rootActivity.GetValue(TimerEventSubscriptionCollection.TimerCollectionProperty);
??????????? TimerEventSubscription subscription = timers.Peek();
???????????
if
(subscription !=
null
)
??????????? {
???????????????
// Set a system timer to automatically reload this workflow when its next timer expires
??????????????? TimerCallback callback =
new
TimerCallback(ReloadWorkflow);
??????????????? TimeSpan timeDifference = subscription.ExpiresAt - DateTime.UtcNow;
???????????????
// check to make sure timeDifference is in legal range
???????????????
if
(timeDifference > FilePersistenceService.MaxInterval)
??????????????? {
??????????????????? timeDifference = FilePersistenceService.MaxInterval;
??????????????? }
???????????????
else
if
(timeDifference < TimeSpan.Zero)
??????????????? {
??????????????????? timeDifference = TimeSpan.Zero;
??????????????? }
???????????????
this
.instanceTimers.Add(contextGuid,
new
System.Threading.Timer(
??????????????????? callback,
??????????????????? subscription.WorkflowInstanceId,
??????????????????? timeDifference,
???????????????????
new
TimeSpan(-1)));
??????????? }
??????? }
???????
private
void
ReloadWorkflow(
object
id)
??????? {
???????????
// Reload the workflow so that it will continue processing
??????????? Timer toDispose;
???????????
if
(
this
.instanceTimers.TryGetValue((Guid)id,
out
toDispose))
??????????? {
???????????????
this
.instanceTimers.Remove((Guid)id);
??????????????? toDispose.Dispose();
??????????? }
???????????
this
.Runtime.GetWorkflow((Guid)id);
??????? }
???????
// Load workflow instance state.
???????
protected
override
Activity LoadWorkflowInstanceState(Guid instanceId)
??????? {
??????????? Console.WriteLine("
Loading instance: {0}\n
", instanceId);
???????????
byte
[] workflowBytes = DeserializeFromFile(instanceId);
???????????
return
WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes,
null
);
??????? }
???????
// Unlock the workflow instance state.
???????
// Instance state locking is necessary when multiple runtimes share instance persistence store
???????
protected
override
void
UnlockWorkflowInstanceState(Activity state)
??????? {
???????????
//File locking is not supported in this sample
??????? }
???????
// Save the completed activity state.
???????
protected
override
void
SaveCompletedContextActivity(Activity activity)
??????? {
??????????? Guid contextGuid = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty);
??????????? Console.WriteLine("
Saving completed activity context: {0}
", contextGuid);
??????????? SerializeToFile(
??????????????? WorkflowPersistenceService.GetDefaultSerializedForm(activity), contextGuid);
??????? }
???????
// Load the completed activity state.
???????
protected
override
Activity LoadCompletedContextActivity(Guid activityId, Activity outerActivity)
??????? {
??????????? Console.WriteLine("
Loading completed activity context: {0}
", activityId);
???????????
byte
[] workflowBytes = DeserializeFromFile(activityId);
??????????? Activity deserializedActivities = WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, outerActivity);
???????????
return
deserializedActivities;
??????? }
???????
protected
override
bool
UnloadOnIdle(Activity activity)
??????? {
???????????
return
unloadOnIdle;
??????? }
???????
// Serialize the activity instance state to file
???????
private
void
SerializeToFile(
byte
[] workflowBytes, Guid id)
??????? {
??????????? String filename = id.ToString();
??????????? FileStream fileStream =
null
;
???????????
try
??????????? {
???????????????
if
(File.Exists(filename))
??????????????????? File.Delete(filename);
??????????????? fileStream =
new
FileStream(filename, FileMode.CreateNew, FileAccess.Write, FileShare.None);
???????????????
// Get the serialized form
??????????????? fileStream.Write(workflowBytes, 0, workflowBytes.Length);
??????????? }
???????????
finally
??????????? {
???????????????
if
(fileStream !=
null
)
??????????????????? fileStream.Close();
??????????? }
??????? }
???????
// Deserialize the instance state from the file given the instance id
???????
private
byte
[] DeserializeFromFile(Guid id)
??????? {
??????????? String filename = id.ToString();
??????????? FileStream fileStream =
null
;
???????????
try
??????????? {
???????????????
// File opened for shared reads but no writes by anyone
??????????????? fileStream =
new
FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read);
??????????????? fileStream.Seek(0, SeekOrigin.Begin);
???????????????
byte
[] workflowBytes =
new
byte
[fileStream.Length];
???????????????
// Get the serialized form
??????????????? fileStream.Read(workflowBytes, 0, workflowBytes.Length);
???????????????
return
workflowBytes;
??????????? }
???????????
finally
??????????? {
??????????????? fileStream.Close();
??????????? }
??????? }
??? }
1.2 看看我們的工作流設(shè)計(jì),只需要放入一個(gè)DelayActivity即可并設(shè)置他的Timeout時(shí)間,如下圖:
1.3 宿主程序加載了我們自定義的持久化服務(wù)后,執(zhí)行結(jié)果如下:
二:上面的例子其實(shí)很簡(jiǎn)單,我們只是做了一些基本的操作,還有很多工作沒有做。我們就來(lái)說(shuō)說(shuō)如何自定義持久化服務(wù)。自定義持久化服務(wù)大概有以下幾步:
1.定義自己的持久化類FilePersistenceService ,必須繼承自WorkflowPersistenceService.
2.實(shí)現(xiàn)WorkflowPersistenceService中所有的抽象方法,下面會(huì)具體介紹。
3.把自定義的持久化服務(wù)裝載到工作流引擎中(和裝載WF提供的標(biāo)準(zhǔn)服務(wù)的方式一樣)。
下面我們來(lái)介紹下WorkflowPersistenceService類中相關(guān)的抽象方法:
2.1 SaveWorkflowInstanceState: 將工作流實(shí)例狀態(tài)保存到數(shù)據(jù)存儲(chǔ)區(qū)。
protected internal abstract void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
rootActivity:工作流實(shí)例的根 Activity。
unlock:如果工作流實(shí)例不應(yīng)鎖定,則為 true;如果工作流實(shí)例應(yīng)該鎖定,則為 false。 關(guān)于鎖方面的我們暫時(shí)不提。
?
在這個(gè)方法中我們會(huì)把rootActivity 序列化到 Stream 中,可以看我們上面的例子中的代碼實(shí)現(xiàn)的該方法中有這樣一段
?
SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);
private void SerializeToFile( byte [] workflowBytes, Guid id)
?
SerializeToFile方法的第一個(gè)參數(shù)是需要一個(gè)byte[]的數(shù)組,我們是使用WorkflowPersistenceService.GetDefaultSerializedForm
(rootActivity)來(lái)實(shí)現(xiàn)的,那我們Reflector一下WorkflowPersistenceService這個(gè)類中的GetDefaultSerializedForm
方法,代碼如下:
protected static byte [] GetDefaultSerializedForm(Activity activity) { DateTime now = DateTime.Now; using (MemoryStream stream = new MemoryStream(0x2800)) { stream.Position = 0L; activity.Save(stream); using (MemoryStream stream2 = new MemoryStream(( int ) stream.Length)) { using (GZipStream stream3 = new GZipStream(stream2, CompressionMode.Compress,
true )) { stream3.Write(stream.GetBuffer(), 0, ( int ) stream.Length); } ActivityExecutionContextInfo info = (ActivityExecutionContextInfo)
activity.GetValue(Activity.ActivityExecutionContextInfoProperty); TimeSpan span = (TimeSpan) (DateTime.Now - now); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, " Serialized a
{0} with id {1} to length {2}. Took {3}. ", new object [] { info, info.ContextGuid,
stream2.Length, span }); byte [] array = stream2.GetBuffer(); Array.Resize< byte >( ref array, Convert.ToInt32(stream2.Length)); return array; } } }
可以看出主要是調(diào)用了activity.Save(stream); ,將rootActivity 序列化到 Stream 中,如果我們自定義這個(gè)方法我們要調(diào)用 Activity的save方法將Activity序列化到stream中去,我們?cè)趯?shí)現(xiàn)LoadWorkflowInstanceState方法時(shí)會(huì)調(diào)用Activity的Load方法來(lái)讀取,另外我們把工作流保存到持久化存儲(chǔ)里我們一般都使用WorkflowInstanceId來(lái)做為唯一性標(biāo)識(shí)
當(dāng)工作流實(shí)例完成或終止時(shí),工作流運(yùn)行時(shí)引擎最后一次調(diào)用 SaveWorkflowInstanceState。因此,如果 GetWorkflowStatus等于 Completed或 Terminated,則可以從數(shù)據(jù)存儲(chǔ)區(qū)中安全地刪除工作流實(shí)例及其所有關(guān)聯(lián)的已完成作用域。此外,可以訂閱 WorkflowCompleted或 WorkflowTerminated事件,確定何時(shí)可以安全地刪除與工作流實(shí)例關(guān)聯(lián)的記錄。是否確實(shí)從數(shù)據(jù)存儲(chǔ)區(qū)中刪除記錄取決于您的實(shí)現(xiàn)。
如果無(wú)法將工作流實(shí)例狀態(tài)保存到數(shù)據(jù)存儲(chǔ)區(qū),則應(yīng)引發(fā)帶有適當(dāng)錯(cuò)誤消息的 PersistenceException。
? 2.2 LoadWorkflowInstanceState : 將 SaveWorkflowInstanceState 中保存的工作流實(shí)例的指定狀態(tài)加載回內(nèi)存
protected internal abstract Activity LoadWorkflowInstanceState(Guid instanceId)
必須還原活動(dòng)的相同副本。為此,必須從數(shù)據(jù)存儲(chǔ)區(qū)中工作流實(shí)例的表示形式中還原有效的 Stream;然后,必須將此 Stream 傳遞到重載的 Load 方法之一,用于反序列化工作流實(shí)例。如果持久性服務(wù)無(wú)法從其數(shù)據(jù)存儲(chǔ)區(qū)加載工作流實(shí)例狀態(tài),則它應(yīng)引發(fā)帶有適當(dāng)消息的 PersistenceException。
在我們上面例子中實(shí)現(xiàn)的該方法中,
protected override Activity LoadWorkflowInstanceState(Guid instanceId) { Console.WriteLine(" Loading instance: {0}\n ", instanceId); byte [] workflowBytes = DeserializeFromFile(instanceId); return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null ); }
我們調(diào)用了WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null);方法
我們Reflector出WorkflowPersistenceService類的代碼后可以看到,如下代碼:
protected
static
Activity RestoreFromDefaultSerializedForm(
byte
[] activityBytes, Activity outerActivity)
??? {
??????? Activity activity;
??????? DateTime now = DateTime.Now;
??????? MemoryStream stream =
new
MemoryStream(activityBytes);
??????? stream.Position = 0L;
???????
using
(GZipStream stream2 =
new
GZipStream(stream, CompressionMode.Decompress,
true
))
??????? {
??????????? activity = Activity.Load(stream2, outerActivity);
??????? }
??????? TimeSpan span = (TimeSpan) (DateTime.Now - now);
??????? WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "
Deserialized a {0}
??????????????????????????????????????????????????? to length {1}. Took {2}.
",
new
object
[] { activity, stream.Length, span });
???????
return
activity;
??? }
這段代碼核心的調(diào)用了Activity.Load方法將從 Stream 加載 Activity的實(shí)例。
2.3 SaveCompletedContextActivity
protected internal abstract void SaveCompletedContextActivity(Activity activity)
activity:表示已完成范圍的 Activity。
?
將指定的已完成作用域保存到數(shù)據(jù)存儲(chǔ)區(qū)。保存完成活動(dòng)的AEC環(huán)境以便實(shí)現(xiàn)補(bǔ)償,比如WhileActivity他每次循環(huán)的
都會(huì)創(chuàng)建新的AEC環(huán)境,這個(gè)時(shí)候完成的活動(dòng)的AEC就會(huì)被保存,但是前提是這個(gè)活動(dòng)要支持補(bǔ)償才可以,所有如果你的WhileActivity里包含SequenceActivity這樣該方法是不會(huì)被調(diào)用的,如果你換成CompensatableSequenceActivity就可以了
工作流運(yùn)行時(shí)引擎保存已完成作用域活動(dòng)的狀態(tài),以便實(shí)現(xiàn)補(bǔ)償。必須調(diào)用重載的 Save 方法之一,將 activity 序列化到 Stream 中;然后可以選擇在將 Stream 寫入到數(shù)據(jù)存儲(chǔ)區(qū)之前,對(duì)其執(zhí)行其他處理。但是,在工作流運(yùn)行時(shí)引擎調(diào)用 LoadCompletedContextActivity時(shí),必須還原活動(dòng)的相同副本。
本例子的程序中不會(huì)涉及到這部分
也同樣使用了WorkflowPersistenceService的GetDefaultSerializedForm方法
2.4 LoadCompletedContextActivity?
和SaveCompletedContextActivity是對(duì)應(yīng)的,加載SaveCompletedContextActivity中保存的已完成活動(dòng)的AEC,就不多說(shuō)了
2.5 UnlockWorkflowInstanceState: 解除對(duì)工作流實(shí)例狀態(tài)的鎖定。
此方法是抽象的,因此它不包含對(duì)鎖定和解鎖的默認(rèn)實(shí)現(xiàn)。
實(shí)現(xiàn)自定義持久性服務(wù)時(shí),如果要實(shí)現(xiàn)鎖定方案,則需要重寫此方法,并在 SaveWorkflowInstanceState方法中提供根據(jù)解鎖參數(shù)的值進(jìn)行鎖定-解鎖的機(jī)制。
比如我們的工作流在取消的時(shí)候,這個(gè)方法被調(diào)用來(lái)解除工作流實(shí)例狀態(tài)的鎖定。
2.6 UnloadOnIdle
返回一個(gè)布爾值,確定在工作流空閑時(shí)是否將其卸載。
這些都只是自定義持久化中最基本的,就先說(shuō)這些吧。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
