貿(mào)易公司做網(wǎng)站有用嗎搭建一個網(wǎng)站需要多少錢
目錄
15.Handler
15.1 handler的分類
15.1.1 按照方向劃分
15.1.2 handler的結(jié)構(gòu)
15.2 輸入方向ChannelInboundHandlerAdapter
15.2.1 輸出方向Handler的順序
15.2.2 多個輸入方向Handler之間的數(shù)據(jù)傳遞
15.2.2.1 handler消失了
15.2.2.2 手動編寫netty提供的new StringDecoder();這一Handler
15.2.2.3 責(zé)任鏈設(shè)計模式
15.2.2.4 ctx上下文對象
15.3 輸出方向ChannelOutboundHandlerAdapter
15.3.1 輸出方向Handler的順序
15.3.2 總結(jié) 【自己多測試測試,真正底層細(xì)節(jié)看源碼,現(xiàn)在只能測試,然后瞎猜】
15.3.3 ctx和ch的writeAndFlush()
15.4 關(guān)于head和tail節(jié)點
16.netty服務(wù)端編程總結(jié)
16.1 服務(wù)端關(guān)于handler和childHandler
16.2 為什么叫孩子處理器?childHandler
16.3 客戶端關(guān)于bootstrap.handler
17.作圖總結(jié) [橘子哥的圖]
18.EmbeddedChannel
15.Handler
Handler是程序員接觸最多的地方。最重要的編碼環(huán)節(jié)。
為何說重要,因為我們前面可以知道serverBootstrap.group(new NioEventLoopGroup());服務(wù)端在結(jié)束了該操作后,實際上就開啟了一個線程池在處理連接ACCEPT事件了,等到連接建立后,后續(xù)的IO操作會把數(shù)據(jù)發(fā)送過來這個數(shù)據(jù)實際上就是我們業(yè)務(wù)中要處理的對象,那么這個處理就是在Handler里面處理的,這個處理就是你業(yè)務(wù)邏輯的所在地,至于你怎么實現(xiàn),是發(fā)mq還是寫庫,還是做什么處理,這個是另外一件事。但是這里的Handler就是拿到網(wǎng)絡(luò)傳輸數(shù)據(jù)的地方,也就是以前所說的SocketChannel的地方,而這個Handler通常都是一組,它有很多實現(xiàn),許多個Handler組成了一個pipeline流水線,每個Handler各司其職,每一種Handler會完成功能中的一件事。通過pipeline流水線來組合各種Handler,實現(xiàn)一系列的功能
15.1 handler的分類
15.1.1 按照方向劃分
我們說的handler是有方向的,可以按照讀入數(shù)據(jù)和寫出數(shù)據(jù)的方向劃分
讀入方向
也就是站在一個角度,數(shù)據(jù)是流入,舉個例子。
我在看服務(wù)端的時候,接收客戶端過來的數(shù)據(jù),對于服務(wù)端來說這屬于數(shù)據(jù)流入,也就是讀入數(shù)據(jù)。這就是讀入方向。而此時對于客戶端來說,數(shù)據(jù)就是寫出方向的。
對于讀入數(shù)據(jù)來說,都屬于ChannelInboundHandlerAdapter,我們接收數(shù)據(jù)的一系列Handler都是這個ChannelInboundHandlerAdapter的子類實現(xiàn)。
寫出方向
如果服務(wù)端此時要給客戶端發(fā)數(shù)據(jù)。這就屬于服務(wù)端的寫出方向,這都屬于
ChannelOutboundHandlerAdapter。
這個方向是相對的,你服務(wù)端寫出數(shù)據(jù),對于客戶端就是寫入。不管怎么看,你如果屬于讀入(吃數(shù)據(jù)),就是在拿到數(shù)據(jù)之后做ChannelInboundHandlerAdapter的處理,如果你是往出寫數(shù)據(jù),也就是吐數(shù)據(jù),寫出去之前,那就是要做ChannelOutboundHandlerAdapter的處理。
15.1.2 handler的結(jié)構(gòu)
pipeline中的各個handler是用雙向鏈表組成的,這個鏈表中間是你所有的配置的handler,實際上一頭一尾還有一個head Handler和一個tail Handler,這兩個Handler是Netty自帶的Handler,負(fù)責(zé)來管理這個雙向鏈表
1.Handler作用:用于處理接收數(shù)據(jù)后 或 發(fā)送數(shù)據(jù)前這兩個時間點的數(shù)據(jù),是程序員使用netty最重要的戰(zhàn)斗場地
2.通過Pipeline把多個handler有機的整合成了一個整體
讀取接收數(shù)據(jù):ChannelInboundHandler子類
寫出數(shù)據(jù):ChannelOutboundHandler子類
3.Pipeline中,執(zhí)行相同種類的Handler有固定順序,不同種類的Handler不講究先后順序
4.Handler傳遞數(shù)據(jù):
super.channelRead(ctx,msg);
super.channelRead(ctx,msg);底層為ctx.fireChannelRead(s)
最后一個Handler不需要傳遞數(shù)據(jù),所以最后一個Handler無需調(diào)用該方法
5.pipeline中的各個Handler是使用雙向鏈表組成的,這個鏈表中間就是你所有的配置的Handler,實際上一頭一尾: Head Handler,tail Handler這兩個Handler來進行管理整個雙向鏈表
15.2 輸入方向ChannelInboundHandlerAdapter
15.2.1 輸出方向Handler的順序
我們說handler的處理是被pipeline流水線管理的。當(dāng)你把handler一個個的添加到pipeline之后。就是按照你添加的順序執(zhí)行的。因為他的添加方法就是addLast,不斷的往后面追加,所以就是先來先執(zhí)行的。
我們來看一下這個順序性。
- 服務(wù)端
package com.messi.netty_core_02.netty04;import com.sun.security.ntlm.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer2 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer2.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(defaultEventLoopGroup,"handler1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler1_____________________________");log.info("msg:{}",msg);super.channelRead(ctx,msg) ;}});pipeline.addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler2_____________________________");super.channelRead(ctx,msg) ;}});}});serverBootstrap.bind(8000);}}
- 客戶端
package com.messi.netty_core_02.netty04;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;import java.net.InetSocketAddress;public class MyNettyClient2 {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);NioEventLoopGroup group = new NioEventLoopGroup();bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LoggingHandler());pipeline.addLast(new StringEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();channel.writeAndFlush("leomessi");System.out.println("MyNettyClient2.main");group.shutdownGracefully();}}
- debug客戶端進行測試
1.先給客戶端設(shè)置斷點:
2.啟動服務(wù)端
3.debug啟動客戶端
4.看控制臺打印輸出:
5.debug客戶端,讓客戶端多次發(fā)數(shù)據(jù)給服務(wù)端
見下圖:
你可以觀察到一個輸出現(xiàn)象就是:無論你客戶端發(fā)送多少次數(shù)據(jù)給服務(wù)端,處理你這個客戶端發(fā)送數(shù)據(jù)的線程都是同一個DefaultEventLoop(處理該NioSocketChannel對應(yīng)的業(yè)務(wù)邏輯)或同一個NioEventLoop(處理該NioSocketChannel對應(yīng)的IO事件邏輯)
所以你可以得出一個結(jié)論:客戶端與服務(wù)端建立連接后,NioServerSocketChannel會分配一個NioSocketChannel給該客戶端與服務(wù)端作為交互通道。之后,同一個SocketChannel的任務(wù)(IO事件任務(wù)或業(yè)務(wù)邏輯任務(wù))會讓同一個線程去做,比如說:同一個NioEventLoop處理讀寫事件,同一個DefaultEventLoop處理業(yè)務(wù)Handler
- 細(xì)節(jié)
為什么要引入DefaultEventLoopGroup來處理業(yè)務(wù)邏輯?
其實很簡單,就是為了提高吞吐。因為NioEventLoopGroup通過Reactor模型劃分為Boss和Worker分別去處理連接,read,write等IO事件。再復(fù)習(xí)下Reactor模型的設(shè)計吧。當(dāng)一個客戶端請求連接服務(wù)端時,服務(wù)端的Boss線程進行處理這一次連接,一旦這一次的連接建立后,在程序?qū)用?#xff0c;NioServerSocketChannel就會生成一個NioSocketChannel,Boss線程就會把該連接所對應(yīng)的NioSocketChannel中IO事件,業(yè)務(wù)邏輯的交互操作全部交給worker線程去做,我們知道連接只需要建立一次,所以boss線程壓力較小,所以boss線程一般只有一個或兩個。worker線程壓力過大,所以一般根據(jù)計算機CPU核數(shù)去具體設(shè)置,但是呢當(dāng)客戶端過多時,一個worker線程是要進行處理多個客戶端連接后的所有操作的,如果一個worker線程在處理完某一個客戶端的寫出數(shù)據(jù)的操作后,又得接著去處理該客戶端觸發(fā)的業(yè)務(wù)邏輯,假設(shè)說這個業(yè)務(wù)邏輯很復(fù)雜很耗時,你這個worker線程是不是就阻塞了。。。我們之前就說過worker線程數(shù)量是有限制的,所以為了提高系統(tǒng)吞吐量,worker線程只處理IO事件,對于業(yè)務(wù)處理耗時操作,會異步新開啟一個新線程去處理。worker線程會直接返回一個確認(rèn)告訴客戶端,客戶端也可以繼續(xù)向下執(zhí)行它的業(yè)務(wù)邏輯,對于客戶端而言這也很高效。當(dāng)異步線程處理完這一業(yè)務(wù)操作后,需要返回業(yè)務(wù)處理結(jié)果,此時會拿到worker線程給的客戶端信息進行回調(diào)客戶端的回調(diào)方法,然后把業(yè)務(wù)處理結(jié)果返回給客戶端??梢?#xff0c;異步線程是要開啟的,那么這個異步線程怎么做呢?其實就是DefaultEventLoopGroup這一線程池去做啦,為什么呢?還是沒說為什么,其實很好理解,使用DefaultEventLoopGroup可以簡化開發(fā),最重要的就是更好的和netty體系進行融合!
但是注意:只有顯示指定使用DefaultEventLoopGroup的Handler才可以使用defaultEventLoop線程去處理對應(yīng)的Handler業(yè)務(wù),否則還是使用NioEventLoop線程去處理,如下圖所示:
ofcourse,當(dāng)然,假設(shè)客戶端多次進行發(fā)數(shù)據(jù)給服務(wù)端,服務(wù)端同樣使用相同的defaultEventLoop線程或NioEventLoop線程去處理對應(yīng)的Handler,不會改變的。
如下這個例子:
15.2.2 多個輸入方向Handler之間的數(shù)據(jù)傳遞
15.2.2.1 handler消失了
15.2.2.2 手動編寫netty提供的new StringDecoder();這一Handler
15.2.2.3 責(zé)任鏈設(shè)計模式
多個Handler組成最終的處理鏈路,這就是責(zé)任鏈設(shè)計模式,把各個工作分到每個部分里面,放到鏈路中挨個處理,你不往下傳(不調(diào)用super.channelRead(ctx,msg)),就類似于filter過濾器中不往下寫(return true)。后面就不會再執(zhí)行了,鏈路就斷開了。
而且你如果是處于最后一個handler比如我的handler2,他處于最后一個handler了,其實他不往下傳了,也就可以不寫這個了。寫了也沒事。反正后面沒了。
pipeline中的handler是個雙向鏈表,因為有讀入讀出,這個后面看看。
總結(jié):
把一個工作做成一個鏈條,則這一個工作分成若干個步驟,并且每一個步驟都會對數(shù)據(jù)進行不斷的加工處理,會把數(shù)據(jù)不斷的傳遞給下一個步驟,直到最后一個環(huán)節(jié)步驟為止。
15.2.2.4 ctx上下文對象
ctx:上下文環(huán)境(ChannelHandlerContext類型)
ctx對象管理的是所有Handler,它是Handler運行的環(huán)境。ctx管理著數(shù)據(jù)的傳遞,也管理著ByteBuf
15.3 輸出方向ChannelOutboundHandlerAdapter
15.3.1 輸出方向Handler的順序
前面我們一直說的是輸入方向的Hnadler的處理,也就是ChannelInboundHandlerAdapter這個處理。 現(xiàn)在我們再來看一下關(guān)于寫出數(shù)據(jù)的操作,也就是ChannelOutboundHandlerAdapter這個處理器操作。我們來看一下代碼。
- 客戶端代碼
package com.messi.netty_core_02.netty04;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;import java.net.InetSocketAddress;public class MyNettyClient2 {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);NioEventLoopGroup group = new NioEventLoopGroup();bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LoggingHandler());pipeline.addLast(new StringEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();channel.writeAndFlush("leomessi");System.out.println("MyNettyClient2.main");group.shutdownGracefully();}}
- 服務(wù)端代碼
package com.messi.netty_core_02.netty04;import com.sun.security.ntlm.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.CharBuffer;
import java.nio.charset.Charset;public class MyNettyServer2 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer2.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// pipeline.addLast(new StringDecoder());pipeline.addLast(defaultEventLoopGroup,"handler1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler1_____________________________");ByteBuf byteBuf = (ByteBuf) msg;CharBuffer decode = Charset.forName("UTF-8").decode(byteBuf.nioBuffer());log.info("decode:{}",decode);super.channelRead(ctx,decode);}});pipeline.addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler2_____________________________");log.info("decode:{}",msg);super.channelRead(ctx,msg) ;}});pipeline.addLast(defaultEventLoopGroup,"handler3",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler3___________________________");super.channelRead(ctx,msg);}});pipeline.addLast(defaultEventLoopGroup,"handler4",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler4_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");ch.writeAndFlush("服務(wù)端向客戶端發(fā)送數(shù)據(jù)");super.write(ctx, msg, promise);}});pipeline.addLast(defaultEventLoopGroup,"handler5",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler5_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});pipeline.addLast(defaultEventLoopGroup,"handler6",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler6_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});}});serverBootstrap.bind(8000);}}
- 測試
我們把服務(wù)端寫出的操作放在handler4中,結(jié)果服務(wù)端寫出對應(yīng)的Handler沒有調(diào)用:
修改一下:
我們把服務(wù)端寫出數(shù)據(jù)的操作放置到最后一個讀取的Handler中:
結(jié)果顯示可以成功調(diào)用寫出Handler!
分析:
我們看到執(zhí)行了輸出處理器的操作信息,因為我們接收數(shù)據(jù)的也就是h3那里寫出了數(shù)據(jù),就能往下走了,每個寫handler里面用了super。write繼續(xù)往下傳遞寫出數(shù)據(jù),但是問題來了。我們add的handler順序是4,5,6但是執(zhí)行的順序卻是6,5,4這樣的倒序。
實際上我們來看個圖。這個圖是所有的handler的一個結(jié)構(gòu),我們說數(shù)據(jù)從外部進來的時候,數(shù)據(jù)會從head接收到,然后順序執(zhí)行h1 h2 h3這個順序。
但是輸出的操作處理器是從tail開始的,也就是h6 h5 h4這樣的順序。而且當(dāng)我們先處理接收數(shù)據(jù),在處
理寫出數(shù)據(jù),是按照這樣的h1 h2 h3執(zhí)行完了,看有沒有下一個輸入,如果沒有就直接走到tail了,然后
從tail往前執(zhí)行輸出,執(zhí)行輸出的時候,也就是從tail開始的。然后倒序執(zhí)行。
注釋:head和tail這兩個Handler是netty自定義自帶的Handler處理器類,負(fù)責(zé)進行管理整個pipeline流水線,管理所有的Handler處理器類
- 再修改一下
把服務(wù)端寫出數(shù)據(jù)的操作放到第一個Handler:
輸出:
- 再修改一下
把handler1的向后傳遞給刪除:
把handler5的向后傳遞給刪除:
輸出:
由于handler1和handler5的向前傳遞都斷了,所以:讀取Handler只輸出handler1,輸出Handler只輸出handler6和handler5
- 修改:基于最原始開頭給出的代碼,只斷開handler4的向前傳遞
結(jié)果表明:對讀取Handler無影響
- 修改:基于最原始開頭給出的代碼,任意修改handler4的位置
根據(jù)輸出結(jié)果可以得出一個結(jié)論,可以自己測試一下:
順序只在同種handler里面產(chǎn)生,不同種類的handler不受順序影響。你可以這樣想,當(dāng)你此時執(zhí)行輸入Handler時,你把輸出Handler都掩蓋住,看輸入Handler之間的相對位置就是真正的執(zhí)行順序!當(dāng)你執(zhí)行輸出Handler時,同理可得。
15.3.2 總結(jié) 【自己多測試測試,真正底層細(xì)節(jié)看源碼,現(xiàn)在只能測試,然后瞎猜】
# 其邏輯一定是從接收到的數(shù)據(jù)head開始往后走,挨個走過所有的InBound處理器。然后處理完了,從tail走,倒序往前走執(zhí)行所有的outBound處理器。
# 基于第一條規(guī)則,不管你怎么變順序,哪怕是輸出和輸入的各種交錯add。也是這么個邏輯,輸出的處理器順序不會影響輸入的處理器邏輯。而且每個輸入的處理器都要super.channelRead(ctx,msg);才能往下一個發(fā)。不管123這樣的順序,而是看你add的順序。而且哪怕你是h1 h4 h2 h3這樣,他也是先處理讀的h1 h2 h3然后才是h4,因為h4是輸出Handler,因為讀寫處理器是互不影響的。
# 基于前兩條規(guī)則,順序只在同種handler里面產(chǎn)生,不同種類的handler不影響
15.3.3 ctx和ch的writeAndFlush()
我們剛才寫出數(shù)據(jù)的時候用的是ch.writeAndFlush("服務(wù)端給客戶端寫的信息...");這個操作,其中ch是NioSocketChannel ch,這是客戶端和服務(wù)端建立的連接。其實我們的參數(shù)里面ctx也有這個寫出方法。我們來看一下:
- 服務(wù)端代碼
package com.messi.netty_core_02.netty04;import com.sun.security.ntlm.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.CharBuffer;
import java.nio.charset.Charset;public class MyNettyServer2 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer2.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// pipeline.addLast(new StringDecoder());pipeline.addLast(defaultEventLoopGroup,"handler1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler1_____________________________");ByteBuf byteBuf = (ByteBuf) msg;CharBuffer decode = Charset.forName("UTF-8").decode(byteBuf.nioBuffer());log.info("decode:{}",decode);super.channelRead(ctx,decode);}});pipeline.addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler2_____________________________");log.info("decode:{}",msg);super.channelRead(ctx,msg) ;}});pipeline.addLast(defaultEventLoopGroup,"handler3",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler3___________________________");
// ch.writeAndFlush("服務(wù)端向客戶端發(fā)送數(shù)據(jù)");ctx.writeAndFlush("服務(wù)端向客戶端寫信息....")super.channelRead(ctx,msg);}});pipeline.addLast(defaultEventLoopGroup,"handler4",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler4_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});pipeline.addLast(defaultEventLoopGroup,"handler5",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler5_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});pipeline.addLast(defaultEventLoopGroup,"handler6",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler6_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});}});serverBootstrap.bind(8000);}}
- 測試
我們發(fā)現(xiàn)h4 h5 h6又沒了,原因還是那個圖:
之前我們使用ch這一SocketChannel去寫出數(shù)據(jù),這個ch是本次連接的對象,所以他能感知到所有本次連接的handler,也就是全局的,他是從tail節(jié)點往前遍歷一直到head節(jié)點,但是ctx只是當(dāng)前h3的上下文對象,他無法感知到其余handler的信息。
ctx代表的是當(dāng)前這個handler處理器的上下文,也就是h3的上下文,其余的handler不知道h3他的上下文。當(dāng)h3使用ctx發(fā)起寫出操作時,他的流程是從當(dāng)前上下文的handler節(jié)點往前走,一直到head節(jié)點,所有他會去執(zhí)行h3前面所有的寫出處理器handler。但是此時h3前面沒有寫出處理器handler,所以就不執(zhí)行了。那么我們現(xiàn)在修改一下代碼,在h3之前注冊幾個寫出handler處理器,比如h5這個handler:
package com.messi.netty_core_02.netty04;import com.sun.security.ntlm.Server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.CharBuffer;
import java.nio.charset.Charset;public class MyNettyServer2 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer2.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// pipeline.addLast(new StringDecoder());pipeline.addLast(defaultEventLoopGroup,"handler1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler1_____________________________");ByteBuf byteBuf = (ByteBuf) msg;CharBuffer decode = Charset.forName("UTF-8").decode(byteBuf.nioBuffer());log.info("decode:{}",decode);super.channelRead(ctx,decode);}});pipeline.addLast(defaultEventLoopGroup,"handler5",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler5_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});pipeline.addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler2_____________________________");log.info("decode:{}",msg);super.channelRead(ctx,msg) ;}});pipeline.addLast(defaultEventLoopGroup,"handler3",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler3___________________________");
// ch.writeAndFlush("服務(wù)端向客戶端發(fā)送數(shù)據(jù)");ctx.writeAndFlush("服務(wù)端向客戶端寫信息....");super.channelRead(ctx,msg);}});pipeline.addLast(defaultEventLoopGroup,"handler4",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler4_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});pipeline.addLast(defaultEventLoopGroup,"handler6",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler6_______________進行write寫出,輸出處理,其實就是向客戶端寫數(shù)據(jù)");super.write(ctx, msg, promise);}});}});serverBootstrap.bind(8000);}}
我們看到如期輸出了,來分析一下原因,我們在代碼里面依次注冊了h1,h5,h2,h3,h4,h6。結(jié)構(gòu)變?yōu)槿缦滤?#xff1a;
當(dāng)h3這一handler執(zhí)行完ctx的寫出之后,會從當(dāng)前h3開始執(zhí)行h3前面的寫出處理器,直到head,只會執(zhí)行h5。因為ctx只具有當(dāng)前handler3的上下文。
但是把handler3中的ctx.writeXXX修改成ch.writeAndFlush的話,由于ch是整個客戶端連接的NioSocketChannel,所以他是面向全局的,也就是他拿到的是tail,然后從tail往前走,去找出全部的輸出handler,執(zhí)行順序為:h6->h4->h5,前后順序是相對的。找輸出handler時只看輸出handler,不看輸入!
PS:tail和head是輔助節(jié)點,你在代碼里面看不到,得去看源碼。
當(dāng)啟動服務(wù)端的時候,此時就是NioEventLoop里面的線程在做select監(jiān)聽。進入死循環(huán),等客戶端連上來才走后面的發(fā)送,然后交給handler的流水線做處理
15.4 關(guān)于head和tail節(jié)點
我們上面說的pipline中有兩個handler是netty內(nèi)置的,叫做head和tail,我們已經(jīng)知道了,當(dāng)服務(wù)端輸入的時候,會按照添加順序執(zhí)行inboundHandler,當(dāng)服務(wù)端往出寫的時候,會按照添加順序的反方向執(zhí)行outBoundHandler。那么問題來了,對于內(nèi)置handler的head和tail在輸入輸出的時候到底執(zhí)行不執(zhí)行呢?
# 答案是輸入的時候執(zhí)行head->h1->...->tail
# 輸出的時候執(zhí)行的是h6->h4->h5->head
看一下原因,這里的順序后面源碼再看,目前為止只能根據(jù)測試結(jié)果去總結(jié)結(jié)論。
我們看一下head節(jié)點的源碼:
final class HeadContext extends AbstractChannelHandlerContext implements
ChannelOutboundHandler, ChannelInboundHandler
Head的類實現(xiàn)了in和outbound,可見其本質(zhì)就是OutboundHandler和InboundHandler,所以他其實在輸入輸出都會和其他的in out一起執(zhí)行。
我們看一下tail節(jié)點的源碼:
final class TailContext extends AbstractChannelHandlerContext implements
ChannelInboundHandler
可見tail是一個inbound,所以他只會在輸入的時候執(zhí)行。
補充:
而且這兩個節(jié)點的類都是內(nèi)部類,都在DefaultChannelPipeline類中,這就是一個高內(nèi)聚的體現(xiàn),如果一個類只在這個類中進行使用,那就在這個類里面定義即可,不對外暴露。
16.netty服務(wù)端編程總結(jié)
package com.messi.netty_core_02.netty05;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class NettyServer {private static final Logger log = LoggerFactory.getLogger(NettyServer.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {@Overrideprotected void initChannel(NioServerSocketChannel ch) throws Exception {}});serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new LoggingHandler());pipeline.addLast(defaultEventLoopGroup,"handler1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler1");super.channelRead(ctx, msg);}});pipeline.addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("handler2");ch.writeAndFlush("llll");super.channelRead(ctx, msg);}});pipeline.addLast(defaultEventLoopGroup,"handler3",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler3");super.write(ctx, msg, promise);}});pipeline.addLast(defaultEventLoopGroup,"handler4",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("handler4");super.write(ctx, msg, promise);}});}});serverBootstrap.bind(8000);}}
package com.messi.netty_core_02.netty05;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;import java.net.InetSocketAddress;public class NettyClient {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap() ;NioEventLoopGroup group = new NioEventLoopGroup();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new LoggingHandler());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();channel.writeAndFlush("netty hello");System.out.println("NettyClient.main");group.shutdownGracefully();}}
16.1 服務(wù)端關(guān)于handler和childHandler
- 我們來看一段代碼:
serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {@Overrideprotected void initChannel(NioServerSocketChannel ch) throws Exception {}
});
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}
});
我們看到這段代碼就是我們在服務(wù)端啟動的時候的設(shè)置的東西,但是我們在上面編程的時候只設(shè)置了serverBootstrap.childHandler。那么關(guān)于handler和childHandler有啥區(qū)別呢,下面為了方便我直接簡稱為h和ch。
我們前面說過serverBootstrap.childHandler也就是ch中實現(xiàn)的是數(shù)據(jù)的IO處理,也就是對應(yīng)在NIO中是
socketChannel的功能。其實從他的參數(shù)中的new ChannelInitializer()是NioSocketChannel泛型就能知道,他是對應(yīng)的SC做IO處理的。
而對于我們的服務(wù)端,他其實又兩個功能,一個是處理IO,一個是accept處理連接的。那么childHandler處理了IO,serverBootstrap.handler就是處理連接的。
其實你看他的參數(shù)泛型是NioServerSocketChannel也能知道他對應(yīng)的是NioServerSocketChannel也就是SSC,他是處理連接的。你不能亂寫泛型啟動會報錯。
其次你也不能不寫serverBootstrap.childHandler這一部分,因為服務(wù)端你不能不處理IO,你是實際建立IO連接的socketChannel的,所以不寫也會報錯。
而不寫serverBootstrap.handler這個處理連接的代碼是不會報錯,不影響運行的,他是為了 ServerSocketChannel服務(wù)的,你可以寫里面也能用pipline,但是他只是為了做連接,沒有太多的操作就是accept,源碼已經(jīng)給你封裝了,所以你可以不寫,但是你要是做一些復(fù)雜開發(fā),你要監(jiān)控SSC的狀態(tài),就可以增加pipline在這里,就能監(jiān)控連接accept的狀態(tài)和信息。
所以我們就可以知道每一個childHandler就是一個sc,都是一個連接,一個連接就對應(yīng)一個childHandler,然后他里面的一組pipline的一組handler是每一個childHandler獨立擁有一份的,他們不能混著來。其實也好理解,一個連接肯定是自己一組pipline的一組handler。不然就混亂了數(shù)據(jù)。
16.2 為什么叫孩子處理器?childHandler
每一個客戶端過來與ServerSocketChannel進行建立accept連接,ServerSocketChannel會給每一個客戶端都建立一個SocketChannel與之對應(yīng)。所以站在服務(wù)端的角度,SocketChannel就是孩子。childHandler()方法是針對服務(wù)端-客戶端之間建立SocketChannel之后進行的讀寫事件操作),所以childHandler必須是要建立編碼的。你想想你與客戶端后續(xù)是不是主要進行讀寫?對吧。然后對于每一個SocketChannel,childHandler都會建立一條pipeline流水線進行處理建立SocketChannel后的讀寫操作以及其他業(yè)務(wù)操作,pipeline流水線是由多個Handler處理器類構(gòu)成(可以為netty自帶的Handler也可以為自定義Handler)。
那么與之對應(yīng)的就是handler()方法,handler方法是處理ServerSocketChannel進行accept()建立連接操作(通常沒什么特別的,所以handler()方法可以省略不寫)。注:由于accept()建立連接為公共可封裝的代碼,netty會把ServerSocketChannel.accept()這種代碼都給你封裝好。
補充:
對于handler()可以省略不寫,childHandler()不可以省略不寫這一結(jié)論,還可以這樣理解:
handler()對應(yīng)的是ServerSocketChannel進行accept建立連接的操作,連接操作是固定的,你想想:客戶端與服務(wù)端交互,那不必須連接嗎?對吧。所以這部分代碼可封裝。handler()主要處理ServerSocketChannel建立連接時這一時間段的處理,能有啥處理,不就重復(fù)建立連接操作的過程嗎,直接封裝不就完了。在特別復(fù)雜的情況下可能會使用handler()做處理,所以可以省略handler()不寫。
childHandler()對應(yīng)的是SocketChannel建立后的讀寫,讀寫是不確定的,誰知道你建立連接后是讀還是寫,還是只讀還是只寫,所以一般需要自定義,所以childHanlder不可以省略。并且當(dāng)你讀取到數(shù)據(jù)后,你會在Handler中配置一些處理器類來進行讀取數(shù)據(jù)后的業(yè)務(wù)處理,寫出前同樣要進行Handler處理。
每一個SocketChannel都對應(yīng)一份pipeline流水線(pipeline流水線是許多個handler構(gòu)成的)
16.3 客戶端關(guān)于bootstrap.handler
在客戶端的代碼是這樣的:
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
bootstrap.group(nioEventLoopGroup);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());}
});
我們看到只有bootstrap.handler,客戶端只有這個東西,設(shè)計層面上,因為他本身就是一個發(fā)起連接的,讀寫數(shù)據(jù)的,沒有什么接收處理連接的操作(因為客戶端是請求服務(wù)端建立連接的,而不是服務(wù)端請求客戶端建立連接!!)。所以他一個就行了。
為什么在客戶端代碼中編寫的handler()方法中實現(xiàn)的泛型是NioSocketChannel而不是服務(wù)端中那種NioServerSocketChannel呢????
因為客戶端代碼是Bootstrap類構(gòu)建的代碼,Bootstrap是ServerBootstrap的父類,ServerBootstrap擴展重寫了Bootstrap這個類。所以泛型肯定不一樣呀。
17.作圖總結(jié) [橘子哥的圖]
我理解的就是:
serverBootstrap.handler(多個handler):其中多個handler是進行客戶端-服務(wù)端連接操作后,后續(xù)的一系列的業(yè)務(wù)邏輯處理Handler。但是你對于連接后,能有什么操作?沒啥操作,所以一般handler方法省略不寫。注:連接事件這一網(wǎng)絡(luò)操作netty都已經(jīng)幫你封裝好了。。。你表層看不見的
serverBootstrap.childhandler(多個handler):這其中也有多個handler,是客戶端-服務(wù)端進行IO事件操作后,后續(xù)一系列的業(yè)務(wù)邏輯處理Handler,因為你對于讀寫事件(讀IO后,寫IO前這個事件段)一定有其他的業(yè)務(wù)邏輯可以處理,比如:你讀取到的數(shù)據(jù)可以用來存儲MQ還是把它存儲起來等,這都屬于業(yè)務(wù)邏輯,你可以把這一系列業(yè)務(wù)邏輯寫在一個個的Handler中。
注:IO事件(read或write)這一網(wǎng)絡(luò)操作netty都已經(jīng)幫你封裝好了。。。你表層看不見的
對于連接,IO事件,netty都幫你封裝好了,你看不見的,你能進行自定義處理的只有Handler操作。所以Handler對于程序員而言,是多么的重要。
18.EmbeddedChannel
前面我們都是啟動一個客戶端,啟動一個服務(wù)端然后客戶端發(fā)消息,服務(wù)端或者Inbound接收,或者outbound輸出。這樣的操作模式,那么我們可以看到我們每次寫一個服務(wù)端代碼都要寫一遍handler。屬實麻煩,我們可以寫成這樣。
package com.messi.netty_core_02.netty05;import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class TestEmbededHandler {private static final Logger log = LoggerFactory.getLogger(TestEmbededHandler.class);public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("h1 {}",msg) ;super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("h2 {}",msg) ;super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h3 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("h3 {}",msg) ;super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("h4 {}",msg) ;super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h5 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("h5 {}",msg) ;super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h6 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("h6 {}",msg) ;super.write(ctx, msg, promise);}};//把handler都綁定到Channel上面EmbeddedChannel channel = new EmbeddedChannel(h1,h2,h3,h4,h5,h6);//讀入操作channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("llll".getBytes()));//寫出操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("llll".getBytes()));}}
- 測試
- 分析
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("h1 {}",msg) ;super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("h2 {}",msg) ;super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h3 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("h3 {}",msg) ;super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("h4 {}",msg) ;super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h5 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("h5 {}",msg) ;super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h6 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("h6 {}",msg) ;super.write(ctx, msg, promise);}};
我們可以把上面這段代碼分解寫成6個類 ,如下所示:
public class InboundHandlerAdapter1 extends ChannelInboundHandlerAdapter {
static Logger logger = LoggerFactory.getLogger(InboundHandlerAdapter1.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
logger.info("HandlerAdapter1 *************msg is {}",msg);
super.channelRead(ctx, msg);
}
}
public class InboundHandlerAdapter2 extends ChannelInboundHandlerAdapter {
static Logger logger = LoggerFactory.getLogger(InboundHandlerAdapter2.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
logger.info("HandlerAdapter2 **************msg is {}",msg);
super.channelRead(ctx, msg);
}
}
public class InboundHandlerAdapter3 extends ChannelInboundHandlerAdapter {
static Logger logger = LoggerFactory.getLogger(InboundHandlerAdapter3.class);
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
logger.info("HandlerAdapter3 *************msg is {}",msg);
super.channelRead(ctx, msg);
}
}
public class OutboundHandlerAdapter4 extends ChannelOutboundHandlerAdapter {
static Logger logger =
LoggerFactory.getLogger(OutboundHandlerAdapter4.class);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
logger.info("HandlerAdapter4 *************msg is {}",msg);
super.write(ctx, msg, promise);
}
}
public class OutboundHandlerAdapter5 extends ChannelOutboundHandlerAdapter {
static Logger logger =
LoggerFactory.getLogger(OutboundHandlerAdapter5.class);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
logger.info("HandlerAdapter5 ************msg is {}",msg);
super.write(ctx, msg, promise);
}
}
public class OutboundHandlerAdapter6 extends ChannelOutboundHandlerAdapter {
static Logger logger =
LoggerFactory.getLogger(OutboundHandlerAdapter6.class);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
logger.info("HandlerAdapter6 *************msg is {}",msg);
super.write(ctx, msg, promise);
}
}
然后原始代碼只需要把這六個類的對象創(chuàng)建出來然后封裝給EmbeddedChannel對象即可。和我們最開始編寫的代碼是一樣的
- 分析2:如果編碼改變一下,如下:
// 把handler綁定到Channel上面
EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1,h2,h3,h4,h5,h6);
// 讀入操作,和之前的inbound一樣
embeddedChannel.writeInbound("inbound netty");
// 寫出操作,和之前的outbound一樣
embeddedChannel.writeOutbound("outbound netty");
我們執(zhí)行writeInbound就是類似以前的執(zhí)行多個inboundHandler,按照EmbeddedChannel添加的順序執(zhí)行。
而執(zhí)行writeOutbound就是類似以前的執(zhí)行多個outboundHandler,按照outBoundHandler的添加反順序執(zhí)行。
還有一個注意點:
我們在embeddedChannel.writeInbound("inbound netty");這個操作類似于以前的接收客戶端的數(shù)據(jù)。以前我們客戶端是經(jīng)過編碼成為bytebuf發(fā)給服務(wù)端接收的,然后服務(wù)端在走解碼器,成為字符串,現(xiàn)在我們就直接寫了一個"inbound netty"的字符串,和以前的不太真實一樣了,所以需要我們發(fā)送的時候編碼為bytebuff才能更加真實。
也就是這樣像之前那樣:
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("llll".getBytes()));
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("llll".getBytes()));
修改后并運行程序,此時則Handler輸出的msg類型為:ByteBuf。這是Netty對NIO-ByteBuffer類型的封裝