網(wǎng)站怎么做懸浮圖片bing搜索引擎入口
1.ChannelGroup可以免遍歷由netty提供,覆蓋remove方法即可觸發(fā)刪除channel\
2.群聊私聊
13.群聊私聊簡(jiǎn)單原理圖
3.netty心跳檢測(cè)機(jī)制,客戶端對(duì)服務(wù)器有沒(méi)有讀寫(讀,寫空閑)
//IdleStateHandler(3,5,7,TimeUnite.SECONDS)是netty提供的檢測(cè)狀態(tài)的處理器,也加到pipeline,讀,寫,讀寫都沒(méi)有
//并在handler類實(shí)現(xiàn)方法userEventTriggered處理事件
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent) {//將 evt 向下轉(zhuǎn)型 IdleStateEventIdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "讀空閑";break;case WRITER_IDLE:eventType = "寫空閑";break;case ALL_IDLE:eventType = "讀寫空閑";break;}System.out.println(ctx.channel().remoteAddress() + "--超時(shí)時(shí)間--" + eventType);System.out.println("服務(wù)器做相應(yīng)處理..");//如果發(fā)生空閑,我們關(guān)閉通道// ctx.channel().close();}}//init注意的順序pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
- 增加一個(gè)自定義的handler
pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
4.websocker實(shí)現(xiàn)全雙工(兩邊可以通信,半雙工只能一邊通信)的長(zhǎng)連接,
//因?yàn)榇髷?shù)據(jù)時(shí)http會(huì)發(fā)送多次請(qǐng)求獲取數(shù)據(jù),所以要升級(jí)用ws協(xié)議
//html有WebSocket對(duì)象,發(fā)送http請(qǐng)求然后200狀態(tài)碼轉(zhuǎn)為101為ws協(xié)議
public class MyServer {public static void main(String[] args) throws Exception{//創(chuàng)建兩個(gè)線程組EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個(gè)NioEventLooptry {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//相當(dāng)于鏈?zhǔn)骄幊?/span>//因?yàn)榛趆ttp協(xié)議,使用http的編碼和解碼器pipeline.addLast(new HttpServerCodec());//是以塊方式寫,添加ChunkedWriteHandler處理器pipeline.addLast(new ChunkedWriteHandler());/*說(shuō)明1. http數(shù)據(jù)在傳輸過(guò)程中是分段, HttpObjectAggregator ,就是可以將多個(gè)段聚合2. 這就就是為什么,當(dāng)瀏覽器發(fā)送大量數(shù)據(jù)時(shí),就會(huì)發(fā)出多次http請(qǐng)求*/pipeline.addLast(new HttpObjectAggregator(8192));/*說(shuō)明1. 對(duì)應(yīng)websocket ,它的數(shù)據(jù)是以 幀(frame) 形式傳遞2. 可以看到WebSocketFrame 下面有六個(gè)子類3. 瀏覽器請(qǐng)求時(shí) ws://localhost:7000/hello 表示請(qǐng)求的uri4. WebSocketServerProtocolHandler 核心功能是將 http協(xié)議升級(jí)為 ws協(xié)議 , 保持長(zhǎng)連接5. 是通過(guò)一個(gè) 狀態(tài)碼 101*/pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));//自定義的handler ,處理業(yè)務(wù)邏輯pipeline.addLast(new MyTextWebSocketFrameHandler());}});//啟動(dòng)服務(wù)器ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {System.out.println("服務(wù)器收到消息 " + msg.text());//回復(fù)消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器時(shí)間" + LocalDateTime.now() + " " + msg.text()));}//當(dāng)web客戶端連接后, 觸發(fā)方法@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {//id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一System.out.println("handlerAdded 被調(diào)用" + ctx.channel().id().asLongText());System.out.println("handlerAdded 被調(diào)用" + ctx.channel().id().asShortText());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("handlerRemoved 被調(diào)用" + ctx.channel().id().asLongText());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("異常發(fā)生 " + cause.getMessage());ctx.close(); //關(guān)閉連接}
}
5.編碼(cli)—二進(jìn)制–>解碼(server)
1.netty提供的 StringEncoder ObjectEncoder
2.但是使用java底層序列化和反序列化效率低
3.無(wú)法跨語(yǔ)言,google protobuf可以解決 ,很多公司由http+json–>tcp+protobuf
6.protobuf (是RPC remote procedure call遠(yuǎn)程過(guò)程調(diào)用) 跨平臺(tái) 生成的大對(duì)象包含多個(gè)小對(duì)象
1.引入依賴
2.寫 xxx.proto文件,idea安裝插件protobuf
pipeline.addLast("encoder", new ProtobufEncoder()); //netty需要加netty自帶protobuf解編碼器//普通的寫法
syntax = "proto3";option java_outer_classname="StudentPOJO"; // 外部類名, 文件名message Student {int32 id = 1;//Student類的屬性string name = 2; //
}//升級(jí)版//需要注意包名,要根據(jù)自己的項(xiàng)目包創(chuàng)建,沒(méi)有可不指定//版本//生成外部類文件名 message Student{ enum DataType{ StudentType =0; //enum編號(hào)從0開(kāi)始WorkType =1;}DataType data_type=1; //標(biāo)識(shí)傳入那個(gè)類型的id WorkType oneof dataBody{ //相當(dāng)于在對(duì)象中選一個(gè)作為屬性Student student =2; Worker worker =3;}} message Student{ //看文檔,不要寫java語(yǔ)言類型int32 id=1; //屬性的序號(hào)string name=2;} message Work{ //看文檔,不要寫java語(yǔ)言類型int32 id=1; //屬性的序號(hào)string name=2;}
3.打開(kāi)下載好的 protoc.exe生成java文件
protoc.exe --java_out=. Student.proto //有空格隔開(kāi),生成到當(dāng)前文件夾,輸入Student.proto文件,文件名在配置文件中寫
4.客戶端
StudentPOJO.Student.new Builder.setId(4).setName("ss").build();//可以發(fā)送多種對(duì)象,而不是一個(gè)對(duì)象//添加編解碼器if(0 == random) { //發(fā)送Student 對(duì)象myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 盧俊義").build()).build();} else { // 發(fā)送一個(gè)Worker 對(duì)象myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();}
//完整代碼 StudentPOJO.proto
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package="com.atguigu.netty.codec2"; //指定生成到哪個(gè)包下!!!
option java_outer_classname="MyDataInfo"; // 外部類名, 文件名//protobuf 可以使用message 管理其他的message
message MyMessage {//定義一個(gè)枚舉類型enum DataType {//可以設(shè)置枚舉類型,然后在java里面判斷StudentType = 0; //在proto3 要求enum的編號(hào)從0開(kāi)始WorkerType = 1;}//用data_type 來(lái)標(biāo)識(shí)傳的是哪一個(gè)枚舉類型DataType data_type = 1;//表示每次枚舉類型最多只能出現(xiàn)其中的一個(gè), 節(jié)省空間,另外一個(gè)對(duì)象如果在java取值變?yōu)榭罩?oneof dataBody {Student student = 2;Worker worker = 3;}}message Student {int32 id = 1;//Student類的屬性string name = 2; //
}
message Worker {string name=1;int32 age=2;
}
public class NettyServer {public static void main(String[] args) throws Exception {//創(chuàng)建BossGroup 和 WorkerGroup//說(shuō)明//1. 創(chuàng)建兩個(gè)線程組 bossGroup 和 workerGroup//2. bossGroup 只是處理連接請(qǐng)求 , 真正的和客戶端業(yè)務(wù)處理,會(huì)交給 workerGroup完成//3. 兩個(gè)都是無(wú)限循環(huán)//4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個(gè)數(shù)// 默認(rèn)實(shí)際 cpu核數(shù) * 2EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(); //8try {//創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象,配置參數(shù)ServerBootstrap bootstrap = new ServerBootstrap();//使用鏈?zhǔn)骄幊虂?lái)進(jìn)行設(shè)置bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個(gè)線程組.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作為服務(wù)器的通道實(shí)現(xiàn).option(ChannelOption.SO_BACKLOG, 128) // 設(shè)置線程隊(duì)列得到連接個(gè)數(shù).childOption(ChannelOption.SO_KEEPALIVE, true) //設(shè)置保持活動(dòng)連接狀態(tài)
// .handler(null) // 該 handler對(duì)應(yīng) bossGroup , childHandler 對(duì)應(yīng) workerGroup.childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建一個(gè)通道初始化對(duì)象(匿名對(duì)象)//給pipeline 設(shè)置處理器@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//在pipeline加入ProtoBufDecoder//指定對(duì)哪種對(duì)象進(jìn)行解碼pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance())); //!!!入站 解碼.pipeline.addLast(new NettyServerHandler());}}); // 給我們的workerGroup 的 EventLoop 對(duì)應(yīng)的管道設(shè)置處理器System.out.println(".....服務(wù)器 is ready...");//綁定一個(gè)端口并且同步, 生成了一個(gè) ChannelFuture 對(duì)象//啟動(dòng)服務(wù)器(并綁定端口)ChannelFuture cf = bootstrap.bind(6668).sync();//給cf 注冊(cè)監(jiān)聽(tīng)器,監(jiān)控我們關(guān)心的事件cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println("監(jiān)聽(tīng)端口 6668 成功");} else {System.out.println("監(jiān)聽(tīng)端口 6668 失敗");}}});//對(duì)關(guān)閉通道進(jìn)行監(jiān)聽(tīng)cf.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
//處理
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {//讀取數(shù)據(jù)實(shí)際(這里我們可以讀取客戶端發(fā)送的消息)/*1. ChannelHandlerContext ctx:上下文對(duì)象, 含有 管道pipeline , 通道channel, 地址2. Object msg: 就是客戶端發(fā)送的數(shù)據(jù) 默認(rèn)Object*/@Overridepublic void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {//根據(jù)dataType 來(lái)顯示不同的信息MyDataInfo.MyMessage.DataType dataType = msg.getDataType();if(dataType == MyDataInfo.MyMessage.DataType.StudentType) {MyDataInfo.Student student = msg.getStudent();System.out.println("學(xué)生id=" + student.getId() + " 學(xué)生名字=" + student.getName());} else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType) {MyDataInfo.Worker worker = msg.getWorker();System.out.println("工人的名字=" + worker.getName() + " 年齡=" + worker.getAge());} else {System.out.println("傳輸?shù)念愋筒徽_");}}// //讀取數(shù)據(jù)實(shí)際(這里我們可以讀取客戶端發(fā)送的消息)
// /*
// 1. ChannelHandlerContext ctx:上下文對(duì)象, 含有 管道pipeline , 通道channel, 地址
// 2. Object msg: 就是客戶端發(fā)送的數(shù)據(jù) 默認(rèn)Object
// */
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
// //讀取從客戶端發(fā)送的StudentPojo.Student
//
// StudentPOJO.Student student = (StudentPOJO.Student) msg;
//
// System.out.println("客戶端發(fā)送的數(shù)據(jù) id=" + student.getId() + " 名字=" + student.getName());
// }//數(shù)據(jù)讀取完畢@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//writeAndFlush 是 write + flush//將數(shù)據(jù)寫入到緩存,并刷新//一般講,我們對(duì)這個(gè)發(fā)送的數(shù)據(jù)進(jìn)行編碼ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));}//處理異常, 一般是需要關(guān)閉通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
//客戶端
public class NettyClient {public static void main(String[] args) throws Exception {//客戶端需要一個(gè)事件循環(huán)組EventLoopGroup group = new NioEventLoopGroup();try {//創(chuàng)建客戶端啟動(dòng)對(duì)象//注意客戶端使用的不是 ServerBootstrap 而是 BootstrapBootstrap bootstrap = new Bootstrap();//設(shè)置相關(guān)參數(shù)bootstrap.group(group) //設(shè)置線程組.channel(NioSocketChannel.class) // 設(shè)置客戶端通道的實(shí)現(xiàn)類(反射).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//在pipeline中加入 ProtoBufEncoderpipeline.addLast("encoder", new ProtobufEncoder()); //!!!入站編碼二進(jìn)制位java編碼pipeline.addLast(new NettyClientHandler()); //加入自己的處理器}});System.out.println("客戶端 ok..");//啟動(dòng)客戶端去連接服務(wù)器端//關(guān)于 ChannelFuture 要分析,涉及到netty的異步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();//給關(guān)閉通道進(jìn)行監(jiān)聽(tīng)channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}
4.入站(服務(wù)器數(shù)據(jù)向cli) 出站(cli->server)(整個(gè)過(guò)程在pipeline里面進(jìn)行) (想象為一個(gè)管道入棧需要解碼(二進(jìn)制變?yōu)閿?shù)據(jù)),出站需要需要編碼[變成二進(jìn)制])
5.netty處理沾包粘包問(wèn)題(需要相互判斷接收的字節(jié)數(shù),不然都是按一個(gè)包來(lái)處理,而我們是需要分開(kāi)處理小的包)
//但是分段的數(shù)據(jù)會(huì)被handler和decode重復(fù)處理
6.netty handler鏈 圖14.handler鏈
//如果發(fā)送的數(shù)據(jù),不是我要編碼的就不處理,所以我們要注意傳入數(shù)據(jù)和傳出數(shù)據(jù)類型要一樣
//客戶端和服務(wù)端都寫解碼和編碼
7.解碼器 RaplayingDecoder處理界面不用判斷是否有足夠的數(shù)據(jù)
//但是不是所有ByteBuf操作都支持,會(huì)拋異常,速度會(huì)變慢(信息變碎片->分片太多)
8.其他解碼器
1.LineBasedFrameDecoder 行尾使用 \n或者\(yùn)r\n分隔符解析數(shù)據(jù)
2.DelimiterBasedFrameDecoder 自定義特殊字符進(jìn)行分割
3.LengthFieldBasedFrameDecoder指定長(zhǎng)度來(lái)識(shí)別包,解決沾包粘包問(wèn)題
4.好用的對(duì)數(shù)據(jù)進(jìn)行壓縮ZlibDecoder
5.httpObjectDecoder
9.maven項(xiàng)目使用netty,整合Log4j
10.TCP粘包(關(guān)鍵在數(shù)據(jù)長(zhǎng)度)(小的包通過(guò)算法合成大包,可能出現(xiàn)接收的大包不完整,部分小包位置的問(wèn)題)
和拆包問(wèn)題
1.netty解決,自己定義協(xié)議對(duì)象+解密編碼器,接收者得到長(zhǎng)度,才得到數(shù)據(jù)(在這里不用判斷長(zhǎng)度,因?yàn)橛瞄L(zhǎng)度創(chuàng)建byte[]數(shù)組,長(zhǎng)度錯(cuò)誤就報(bào)錯(cuò)) //客戶端連續(xù)發(fā)送數(shù)據(jù),就有粘包問(wèn)題
//協(xié)議對(duì)象
public class MessageProtocol {private int len; //關(guān)鍵private byte[] content;public int getLen() {return len;}public void setLen(int len) {this.len = len;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}
}
//解碼器和加密器,可以分別識(shí)別出先寫入大小和內(nèi)容,解決粘包拆包問(wèn)題
public class MyMessageDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("MyMessageDecoder decode 被調(diào)用");//需要將得到二進(jìn)制字節(jié)碼-> MessageProtocol 數(shù)據(jù)包(對(duì)象)int length = in.readInt();//!!!!!!!!!!!!!!!!!!!核心步驟,得到的數(shù)據(jù)長(zhǎng)度,然后再創(chuàng)建byte數(shù)組接收byte[] content = new byte[length];in.readBytes(content);//封裝成 MessageProtocol 對(duì)象,放入 out, 傳遞下一個(gè)handler業(yè)務(wù)處理MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length); //messageProtocol.setContent(content);out.add(messageProtocol);}
}
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {System.out.println("MyMessageEncoder encode 方法被調(diào)用");out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//使用客戶端發(fā)送10條數(shù)據(jù) "今天天氣冷,吃火鍋" 編號(hào)for(int i = 0; i< 5; i++) {String mes = "今天天氣冷,吃火鍋";byte[] content = mes.getBytes(Charset.forName("utf-8"));int length = mes.getBytes(Charset.forName("utf-8")).length;//創(chuàng)建協(xié)議包對(duì)象MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);ctx.writeAndFlush(messageProtocol);}}// @Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {int len = msg.getLen();byte[] content = msg.getContent();System.out.println("客戶端接收到消息如下");System.out.println("長(zhǎng)度=" + len);System.out.println("內(nèi)容=" + new String(content, Charset.forName("utf-8")));System.out.println("客戶端接收消息數(shù)量=" + (++this.count));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("異常消息=" + cause.getMessage());ctx.close();}
}//server
//處理業(yè)務(wù)的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol>{private int count;@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//cause.printStackTrace();ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {//接收到數(shù)據(jù),并處理int len = msg.getLen();byte[] content = msg.getContent();System.out.println();System.out.println();System.out.println();System.out.println("服務(wù)器接收到信息如下");System.out.println("長(zhǎng)度=" + len);System.out.println("內(nèi)容=" + new String(content, Charset.forName("utf-8")));System.out.println("服務(wù)器接收到消息包數(shù)量=" + (++this.count));//回復(fù)消息String responseContent = UUID.randomUUID().toString();int responseLen = responseContent.getBytes("utf-8").length;byte[] responseContent2 = responseContent.getBytes("utf-8");//構(gòu)建一個(gè)協(xié)議包MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(responseLen);messageProtocol.setContent(responseContent2);//向所有channel發(fā)送數(shù)據(jù)ctx.writeAndFlush(messageProtocol);}
}