中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

網(wǎng)站開發(fā)非常之旅開發(fā)網(wǎng)站用什么軟件

網(wǎng)站開發(fā)非常之旅,開發(fā)網(wǎng)站用什么軟件,b2b網(wǎng)絡(luò)推廣,武漢網(wǎng)站整合營銷聯(lián)系方式一、背景 早期學(xué)習(xí)和使用Dubbo的時候(那時候Dubbo還沒成為Apache頂級項目),寫過一些源碼解讀,但隨著Dubbo發(fā)生了翻天覆地的變化,那些文章早已過時,所以現(xiàn)在計劃針對最新的Apache Dubbo源碼來進行“閱讀理解…

一、背景

????????早期學(xué)習(xí)和使用Dubbo的時候(那時候Dubbo還沒成為Apache頂級項目),寫過一些源碼解讀,但隨著Dubbo發(fā)生了翻天覆地的變化,那些文章早已過時,所以現(xiàn)在計劃針對最新的Apache Dubbo源碼來進行“閱讀理解”,希望和大家一起再探Dubbo的實現(xiàn)。由于能力有限,如果文章有錯誤的地方,歡迎大家留言指正。

? ? ? ? 本期的主題是Dubbo如何使用Netty4構(gòu)建RPC來通訊。

二、Server端視角

? ? ? ? 我們看看作為服務(wù)提供方,Apache Dubbo是如何使用Netty4的。

2.1 Netty的線程組

????????線程組作為Netty的Reactor設(shè)計核心組件,在這里自然少不了,我們看到org.apache.dubbo.remoting.transport.netty4.NettyServer中有如下兩個方法:

    int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);String IO_THREADS_KEY = "iothreads";String EVENT_LOOP_BOSS_POOL_NAME = "NettyServerBoss";String EVENT_LOOP_WORKER_POOL_NAME = "NettyServerWorker";protected EventLoopGroup createBossGroup() {return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);}protected EventLoopGroup createWorkerGroup() {return NettyEventLoopFactory.eventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),EVENT_LOOP_WORKER_POOL_NAME);}

分別用來構(gòu)建Boss線程組和Worker線程組,Boss線程組負責創(chuàng)建連接,連接創(chuàng)建成功后由Worker線程組來負責處理和發(fā)送請求。注意Boss線程組只設(shè)置了1個線程,而且沒有設(shè)置修改接口,通常1個也是夠用的,如果設(shè)置成可配的會更好。而Worker線程組默認值是當前可用核數(shù)和32中取最小值,注意,是最小值,不是最大值,意味著如果你單個Provider實例上可用的核超過32,那么一定要設(shè)置IO_THREADS_KEY,否則可能無法達到最大吞吐量(特別是IO密集型應(yīng)用)。上面代碼也給出了這兩個線程組的線程池的名稱,如果不清楚現(xiàn)在跑的是多少,可以jstack命令看下。注意,為了闡述方便,上面的四個屬性定義來自org.apache.dubbo.remoting.Constants。

2.1 Netty的ChannelHandler

? ? ? ? Netty的可擴展性主要來自其核心組件之一ChannelHandler,ChannelHandler能幫助我們解決拆包&粘包、協(xié)議編解碼、權(quán)限校驗等RPC常遇到的問題。那我們來看看現(xiàn)在Dubbo的Provider端用到了哪些ChannelHandler,同樣,我們看NettyServer#initServerBootstrap方法:

    protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);bootstrap.group(bossGroup, workerGroup)// 根據(jù)是否支持EPoll來返回EpollServerSocketChannel或者NioServerSocketChannel.channel(NettyEventLoopFactory.serverSocketChannelClass())// SO_REUSEADDR設(shè)置為true表示允許重用本地地址和端口。如果服務(wù)器意外關(guān)閉后再次啟動,可以立即綁定到之前使用的地址和端口。.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// 表示禁用Nagle算法。Nagle算法會將小的網(wǎng)絡(luò)包合并成較大的包來提高網(wǎng)絡(luò)效率,但會增加數(shù)據(jù)傳輸?shù)难舆t。一般對延遲敏感的場景都會禁用Nagle算法。.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)// 開啟TCP的心跳機制,用于檢測連接是否還活著.childOption(ChannelOption.SO_KEEPALIVE, keepalive)// PooledByteBufAllocator是Netty提供的一種內(nèi)存分配器實現(xiàn),它可以重用ByteBuf對象的內(nèi)存,提高內(nèi)存的利用率和性能。.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int closeTimeout = UrlUtils.getCloseTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);// 處理HTTPSch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));ch.pipeline()// 解碼,讀數(shù)據(jù)的時候用.addLast("decoder", adapter.getDecoder())// 編碼,寫數(shù)據(jù)的時候用.addLast("encoder", adapter.getEncoder())// 注意,這是Netty提供的ChannelHandler.addLast("server-idle-handler", new IdleStateHandler(0, 0, closeTimeout, MILLISECONDS))// NettyServerHandler可是核心.addLast("handler", nettyServerHandler);}});}

我們重點關(guān)注下childHandler中添加的ChannelHandler。在這之前,我們先對Channel中的ChannelPipeline和ChanelHandler進行簡單介紹,我們知道每個連接對應(yīng)一個Channel,而每個Channel對應(yīng)一個ChannelPipeline,用來組織和管理連接上面的處理器ChannelHandler(責任鏈的方式)。ChannelHandler分為ChannelInboundHandler和ChannelOutboundHandler兩大類,分別管理入站(讀)和出站(寫)的流程定制,當然,為了使用方便,Netty提供了很多開箱即用的ChannelHandler,便于我們輕松實現(xiàn)HTTP、WebSocket等協(xié)議。

SslServerTlsHandler

????????這是上面通過childHandler添加的第一個ChannelHandler(它屬于Netty中ByteToMessageDecoder的子類,ByteToMessageDecoder的主要用途是將接收到的字節(jié)數(shù)據(jù)解碼為消息對象,并將解碼后的消息對象傳遞給下一個處理器進行后續(xù)的業(yè)務(wù)處理。繼承它可以方便的處理多種數(shù)據(jù)格式,例如二進制數(shù)據(jù)、文本數(shù)據(jù)等),看這Handler的名稱就知道是做什么的,用來進行SSL|TLS加解密。在其實現(xiàn)的decode方法中,如果發(fā)現(xiàn)ByteBuf是支持SSL|TLS,那么會立馬在當前Channel的ChannelPipeline中加入SslHandler(Netty自帶的支持SSL|TLS握手的ChannelHandler),注意,是加載當前SslServerTlsHandler之后。為了能在SSL|TLS解碼后能繼續(xù)執(zhí)行SslServerTlsHandler的邏輯,在SslHandler后面又新加了一個SslServerTlsHandler(其中sslDetected=true,說明數(shù)據(jù)包到這里已經(jīng)解包完成),用于校驗協(xié)議。

? ? ? ? 當然,如果SslServerTlsHandler發(fā)現(xiàn)你沒有使用SSL|TLS(Netty中的SslHandler#isEncrypted能判斷ByteBuf是否進行過加密),那么會直接把自己從ChannelPipeline中刪除。這里給出SslServerTlsHandler#decode代碼:

    @Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)throws Exception {// Will use the first five bytes to detect a protocol.if (byteBuf.readableBytes() < 5) {return;}// SSL|TLS 校驗完成后會創(chuàng)建一個新的SslServerTlsHandler,其中sslDetected會設(shè)置成trueif (sslDetected) {return;}CertManager certManager =url.getOrDefaultFrameworkModel().getBeanFactory().getBean(CertManager.class);ProviderCert providerConnectionConfig = certManager.getProviderConnectionConfig(url, channelHandlerContext.channel().remoteAddress());// 如果沒啟用SSL|TLS,那么獲取到的providerConnectionConfig就會是null,從而刪除并退出該Handlerif (providerConnectionConfig == null) {ChannelPipeline p = channelHandlerContext.pipeline();p.remove(this);return;}if (isSsl(byteBuf)) {SslContext sslContext = SslContexts.buildServerSslContext(providerConnectionConfig);enableSsl(channelHandlerContext, sslContext);return;}if (providerConnectionConfig.getAuthPolicy() == AuthPolicy.NONE) {ChannelPipeline p = channelHandlerContext.pipeline();p.remove(this);return;}logger.error(INTERNAL_ERROR, "", "", "TLS negotiation failed when trying to accept new connection.");channelHandlerContext.close();}

NettyCodecAdapter.InternalDecoder

? ? ? ? 之前已經(jīng)給了NettyServer#initServerBootstrap方法,其中可以發(fā)現(xiàn)這個內(nèi)部解碼器InternalDecoder是繼SslServerTlsHandler的下一個ChannelHandler,由于它和SslServerTlsHandler一樣,是ByteToMessageDecoder的子類,所以我們直接看其decode方法:

    private class InternalDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {// ChannelBuffer是Dubbo對通道緩沖區(qū)的抽象,這里的NettyBackedChannelBuffer就是其Netty緩沖區(qū)第一個實現(xiàn),NettyBackedChannelBuffer內(nèi)部直接使用的是Netty的ByteBufChannelBuffer message = new NettyBackedChannelBuffer(input);// Dubbo中的NettyChannel是銜接Netty Channel和Dubbo內(nèi)部通道抽象的橋梁,其中NettyChannel就有Dubbo的解碼器NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);do {int saveReaderIndex = message.readerIndex();// 這里的codec默認是DubboCountCodec實例,支持MultiMessage解碼,這里返回的msg已經(jīng)是org.apache.dubbo.remoting.exchange.Request對象Object msg = codec.decode(channel, message);if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {message.readerIndex(saveReaderIndex);break;} else {// is it possible to go here ?if (saveReaderIndex == message.readerIndex()) {throw new IOException("Decode without read data.");}if (msg != null) {out.add(msg);}}} while (message.readable());}}

1. DubboCountCodec(implements Codec2)用來處理MultiMessage類型的消息,Dubbo Consumer可以通過MultiMessage來一次性發(fā)送多個請求,但一般沒人這么用,大家更愿意傳遞一個List來讓下游Dubbo Provider來批量處理,上述InternalDecoder#decode方法內(nèi)部調(diào)用了其decode方法,decode方法會繼續(xù)觸發(fā)其他org.apache.dubbo.remoting.Codec2實現(xiàn)類的調(diào)用。

2. DubboCodec(extends ExchangeCodec):DubboCountCodec#decode內(nèi)部會調(diào)用其父類ExchangeCodec#decode方法,主要用于將Dubbo協(xié)議中的請求和響應(yīng)消息進行序列化和反序列化??吹竭@里,難道Dubbo沒用LengthFieldBasedFrameDecoder?這可是Netty中解決TCP拆包、粘包的神器!沒有,Dubbo自己在ExchangeCodec#decode方法中自己解決了粘包、拆包問題,通過返回DecodeResult.NEED_MORE_INPUT來表示需要更多數(shù)據(jù)。ExchangeCodec#decode方法最后又將調(diào)用DubboCodec#decodeBody來解碼body部分,這里給出Dubbo的協(xié)議圖:

我們看看DubboCodec解碼body部分的代碼(注意DubboCodec可以解碼也可以編碼,這里為了簡單起見,只展示解碼部分):

    @Overrideprotected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// get request id.long id = Bytes.bytes2long(header, 4);// decode request.Request req;Object data;req = new Request(id);req.setVersion(Version.getProtocolVersion());req.setTwoWay((flag & FLAG_TWOWAY) != 0);// get data length.int len = Bytes.bytes2int(header, 12);req.setPayload(len);DecodeableRpcInvocation inv;// 如果你實現(xiàn)了自定義解析器,那么就使用自定義的if (customByteAccessor != null) {inv = customByteAccessor.getRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);} else {// 否則用Dubbo內(nèi)置的inv = new DecodeableRpcInvocation(frameworkModel,channel,req,new UnsafeByteArrayInputStream(readMessageData(is)),proto);}// DecodeableRpcInvocation來通過協(xié)議對應(yīng)的序列化器來反序列化inv.decode();data = inv;req.setData(data);return req;}

可以看到,DubboCodec將使用DecodeableRpcInvocation#decode來完成最終解碼過程。需要注意的是,我們可以通過設(shè)置DECODE_IN_IO_THREAD_KEY("decode.in.io.thread")參數(shù)來讓Dubbo決定整個decode操作是否在IO線程中執(zhí)行,默認是在IO線程執(zhí)行。

2. DecodeableRpcInvocation(extends RpcInvocation):

前面我們說真正的反序列化是在該類中實現(xiàn)的,我們看下代碼:

    @Overridepublic Object decode(Channel channel, InputStream input) throws IOException {int contentLength = input.available();this.put(Constants.CONTENT_LENGTH_KEY, contentLength);// 通過請求中的SerializationId來選擇對應(yīng)的序列化對象進行數(shù)據(jù)的反序列化ObjectInput in = CodecSupport.getSerialization(serializationType).deserialize(channel.getUrl(), input);this.put(SERIALIZATION_ID_KEY, serializationType);// 設(shè)置Dubbo版本號String dubboVersion = in.readUTF();request.setVersion(dubboVersion);setAttachment(DUBBO_VERSION_KEY, dubboVersion);// 設(shè)置需要調(diào)用的接口標識String path = in.readUTF();setAttachment(PATH_KEY, path);String version = in.readUTF();setAttachment(VERSION_KEY, version);// Do provider-level payload checks.String keyWithoutGroup = keyWithoutGroup(path, version);// 為了防止數(shù)據(jù)過載,在Dubbo Consumer端可以設(shè)置payLoad,這里檢查數(shù)據(jù)是否超過payLoad限制checkPayload(keyWithoutGroup);// 設(shè)置需要調(diào)用的方法標識setMethodName(in.readUTF());// 參數(shù)類型的全限定名,如果有多個參數(shù),通過;分隔String desc = in.readUTF();setParameterTypesDesc(desc);ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();try {Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;if (desc.length() > 0) {// 根據(jù)參數(shù)描述來拿到對應(yīng)的Class對象pts = drawPts(path, version, desc, pts);// 根據(jù)參數(shù)的Class和值,解析到真正的參數(shù)對象args = drawArgs(in, pts);}setParameterTypes(pts);Map<String, Object> map = in.readAttachments();if (CollectionUtils.isNotEmptyMap(map)) {addObjectAttachments(map);}decodeArgument(channel, pts, args);} catch (ClassNotFoundException e) {throw new IOException(StringUtils.toString("Read invocation data failed.", e));} finally {Thread.currentThread().setContextClassLoader(originClassLoader);if (in instanceof Cleanable) {((Cleanable) in).cleanup();}}return this;}

我們可以看到,先是通過CodecSupport.getSerialization來獲取對應(yīng)的序列化對象,然后通過它來反序列化數(shù)據(jù)(我們可以通過看org.apache.dubbo.common.serialize.Serialization有哪些實現(xiàn)類來了解Dubbo默認支持的協(xié)議類型)。

NettyServerHandler

????????當輪到NettyServerHandler執(zhí)行的時候,由于前面的ChannelHandler處理,請求數(shù)據(jù)已經(jīng)反序列化成了org.apache.dubbo.remoting.exchange.Request對象,到了NettyServerHandler#received內(nèi)部,就輪到一些列?org.apache.dubbo.remoting.ChannelHandler 的子類來處理了:

MultiMessageHandler

????????如果是MultiMessage類型的消息,那么for循環(huán)去調(diào)用內(nèi)部handler去處理,否則只調(diào)用handler處理一次。

HeartbeatHandler

? ? ? ? 在Channel上設(shè)置讀的時間戳,便于后續(xù)判斷該Channel是否存活,同時判斷請求是不是HeartBeatRequest類型,如果是,則直接應(yīng)答。否則繼續(xù)執(zhí)行下一個handler。

AllChannelHandler

? ? ? ? 根據(jù)請求的url來獲取處理的線程池,然后將請求和DecodeHandler實例構(gòu)造成ChannelEventRunnable對象來扔進線程池中執(zhí)行。

    @Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {// 這里的handler是DecodeHandler對象executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}

DecodeHandler

? ? ? ? 前面說了AllChannelHandler之后的邏輯是在另一個線程中執(zhí)行的,毫無疑問,在這個新的線程中,主要是用來調(diào)用DecodeHandler的來進行解碼,在Dubbo Provider接收客戶端請求的場景下,調(diào)用的是received方法:

    @Overridepublic void received(Channel channel, Object message) throws RemotingException {// 如果之前設(shè)置的不是在IO線程中解碼,那么就在這個線程中解碼if (message instanceof Decodeable) {decode(message);}// 如果是Request對象,那么得對消息負載(真正的請求對象)進行解碼if (message instanceof Request) {decode(((Request) message).getData());}if (message instanceof Response) {decode(((Response) message).getResult());}// handler是HeaderExchangeHandler對象handler.received(channel, message);}

HeaderExchangeHandler

? ? ? ? 該Handler根據(jù)請求的交互方式來決定如何處理請求(例如有的交互方式是不需要應(yīng)答的),處理真正調(diào)用,比如會拿Request.data(RpcInvocation),最終會去調(diào)用DubboProtocol#ExchangeHandler的reply方法來拿到一個異步結(jié)果對象,并設(shè)置完成時候res的狀態(tài)碼和結(jié)果的賦值:

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());// find handler by message class.Object msg = req.getData();try {// handler是DubboProtocol#ExchangeHandler對象CompletionStage<Object> future = handler.reply(channel, msg);future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}channel.send(res);} catch (RemotingException e) {logger.warn(TRANSPORT_FAILED_RESPONSE,"","","Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);}}

http://www.risenshineclean.com/news/21739.html

相關(guān)文章:

  • 河南錦路路橋建設(shè)有限公司網(wǎng)站高端網(wǎng)站建設(shè)定制
  • 做漫畫網(wǎng)站空間多大文案短句干凈治愈
  • 我國市級政府網(wǎng)站建設(shè)分析平面設(shè)計
  • 網(wǎng)站建設(shè)企業(yè)蛋糕怎么做公司網(wǎng)頁
  • 黃金做空網(wǎng)站長沙本地推廣聯(lián)系電話
  • 裝飾工程施工工藝流程口碑優(yōu)化
  • wordpress banseo前景
  • 完整版網(wǎng)站推廣方案網(wǎng)站排名優(yōu)化方案
  • 網(wǎng)站建設(shè)方案案例石家莊網(wǎng)絡(luò)營銷網(wǎng)站推廣
  • 個人網(wǎng)站二級域名做淘寶客企業(yè)網(wǎng)站營銷
  • 寧波網(wǎng)站建設(shè)服務(wù)提供商網(wǎng)絡(luò)推廣營銷方案免費
  • steam課程做網(wǎng)站黑帽seo教程
  • 網(wǎng)站源碼設(shè)計給我免費的視頻在線觀看
  • 唯品會一家做特賣的網(wǎng)站手機版百度開戶代理公司
  • 網(wǎng)站備案變更公司名稱百度指數(shù)屬于行業(yè)趨勢及人群
  • 域名備案關(guān)閉網(wǎng)站保溫杯軟文營銷300字
  • 青島專業(yè)做網(wǎng)站優(yōu)化成都網(wǎng)站建設(shè)方案外包
  • 代做吧機械網(wǎng)站seo搜索引擎優(yōu)化師
  • 電子 網(wǎng)站模板2024年2月疫情又開始了嗎
  • aspx網(wǎng)站開發(fā) 案例淘寶推廣公司
  • 做美妝網(wǎng)站名稱營銷案例最新
  • 睢縣做網(wǎng)站的公司百度識圖軟件
  • 如何網(wǎng)上賺點零花錢seo如何去做優(yōu)化
  • 網(wǎng)站icp備案號怎么查詢合肥網(wǎng)絡(luò)推廣軟件
  • 湖南服務(wù)專業(yè)的網(wǎng)站制作自創(chuàng)網(wǎng)站
  • 網(wǎng)站seo推廣站長統(tǒng)計網(wǎng)站統(tǒng)計
  • 建設(shè)了網(wǎng)站怎么管理系統(tǒng)代發(fā)新聞稿的網(wǎng)站
  • 專業(yè)做廠房的網(wǎng)站怎么申請域名建立網(wǎng)站
  • 只做一頁的網(wǎng)站多少錢中層管理者培訓(xùn)課程有哪些
  • 網(wǎng)站制作比較好的公司網(wǎng)絡(luò)推廣的方法有