網(wǎng)站開發(fā)非常之旅開發(fā)網(wǎng)站用什么軟件
一、背景
????????早期學(xué)習(xí)和使用Dubbo的時候(那時候Dubbo還沒成為Apache頂級項目),寫過一些源碼解讀,但隨著Dubbo發(fā)生了翻天覆地的變化,那些文章早已過時,所以現(xiàn)在計劃針對最新的Apache Dubbo源碼來進行“閱讀理解”,希望和大家一起再探Dubbo的實現(xiàn)。由于能力有限,如果文章有錯誤的地方,歡迎大家留言指正。
? ? ? ? 本期的主題是Dubbo如何使用Netty4構(gòu)建RPC來通訊。
二、Server端視角
? ? ? ? 我們看看作為服務(wù)提供方,Apache Dubbo是如何使用Netty4的。
2.1 Netty的線程組
????????線程組作為Netty的Reactor設(shè)計核心組件,在這里自然少不了,我們看到org.apache.dubbo.remoting.transport.netty4.NettyServer中有如下兩個方法:
int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);String IO_THREADS_KEY = "iothreads";String EVENT_LOOP_BOSS_POOL_NAME = "NettyServerBoss";String EVENT_LOOP_WORKER_POOL_NAME = "NettyServerWorker";protected EventLoopGroup createBossGroup() {return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);}protected EventLoopGroup createWorkerGroup() {return NettyEventLoopFactory.eventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),EVENT_LOOP_WORKER_POOL_NAME);}
分別用來構(gòu)建Boss線程組和Worker線程組,Boss線程組負責創(chuàng)建連接,連接創(chuàng)建成功后由Worker線程組來負責處理和發(fā)送請求。注意Boss線程組只設(shè)置了1個線程,而且沒有設(shè)置修改接口,通常1個也是夠用的,如果設(shè)置成可配的會更好。而Worker線程組默認值是當前可用核數(shù)和32中取最小值,注意,是最小值,不是最大值,意味著如果你單個Provider實例上可用的核超過32,那么一定要設(shè)置IO_THREADS_KEY,否則可能無法達到最大吞吐量(特別是IO密集型應(yīng)用)。上面代碼也給出了這兩個線程組的線程池的名稱,如果不清楚現(xiàn)在跑的是多少,可以jstack命令看下。注意,為了闡述方便,上面的四個屬性定義來自org.apache.dubbo.remoting.Constants。
2.1 Netty的ChannelHandler
? ? ? ? Netty的可擴展性主要來自其核心組件之一ChannelHandler,ChannelHandler能幫助我們解決拆包&粘包、協(xié)議編解碼、權(quán)限校驗等RPC常遇到的問題。那我們來看看現(xiàn)在Dubbo的Provider端用到了哪些ChannelHandler,同樣,我們看NettyServer#initServerBootstrap方法:
protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);bootstrap.group(bossGroup, workerGroup)// 根據(jù)是否支持EPoll來返回EpollServerSocketChannel或者NioServerSocketChannel.channel(NettyEventLoopFactory.serverSocketChannelClass())// SO_REUSEADDR設(shè)置為true表示允許重用本地地址和端口。如果服務(wù)器意外關(guān)閉后再次啟動,可以立即綁定到之前使用的地址和端口。.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// 表示禁用Nagle算法。Nagle算法會將小的網(wǎng)絡(luò)包合并成較大的包來提高網(wǎng)絡(luò)效率,但會增加數(shù)據(jù)傳輸?shù)难舆t。一般對延遲敏感的場景都會禁用Nagle算法。.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)// 開啟TCP的心跳機制,用于檢測連接是否還活著.childOption(ChannelOption.SO_KEEPALIVE, keepalive)// PooledByteBufAllocator是Netty提供的一種內(nèi)存分配器實現(xiàn),它可以重用ByteBuf對象的內(nèi)存,提高內(nèi)存的利用率和性能。.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int closeTimeout = UrlUtils.getCloseTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);// 處理HTTPSch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));ch.pipeline()// 解碼,讀數(shù)據(jù)的時候用.addLast("decoder", adapter.getDecoder())// 編碼,寫數(shù)據(jù)的時候用.addLast("encoder", adapter.getEncoder())// 注意,這是Netty提供的ChannelHandler.addLast("server-idle-handler", new IdleStateHandler(0, 0, closeTimeout, MILLISECONDS))// NettyServerHandler可是核心.addLast("handler", nettyServerHandler);}});}
我們重點關(guān)注下childHandler中添加的ChannelHandler。在這之前,我們先對Channel中的ChannelPipeline和ChanelHandler進行簡單介紹,我們知道每個連接對應(yīng)一個Channel,而每個Channel對應(yīng)一個ChannelPipeline,用來組織和管理連接上面的處理器ChannelHandler(責任鏈的方式)。ChannelHandler分為ChannelInboundHandler和ChannelOutboundHandler兩大類,分別管理入站(讀)和出站(寫)的流程定制,當然,為了使用方便,Netty提供了很多開箱即用的ChannelHandler,便于我們輕松實現(xiàn)HTTP、WebSocket等協(xié)議。
SslServerTlsHandler
????????這是上面通過childHandler添加的第一個ChannelHandler(它屬于Netty中ByteToMessageDecoder的子類,ByteToMessageDecoder的主要用途是將接收到的字節(jié)數(shù)據(jù)解碼為消息對象,并將解碼后的消息對象傳遞給下一個處理器進行后續(xù)的業(yè)務(wù)處理。繼承它可以方便的處理多種數(shù)據(jù)格式,例如二進制數(shù)據(jù)、文本數(shù)據(jù)等),看這Handler的名稱就知道是做什么的,用來進行SSL|TLS加解密。在其實現(xiàn)的decode方法中,如果發(fā)現(xiàn)ByteBuf是支持SSL|TLS,那么會立馬在當前Channel的ChannelPipeline中加入SslHandler(Netty自帶的支持SSL|TLS握手的ChannelHandler),注意,是加載當前SslServerTlsHandler之后。為了能在SSL|TLS解碼后能繼續(xù)執(zhí)行SslServerTlsHandler的邏輯,在SslHandler后面又新加了一個SslServerTlsHandler(其中sslDetected=true,說明數(shù)據(jù)包到這里已經(jīng)解包完成),用于校驗協(xié)議。
? ? ? ? 當然,如果SslServerTlsHandler發(fā)現(xiàn)你沒有使用SSL|TLS(Netty中的SslHandler#isEncrypted能判斷ByteBuf是否進行過加密),那么會直接把自己從ChannelPipeline中刪除。這里給出SslServerTlsHandler#decode代碼:
@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)throws Exception {// Will use the first five bytes to detect a protocol.if (byteBuf.readableBytes() < 5) {return;}// SSL|TLS 校驗完成后會創(chuàng)建一個新的SslServerTlsHandler,其中sslDetected會設(shè)置成trueif (sslDetected) {return;}CertManager certManager =url.getOrDefaultFrameworkModel().getBeanFactory().getBean(CertManager.class);ProviderCert providerConnectionConfig = certManager.getProviderConnectionConfig(url, channelHandlerContext.channel().remoteAddress());// 如果沒啟用SSL|TLS,那么獲取到的providerConnectionConfig就會是null,從而刪除并退出該Handlerif (providerConnectionConfig == null) {ChannelPipeline p = channelHandlerContext.pipeline();p.remove(this);return;}if (isSsl(byteBuf)) {SslContext sslContext = SslContexts.buildServerSslContext(providerConnectionConfig);enableSsl(channelHandlerContext, sslContext);return;}if (providerConnectionConfig.getAuthPolicy() == AuthPolicy.NONE) {ChannelPipeline p = channelHandlerContext.pipeline();p.remove(this);return;}logger.error(INTERNAL_ERROR, "", "", "TLS negotiation failed when trying to accept new connection.");channelHandlerContext.close();}
NettyCodecAdapter.InternalDecoder
? ? ? ? 之前已經(jīng)給了NettyServer#initServerBootstrap方法,其中可以發(fā)現(xiàn)這個內(nèi)部解碼器InternalDecoder是繼SslServerTlsHandler的下一個ChannelHandler,由于它和SslServerTlsHandler一樣,是ByteToMessageDecoder的子類,所以我們直接看其decode方法:
private class InternalDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {// ChannelBuffer是Dubbo對通道緩沖區(qū)的抽象,這里的NettyBackedChannelBuffer就是其Netty緩沖區(qū)第一個實現(xiàn),NettyBackedChannelBuffer內(nèi)部直接使用的是Netty的ByteBufChannelBuffer message = new NettyBackedChannelBuffer(input);// Dubbo中的NettyChannel是銜接Netty Channel和Dubbo內(nèi)部通道抽象的橋梁,其中NettyChannel就有Dubbo的解碼器NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);do {int saveReaderIndex = message.readerIndex();// 這里的codec默認是DubboCountCodec實例,支持MultiMessage解碼,這里返回的msg已經(jīng)是org.apache.dubbo.remoting.exchange.Request對象Object msg = codec.decode(channel, message);if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {message.readerIndex(saveReaderIndex);break;} else {// is it possible to go here ?if (saveReaderIndex == message.readerIndex()) {throw new IOException("Decode without read data.");}if (msg != null) {out.add(msg);}}} while (message.readable());}}
1. DubboCountCodec(implements Codec2):用來處理MultiMessage類型的消息,Dubbo Consumer可以通過MultiMessage來一次性發(fā)送多個請求,但一般沒人這么用,大家更愿意傳遞一個List來讓下游Dubbo Provider來批量處理,上述InternalDecoder#decode方法內(nèi)部調(diào)用了其decode方法,decode方法會繼續(xù)觸發(fā)其他org.apache.dubbo.remoting.Codec2實現(xiàn)類的調(diào)用。
2. DubboCodec(extends ExchangeCodec):DubboCountCodec#decode內(nèi)部會調(diào)用其父類ExchangeCodec#decode方法,主要用于將Dubbo協(xié)議中的請求和響應(yīng)消息進行序列化和反序列化??吹竭@里,難道Dubbo沒用LengthFieldBasedFrameDecoder?這可是Netty中解決TCP拆包、粘包的神器!沒有,Dubbo自己在ExchangeCodec#decode方法中自己解決了粘包、拆包問題,通過返回DecodeResult.NEED_MORE_INPUT來表示需要更多數(shù)據(jù)。ExchangeCodec#decode方法最后又將調(diào)用DubboCodec#decodeBody來解碼body部分,這里給出Dubbo的協(xié)議圖:
我們看看DubboCodec解碼body部分的代碼(注意DubboCodec可以解碼也可以編碼,這里為了簡單起見,只展示解碼部分):
@Overrideprotected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// get request id.long id = Bytes.bytes2long(header, 4);// decode request.Request req;Object data;req = new Request(id);req.setVersion(Version.getProtocolVersion());req.setTwoWay((flag & FLAG_TWOWAY) != 0);// get data length.int len = Bytes.bytes2int(header, 12);req.setPayload(len);DecodeableRpcInvocation inv;// 如果你實現(xiàn)了自定義解析器,那么就使用自定義的if (customByteAccessor != null) {inv = customByteAccessor.getRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);} else {// 否則用Dubbo內(nèi)置的inv = new DecodeableRpcInvocation(frameworkModel,channel,req,new UnsafeByteArrayInputStream(readMessageData(is)),proto);}// DecodeableRpcInvocation來通過協(xié)議對應(yīng)的序列化器來反序列化inv.decode();data = inv;req.setData(data);return req;}
可以看到,DubboCodec將使用DecodeableRpcInvocation#decode來完成最終解碼過程。需要注意的是,我們可以通過設(shè)置DECODE_IN_IO_THREAD_KEY("decode.in.io.thread")參數(shù)來讓Dubbo決定整個decode操作是否在IO線程中執(zhí)行,默認是在IO線程執(zhí)行。
2. DecodeableRpcInvocation(extends RpcInvocation):
前面我們說真正的反序列化是在該類中實現(xiàn)的,我們看下代碼:
@Overridepublic Object decode(Channel channel, InputStream input) throws IOException {int contentLength = input.available();this.put(Constants.CONTENT_LENGTH_KEY, contentLength);// 通過請求中的SerializationId來選擇對應(yīng)的序列化對象進行數(shù)據(jù)的反序列化ObjectInput in = CodecSupport.getSerialization(serializationType).deserialize(channel.getUrl(), input);this.put(SERIALIZATION_ID_KEY, serializationType);// 設(shè)置Dubbo版本號String dubboVersion = in.readUTF();request.setVersion(dubboVersion);setAttachment(DUBBO_VERSION_KEY, dubboVersion);// 設(shè)置需要調(diào)用的接口標識String path = in.readUTF();setAttachment(PATH_KEY, path);String version = in.readUTF();setAttachment(VERSION_KEY, version);// Do provider-level payload checks.String keyWithoutGroup = keyWithoutGroup(path, version);// 為了防止數(shù)據(jù)過載,在Dubbo Consumer端可以設(shè)置payLoad,這里檢查數(shù)據(jù)是否超過payLoad限制checkPayload(keyWithoutGroup);// 設(shè)置需要調(diào)用的方法標識setMethodName(in.readUTF());// 參數(shù)類型的全限定名,如果有多個參數(shù),通過;分隔String desc = in.readUTF();setParameterTypesDesc(desc);ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();try {Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;if (desc.length() > 0) {// 根據(jù)參數(shù)描述來拿到對應(yīng)的Class對象pts = drawPts(path, version, desc, pts);// 根據(jù)參數(shù)的Class和值,解析到真正的參數(shù)對象args = drawArgs(in, pts);}setParameterTypes(pts);Map<String, Object> map = in.readAttachments();if (CollectionUtils.isNotEmptyMap(map)) {addObjectAttachments(map);}decodeArgument(channel, pts, args);} catch (ClassNotFoundException e) {throw new IOException(StringUtils.toString("Read invocation data failed.", e));} finally {Thread.currentThread().setContextClassLoader(originClassLoader);if (in instanceof Cleanable) {((Cleanable) in).cleanup();}}return this;}
我們可以看到,先是通過CodecSupport.getSerialization來獲取對應(yīng)的序列化對象,然后通過它來反序列化數(shù)據(jù)(我們可以通過看org.apache.dubbo.common.serialize.Serialization有哪些實現(xiàn)類來了解Dubbo默認支持的協(xié)議類型)。
NettyServerHandler
????????當輪到NettyServerHandler執(zhí)行的時候,由于前面的ChannelHandler處理,請求數(shù)據(jù)已經(jīng)反序列化成了org.apache.dubbo.remoting.exchange.Request對象,到了NettyServerHandler#received內(nèi)部,就輪到一些列?org.apache.dubbo.remoting.ChannelHandler 的子類來處理了:
MultiMessageHandler
????????如果是MultiMessage類型的消息,那么for循環(huán)去調(diào)用內(nèi)部handler去處理,否則只調(diào)用handler處理一次。
HeartbeatHandler
? ? ? ? 在Channel上設(shè)置讀的時間戳,便于后續(xù)判斷該Channel是否存活,同時判斷請求是不是HeartBeatRequest類型,如果是,則直接應(yīng)答。否則繼續(xù)執(zhí)行下一個handler。
AllChannelHandler
? ? ? ? 根據(jù)請求的url來獲取處理的線程池,然后將請求和DecodeHandler實例構(gòu)造成ChannelEventRunnable對象來扔進線程池中執(zhí)行。
@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {// 這里的handler是DecodeHandler對象executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}
DecodeHandler
? ? ? ? 前面說了AllChannelHandler之后的邏輯是在另一個線程中執(zhí)行的,毫無疑問,在這個新的線程中,主要是用來調(diào)用DecodeHandler的來進行解碼,在Dubbo Provider接收客戶端請求的場景下,調(diào)用的是received方法:
@Overridepublic void received(Channel channel, Object message) throws RemotingException {// 如果之前設(shè)置的不是在IO線程中解碼,那么就在這個線程中解碼if (message instanceof Decodeable) {decode(message);}// 如果是Request對象,那么得對消息負載(真正的請求對象)進行解碼if (message instanceof Request) {decode(((Request) message).getData());}if (message instanceof Response) {decode(((Response) message).getResult());}// handler是HeaderExchangeHandler對象handler.received(channel, message);}
HeaderExchangeHandler
? ? ? ? 該Handler根據(jù)請求的交互方式來決定如何處理請求(例如有的交互方式是不需要應(yīng)答的),處理真正調(diào)用,比如會拿Request.data(RpcInvocation),最終會去調(diào)用DubboProtocol#ExchangeHandler的reply方法來拿到一個異步結(jié)果對象,并設(shè)置完成時候res的狀態(tài)碼和結(jié)果的賦值:
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());// find handler by message class.Object msg = req.getData();try {// handler是DubboProtocol#ExchangeHandler對象CompletionStage<Object> future = handler.reply(channel, msg);future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}channel.send(res);} catch (RemotingException e) {logger.warn(TRANSPORT_FAILED_RESPONSE,"","","Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);}}