有時(shí)您可能希望部署新版本的 Samza 作業(yè),以不同的方式計(jì)算結(jié)果;也許你修復(fù)了一個(gè) bug 或者引入了一個(gè)新功能。例如,假設(shè)您有一個(gè) Samza 工作,將郵件分類為垃圾郵件或非垃圾郵件,使用您在離線培訓(xùn)的機(jī)器學(xué)習(xí)模型。您希望定期部署 Samza 工作的更新版本,其中包括最新的分類模型。
當(dāng)您啟動(dòng)新版本的工作時(shí),會出現(xiàn)一個(gè)問題:您以前用舊版本的作業(yè)處理的郵件要做什么?答案取決于你想要的行為:
這種方法需要一個(gè)輸入系統(tǒng),如 Kafka,它允許您在時(shí)間上跳回到流中的上一個(gè)點(diǎn)。下面我們來討論這個(gè)工作原理。
在并行重繞方法中,您并行運(yùn)行兩個(gè)作業(yè):一個(gè)作業(yè)繼續(xù)處理低延遲(實(shí)時(shí)作業(yè))的實(shí)時(shí)更新,而另一個(gè)作業(yè)在流中的較舊點(diǎn)啟動(dòng)并重新處理歷史數(shù)據(jù)(后處理工作)。這兩個(gè)作業(yè)在不同的時(shí)間點(diǎn)消耗相同的輸入流,最終后處理工作可以滿足實(shí)時(shí)工作。
在部署并行倒帶之前,您需要仔細(xì)思考一些細(xì)節(jié),我們在下面討論。
簡單的倒帶和平行倒帶方法的一個(gè)常見方面是:您有一個(gè)工作可以在輸入流中跳回舊的時(shí)間點(diǎn),并從那時(shí)起就消耗所有的消息。通過與 Samza 的檢查點(diǎn)合作來實(shí)現(xiàn)這一點(diǎn)。
通常,當(dāng) Samza 作業(yè)啟動(dòng)時(shí),它會讀取最新的檢查點(diǎn),以確定需要恢復(fù)處理的輸入流中哪個(gè)偏移量。如果你需要回到更早的時(shí)間,可以采取以下兩種方式之一:
使用這些方法之一,您可以讓 Samza 在輸入系統(tǒng)中重新處理消息的整個(gè)歷史記錄。輸入系統(tǒng)如 Kafka 可以保留大量的歷史 - 參見下面的討論。為了加快歷史數(shù)據(jù)的再處理,可以(增加容器數(shù) job.container.count 如果你在紗線行走 Samza),以提高你的工作的計(jì)算資源。
如果您的工作保持任何持續(xù)狀態(tài),則需要在及時(shí)跳回時(shí)小心:重置檢查點(diǎn)不會自動(dòng)更改持久狀態(tài),因此您可以在稍后使用狀態(tài)時(shí)最終重新處理舊消息。在大多數(shù)情況下,及時(shí)跳回的工作應(yīng)以空狀態(tài)開始。您可以通過刪除更改日志主題或更改作業(yè)配置中更改日志主題的名稱來重置狀態(tài)。
當(dāng)你及時(shí)回來時(shí),你使用 Samza 有點(diǎn)像一個(gè)批處理框架(例如 MapReduce) - 區(qū)別在于處理所有歷史數(shù)據(jù)后,你的工作不會停止,而是繼續(xù)運(yùn)行處理新消息流時(shí),它的優(yōu)勢在于您不需要編寫和維護(hù)您的工作的單獨(dú)批量和流式版本:您可以使用相同的 Samza API 來處理實(shí)時(shí)和歷史數(shù)據(jù)。
Samza 本身不保留歷史 - 輸入系統(tǒng)的責(zé)任,如 Kafka。你可以跳多遠(yuǎn)的時(shí)間取決于該系統(tǒng)中保留的歷史記錄數(shù)量。
Kafka 旨在保持相當(dāng)大的歷史:Kafka 經(jīng)紀(jì)人通??梢员A粢粋€(gè)或兩個(gè)星期的消息歷史記錄,即使是大量的話題。保留期大部分取決于您有多少磁盤空間。即使你有太字節(jié)的歷史,Kafka 的表現(xiàn)仍然很高。
X- 200 200 X- 200 200 X- 200 200 X-
在數(shù)據(jù)庫更改流中,當(dāng)您重新處理數(shù)據(jù)時(shí),通常要重新處理整個(gè)數(shù)據(jù)庫。你不想錯(cuò)過一個(gè)值,因?yàn)樗洗胃率窃趲讉€(gè)星期前。換句話說,您不希望只是因?yàn)楸饶硞€(gè)閾值更早而刪除更改事件。在這種情況下,當(dāng)您及時(shí)跳回時(shí),您需要重新開始時(shí)間,即先對數(shù)據(jù)庫進(jìn)行的更改(在 Kafka 中稱為 “offset 0”)。
幸運(yùn)的是,可以使用稱為日志壓縮的 Kafka 功能有效地完成此操作。
例如,假設(shè)您的數(shù)據(jù)庫包含計(jì)數(shù)器:每當(dāng)發(fā)生一些事情時(shí),您將增加相應(yīng)的計(jì)數(shù)器并使用新的計(jì)數(shù)器值更新數(shù)據(jù)庫。每個(gè)更新都會發(fā)送到更改日志,并且由于有更多更新,更改日志流將占用大量空間。啟用日志壓縮后,Kafka 會在后臺對數(shù)據(jù)流進(jìn)行重復(fù)數(shù)據(jù)刪除,只保留每個(gè)密鑰的最新計(jì)數(shù)器值,并刪除相同計(jì)數(shù)器的任何舊值。這樣可以減少流的大小,以便您可以保留每個(gè)密鑰的最新更新,即使最近更新了很久以前。
啟用日志壓縮后,數(shù)據(jù)庫更改流將成為整個(gè)數(shù)據(jù)庫的完整副本。通過跳轉(zhuǎn)到 0,您的 Samza 作業(yè)可以掃描整個(gè)數(shù)據(jù)庫并重新處理它。這是構(gòu)建可擴(kuò)展應(yīng)用程序的非常強(qiáng)大的方法。
如果您采用上述并行重繞方法,并行運(yùn)行兩個(gè)作業(yè),則需要仔細(xì)配置它們以避免出現(xiàn)問題。特別要注意的是:
Samza 為您提供了重新處理歷史數(shù)據(jù)的很多靈活性,您不需要針對單獨(dú)的批處理 API 進(jìn)行編程,以利用它。如果您注意到這些問題,您可以建立一個(gè)非常強(qiáng)大的數(shù)據(jù)系統(tǒng),但是您仍然可以隨時(shí)更改處理邏輯。
更多建議: