本教程提供了使用 Samza 異步 API 和多線程的示例和指南。
如果您的工作過程涉及同步 IO 或阻塞 IO,則可以簡單地配置 Samza 內(nèi)置線程池來并行運行任務。在以下示例中,SyncRestTask 使用 Jersey 客戶端在每個進程()中進行休息調(diào)用。
public class SyncRestTask implements StreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Response response = target.request().get();
System.out.println("Response status code " + response.getStatus() + " received.");
}
@Override
public void close() throws Exception {
client.close();
}
}
默認情況下,Samza 將在單個線程中順序運行此任務。在下面我們配置大小為16的線程池并行運行任務:
# Thread pool to run synchronous tasks in parallel.
job.container.thread.pool.size=16
注意:線程池將用于運行任務的所有同步操作,包括 StreamTask.process(),WindowableTask.window()和內(nèi)部的 Task.commit()。這是為了最大化任務之間的并行性以及減少阻塞時間。在多線程中運行任務時,默認情況下,Samza 仍保證任務內(nèi)的消息的按順序處理。
如果您的工作過程是異步的,例如,進行非阻塞的遠程 IO 調(diào)用,AsyncStreamTask 接口將為其提供支持。在下面的例子中,AsyncRestTask 會使異步休息調(diào)用,并在完成后觸發(fā)回調(diào)。
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
System.out.println("Response status code " + response.getStatus() + " received.");
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
在上面的例子中,processAsync()返回時,進程不完整。在來自澤西客戶端的回調(diào)線程中,我們觸發(fā) TaskCallback 以指示進程完成。為了確保在一定時間間隔(例如5秒)內(nèi)觸發(fā)回調(diào),您可以配置以下屬性:
# Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
task.callback.timeout.ms=5000
注意:默認情況下,Samza 還保證 AsyncStreamTask 中消息的按順序進程,這意味著在觸發(fā)前一個 processAsync()回調(diào)之前,任務的下一個 processAsync()才會被調(diào)用。
在上述兩種情況下,Samza 默認支持按順序進行。通過允許任務并行處理多個未完成的消息也支持進一步的并行性。以下配置允許一個任務一次處理最多4個未完成的消息:
# Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.
task.max.concurrency=4
注意:在 AsyncStreamTask 的情況下,processAsync()仍然按消息到達的順序調(diào)用,但完成可能會出錯。在具有多線程的 StreamTask 的情況下,process()可以無序運行,因為它們被分派到線程池。應此選項不是在需要輸出的嚴格的順序使用。
在任何情況下,Samza 保證以下語義:
更多建議: