W3Cschool
恭喜您成為首批注冊(cè)用戶
獲得88經(jīng)驗(yàn)值獎(jiǎng)勵(lì)
在上節(jié)中我們通過(guò)對(duì)比了解到了使用基于Netty傳輸的好處,那么在本節(jié)中我們就來(lái)看看Transport API是如何工作的。
Transport API 的核心是 Channel 接口,用于所有的出站操作,見下圖
如上圖所示,每個(gè) Channel 都會(huì)分配一個(gè) ChannelPipeline 和ChannelConfig。ChannelConfig 負(fù)責(zé)設(shè)置并存儲(chǔ) Channel 的配置,并允許在運(yùn)行期間更新它們。傳輸一般有特定的配置設(shè)置,可能實(shí)現(xiàn)了 ChannelConfig. 的子類型。
ChannelPipeline 容納了使用的 ChannelHandler 實(shí)例,這些ChannelHandler 將處理通道傳遞的“入站”和“出站”數(shù)據(jù)以及事件。ChannelHandler 的實(shí)現(xiàn)允許你改變數(shù)據(jù)狀態(tài)和傳輸數(shù)據(jù)。
現(xiàn)在我們可以使用 ChannelHandler 做下面一些事情:
Intercepting Filter(攔截過(guò)濾器)
ChannelPipeline 實(shí)現(xiàn)了常用的 Intercepting Filter(攔截過(guò)濾器)設(shè)計(jì)模式。UNIX管道是另一例子:命令鏈接在一起,一個(gè)命令的輸出連接到 的下一行中的輸入。
你還可以在運(yùn)行時(shí)根據(jù)需要添加 ChannelHandler 實(shí)例到ChannelPipeline 或從 ChannelPipeline 中刪除,這能幫助我們構(gòu)建高度靈活的 Netty 程序。例如,你可以支持 STARTTLS 協(xié)議,只需通過(guò)加入適當(dāng)?shù)?ChannelHandler(這里是 SslHandler)到的ChannelPipeline 中,當(dāng)被請(qǐng)求這個(gè)協(xié)議時(shí)。
此外,訪問(wèn)指定的 ChannelPipeline 和 ChannelConfig,你能在Channel 自身上進(jìn)行操作。Channel 提供了很多方法,如下列表:
Table 4.1 Channel main methods
方法名稱 | 描述 |
---|---|
eventLoop() | 返回分配給Channel的EventLoop |
pipeline() | 返回分配給Channel的ChannelPipeline |
isActive() | 返回Channel是否激活,已激活說(shuō)明與遠(yuǎn)程連接對(duì)等 |
localAddress() | 返回已綁定的本地SocketAddress |
remoteAddress() | 返回已綁定的遠(yuǎn)程SocketAddress |
write() | 寫數(shù)據(jù)到遠(yuǎn)程客戶端,數(shù)據(jù)通過(guò)ChannelPipeline傳輸過(guò)去 |
flush() | 刷新先前的數(shù)據(jù) |
writeAndFlush(...) | 一個(gè)方便的方法用戶調(diào)用write(...)而后調(diào)用y flush() |
后面會(huì)越來(lái)越熟悉這些方法,現(xiàn)在只需要記住我們的操作都是在相同的接口上運(yùn)行,Netty 的高靈活性讓你可以以不同的傳輸實(shí)現(xiàn)進(jìn)行重構(gòu)。
寫數(shù)據(jù)到遠(yuǎn)程已連接客戶端可以調(diào)用Channel.write()方法,如下代碼:
Listing 4.5 Writing to a channel
Channel channel = ...; // 獲取channel的引用
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); //1
ChannelFuture cf = channel.writeAndFlush(buf); //2
cf.addListener(new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) { //4
System.out.println("Write successful");
} else {
System.err.println("Write error"); //5
future.cause().printStackTrace();
}
}
});
1.創(chuàng)建 ByteBuf 保存寫的數(shù)據(jù)
2.寫數(shù)據(jù),并刷新
3.添加 ChannelFutureListener 即可寫操作完成后收到通知,
4.寫操作沒有錯(cuò)誤完成
5.寫操作完成時(shí)出現(xiàn)錯(cuò)誤
Channel 是線程安全(thread-safe)的,它可以被多個(gè)不同的線程安全的操作,在多線程環(huán)境下,所有的方法都是安全的。正因?yàn)?Channel 是安全的,我們存儲(chǔ)對(duì)Channel的引用,并在學(xué)習(xí)的時(shí)候使用它寫入數(shù)據(jù)到遠(yuǎn)程已連接的客戶端,使用多線程也是如此。下面的代碼是一個(gè)簡(jiǎn)單的多線程例子:
Listing 4.6 Using the channel from many threads
final Channel channel = ...; // 獲取channel的引用
final ByteBuf buf = Unpooled.copiedBuffer("your data",
CharsetUtil.UTF_8).retain(); //1
Runnable writer = new Runnable() { //2
@Override
public void run() {
channel.writeAndFlush(buf.duplicate());
}
};
Executor executor = Executors.newCachedThreadPool();//3
//寫進(jìn)一個(gè)線程
executor.execute(writer); //4
//寫進(jìn)另外一個(gè)線程
executor.execute(writer); //5
1.創(chuàng)建一個(gè) ByteBuf 保存寫的數(shù)據(jù)
2.創(chuàng)建 Runnable 用于寫數(shù)據(jù)到 channel
3.獲取 Executor 的引用使用線程來(lái)執(zhí)行任務(wù)
4.手寫一個(gè)任務(wù),在一個(gè)線程中執(zhí)行
5.手寫另一個(gè)任務(wù),在另一個(gè)線程中執(zhí)行
Copyright©2021 w3cschool編程獅|閩ICP備15016281號(hào)-3|閩公網(wǎng)安備35020302033924號(hào)
違法和不良信息舉報(bào)電話:173-0602-2364|舉報(bào)郵箱:jubao@eeedong.com
掃描二維碼
下載編程獅App
編程獅公眾號(hào)
聯(lián)系方式:
更多建議: