分解,執(zhí)行復雜處理成一系列可重復使用分立元件的一個任務。這種模式可以允許執(zhí)行的處理進行部署和獨立縮放任務元素提高性能,可擴展性和可重用性。
一個應用程序可能需要執(zhí)行各種關于它處理的信息不同復雜的任務。一個簡單,但不靈活的方式來實施這個應用程序可以執(zhí)行此處理為單一模塊。然而,這種方法有可能減少用于重構代碼,對其進行優(yōu)化,或者重新使用它,如果是在應用程序中其他地方所需要的相同的處理的部件的機會。
圖1通過使用單片式的方式示出了與處理數(shù)據(jù)的問題。一個應用程序接收并處理來自兩個來源的數(shù)據(jù)進行處理。從每個源數(shù)據(jù)是由執(zhí)行一系列任務來轉換該數(shù)據(jù),并傳遞結果給應用程序的業(yè)務邏輯之前的獨立模塊進行處理。
圖1 - 使用單一模塊實現(xiàn)的解決方案
部分的單片模塊執(zhí)行的任務在功能上是非常相似的,但在模塊已被分開設計的。實現(xiàn)該任務的代碼被緊密模塊內耦合,并且此代碼已開發(fā)具有很少或沒有給定重新使用或可伸縮性的思想。
然而,由每個模塊或每個任務的部署要求執(zhí)行的處理任務,可能會改變,因為業(yè)務需求進行修改。有些任務可能是計算密集型的,并可能受益于強大的硬件上運行,而其他人可能并不需要如此昂貴的資源。此外,額外的處理可能需要在將來,或順序,其中由所述處理執(zhí)行的任務可能會改變。一個解決方案是必需的,解決了這些問題,并且增加的可能性代碼重用。
分解需要為每個數(shù)據(jù)流轉換為一組離散的元件(或過濾器)的處理,其中每一個執(zhí)行單任務。通過標準化每個組件接收和發(fā)射的數(shù)據(jù)的格式,這些過濾器可以組合在一起成為一個管道。這有助于避免重復代碼,并且可以很容易地移除,替換或集成額外的組件,如果處理要求改變。圖2顯示了這種結構的一個例子。
圖2 - 通過使用管道和過濾器實現(xiàn)的解決方案
處理一個請求所花費的時間取決于最慢的過濾器管道中的速度。這可能是一個或多個濾波器可能被證明是一個瓶頸,尤其是如果出現(xiàn)在從一個特定的數(shù)據(jù)源的數(shù)據(jù)流的大量請求。流水線結構的一個關鍵優(yōu)點是它提供了機會,運行速度慢的過濾器的并聯(lián)情況下,使系統(tǒng)能夠分散負載并提高吞吐量。
可以獨立縮放組成一個管道可以在不同的機器上運行過濾器,使他們和可以利用的彈性,許多云計算環(huán)境提供的優(yōu)勢。過濾器是計算密集型可以在高性能的硬件上運行,而其他要求不高的過濾器可以對商品(便宜)的硬件來承載。過濾器甚至不需要是在同一數(shù)據(jù)中心或地理位置,它允許在一個管道中的每個元素的環(huán)境下接近它需要的資源來運行。
圖3示出了從源 1 施加到管道中的數(shù)據(jù)的一個例子。
圖3 - 在一個管道負載平衡組件
如果一個濾波器的輸入和輸出被構造為一個流,它可能是能夠進行的處理并行的每個過濾器。在流水線的第一個過濾器可以開始工作,并開始發(fā)射其結果,它們會直接傳遞到序列中的下一個過濾器之前的第一過濾器已經完成它的工作。
另一個好處是靈活性,這種模式可以提供。如果一個過濾器發(fā)生故障或者其上運行的機器不再可用時,管道可能能夠重新安排濾波器所執(zhí)行的工作,并指示此工作到組件的另一個實例。單個過濾器的故障不會必然導致整個管道的故障。
使用管道和過濾器與補償交易模式相結合的模式可以提供一種替代的方法來實現(xiàn)分布式事務。分布式事務可以被分解成單獨的賠的任務,每個都可以通過使用一個過濾器,也實現(xiàn)了補償事務圖案來實現(xiàn)。在一個管道中的過濾器可以在運行接近它們保持數(shù)據(jù)被實現(xiàn)為單獨的托管工作。
在決定如何實現(xiàn)這個模式時,您應考慮以下幾點:
注意: 如果要實現(xiàn)管道使用消息隊列(如微軟的Azure服務總線隊列),消息隊列基礎設施可以提供自動重復消息檢測和清除。
使用這種模式時:
注意:它可能會向組過濾器應擴展一起在相同的過程。欲了解更多信息,請參閱計算資源整合模式。
這種模式可能不適合時:
可以使用消息隊列的一個序列,以提供執(zhí)行流水線所需的基礎設施。最初的消息隊列接收未處理的消息。實現(xiàn)為過濾器的任務偵聽此隊列的消息的組件,它執(zhí)行其工作,然后投遞轉化的消息序列中的下一個隊列。另一個過濾器的任務可以偵聽在這個隊列中的消息,對其進行處理,后的結果到另一個隊列,依此類推,直到完全轉化的數(shù)據(jù)出現(xiàn)在隊列中的最后一個消息。
如果你正在構建一個解決方案,在 Azure 上,你可以使用服務總線隊列提供了可靠的,可擴展的排隊機制。下面所示的 ServiceBusPipeFilter 類提供了一個例子。它演示了如何實現(xiàn)接收從隊列中輸入消息,處理這些郵件的過濾器,并張貼結果到另一個隊列。
注意: 該 ServiceBusPipeFilter 類在 PipesAndFilters 解決方案 PipesAndFilters.Shared 項目定義。此示例代碼都可以可以下載本指導意見。
public class ServiceBusPipeFilter
{
...
private readonly string inQueuePath;
private readonly string outQueuePath;
...
private QueueClient inQueue;
private QueueClient outQueue;
...
?
public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
{
...
this.inQueuePath = inQueuePath;
this.outQueuePath = outQueuePath;
}
?
public void Start()
{
...
// Create the outbound filter queue if it does not exist.
...
this.outQueue = QueueClient.CreateFromConnectionString(...);
?
...
// Create the inbound and outbound queue clients.
this.inQueue = QueueClient.CreateFromConnectionString(...);
}
?
public void OnPipeFilterMessageAsync(
Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
{
...
?
this.inQueue.OnMessageAsync(
async (msg) =>
{
...
// Process the filter and send the output to the
// next queue in the pipeline.
var outMessage = await asyncFilterTask(msg);
?
// Send the message from the filter processor
// to the next queue in the pipeline.
if (outQueue != null)
{
await outQueue.SendAsync(outMessage);
}
?
// Note: There is a chance that the same message could be sent twice
// or that a message may be processed by an upstream or downstream
// filter at the same time.
// This would happen in a situation where processing of a message was
// completed, it was sent to the next pipe/queue, and then failed
// to complete when using the PeekLock method.
// Idempotent message processing and concurrency should be considered
// in a real-world implementation.
},
options);
}
?
public async Task Close(TimeSpan timespan)
{
// Pause the processing threads.
this.pauseProcessingEvent.Reset();
?
// There is no clean approach for waiting for the threads to complete
// the processing. This example simply stops any new processing, waits
// for the existing thread to complete, then closes the message pump
// and finally returns.
Thread.Sleep(timespan);
?
this.inQueue.Close();
...
}
?
...
}
在 ServiceBusPipeFilter 類 Start 方法連接到一對輸入和輸出隊列,以及關閉方法從輸入隊列斷開。該 OnPipeFilterMessageAsync 方法執(zhí)行消息的實際處理;該 asyncFilterTask 參數(shù)這種方法指定要執(zhí)行的處理。該 OnPipeFilterMessageAsync 方法等待輸入隊列中收到的消息,因為它到達,并張貼結果到輸出隊列通過運行在每個郵件的 asyncFilterTask 參數(shù)指定的代碼。隊列本身的構造函數(shù)中指定。
樣品溶液的過濾器實現(xiàn)了在一組工作角色。每個工人的作用可獨立進行調整,這取決于它執(zhí)行的業(yè)務處理的復雜性,或者它需要執(zhí)行此處理的資源。此外,各輔助角色的多個實例可以并行地運行,以提高吞吐量。
下面的代碼顯示了一個名為 PipeFilterARoleEntry 的 Azure 工作者角色,這是在樣品溶液中 PipeFilterA 項目定義。
public class PipeFilterARoleEntry : RoleEntryPoint
{
...
private ServiceBusPipeFilter pipeFilterA;
?
public override bool OnStart()
{
...
this.pipeFilterA = new ServiceBusPipeFilter(
...,
Constants.QueueAPath,
Constants.QueueBPath);
?
this.pipeFilterA.Start();
...
}
?
public override void Run()
{
this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
{
// Clone the message and update it.
// Properties set by the broker (Deliver count, enqueue time, ...)
// are not cloned and must be copied over if required.
var newMsg = msg.Clone();
?
await Task.Delay(500); // DOING WORK
?
Trace.TraceInformation("Filter A processed message:{0} at {1}",
msg.MessageId, DateTime.UtcNow);
?
newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");
?
return newMsg;
});
?
...
}
?
...
}
這個角色包含 ServiceBusPipeFilter 對象。在角色 OnStart 方法連接到隊列接收輸入的信息并張貼輸出消息(隊列的名稱在常量類中定義)。 Run 方法調用 OnPipeFilterMessagesAsync 方法來對接收到的(在本例中,該處理通過等待較短的時間段模擬的)的每個消息執(zhí)行某些處理。何時處理完成時,一個新的消息被構造包含結果(在這種情況下,輸入消息被簡單地增加了一個自定義屬性),并將該消息發(fā)送到輸出隊列。
示例代碼中包含一個名為 PipeFilterBRoleEntry 在 PipeFilterB 項目的另一名工人的作用。這個角色類似于 PipeFilterARoleEntry 不同之處在于它的 Run 方法進行不同的處理。在本例中的解決方案,這兩種作用結合起來,構建一個管道;為 PipeFilterARoleEntry 角色輸出隊列是用于 PipeFilterBRoleEntry 角色的輸入隊列。
樣品溶液還提供了兩個名為 InitialSenderRoleEntry(在 InitialSender 項目)和 FinalReceiverRoleEntry(在 FinalReceiver 項目),進一步的角色。該 InitialSenderRoleEntry 作用提供了在管道中的初始消息。OnStart 方法連接到單個隊列和運行方法的帖子的方法來此隊列。這個隊列是所使用的 PipeFilterARoleEntry 作用,所以發(fā)布一條消息到這個隊列的輸入隊列導致由 PipeFilterARoleEntry 作用來接收和處理消息。經處理的信息,然后通過 PipeFilterBRoleEntry 作用傳遞。
為 FinalReceiveRoleEntry 角色輸入隊列是用于 PipeFilterBRoleEntry 角色的輸出隊列。 Run 方法在 FinalReceiveRoleEntry 作用,如下圖所示,接收到該消息,并且執(zhí)行一些最后的處理。然后將其寫入了過濾器的管道跟蹤輸出添加自定義屬性的值。
public class FinalReceiverRoleEntry : RoleEntryPoint
{
...
// Final queue/pipe in the pipeline from which to process data.
private ServiceBusPipeFilter queueFinal;
?
public override bool OnStart()
{
...
// Set up the queue.
this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
this.queueFinal.Start();
...
}
?
public override void Run()
{
this.queueFinal.OnPipeFilterMessageAsync(
async (msg) =>
{
await Task.Delay(500); // DOING WORK
?
// The pipeline message was received.
Trace.TraceInformation(
"Pipeline Message Complete - FilterA:{0} FilterB:{1}",
msg.Properties[Constants.FilterAMessageKey],
msg.Properties[Constants.FilterBMessageKey]);
?
return null;
});
...
}
?
...
}
更多建議: