wordpress 微博】蘇州網(wǎng)站建設(shè)優(yōu)化
前言
在netty數(shù)據(jù)傳輸過程中可以有很多選擇,比如;字符串、json、xml、java對(duì)象,但為了保證傳輸?shù)臄?shù)據(jù)具備;良好的通用性、方便的操作性和傳輸?shù)母咝阅?#xff0c;我們可以選擇protobuf作為我們的數(shù)據(jù)傳輸格式。目前protobuf可以支持;C++、C#、Dart、Go、Java、Python等,也可以在JS里使用。知識(shí)點(diǎn);ProtobufDecoder、ProtobufEncoder、ProtobufVarint32FrameDecoder、ProtobufVarint32LengthFieldPrepender。
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//protobuf 處理channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());channel.pipeline().addLast(new ProtobufDecoder(MsgBody.getDefaultInstance()));channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());channel.pipeline().addLast(new ProtobufEncoder());// 在管道中添加我們自己的接收數(shù)據(jù)實(shí)現(xiàn)方法channel.pipeline().addLast(new MyClientHandler());}
}
public class MyClientHandler extends ChannelInboundHandlerAdapter {/*** 當(dāng)客戶端主動(dòng)鏈接服務(wù)端的鏈接后,這個(gè)通道就是活躍的了。也就是客戶端與服務(wù)端建立了通信通道并且可以傳輸數(shù)據(jù)*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("鏈接報(bào)告開始");System.out.println("鏈接報(bào)告信息:本客戶端鏈接到服務(wù)端。channelId:" + channel.id());System.out.println("鏈接報(bào)告IP:" + channel.localAddress().getHostString());System.out.println("鏈接報(bào)告Port:" + channel.localAddress().getPort());System.out.println("鏈接報(bào)告完畢");//通知客戶端鏈接建立成功String str = "通知服務(wù)端鏈接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString();ctx.writeAndFlush(MsgUtil.buildMsg(channel.id().toString(), str));}/*** 當(dāng)客戶端主動(dòng)斷開服務(wù)端的鏈接后,這個(gè)通道就是不活躍的。也就是說客戶端與服務(wù)端的關(guān)閉了通信通道并且不可以傳輸數(shù)據(jù)*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("斷開鏈接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收msg消息{與上一章節(jié)相比,此處已經(jīng)不需要自己進(jìn)行解碼}System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息類型:" + msg.getClass());System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息內(nèi)容:" + JsonFormat.printToString((MsgBody) msg));}/*** 抓住異常,當(dāng)發(fā)生異常的時(shí)候,可以做一些相應(yīng)的處理,比如打印日志、關(guān)閉鏈接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("異常信息:\r\n" + cause.getMessage());}}
public class NettyClient {public static void main(String[] args) {new NettyClient().connect("127.0.0.1", 7397);}private void connect(String inetHost, int inetPort) {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new MyChannelInitializer());ChannelFuture f = b.connect(inetHost, inetPort).sync();System.out.println(" client start done. {關(guān)注明哥,獲取源碼}");f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服務(wù)端,我是明哥,關(guān)注我獲取案例源碼。"));f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服務(wù)端,我是明哥,關(guān)注我獲取案例源碼。"));f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服務(wù)端,我是明哥,關(guān)注我獲取案例源碼。"));f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服務(wù)端,我是明哥關(guān)注我獲取案例源碼。"));f.channel().writeAndFlush(MsgUtil.buildMsg(f.channel().id().toString(),"你好,使用protobuf通信格式的服務(wù)端,我是明哥,關(guān)注我獲取案例源碼。"));f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}}
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {//protobuf 處理channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());channel.pipeline().addLast(new ProtobufDecoder(MsgBody.getDefaultInstance()));channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());channel.pipeline().addLast(new ProtobufEncoder());// 在管道中添加我們自己的接收數(shù)據(jù)實(shí)現(xiàn)方法channel.pipeline().addLast(new MyServerHandler());}}
public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 當(dāng)客戶端主動(dòng)鏈接服務(wù)端的鏈接后,這個(gè)通道就是活躍的了。也就是客戶端與服務(wù)端建立了通信通道并且可以傳輸數(shù)據(jù)*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("鏈接報(bào)告開始");System.out.println("鏈接報(bào)告信息:有一客戶端鏈接到本服務(wù)端。channelId:" + channel.id());System.out.println("鏈接報(bào)告IP:" + channel.localAddress().getHostString());System.out.println("鏈接報(bào)告Port:" + channel.localAddress().getPort());System.out.println("鏈接報(bào)告完畢");//通知客戶端鏈接建立成功String str = "通知客戶端鏈接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";ctx.writeAndFlush(MsgUtil.buildMsg(channel.id().toString(), str));}/*** 當(dāng)客戶端主動(dòng)斷開服務(wù)端的鏈接后,這個(gè)通道就是不活躍的。也就是說客戶端與服務(wù)端的關(guān)閉了通信通道并且不可以傳輸數(shù)據(jù)*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端斷開鏈接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收msg消息{與上一章節(jié)相比,此處已經(jīng)不需要自己進(jìn)行解碼}System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息類型:" + msg.getClass());System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息內(nèi)容:" + JsonFormat.printToString((MsgBody) msg));}/*** 抓住異常,當(dāng)發(fā)生異常的時(shí)候,可以做一些相應(yīng)的處理,比如打印日志、關(guān)閉鏈接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("異常信息:\r\n" + cause.getMessage());}}
public class NettyServer {public static void main(String[] args) {new NettyServer().bing(7397);}private void bing(int port) {//配置服務(wù)端NIO線程組EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));EventLoopGroup childGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class) //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());ChannelFuture f = b.bind(port).sync();System.out.println("server start done. {關(guān)注我是明哥,獲取源碼}");f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {childGroup.shutdownGracefully();parentGroup.shutdownGracefully();}}}
public class MsgUtil {/*** 構(gòu)建protobuf消息體*/public static MsgBody buildMsg(String channelId, String msgInfo) {MsgBody.Builder msg = MsgBody.newBuilder();msg.setChannelId(channelId);msg.setMsgInfo(msgInfo);return msg.build();}}
測(cè)試結(jié)果
操作:idea選中E:\itstack\GIT\itstack.org\itstack-demo-netty\itstack-demo-netty-2-02\protoc-3.5.0-win32\bin 命令:protoc -I=源地址 --java_out=目標(biāo)地址 源地址/xxx.proto
E:demo-netty\itstack-demo-netty-2-02\protoc-3.5.0-win32\bin>protoc.exe -I=demo-netty\itstack-demo-netty-2-02\src\main\java\org\itstack\demo\netty\proto --java_out=demo-netty\netty-2-02\src\main
\java MsgInfo.proto
啟動(dòng)NettyServer
啟動(dòng)NettyClient
服務(wù)端執(zhí)行結(jié)果
客戶端執(zhí)行結(jié)果
好了到這里就結(jié)束netty之Netty使用Protobuf傳輸數(shù)據(jù)的學(xué)習(xí),大家一定要跟著動(dòng)手操作起來。需要的源碼的 可si我獲取;