一個網(wǎng)站做無限關(guān)鍵詞網(wǎng)站seo優(yōu)化怎么做
在學(xué)習(xí)Netty之前先要學(xué)習(xí)一下NIO相關(guān)的知識,因為Netty是基于NIO搭建的一套網(wǎng)絡(luò)編程框架。
一. NIO 基礎(chǔ)
non-blocking io 非阻塞 IO
1. 三大組件
1.1 Channel & Buffer
channel 有一點類似于 stream,它就是讀寫數(shù)據(jù)的雙向通道,可以從 channel 將數(shù)據(jù)讀入 buffer,也可以將 buffer 的數(shù)據(jù)寫入 channel,而之前的 stream 要么是輸入,要么是輸出,channel 比 stream 更為底層
?
常見的 Channel 有
-
FileChannel
-
DatagramChannel
-
SocketChannel
-
ServerSocketChannel
buffer 則用來緩沖讀寫數(shù)據(jù),常見的 buffer 有
-
ByteBuffer
-
MappedByteBuffer
-
DirectByteBuffer
-
HeapByteBuffer
-
-
ShortBuffer
-
IntBuffer
-
LongBuffer
-
FloatBuffer
-
DoubleBuffer
-
CharBuffer
1.2 Selector
selector 單從字面意思不好理解,需要結(jié)合服務(wù)器的設(shè)計演化來理解它的用途
多線程版設(shè)計?
?? 多線程版缺點
-
內(nèi)存占用高
-
線程上下文切換成本高
-
只適合連接數(shù)少的場景
線程池版設(shè)計
?? 線程池版缺點
-
阻塞模式下,線程僅能處理一個 socket 連接
-
僅適合短連接場景
selector 版設(shè)計
selector 的作用就是配合一個線程來管理多個 channel,獲取這些 channel 上發(fā)生的事件,這些 channel 工作在非阻塞模式下,不會讓線程吊死在一個 channel 上。適合連接數(shù)特別多,但流量低的場景(low traffic)
?
調(diào)用 selector 的 select() 會阻塞當(dāng)前線程直到 channel 發(fā)生了讀寫就緒事件,這些事件發(fā)生,select 方法就會返回這些事件交給 thread 來處理
2. ByteBuffer
有一普通文本文件 data.txt,內(nèi)容為
1234567890abcd
使用 FileChannel 來讀取文件內(nèi)容
@Slf4j
public class ChannelDemo1 {public static void main(String[] args) {try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) {FileChannel channel = file.getChannel();ByteBuffer buffer = ByteBuffer.allocate(10);do {// 向 buffer 寫入int len = channel.read(buffer);log.debug("讀到字節(jié)數(shù):{}", len);if (len == -1) {break;}// 切換 buffer 讀模式buffer.flip();while(buffer.hasRemaining()) {log.debug("{}", (char)buffer.get());}// 切換 buffer 寫模式buffer.clear();} while (true);} catch (IOException e) {e.printStackTrace();}}
}
輸出
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):10 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):4 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d 10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):-1
2.1 ByteBuffer 正確使用姿勢
-
向 buffer 寫入數(shù)據(jù),例如調(diào)用 channel.read(buffer)
-
調(diào)用 flip() 切換至讀模式
-
從 buffer 讀取數(shù)據(jù),例如調(diào)用 buffer.get()
-
調(diào)用 clear() 或 compact() 切換至寫模式
-
重復(fù) 1~4 步驟
2.2 ByteBuffer 結(jié)構(gòu)
ByteBuffer 有以下重要屬性
-
capacity
-
position(讀寫指針)
-
limit
一開始
寫模式下,position 是寫入位置,limit 等于容量,下圖表示寫入了 4 個字節(jié)后的狀態(tài)
flip 動作發(fā)生后,position 切換為讀取位置,limit 切換為讀取限制
讀取 4 個字節(jié)后,狀態(tài)
clear 動作發(fā)生后,狀態(tài)
compact 方法,是把未讀完的部分向前壓縮,然后切換至寫模式
💡 調(diào)試工具類
public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];
?static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}
?int i;
?// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append(" ? ");}HEXPADDING[i] = buf.toString();}
?// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}
?// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}
?// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}
?// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}
?/*** 打印所有內(nèi)容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}
?/*** 打印可讀取內(nèi)容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}
?private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append(" ? ? ? ? +-------------------------------------------------+" +NEWLINE + " ? ? ? ? | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +NEWLINE + "+--------+-------------------------------------------------+----------------+");
?final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;
?// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;
?// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);
?// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");
?// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}
?// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
?// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");
?// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}
?dump.append(NEWLINE +"+--------+-------------------------------------------------+----------------+");}
?private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}
?public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);}
}
2.3 ByteBuffer 常見方法
分配空間
可以使用 allocate 方法為 ByteBuffer 分配空間,其它 buffer 類也有該方法
Bytebuffer buf = ByteBuffer.allocate(16);
class java.nio.HeapByteBuffer - java 堆內(nèi)存,讀寫效率較低,受到 GC 的影響
class java.nio.DirectByteBuffer - 直接內(nèi)存,讀寫效率高(少一次拷貝),不會受 GC 影響,分配的效率低
向 buffer 寫入數(shù)據(jù)
有兩種辦法
-
調(diào)用 channel 的 read 方法
-
調(diào)用 buffer 自己的 put 方法
int readBytes = channel.read(buf);
和
buf.put((byte)127);
從 buffer 讀取數(shù)據(jù)
同樣有兩種辦法
-
調(diào)用 channel 的 write 方法
-
調(diào)用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);
和
byte b = buf.get();
get 方法會讓 position 讀指針向后走,如果想重復(fù)讀取數(shù)據(jù)
-
可以調(diào)用 rewind 方法將 position 重新置為 0
-
或者調(diào)用 get(int i) 方法獲取索引 i 的內(nèi)容,它不會移動讀指針
mark 和 reset
mark 是在讀取時,做一個標(biāo)記,即使 position 改變,只要調(diào)用 reset 就能回到 mark 的位置
注意
rewind 和 flip 都會清除 mark 位置
字符串與 ByteBuffer 互轉(zhuǎn)
public class TestByteBufferString {public static void main(String[] args) {// 1. 字符串轉(zhuǎn)為 ByteBufferByteBuffer buffer1 = ByteBuffer.allocate(16);buffer1.put("hello".getBytes());debugAll(buffer1);// 2. CharsetByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");debugAll(buffer2);// 3. wrapByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());debugAll(buffer3);// 4. 轉(zhuǎn)為字符串String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();System.out.println(str1);buffer1.flip();String str2 = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(str2);}
}
輸出
?? Buffer 的線程安全
Buffer 是非線程安全的
2.4 Scattering Reads
分散讀取,有一個文本文件 3parts.txt
onetwothree
使用如下方式讀取,可以將數(shù)據(jù)填充至多個 buffer
public class TestScatteringReads {public static void main(String[] args) {try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {ByteBuffer b1 = ByteBuffer.allocate(3);ByteBuffer b2 = ByteBuffer.allocate(3);ByteBuffer b3 = ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{b1, b2, b3});b1.flip();b2.flip();b3.flip();debugAll(b1);debugAll(b2);debugAll(b3);} catch (IOException e) {}}
}
2.5 Gathering Writes
使用如下方式寫入,可以將多個 buffer 的數(shù)據(jù)填充至 channel
public class TestGatheringWrites {public static void main(String[] args) {ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {channel.write(new ByteBuffer[]{b1, b2, b3});} catch (IOException e) {}}
}
2.6 練習(xí)
網(wǎng)絡(luò)上有多條數(shù)據(jù)發(fā)送給服務(wù)端,數(shù)據(jù)之間使用 \n 進(jìn)行分隔但由于某種原因這些數(shù)據(jù)在接收時,被進(jìn)行了重新組合,例如原始數(shù)據(jù)有3條為
-
Hello,world\n
-
I'm zhangsan\n
-
How are you?\n
變成了下面的兩個 byteBuffer (黏包,半包)
-
Hello,world\nI'm zhangsan\nHo
-
w are you?\n
現(xiàn)在要求你編寫程序,將錯亂的數(shù)據(jù)恢復(fù)成原始的按 \n 分隔的數(shù)據(jù)
public static void main(String[] args) {ByteBuffer source = ByteBuffer.allocate(32);// ? ? ? ? ? ? ? ? ? ? 11 ? ? ? ? ? 24source.put("Hello,world\nI'm zhangsan\nHo".getBytes());split(source);
?source.put("w are you?\nhaha!\n".getBytes());split(source);
}
?
private static void split(ByteBuffer source) {source.flip();int oldLimit = source.limit();for (int i = 0; i < oldLimit; i++) {if (source.get(i) == '\n') {System.out.println(i);ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());// 0 ~ limitsource.limit(i + 1);target.put(source); // 從source 讀,向 target 寫debugAll(target);source.limit(oldLimit);}}source.compact();
}
3. 文件編程
3.1 FileChannel
?? FileChannel 工作模式
FileChannel 只能工作在阻塞模式下
獲取
不能直接打開 FileChannel,必須通過 FileInputStream、FileOutputStream 或者 RandomAccessFile 來獲取 FileChannel,它們都有 getChannel 方法
-
通過 FileInputStream 獲取的 channel 只能讀
-
通過 FileOutputStream 獲取的 channel 只能寫
-
通過 RandomAccessFile 是否能讀寫根據(jù)構(gòu)造 RandomAccessFile 時的讀寫模式?jīng)Q定
讀取
會從 channel 讀取數(shù)據(jù)填充 ByteBuffer,返回值表示讀到了多少字節(jié),-1 表示到達(dá)了文件的末尾
int readBytes = channel.read(buffer);
寫入
寫入的正確姿勢如下, SocketChannel
ByteBuffer buffer = ...;
buffer.put(...); // 存入數(shù)據(jù)
buffer.flip(); ? // 切換讀模式
?
while(buffer.hasRemaining()) {channel.write(buffer);
}
在 while 中調(diào)用 channel.write 是因為 write 方法并不能保證一次將 buffer 中的內(nèi)容全部寫入 channel
關(guān)閉
channel 必須關(guān)閉,不過調(diào)用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法會間接地調(diào)用 channel 的 close 方法
位置
獲取當(dāng)前位置
long pos = channel.position();
設(shè)置當(dāng)前位置
long newPos = ...;
channel.position(newPos);
設(shè)置當(dāng)前位置時,如果設(shè)置為文件的末尾
-
這時讀取會返回 -1
-
這時寫入,會追加內(nèi)容,但要注意如果 position 超過了文件末尾,再寫入時在新內(nèi)容和原末尾之間會有空洞(00)
大小
使用 size 方法獲取文件的大小
強(qiáng)制寫入
操作系統(tǒng)出于性能的考慮,會將數(shù)據(jù)緩存,不是立刻寫入磁盤??梢哉{(diào)用 force(true) 方法將文件內(nèi)容和元數(shù)據(jù)(文件的權(quán)限等信息)立刻寫入磁盤
3.2 兩個 Channel 傳輸數(shù)據(jù)
String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();FileChannel to = new FileOutputStream(TO).getChannel();) {from.transferTo(0, from.size(), to);
} catch (IOException e) {e.printStackTrace();
}
long end = System.nanoTime();
System.out.println("transferTo 用時:" + (end - start) / 1000_000.0);
輸出
transferTo 用時:8.2011
超過 2g 大小的文件傳輸(注意channel一次性最大的數(shù)據(jù)傳輸量為channel)
public class TestFileChannelTransferTo {public static void main(String[] args) {try (FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("to.txt").getChannel();) {// 效率高,底層會利用操作系統(tǒng)的零拷貝進(jìn)行優(yōu)化long size = from.size();// left 變量代表還剩余多少字節(jié)for (long left = size; left > 0; ) {System.out.println("position:" + (size - left) + " left:" + left);left -= from.transferTo((size - left), left, to);}} catch (IOException e) {e.printStackTrace();}}
}
實際傳輸一個超大文件
position:0 left:7769948160
position:2147483647 left:5622464513
position:4294967294 left:3474980866
position:6442450941 left:1327497219
3.3 Path
jdk7 引入了 Path 和 Paths 類
-
Path 用來表示文件路徑
-
Paths 是工具類,用來獲取 Path 實例
Path source = Paths.get("1.txt"); // 相對路徑 使用 user.dir 環(huán)境變量來定位 1.txt
?
Path source = Paths.get("d:\\1.txt"); // 絕對路徑 代表了 d:\1.txt
?
Path source = Paths.get("d:/1.txt"); // 絕對路徑 同樣代表了 d:\1.txt
?
Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
-
.
代表了當(dāng)前路徑 -
..
代表了上一級路徑
例如目錄結(jié)構(gòu)如下
d:|- data|- projects|- a|- b
代碼
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路徑
會輸出
d:\data\projects\a\..\b
d:\data\projects\b
3.4 Files
檢查文件是否存在
Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));
創(chuàng)建一級目錄
Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
-
如果目錄已存在,會拋異常 FileAlreadyExistsException
-
不能一次創(chuàng)建多級目錄,否則會拋異常 NoSuchFileException
創(chuàng)建多級目錄用
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);
拷貝文件
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");
?
Files.copy(source, target);
-
如果文件已存在,會拋異常 FileAlreadyExistsException
如果希望用 source 覆蓋掉 target,需要用 StandardCopyOption 來控制
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
移動文件
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");
?
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
-
StandardCopyOption.ATOMIC_MOVE 保證文件移動的原子性
刪除文件
Path target = Paths.get("helloword/target.txt");
?
Files.delete(target);
-
如果文件不存在,會拋異常 NoSuchFileException
刪除目錄
Path target = Paths.get("helloword/d1");
?
Files.delete(target);
-
如果目錄還有內(nèi)容,會拋異常 DirectoryNotEmptyException
遍歷目錄文件
public static void main(String[] args) throws IOException {Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");AtomicInteger dirCount = new AtomicInteger();AtomicInteger fileCount = new AtomicInteger();Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println(dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}
?@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);fileCount.incrementAndGet();return super.visitFile(file, attrs);}});System.out.println(dirCount); // 133System.out.println(fileCount); // 1479
}
統(tǒng)計 jar 的數(shù)目
Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {if (file.toFile().getName().endsWith(".jar")) {fileCount.incrementAndGet();}return super.visitFile(file, attrs);}
});
System.out.println(fileCount); // 724
刪除多級目錄
Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);return super.visitFile(file, attrs);}
?@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {Files.delete(dir);return super.postVisitDirectory(dir, exc);}
});
?? 刪除很危險
刪除是危險操作,確保要遞歸刪除的文件夾沒有重要內(nèi)容
拷貝多級目錄
long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";
?
Files.walk(Paths.get(source)).forEach(path -> {try {String targetName = path.toString().replace(source, target);// 是目錄if (Files.isDirectory(path)) {Files.createDirectory(Paths.get(targetName));}// 是普通文件else if (Files.isRegularFile(path)) {Files.copy(path, Paths.get(targetName));}} catch (IOException e) {e.printStackTrace();}
});
long end = System.currentTimeMillis();
System.out.println(end - start);
4. 網(wǎng)絡(luò)編程
4.1 非阻塞 vs 阻塞
阻塞
-
阻塞模式下,相關(guān)方法都會導(dǎo)致線程暫停
-
ServerSocketChannel.accept 會在沒有連接建立時讓線程暫停
-
SocketChannel.read 會在沒有數(shù)據(jù)可讀時讓線程暫停
-
阻塞的表現(xiàn)其實就是線程暫停了,暫停期間不會占用 cpu,但線程相當(dāng)于閑置
-
-
單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
-
但多線程下,有新的問題,體現(xiàn)在以下方面
-
32 位 jvm 一個線程 320k,64 位 jvm 一個線程 1024k,如果連接數(shù)過多,必然導(dǎo)致 OOM,并且線程太多,反而會因為頻繁上下文切換導(dǎo)致性能降低
-
可以采用線程池技術(shù)來減少線程數(shù)和線程上下文切換,但治標(biāo)不治本,如果有很多連接建立,但長時間 inactive,會阻塞線程池中所有線程,因此不適合長連接,只適合短連接
-
服務(wù)器端
// 使用 nio 來理解阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
?
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
?
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信log.debug("connecting...");SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運行l(wèi)og.debug("connected... {}", sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接收客戶端發(fā)送的數(shù)據(jù)log.debug("before read... {}", channel);channel.read(buffer); // 阻塞方法,線程停止運行buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
非阻塞
-
非阻塞模式下,相關(guān)方法都會不會讓線程暫停
-
在 ServerSocketChannel.accept 在沒有連接建立時,會返回 null,繼續(xù)運行
-
SocketChannel.read 在沒有數(shù)據(jù)可讀時,會返回 0,但線程不必阻塞,可以去執(zhí)行其它 SocketChannel 的 read 或是去執(zhí)行 ServerSocketChannel.accept
-
寫數(shù)據(jù)時,線程只是等待數(shù)據(jù)寫入 Channel 即可,無需等 Channel 通過網(wǎng)絡(luò)把數(shù)據(jù)發(fā)送出去
-
-
但非阻塞模式下,即使沒有連接建立,和可讀數(shù)據(jù),線程仍然在不斷運行,白白浪費了 cpu
-
數(shù)據(jù)復(fù)制過程中,線程實際還是阻塞的(AIO 改進(jìn)的地方)
服務(wù)器端,客戶端代碼不變
// 使用 nio 來理解非阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信SocketChannel sc = ssc.accept(); // 非阻塞,線程還會繼續(xù)運行,如果沒有連接建立,但sc是nullif (sc != null) {log.debug("connected... {}", sc);sc.configureBlocking(false); // 非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {// 5. 接收客戶端發(fā)送的數(shù)據(jù)int read = channel.read(buffer);// 非阻塞,線程仍然會繼續(xù)運行,如果沒有讀到數(shù)據(jù),read 返回 0if (read > 0) {buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}}
}
多路復(fù)用
單線程可以配合 Selector 完成對多個 Channel 可讀寫事件的監(jiān)控,這稱之為多路復(fù)用
-
多路復(fù)用僅針對網(wǎng)絡(luò) IO、普通文件 IO 沒法利用多路復(fù)用
-
如果不用 Selector 的非阻塞模式,線程大部分時間都在做無用功,而 Selector 能夠保證
-
有可連接事件時才去連接
-
有可讀事件才去讀取
-
有可寫事件才去寫入
-
限于網(wǎng)絡(luò)傳輸能力,Channel 未必時時可寫,一旦 Channel 可寫,會觸發(fā) Selector 的可寫事件
-
-
4.2 Selector
?
好處
-
一個線程配合 selector 就可以監(jiān)控多個 channel 的事件,事件發(fā)生線程才去處理。避免非阻塞模式下所做無用功
-
讓這個線程能夠被充分利用
-
節(jié)約了線程的數(shù)量
-
減少了線程上下文切換
創(chuàng)建
Selector selector = Selector.open();
綁定 Channel 事件
也稱之為注冊事件,綁定的事件 selector 才會關(guān)心
channel.configureBlocking(false);//設(shè)置為非阻塞模式
SelectionKey key = channel.register(selector, 綁定事件);
-
channel 必須工作在非阻塞模式
-
FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
-
綁定的事件類型可以有
-
connect - 客戶端連接成功時觸發(fā)
-
accept - 服務(wù)器端成功接受連接時觸發(fā)
-
read - 數(shù)據(jù)可讀入時觸發(fā),有因為接收能力弱,數(shù)據(jù)暫不能讀入的情況
-
write - 數(shù)據(jù)可寫出時觸發(fā),有因為發(fā)送能力弱,數(shù)據(jù)暫不能寫出的情況
-
監(jiān)聽 Channel 事件
可以通過下面三種方法來監(jiān)聽是否有事件發(fā)生,方法的返回值代表有多少 channel 發(fā)生了事件
方法1,阻塞直到綁定事件發(fā)生
int count = selector.select();
方法2,阻塞直到綁定事件發(fā)生,或是超時(時間單位為 ms)
int count = selector.select(long timeout);
方法3,不會阻塞,也就是不管有沒有事件,立刻返回,自己根據(jù)返回值檢查是否有事件
int count = selector.selectNow();
💡 select 何時不阻塞
事件發(fā)生時
客戶端發(fā)起連接請求,會觸發(fā) accept 事件
客戶端發(fā)送數(shù)據(jù)過來,客戶端正常、異常關(guān)閉時,都會觸發(fā) read 事件,另外如果發(fā)送的數(shù)據(jù)大于 buffer 緩沖區(qū),會觸發(fā)多次讀取事件
channel 可寫,會觸發(fā) write 事件
在 linux 下 nio bug 發(fā)生時
調(diào)用 selector.wakeup()
調(diào)用 selector.close()
selector 所在線程 interrupt
4.3 處理 accept 事件
客戶端代碼為
public class Client {public static void main(String[] args) {try (Socket socket = new Socket("localhost", 8080)) {System.out.println(socket);socket.getOutputStream().write("world".getBytes());System.in.read();} catch (IOException e) {e.printStackTrace();}}
}
服務(wù)器端代碼為
@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);
?while (true) {int count = selector.select();
// ? ? ? ? ? ? ? int count = selector.selectNow();log.debug("select count: {}", count);
// ? ? ? ? ? ? ? if(count <= 0) {
// ? ? ? ? ? ? ? ? ? continue;
// ? ? ? ? ? ? ? }
?// 獲取所有事件Set<SelectionKey> keys = selector.selectedKeys();
?// 遍歷所有事件,逐一處理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判斷事件類型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必須處理SocketChannel sc = c.accept();log.debug("{}", sc);}// 處理完畢,必須將事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}
💡 事件發(fā)生后能否不處理
事件發(fā)生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會觸發(fā),這是因為 nio 底層使用的是水平觸發(fā)
4.4 處理 read 事件
@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);
?while (true) {int count = selector.select();
// ? ? ? ? ? ? ? int count = selector.selectNow();log.debug("select count: {}", count);
// ? ? ? ? ? ? ? if(count <= 0) {
// ? ? ? ? ? ? ? ? ? continue;
// ? ? ? ? ? ? ? }
?// 獲取所有事件Set<SelectionKey> keys = selector.selectedKeys();
?// 遍歷所有事件,逐一處理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判斷事件類型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必須處理SocketChannel sc = c.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);log.debug("連接已建立: {}", sc);} else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);if(read == -1) {key.cancel();sc.close();} else {buffer.flip();debug(buffer);}}// 處理完畢,必須將事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}
開啟兩個客戶端,修改一下發(fā)送文字,輸出
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080] 21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1 21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367] 21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ 21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1 21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378] 21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 6f 72 6c 64 |world | +--------+-------------------------------------------------+----------------+
💡 為何要 iter.remove()
因為 select 在事件發(fā)生后,就會將相關(guān)的 key 放入 selectedKeys 集合,但不會在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如
第一次觸發(fā)了 ssckey 上的 accept 事件,沒有移除 ssckey
第二次觸發(fā)了 sckey 上的 read 事件,但這時 selectedKeys 中還有上次的 ssckey ,在處理時因為沒有真正的 serverSocket 連上了,就會導(dǎo)致空指針異常
💡 cancel 的作用
cancel 會取消注冊在 selector 上的 channel,并從 keys 集合中刪除 key 后續(xù)不會再監(jiān)聽事件
?? 不處理邊界的問題
以前有同學(xué)寫過這樣的代碼,思考注釋中兩個問題,以 bio 為例,其實 nio 道理是一樣的
public class Server {public static void main(String[] args) throws IOException {ServerSocket ss=new ServerSocket(9000);while (true) {Socket s = ss.accept();InputStream in = s.getInputStream();// 這里這么寫,有沒有問題byte[] arr = new byte[4];while(true) {int read = in.read(arr);// 這里這么寫,有沒有問題if(read == -1) {break;}System.out.println(new String(arr, 0, read));}}}
}
客戶端
public class Client {public static void main(String[] args) throws IOException {Socket max = new Socket("localhost", 9000);OutputStream out = max.getOutputStream();out.write("hello".getBytes());out.write("world".getBytes());out.write("你好".getBytes());max.close();}
}
輸出
hell owor ld� �好
為什么?
處理消息的邊界
-
一種思路是固定消息長度,數(shù)據(jù)包大小一樣,服務(wù)器按預(yù)定長度讀取,缺點是浪費帶寬
-
另一種思路是按分隔符拆分,缺點是效率低
-
TLV 格式,即 Type 類型、Length 長度、Value 數(shù)據(jù),類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點是 buffer 需要提前分配,如果內(nèi)容過大,則影響 server 吞吐量
-
Http 1.1 是 TLV 格式
-
Http 2.0 是 LTV 格式
-
?
?
服務(wù)器端
private static void split(ByteBuffer source) {source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一條完整消息if (source.get(i) == '\n') {int length = i + 1 - source.position();// 把這條完整消息存入新的 ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 從 source 讀,向 target 寫for (int j = 0; j < length; j++) {target.put(source.get());}debugAll(target);}}source.compact(); // 0123456789abcdef position 16 limit 16
}
?
public static void main(String[] args) throws IOException {// 1. 創(chuàng)建 selector, 管理多個 channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立 selector 和 channel 的聯(lián)系(注冊)// SelectionKey 就是將來事件發(fā)生后,通過它可以知道事件和哪個channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);// key 只關(guān)注 accept 事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("sscKey:{}", sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {// 3. select 方法, 沒有事件發(fā)生,線程阻塞,有事件,線程才會恢復(fù)運行// select 在事件未處理時,它不會阻塞, 事件發(fā)生后要么處理,要么取消,不能置之不理selector.select();// 4. 處理事件, selectedKeys 內(nèi)部包含了所有發(fā)生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 處理key 時,要從 selectedKeys 集合中刪除,否則下次處理就會有問題iter.remove();log.debug("key: {}", key);// 5. 區(qū)分事件類型if (key.isAcceptable()) { // 如果是 acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); // attachment// 將一個 byteBuffer 作為附件關(guān)聯(lián)到 selectionKey 上SelectionKey scKey = sc.register(selector, 0, buffer);scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);log.debug("scKey:{}", scKey);} else if (key.isReadable()) { // 如果是 readtry {SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發(fā)事件的channel// 獲取 selectionKey 上關(guān)聯(lián)的附件ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常斷開,read 的方法的返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 需要擴(kuò)容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer); // 0123456789abcdef3333\nkey.attach(newBuffer);}}
?} catch (IOException e) {e.printStackTrace();key.cancel(); ?// 因為客戶端斷開了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)}}}}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();
ByteBuffer 大小分配
-
每個 channel 都需要記錄可能被切分的消息,因為 ByteBuffer 不能被多個 channel 共同使用,因此需要為每個 channel 維護(hù)一個獨立的 ByteBuffer
-
ByteBuffer 不能太大,比如一個 ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內(nèi)存,因此需要設(shè)計大小可變的 ByteBuffer
-
一種思路是首先分配一個較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點是消息連續(xù)容易處理,缺點是數(shù)據(jù)拷貝耗費性能,參考實現(xiàn) Java Resizable Array
-
另一種思路是用多個數(shù)組組成 buffer,一個數(shù)組不夠,把多出來的內(nèi)容寫入新的數(shù)組,與前面的區(qū)別是消息存儲不連續(xù)解析復(fù)雜,優(yōu)點是避免了拷貝引起的性能損耗
-
4.5 處理 write 事件
一次無法寫完例子
-
非阻塞模式下,無法保證把 buffer 中所有數(shù)據(jù)都寫入 channel,因此需要追蹤 write 方法的返回值(代表實際寫入字節(jié)數(shù))
-
用 selector 監(jiān)聽所有 channel 的可寫事件,每個 channel 都需要一個 key 來跟蹤 buffer,但這樣又會導(dǎo)致占用內(nèi)存過多,就有兩階段策略
-
當(dāng)消息處理器第一次寫入消息時,才將 channel 注冊到 selector 上
-
selector 檢查 channel 上的可寫事件,如果所有的數(shù)據(jù)寫完了,就取消 channel 的注冊
-
如果不取消,會每次可寫均會觸發(fā) write 事件
-
public class WriteServer {
?public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8080));
?Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);
?while(true) {selector.select();
?Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);// 1. 向客戶端發(fā)送內(nèi)容StringBuilder sb = new StringBuilder();for (int i = 0; i < 3000000; i++) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());int write = sc.write(buffer);// 3. write 表示實際寫了多少字節(jié)System.out.println("實際寫入字節(jié):" + write);// 4. 如果有剩余未讀字節(jié),才需要關(guān)注寫事件if (buffer.hasRemaining()) {// read 1 write 4// 在原有關(guān)注事件的基礎(chǔ)上,多關(guān)注 寫事件sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);// 把 buffer 作為附件加入 sckeysckey.attach(buffer);}} else if (key.isWritable()) {ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel) key.channel();int write = sc.write(buffer);System.out.println("實際寫入字節(jié):" + write);if (!buffer.hasRemaining()) { // 寫完了key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);key.attach(null);}}}}}
}
客戶端
public class WriteClient {public static void main(String[] args) throws IOException {Selector selector = Selector.open();SocketChannel sc = SocketChannel.open();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);sc.connect(new InetSocketAddress("localhost", 8080));int count = 0;while (true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isConnectable()) {System.out.println(sc.finishConnect());} else if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);count += sc.read(buffer);buffer.clear();System.out.println(count);}}}}
}
💡 write 為何要取消
只要向 channel 發(fā)送數(shù)據(jù)時,socket 緩沖可寫,這個事件會頻繁觸發(fā),因此應(yīng)當(dāng)只在 socket 緩沖區(qū)寫不下時再關(guān)注可寫事件,數(shù)據(jù)寫完之后再取消關(guān)注
4.6 更進(jìn)一步
💡 利用多線程優(yōu)化
現(xiàn)在都是多核 cpu,設(shè)計時要充分考慮別讓 cpu 的力量被白白浪費
前面的代碼只有一個選擇器,沒有充分利用多核 cpu,如何改進(jìn)呢?
分兩組選擇器
-
單線程配一個選擇器,專門處理 accept 事件,只關(guān)注連接事件
-
創(chuàng)建 cpu 核心數(shù)的線程,每個線程配一個選擇器,輪流處理 read 事件和write事件
服務(wù)器端代碼:
package com.kjz.nio.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;import static com.kjz.nio.c2.ByteBufferUtil.debugAll;@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));// 1. 創(chuàng)建固定數(shù)量的 worker 并初始化Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}AtomicInteger index = new AtomicInteger();while(true) {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 2. 關(guān)聯(lián) selectorlog.debug("before register...{}", sc.getRemoteAddress());// round robin 輪詢workers[index.getAndIncrement() % workers.length].register(sc); // boss 調(diào)用 初始化 selector , 啟動 worker-0log.debug("after register...{}", sc.getRemoteAddress());}}}}//使用另外一個線程創(chuàng)建專門監(jiān)聽讀寫事件的Selectorstatic class Worker implements Runnable{private Thread thread;private Selector selector;private String name;private volatile boolean start = false; // 還未初始化//隊列用于在線程之間傳遞數(shù)據(jù)private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public Worker(String name) {this.name = name;}// 初始化線程和selectorpublic void register(SocketChannel sc) throws IOException {if(!start) {selector = Selector.open();thread = new Thread(this, name);thread.start();start = true;}//向隊列添加了任務(wù),但是任務(wù)中的代碼并沒有立即執(zhí)行,要保證任務(wù)中的代碼在同一線程下先執(zhí)行//selector.select()再執(zhí)行 sc.register(selector,SelectionKey.OP_READ,null)//以此來保證selector.select()和 sc.register(selector,SelectionKey.OP_READ,null)不會相互阻塞
// queue.add(()->{
// try {
// sc.register(selector,SelectionKey.OP_READ,null);
// } catch (ClosedChannelException e) {
// e.printStackTrace();
// }
// });// 喚醒 select 方法 ,防止下面的run方法中 selector.select()在初始化注冊讀事件時阻塞
// selector.wakeup();
// sc.register(selector, SelectionKey.OP_READ, null); // bossselector.wakeup();sc.register(selector,SelectionKey.OP_READ,null);}@Overridepublic void run() {while(true) {try {//監(jiān)聽事件selector.select(); // worker-0 阻塞,此時register方法就不能執(zhí)行了
// Runnable task = queue.poll();
// if (task != null) {
// //執(zhí)行任務(wù)隊列中的方法
// task.run();
// }Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}
💡 如何拿到 cpu 個數(shù)
Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因為容器不是物理隔離的,會拿到物理 cpu 個數(shù),而不是容器申請時的個數(shù)
這個問題直到 jdk 10 才修復(fù),使用 jvm 參數(shù) UseContainerSupport 配置, 默認(rèn)開啟
4.7 UDP
-
UDP 是無連接的,client 發(fā)送數(shù)據(jù)不會管 server 是否開啟
-
server 這邊的 receive 方法會將接收到的數(shù)據(jù)存入 byte buffer,但如果數(shù)據(jù)報文超過 buffer 大小,多出來的數(shù)據(jù)會被默默拋棄
首先啟動服務(wù)器端
public class UdpServer {public static void main(String[] args) {try (DatagramChannel channel = DatagramChannel.open()) {channel.socket().bind(new InetSocketAddress(9999));System.out.println("waiting...");ByteBuffer buffer = ByteBuffer.allocate(32);channel.receive(buffer);buffer.flip();debug(buffer);} catch (IOException e) {e.printStackTrace();}}
}
輸出
waiting...
運行客戶端
public class UdpClient {public static void main(String[] args) {try (DatagramChannel channel = DatagramChannel.open()) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");InetSocketAddress address = new InetSocketAddress("localhost", 9999);channel.send(buffer, address);} catch (Exception e) {e.printStackTrace();}}
}
接下來服務(wù)器端輸出
+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+
5. NIO vs BIO
5.1 stream vs channel
-
stream 不會自動緩沖數(shù)據(jù),channel 會利用系統(tǒng)提供的發(fā)送緩沖區(qū)、接收緩沖區(qū)(更為底層)
-
stream 僅支持阻塞 API,channel 同時支持阻塞、非阻塞 API,網(wǎng)絡(luò) channel 可配合 selector 實現(xiàn)多路復(fù)用
-
二者均為全雙工,即讀寫可以同時進(jìn)行
5.2 IO 模型
同步阻塞、同步非阻塞、同步多路復(fù)用、異步阻塞(沒有此情況)、異步非阻塞
-
同步:線程自己去獲取結(jié)果(一個線程)
-
異步:線程自己不去獲取結(jié)果,而是由其它線程送結(jié)果(至少兩個線程)
當(dāng)調(diào)用一次 channel.read 或 stream.read 后,會切換至操作系統(tǒng)內(nèi)核態(tài)來完成真正數(shù)據(jù)讀取,而讀取又分為兩個階段,分別為:
-
等待數(shù)據(jù)階段
-
復(fù)制數(shù)據(jù)階段
-
阻塞 IO
用戶線程在讀取數(shù)據(jù)時被阻塞,在阻塞過程中,用戶線程無法干其他的事
-
非阻塞 IO
向內(nèi)核空間讀取數(shù)據(jù)時如果沒有數(shù)據(jù)就會立即返回,用戶線程中的讀方法被while(true)包裹,用戶線程會不斷向內(nèi)核空間讀取,一旦有數(shù)據(jù)就會進(jìn)入到復(fù)制數(shù)據(jù),在等待數(shù)據(jù)的過程中是非阻塞的,但是復(fù)制數(shù)據(jù)的過程是非阻塞的。
-
多路復(fù)用
-
信號驅(qū)動
-
異步 IO
同步:線程自己去獲取結(jié)果(一個線程親力親為)
異步:線程自己不去獲取結(jié)果,而是由其他線程送結(jié)果(至少兩個線程)
異步情況下線程一定是非阻塞
-
阻塞 IO vs 多路復(fù)用
在阻塞IO下如果read阻塞在channel1的讀取數(shù)據(jù)中,那么此時另外一個channel2如果過來要建立連接accept,那么這個accept方法就要阻塞到read方法執(zhí)行完畢才能執(zhí)行。也就是說阻塞IO在同一時間只能處理一個channel的事件。
在多路復(fù)用的情況下,則是select方法是阻塞的,它會返回多個channel的事件,在SelectionKey的集合中處理多個channel事件,可以同時應(yīng)對多個channel事件
🔖 參考
UNIX 網(wǎng)絡(luò)編程 - 卷 I
5.3 零拷貝(從文件傳輸?shù)慕嵌葋砜?#xff09;
傳統(tǒng) IO 問題
傳統(tǒng)的 IO 將一個文件通過 socket 寫出
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
?
byte[] buf = new byte[(int)f.length()];
file.read(buf);
?
Socket socket = ...;
socket.getOutputStream().write(buf);
內(nèi)部工作流程是這樣的:
-
java 本身并不具備 IO 讀寫能力,因此 read 方法調(diào)用后,要從 java 程序的用戶態(tài)切換至內(nèi)核態(tài),去調(diào)用操作系統(tǒng)(Kernel)的讀能力,將數(shù)據(jù)讀入內(nèi)核緩沖區(qū)。這期間用戶線程阻塞,操作系統(tǒng)使用 DMA(Direct Memory Access)來實現(xiàn)文件讀,其間也不會使用 cpu
DMA 也可以理解為硬件單元,用來解放 cpu 完成文件 IO
-
從內(nèi)核態(tài)切換回用戶態(tài),將數(shù)據(jù)從內(nèi)核緩沖區(qū)讀入用戶緩沖區(qū)(即 byte[] buf),這期間 cpu 會參與拷貝,無法利用 DMA
-
調(diào)用 write 方法,這時將數(shù)據(jù)從用戶緩沖區(qū)(byte[] buf)寫入 socket 緩沖區(qū),cpu 會參與拷貝
-
接下來要向網(wǎng)卡寫數(shù)據(jù),這項能力 java 又不具備,因此又得從用戶態(tài)切換至內(nèi)核態(tài),調(diào)用操作系統(tǒng)的寫能力,使用 DMA 將 socket 緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會使用 cpu
可以看到中間環(huán)節(jié)較多,java 的 IO 實際不是物理設(shè)備級別的讀寫,而是緩存的復(fù)制,底層的真正讀寫是操作系統(tǒng)來完成的
-
用戶態(tài)與內(nèi)核態(tài)的切換發(fā)生了 3 次,這個操作比較重量級
-
數(shù)據(jù)拷貝了共 4 次
NIO 優(yōu)化
通過 DirectByteBuf
-
ByteBuffer.allocate(10) HeapByteBuffer 使用的還是 java 內(nèi)存
-
ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系統(tǒng)內(nèi)存
大部分步驟與優(yōu)化前相同,不再贅述。唯有一點:java 可以使用 DirectByteBuf 將堆外內(nèi)存映射到 jvm 內(nèi)存中來直接訪問使用
-
這塊內(nèi)存不受 jvm 垃圾回收的影響,因此內(nèi)存地址固定,有助于 IO 讀寫
-
java 中的 DirectByteBuf 對象僅維護(hù)了此內(nèi)存的虛引用,內(nèi)存回收分成兩步
-
DirectByteBuf 對象被垃圾回收,將虛引用加入引用隊列
-
通過專門線程訪問引用隊列,根據(jù)虛引用釋放堆外內(nèi)存
-
-
減少了一次數(shù)據(jù)拷貝,用戶態(tài)與內(nèi)核態(tài)的切換次數(shù)沒有減少
進(jìn)一步優(yōu)化(底層采用了 linux 2.1 后提供的 sendFile 方法),java 中對應(yīng)著兩個 channel 調(diào)用 transferTo/transferFrom 方法拷貝數(shù)據(jù)
-
java 調(diào)用 transferTo 方法后,要從 java 程序的用戶態(tài)切換至內(nèi)核態(tài),使用 DMA將數(shù)據(jù)讀入內(nèi)核緩沖區(qū),不會使用 cpu
-
數(shù)據(jù)從內(nèi)核緩沖區(qū)傳輸?shù)?socket 緩沖區(qū),cpu 會參與拷貝
-
最后使用 DMA 將 socket 緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會使用 cpu
可以看到
-
只發(fā)生了一次用戶態(tài)與內(nèi)核態(tài)的切換
-
數(shù)據(jù)拷貝了 3 次
進(jìn)一步優(yōu)化(linux 2.4)
-
java 調(diào)用 transferTo 方法后,要從 java 程序的用戶態(tài)切換至內(nèi)核態(tài),使用 DMA將數(shù)據(jù)讀入內(nèi)核緩沖區(qū),不會使用 cpu
-
只會將一些 offset(偏移量) 和 length(長度) 信息拷入 socket 緩沖區(qū),幾乎無消耗
-
使用 DMA 將 內(nèi)核緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會使用 cpu
整個過程僅只發(fā)生了一次用戶態(tài)與內(nèi)核態(tài)的切換,數(shù)據(jù)拷貝了 2 次。所謂的【零拷貝】,并不是真正無拷貝,而是在不會拷貝重復(fù)數(shù)據(jù)到 jvm 內(nèi)存中,零拷貝的優(yōu)點有
-
更少的用戶態(tài)與內(nèi)核態(tài)的切換
-
不利用 cpu 計算,減少 cpu 緩存?zhèn)喂蚕?/p>
-
零拷貝適合小文件傳輸
5.3 AIO
AIO 用來解決數(shù)據(jù)復(fù)制階段的阻塞問題
-
同步意味著,在進(jìn)行讀寫操作時,線程需要等待結(jié)果,還是相當(dāng)于閑置
-
異步意味著,在進(jìn)行讀寫操作時,線程不必等待結(jié)果,而是將來由操作系統(tǒng)來通過回調(diào)方式由另外的線程來獲得結(jié)果
異步模型需要底層操作系統(tǒng)(Kernel)提供支持
Windows 系統(tǒng)通過 IOCP 實現(xiàn)了真正的異步 IO
Linux 系統(tǒng)異步 IO 在 2.6 版本引入,但其底層實現(xiàn)還是用多路復(fù)用模擬了異步 IO,性能沒有優(yōu)勢
文件 AIO
先來看看 AsynchronousFileChannel
@Slf4j
public class AioDemo1 {public static void main(String[] args) throws IOException {try{AsynchronousFileChannel s = AsynchronousFileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);ByteBuffer buffer = ByteBuffer.allocate(2);log.debug("begin...");s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer attachment) {log.debug("read completed...{}", result);buffer.flip();debug(buffer);}
?@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {log.debug("read failed...");}});
?} catch (IOException e) {e.printStackTrace();}log.debug("do other things...");System.in.read();}
}
輸出
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin... 13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things... 13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 0d |a. | +--------+-------------------------------------------------+----------------+
可以看到
-
響應(yīng)文件讀取成功的是另一個線程 Thread-5
-
主線程并沒有 IO 操作阻塞
💡 守護(hù)線程
默認(rèn)文件 AIO 使用的線程都是守護(hù)線程,所以最后要執(zhí)行 System.in.read()
以避免守護(hù)線程意外結(jié)束
網(wǎng)絡(luò) AIO
public class AioServer {public static void main(String[] args) throws IOException {AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.accept(null, new AcceptHandler(ssc));System.in.read();}
?private static void closeChannel(AsynchronousSocketChannel sc) {try {System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());sc.close();} catch (IOException e) {e.printStackTrace();}}
?private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;
?public ReadHandler(AsynchronousSocketChannel sc) {this.sc = sc;}
?@Overridepublic void completed(Integer result, ByteBuffer attachment) {try {if (result == -1) {closeChannel(sc);return;}System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());attachment.flip();System.out.println(Charset.defaultCharset().decode(attachment));attachment.clear();// 處理完第一個 read 時,需要再次調(diào)用 read 方法來處理下一個 read 事件sc.read(attachment, attachment, this);} catch (IOException e) {e.printStackTrace();}}
?@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {closeChannel(sc);exc.printStackTrace();}}
?private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;
?private WriteHandler(AsynchronousSocketChannel sc) {this.sc = sc;}
?@Overridepublic void completed(Integer result, ByteBuffer attachment) {// 如果作為附件的 buffer 還有內(nèi)容,需要再次 write 寫出剩余內(nèi)容if (attachment.hasRemaining()) {sc.write(attachment);}}
?@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();closeChannel(sc);}}
?private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {private final AsynchronousServerSocketChannel ssc;
?public AcceptHandler(AsynchronousServerSocketChannel ssc) {this.ssc = ssc;}
?@Overridepublic void completed(AsynchronousSocketChannel sc, Object attachment) {try {System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}ByteBuffer buffer = ByteBuffer.allocate(16);// 讀事件由 ReadHandler 處理sc.read(buffer, buffer, new ReadHandler(sc));// 寫事件由 WriteHandler 處理sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));// 處理完第一個 accpet 時,需要再次調(diào)用 accept 方法來處理下一個 accept 事件ssc.accept(null, this);}
?@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}}
}
學(xué)習(xí)完NIO的基礎(chǔ)知識,接下來正式進(jìn)入Netty的學(xué)習(xí),請看這篇博客:Netty入門-CSDN博客