中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

一個網(wǎng)站做無限關(guān)鍵詞網(wǎng)站seo優(yōu)化怎么做

一個網(wǎng)站做無限關(guān)鍵詞,網(wǎng)站seo優(yōu)化怎么做,網(wǎng)站開發(fā)類投標(biāo)文件,新手學(xué)做網(wǎng)站手機(jī)在學(xué)習(xí)Netty之前先要學(xué)習(xí)一下NIO相關(guān)的知識,因為Netty是基于NIO搭建的一套網(wǎng)絡(luò)編程框架。 一. NIO 基礎(chǔ) non-blocking io 非阻塞 IO 1. 三大組件 1.1 Channel & Buffer channel 有一點類似于 stream,它就是讀寫數(shù)據(jù)的雙向通道,可以從…

在學(xué)習(xí)Netty之前先要學(xué)習(xí)一下NIO相關(guān)的知識,因為Netty是基于NIO搭建的一套網(wǎng)絡(luò)編程框架。

一. NIO 基礎(chǔ)

non-blocking io 非阻塞 IO

1. 三大組件

1.1 Channel & Buffer

channel 有一點類似于 stream,它就是讀寫數(shù)據(jù)的雙向通道,可以從 channel 將數(shù)據(jù)讀入 buffer,也可以將 buffer 的數(shù)據(jù)寫入 channel,而之前的 stream 要么是輸入,要么是輸出,channel 比 stream 更為底層

?

常見的 Channel 有

  • FileChannel

  • DatagramChannel

  • SocketChannel

  • ServerSocketChannel

buffer 則用來緩沖讀寫數(shù)據(jù),常見的 buffer 有

  • ByteBuffer

    • MappedByteBuffer

    • DirectByteBuffer

    • HeapByteBuffer

  • ShortBuffer

  • IntBuffer

  • LongBuffer

  • FloatBuffer

  • DoubleBuffer

  • CharBuffer

1.2 Selector

selector 單從字面意思不好理解,需要結(jié)合服務(wù)器的設(shè)計演化來理解它的用途

多線程版設(shè)計?

?? 多線程版缺點

  • 內(nèi)存占用高

  • 線程上下文切換成本高

  • 只適合連接數(shù)少的場景

線程池版設(shè)計

?? 線程池版缺點

  • 阻塞模式下,線程僅能處理一個 socket 連接

  • 僅適合短連接場景

selector 版設(shè)計

selector 的作用就是配合一個線程來管理多個 channel,獲取這些 channel 上發(fā)生的事件,這些 channel 工作在非阻塞模式下,不會讓線程吊死在一個 channel 上。適合連接數(shù)特別多,但流量低的場景(low traffic)

?

調(diào)用 selector 的 select() 會阻塞當(dāng)前線程直到 channel 發(fā)生了讀寫就緒事件,這些事件發(fā)生,select 方法就會返回這些事件交給 thread 來處理

2. ByteBuffer

有一普通文本文件 data.txt,內(nèi)容為

1234567890abcd

使用 FileChannel 來讀取文件內(nèi)容

@Slf4j
public class ChannelDemo1 {public static void main(String[] args) {try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) {FileChannel channel = file.getChannel();ByteBuffer buffer = ByteBuffer.allocate(10);do {// 向 buffer 寫入int len = channel.read(buffer);log.debug("讀到字節(jié)數(shù):{}", len);if (len == -1) {break;}// 切換 buffer 讀模式buffer.flip();while(buffer.hasRemaining()) {log.debug("{}", (char)buffer.get());}// 切換 buffer 寫模式buffer.clear();} while (true);} catch (IOException e) {e.printStackTrace();}}
}

輸出

10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):10
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):-1

2.1 ByteBuffer 正確使用姿勢

  1. 向 buffer 寫入數(shù)據(jù),例如調(diào)用 channel.read(buffer)

  2. 調(diào)用 flip() 切換至讀模式

  3. 從 buffer 讀取數(shù)據(jù),例如調(diào)用 buffer.get()

  4. 調(diào)用 clear() 或 compact() 切換至寫模式

  5. 重復(fù) 1~4 步驟

2.2 ByteBuffer 結(jié)構(gòu)

ByteBuffer 有以下重要屬性

  • capacity

  • position(讀寫指針)

  • limit

一開始

寫模式下,position 是寫入位置,limit 等于容量,下圖表示寫入了 4 個字節(jié)后的狀態(tài)

flip 動作發(fā)生后,position 切換為讀取位置,limit 切換為讀取限制

讀取 4 個字節(jié)后,狀態(tài)

clear 動作發(fā)生后,狀態(tài)

compact 方法,是把未讀完的部分向前壓縮,然后切換至寫模式

💡 調(diào)試工具類

public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];
?static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}
?int i;
?// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append(" ? ");}HEXPADDING[i] = buf.toString();}
?// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}
?// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}
?// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}
?// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}
?/*** 打印所有內(nèi)容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}
?/*** 打印可讀取內(nèi)容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}
?private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append(" ? ? ? ? +-------------------------------------------------+" +NEWLINE + " ? ? ? ? |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +NEWLINE + "+--------+-------------------------------------------------+----------------+");
?final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;
?// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;
?// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);
?// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");
?// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}
?// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
?// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");
?// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}
?dump.append(NEWLINE +"+--------+-------------------------------------------------+----------------+");}
?private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}
?public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);}
}

2.3 ByteBuffer 常見方法

分配空間

可以使用 allocate 方法為 ByteBuffer 分配空間,其它 buffer 類也有該方法

Bytebuffer buf = ByteBuffer.allocate(16);

class java.nio.HeapByteBuffer - java 堆內(nèi)存,讀寫效率較低,受到 GC 的影響
class java.nio.DirectByteBuffer - 直接內(nèi)存,讀寫效率高(少一次拷貝),不會受 GC 影響,分配的效率低

向 buffer 寫入數(shù)據(jù)

有兩種辦法

  • 調(diào)用 channel 的 read 方法

  • 調(diào)用 buffer 自己的 put 方法

int readBytes = channel.read(buf);

buf.put((byte)127);

從 buffer 讀取數(shù)據(jù)

同樣有兩種辦法

  • 調(diào)用 channel 的 write 方法

  • 調(diào)用 buffer 自己的 get 方法

int writeBytes = channel.write(buf);

byte b = buf.get();

get 方法會讓 position 讀指針向后走,如果想重復(fù)讀取數(shù)據(jù)

  • 可以調(diào)用 rewind 方法將 position 重新置為 0

  • 或者調(diào)用 get(int i) 方法獲取索引 i 的內(nèi)容,它不會移動讀指針

mark 和 reset

mark 是在讀取時,做一個標(biāo)記,即使 position 改變,只要調(diào)用 reset 就能回到 mark 的位置

注意

rewind 和 flip 都會清除 mark 位置

字符串與 ByteBuffer 互轉(zhuǎn)
public class TestByteBufferString {public static void main(String[] args) {// 1. 字符串轉(zhuǎn)為 ByteBufferByteBuffer buffer1 = ByteBuffer.allocate(16);buffer1.put("hello".getBytes());debugAll(buffer1);// 2. CharsetByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");debugAll(buffer2);// 3. wrapByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());debugAll(buffer3);// 4. 轉(zhuǎn)為字符串String str1 = StandardCharsets.UTF_8.decode(buffer2).toString();System.out.println(str1);buffer1.flip();String str2 = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(str2);}
}

輸出

 

?? Buffer 的線程安全

Buffer 是非線程安全的

2.4 Scattering Reads

分散讀取,有一個文本文件 3parts.txt

onetwothree

使用如下方式讀取,可以將數(shù)據(jù)填充至多個 buffer

public class TestScatteringReads {public static void main(String[] args) {try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {ByteBuffer b1 = ByteBuffer.allocate(3);ByteBuffer b2 = ByteBuffer.allocate(3);ByteBuffer b3 = ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{b1, b2, b3});b1.flip();b2.flip();b3.flip();debugAll(b1);debugAll(b2);debugAll(b3);} catch (IOException e) {}}
}

2.5 Gathering Writes

使用如下方式寫入,可以將多個 buffer 的數(shù)據(jù)填充至 channel

public class TestGatheringWrites {public static void main(String[] args) {ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {channel.write(new ByteBuffer[]{b1, b2, b3});} catch (IOException e) {}}
}

2.6 練習(xí)

網(wǎng)絡(luò)上有多條數(shù)據(jù)發(fā)送給服務(wù)端,數(shù)據(jù)之間使用 \n 進(jìn)行分隔但由于某種原因這些數(shù)據(jù)在接收時,被進(jìn)行了重新組合,例如原始數(shù)據(jù)有3條為

  • Hello,world\n

  • I'm zhangsan\n

  • How are you?\n

變成了下面的兩個 byteBuffer (黏包,半包)

  • Hello,world\nI'm zhangsan\nHo

  • w are you?\n

現(xiàn)在要求你編寫程序,將錯亂的數(shù)據(jù)恢復(fù)成原始的按 \n 分隔的數(shù)據(jù)

public static void main(String[] args) {ByteBuffer source = ByteBuffer.allocate(32);// ? ? ? ? ? ? ? ? ? ? 11 ? ? ? ? ?  24source.put("Hello,world\nI'm zhangsan\nHo".getBytes());split(source);
?source.put("w are you?\nhaha!\n".getBytes());split(source);
}
?
private static void split(ByteBuffer source) {source.flip();int oldLimit = source.limit();for (int i = 0; i < oldLimit; i++) {if (source.get(i) == '\n') {System.out.println(i);ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());// 0 ~ limitsource.limit(i + 1);target.put(source); // 從source 讀,向 target 寫debugAll(target);source.limit(oldLimit);}}source.compact();
}

3. 文件編程

3.1 FileChannel

?? FileChannel 工作模式

FileChannel 只能工作在阻塞模式

獲取

不能直接打開 FileChannel,必須通過 FileInputStream、FileOutputStream 或者 RandomAccessFile 來獲取 FileChannel,它們都有 getChannel 方法

  • 通過 FileInputStream 獲取的 channel 只能讀

  • 通過 FileOutputStream 獲取的 channel 只能寫

  • 通過 RandomAccessFile 是否能讀寫根據(jù)構(gòu)造 RandomAccessFile 時的讀寫模式?jīng)Q定

讀取

會從 channel 讀取數(shù)據(jù)填充 ByteBuffer,返回值表示讀到了多少字節(jié),-1 表示到達(dá)了文件的末尾

int readBytes = channel.read(buffer);
寫入

寫入的正確姿勢如下, SocketChannel

ByteBuffer buffer = ...;
buffer.put(...); // 存入數(shù)據(jù)
buffer.flip(); ? // 切換讀模式
?
while(buffer.hasRemaining()) {channel.write(buffer);
}

在 while 中調(diào)用 channel.write 是因為 write 方法并不能保證一次將 buffer 中的內(nèi)容全部寫入 channel

關(guān)閉

channel 必須關(guān)閉,不過調(diào)用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法會間接地調(diào)用 channel 的 close 方法

位置

獲取當(dāng)前位置

long pos = channel.position();

設(shè)置當(dāng)前位置

long newPos = ...;
channel.position(newPos);

設(shè)置當(dāng)前位置時,如果設(shè)置為文件的末尾

  • 這時讀取會返回 -1

  • 這時寫入,會追加內(nèi)容,但要注意如果 position 超過了文件末尾,再寫入時在新內(nèi)容和原末尾之間會有空洞(00)

大小

使用 size 方法獲取文件的大小

強(qiáng)制寫入

操作系統(tǒng)出于性能的考慮,會將數(shù)據(jù)緩存,不是立刻寫入磁盤??梢哉{(diào)用 force(true) 方法將文件內(nèi)容和元數(shù)據(jù)(文件的權(quán)限等信息)立刻寫入磁盤

3.2 兩個 Channel 傳輸數(shù)據(jù)

String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();FileChannel to = new FileOutputStream(TO).getChannel();) {from.transferTo(0, from.size(), to);
} catch (IOException e) {e.printStackTrace();
}
long end = System.nanoTime();
System.out.println("transferTo 用時:" + (end - start) / 1000_000.0);

輸出

transferTo 用時:8.2011

超過 2g 大小的文件傳輸(注意channel一次性最大的數(shù)據(jù)傳輸量為channel)

public class TestFileChannelTransferTo {public static void main(String[] args) {try (FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("to.txt").getChannel();) {// 效率高,底層會利用操作系統(tǒng)的零拷貝進(jìn)行優(yōu)化long size = from.size();// left 變量代表還剩余多少字節(jié)for (long left = size; left > 0; ) {System.out.println("position:" + (size - left) + " left:" + left);left -= from.transferTo((size - left), left, to);}} catch (IOException e) {e.printStackTrace();}}
}

實際傳輸一個超大文件

position:0 left:7769948160
position:2147483647 left:5622464513
position:4294967294 left:3474980866
position:6442450941 left:1327497219

3.3 Path

jdk7 引入了 Path 和 Paths 類

  • Path 用來表示文件路徑

  • Paths 是工具類,用來獲取 Path 實例

Path source = Paths.get("1.txt"); // 相對路徑 使用 user.dir 環(huán)境變量來定位 1.txt
?
Path source = Paths.get("d:\\1.txt"); // 絕對路徑 代表了  d:\1.txt
?
Path source = Paths.get("d:/1.txt"); // 絕對路徑 同樣代表了  d:\1.txt
?
Path projects = Paths.get("d:\\data", "projects"); // 代表了  d:\data\projects
  • . 代表了當(dāng)前路徑

  • .. 代表了上一級路徑

例如目錄結(jié)構(gòu)如下

d:|- data|- projects|- a|- b

代碼

Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路徑

會輸出

d:\data\projects\a\..\b
d:\data\projects\b

3.4 Files

檢查文件是否存在

Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));

創(chuàng)建一級目錄

Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
  • 如果目錄已存在,會拋異常 FileAlreadyExistsException

  • 不能一次創(chuàng)建多級目錄,否則會拋異常 NoSuchFileException

創(chuàng)建多級目錄用

Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

拷貝文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");
?
Files.copy(source, target);
  • 如果文件已存在,會拋異常 FileAlreadyExistsException

如果希望用 source 覆蓋掉 target,需要用 StandardCopyOption 來控制

Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);

移動文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");
?
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE 保證文件移動的原子性

刪除文件

Path target = Paths.get("helloword/target.txt");
?
Files.delete(target);
  • 如果文件不存在,會拋異常 NoSuchFileException

刪除目錄

Path target = Paths.get("helloword/d1");
?
Files.delete(target);
  • 如果目錄還有內(nèi)容,會拋異常 DirectoryNotEmptyException

遍歷目錄文件

public static void main(String[] args) throws IOException {Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");AtomicInteger dirCount = new AtomicInteger();AtomicInteger fileCount = new AtomicInteger();Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println(dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}
?@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);fileCount.incrementAndGet();return super.visitFile(file, attrs);}});System.out.println(dirCount); // 133System.out.println(fileCount); // 1479
}

統(tǒng)計 jar 的數(shù)目

Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {if (file.toFile().getName().endsWith(".jar")) {fileCount.incrementAndGet();}return super.visitFile(file, attrs);}
});
System.out.println(fileCount); // 724

刪除多級目錄

Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);return super.visitFile(file, attrs);}
?@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {Files.delete(dir);return super.postVisitDirectory(dir, exc);}
});

?? 刪除很危險

刪除是危險操作,確保要遞歸刪除的文件夾沒有重要內(nèi)容

拷貝多級目錄

long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";
?
Files.walk(Paths.get(source)).forEach(path -> {try {String targetName = path.toString().replace(source, target);// 是目錄if (Files.isDirectory(path)) {Files.createDirectory(Paths.get(targetName));}// 是普通文件else if (Files.isRegularFile(path)) {Files.copy(path, Paths.get(targetName));}} catch (IOException e) {e.printStackTrace();}
});
long end = System.currentTimeMillis();
System.out.println(end - start);

4. 網(wǎng)絡(luò)編程

4.1 非阻塞 vs 阻塞

阻塞

  • 阻塞模式下,相關(guān)方法都會導(dǎo)致線程暫停

    • ServerSocketChannel.accept 會在沒有連接建立時讓線程暫停

    • SocketChannel.read 會在沒有數(shù)據(jù)可讀時讓線程暫停

    • 阻塞的表現(xiàn)其實就是線程暫停了,暫停期間不會占用 cpu,但線程相當(dāng)于閑置

  • 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持

  • 但多線程下,有新的問題,體現(xiàn)在以下方面

    • 32 位 jvm 一個線程 320k,64 位 jvm 一個線程 1024k,如果連接數(shù)過多,必然導(dǎo)致 OOM,并且線程太多,反而會因為頻繁上下文切換導(dǎo)致性能降低

    • 可以采用線程池技術(shù)來減少線程數(shù)和線程上下文切換,但治標(biāo)不治本,如果有很多連接建立,但長時間 inactive,會阻塞線程池中所有線程,因此不適合長連接,只適合短連接

服務(wù)器端

// 使用 nio 來理解阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
?
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
?
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信log.debug("connecting...");SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運行l(wèi)og.debug("connected... {}", sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接收客戶端發(fā)送的數(shù)據(jù)log.debug("before read... {}", channel);channel.read(buffer); // 阻塞方法,線程停止運行buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}
}

客戶端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");

非阻塞

  • 非阻塞模式下,相關(guān)方法都會不會讓線程暫停

    • 在 ServerSocketChannel.accept 在沒有連接建立時,會返回 null,繼續(xù)運行

    • SocketChannel.read 在沒有數(shù)據(jù)可讀時,會返回 0,但線程不必阻塞,可以去執(zhí)行其它 SocketChannel 的 read 或是去執(zhí)行 ServerSocketChannel.accept

    • 寫數(shù)據(jù)時,線程只是等待數(shù)據(jù)寫入 Channel 即可,無需等 Channel 通過網(wǎng)絡(luò)把數(shù)據(jù)發(fā)送出去

  • 但非阻塞模式下,即使沒有連接建立,和可讀數(shù)據(jù),線程仍然在不斷運行,白白浪費了 cpu

  • 數(shù)據(jù)復(fù)制過程中,線程實際還是阻塞的(AIO 改進(jìn)的地方)

服務(wù)器端,客戶端代碼不變

// 使用 nio 來理解非阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信SocketChannel sc = ssc.accept(); // 非阻塞,線程還會繼續(xù)運行,如果沒有連接建立,但sc是nullif (sc != null) {log.debug("connected... {}", sc);sc.configureBlocking(false); // 非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {// 5. 接收客戶端發(fā)送的數(shù)據(jù)int read = channel.read(buffer);// 非阻塞,線程仍然會繼續(xù)運行,如果沒有讀到數(shù)據(jù),read 返回 0if (read > 0) {buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}}
}

多路復(fù)用

單線程可以配合 Selector 完成對多個 Channel 可讀寫事件的監(jiān)控,這稱之為多路復(fù)用

  • 多路復(fù)用僅針對網(wǎng)絡(luò) IO、普通文件 IO 沒法利用多路復(fù)用

  • 如果不用 Selector 的非阻塞模式,線程大部分時間都在做無用功,而 Selector 能夠保證

    • 有可連接事件時才去連接

    • 有可讀事件才去讀取

    • 有可寫事件才去寫入

      • 限于網(wǎng)絡(luò)傳輸能力,Channel 未必時時可寫,一旦 Channel 可寫,會觸發(fā) Selector 的可寫事件

4.2 Selector

?

好處

  • 一個線程配合 selector 就可以監(jiān)控多個 channel 的事件,事件發(fā)生線程才去處理。避免非阻塞模式下所做無用功

  • 讓這個線程能夠被充分利用

  • 節(jié)約了線程的數(shù)量

  • 減少了線程上下文切換

創(chuàng)建
Selector selector = Selector.open();

綁定 Channel 事件

也稱之為注冊事件,綁定的事件 selector 才會關(guān)心

channel.configureBlocking(false);//設(shè)置為非阻塞模式
SelectionKey key = channel.register(selector, 綁定事件);
  • channel 必須工作在非阻塞模式

  • FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用

  • 綁定的事件類型可以有

    • connect - 客戶端連接成功時觸發(fā)

    • accept - 服務(wù)器端成功接受連接時觸發(fā)

    • read - 數(shù)據(jù)可讀入時觸發(fā),有因為接收能力弱,數(shù)據(jù)暫不能讀入的情況

    • write - 數(shù)據(jù)可寫出時觸發(fā),有因為發(fā)送能力弱,數(shù)據(jù)暫不能寫出的情況

監(jiān)聽 Channel 事件

可以通過下面三種方法來監(jiān)聽是否有事件發(fā)生,方法的返回值代表有多少 channel 發(fā)生了事件

方法1,阻塞直到綁定事件發(fā)生

int count = selector.select();

方法2,阻塞直到綁定事件發(fā)生,或是超時(時間單位為 ms)

int count = selector.select(long timeout);

方法3,不會阻塞,也就是不管有沒有事件,立刻返回,自己根據(jù)返回值檢查是否有事件

int count = selector.selectNow();

💡 select 何時不阻塞
  • 事件發(fā)生時

    • 客戶端發(fā)起連接請求,會觸發(fā) accept 事件

    • 客戶端發(fā)送數(shù)據(jù)過來,客戶端正常、異常關(guān)閉時,都會觸發(fā) read 事件,另外如果發(fā)送的數(shù)據(jù)大于 buffer 緩沖區(qū),會觸發(fā)多次讀取事件

    • channel 可寫,會觸發(fā) write 事件

    • 在 linux 下 nio bug 發(fā)生時

  • 調(diào)用 selector.wakeup()

  • 調(diào)用 selector.close()

  • selector 所在線程 interrupt

4.3 處理 accept 事件

客戶端代碼為

public class Client {public static void main(String[] args) {try (Socket socket = new Socket("localhost", 8080)) {System.out.println(socket);socket.getOutputStream().write("world".getBytes());System.in.read();} catch (IOException e) {e.printStackTrace();}}
}

服務(wù)器端代碼為

@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);
?while (true) {int count = selector.select();
// ? ? ? ? ? ? ?  int count = selector.selectNow();log.debug("select count: {}", count);
// ? ? ? ? ? ? ?  if(count <= 0) {
// ? ? ? ? ? ? ? ? ?  continue;
// ? ? ? ? ? ? ?  }
?// 獲取所有事件Set<SelectionKey> keys = selector.selectedKeys();
?// 遍歷所有事件,逐一處理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判斷事件類型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必須處理SocketChannel sc = c.accept();log.debug("{}", sc);}// 處理完畢,必須將事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}

💡 事件發(fā)生后能否不處理

事件發(fā)生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會觸發(fā),這是因為 nio 底層使用的是水平觸發(fā)

4.4 處理 read 事件

@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);
?while (true) {int count = selector.select();
// ? ? ? ? ? ? ?  int count = selector.selectNow();log.debug("select count: {}", count);
// ? ? ? ? ? ? ?  if(count <= 0) {
// ? ? ? ? ? ? ? ? ?  continue;
// ? ? ? ? ? ? ?  }
?// 獲取所有事件Set<SelectionKey> keys = selector.selectedKeys();
?// 遍歷所有事件,逐一處理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判斷事件類型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必須處理SocketChannel sc = c.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);log.debug("連接已建立: {}", sc);} else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);if(read == -1) {key.cancel();sc.close();} else {buffer.flip();debug(buffer);}}// 處理完畢,必須將事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}

開啟兩個客戶端,修改一下發(fā)送文字,輸出

sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64                                  |world           |
+--------+-------------------------------------------------+----------------+

💡 為何要 iter.remove()

因為 select 在事件發(fā)生后,就會將相關(guān)的 key 放入 selectedKeys 集合,但不會在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如

  • 第一次觸發(fā)了 ssckey 上的 accept 事件,沒有移除 ssckey

  • 第二次觸發(fā)了 sckey 上的 read 事件,但這時 selectedKeys 中還有上次的 ssckey ,在處理時因為沒有真正的 serverSocket 連上了,就會導(dǎo)致空指針異常

💡 cancel 的作用

cancel 會取消注冊在 selector 上的 channel,并從 keys 集合中刪除 key 后續(xù)不會再監(jiān)聽事件

?? 不處理邊界的問題

以前有同學(xué)寫過這樣的代碼,思考注釋中兩個問題,以 bio 為例,其實 nio 道理是一樣的

public class Server {public static void main(String[] args) throws IOException {ServerSocket ss=new ServerSocket(9000);while (true) {Socket s = ss.accept();InputStream in = s.getInputStream();// 這里這么寫,有沒有問題byte[] arr = new byte[4];while(true) {int read = in.read(arr);// 這里這么寫,有沒有問題if(read == -1) {break;}System.out.println(new String(arr, 0, read));}}}
}

客戶端

public class Client {public static void main(String[] args) throws IOException {Socket max = new Socket("localhost", 9000);OutputStream out = max.getOutputStream();out.write("hello".getBytes());out.write("world".getBytes());out.write("你好".getBytes());max.close();}
}

輸出

hell
owor
ld�
�好

為什么?

處理消息的邊界

  • 一種思路是固定消息長度,數(shù)據(jù)包大小一樣,服務(wù)器按預(yù)定長度讀取,缺點是浪費帶寬

  • 另一種思路是按分隔符拆分,缺點是效率低

  • TLV 格式,即 Type 類型、Length 長度、Value 數(shù)據(jù),類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點是 buffer 需要提前分配,如果內(nèi)容過大,則影響 server 吞吐量

    • Http 1.1 是 TLV 格式

    • Http 2.0 是 LTV 格式

?


?

服務(wù)器端

private static void split(ByteBuffer source) {source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一條完整消息if (source.get(i) == '\n') {int length = i + 1 - source.position();// 把這條完整消息存入新的 ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 從 source 讀,向 target 寫for (int j = 0; j < length; j++) {target.put(source.get());}debugAll(target);}}source.compact(); // 0123456789abcdef  position 16 limit 16
}
?
public static void main(String[] args) throws IOException {// 1. 創(chuàng)建 selector, 管理多個 channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立 selector 和 channel 的聯(lián)系(注冊)// SelectionKey 就是將來事件發(fā)生后,通過它可以知道事件和哪個channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);// key 只關(guān)注 accept 事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("sscKey:{}", sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {// 3. select 方法, 沒有事件發(fā)生,線程阻塞,有事件,線程才會恢復(fù)運行// select 在事件未處理時,它不會阻塞, 事件發(fā)生后要么處理,要么取消,不能置之不理selector.select();// 4. 處理事件, selectedKeys 內(nèi)部包含了所有發(fā)生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 處理key 時,要從 selectedKeys 集合中刪除,否則下次處理就會有問題iter.remove();log.debug("key: {}", key);// 5. 區(qū)分事件類型if (key.isAcceptable()) { // 如果是 acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); // attachment// 將一個 byteBuffer 作為附件關(guān)聯(lián)到 selectionKey 上SelectionKey scKey = sc.register(selector, 0, buffer);scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);log.debug("scKey:{}", scKey);} else if (key.isReadable()) { // 如果是 readtry {SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發(fā)事件的channel// 獲取 selectionKey 上關(guān)聯(lián)的附件ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常斷開,read 的方法的返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 需要擴(kuò)容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer); // 0123456789abcdef3333\nkey.attach(newBuffer);}}
?} catch (IOException e) {e.printStackTrace();key.cancel(); ?// 因為客戶端斷開了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)}}}}
}

客戶端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

ByteBuffer 大小分配

  • 每個 channel 都需要記錄可能被切分的消息,因為 ByteBuffer 不能被多個 channel 共同使用,因此需要為每個 channel 維護(hù)一個獨立的 ByteBuffer

  • ByteBuffer 不能太大,比如一個 ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內(nèi)存,因此需要設(shè)計大小可變的 ByteBuffer

    • 一種思路是首先分配一個較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點是消息連續(xù)容易處理,缺點是數(shù)據(jù)拷貝耗費性能,參考實現(xiàn) Java Resizable Array

    • 另一種思路是用多個數(shù)組組成 buffer,一個數(shù)組不夠,把多出來的內(nèi)容寫入新的數(shù)組,與前面的區(qū)別是消息存儲不連續(xù)解析復(fù)雜,優(yōu)點是避免了拷貝引起的性能損耗

4.5 處理 write 事件

一次無法寫完例子

  • 非阻塞模式下,無法保證把 buffer 中所有數(shù)據(jù)都寫入 channel,因此需要追蹤 write 方法的返回值(代表實際寫入字節(jié)數(shù))

  • 用 selector 監(jiān)聽所有 channel 的可寫事件,每個 channel 都需要一個 key 來跟蹤 buffer,但這樣又會導(dǎo)致占用內(nèi)存過多,就有兩階段策略

    • 當(dāng)消息處理器第一次寫入消息時,才將 channel 注冊到 selector 上

    • selector 檢查 channel 上的可寫事件,如果所有的數(shù)據(jù)寫完了,就取消 channel 的注冊

    • 如果不取消,會每次可寫均會觸發(fā) write 事件

public class WriteServer {
?public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8080));
?Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);
?while(true) {selector.select();
?Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);// 1. 向客戶端發(fā)送內(nèi)容StringBuilder sb = new StringBuilder();for (int i = 0; i < 3000000; i++) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());int write = sc.write(buffer);// 3. write 表示實際寫了多少字節(jié)System.out.println("實際寫入字節(jié):" + write);// 4. 如果有剩余未讀字節(jié),才需要關(guān)注寫事件if (buffer.hasRemaining()) {// read 1  write 4// 在原有關(guān)注事件的基礎(chǔ)上,多關(guān)注 寫事件sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);// 把 buffer 作為附件加入 sckeysckey.attach(buffer);}} else if (key.isWritable()) {ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel) key.channel();int write = sc.write(buffer);System.out.println("實際寫入字節(jié):" + write);if (!buffer.hasRemaining()) { // 寫完了key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);key.attach(null);}}}}}
}

客戶端

public class WriteClient {public static void main(String[] args) throws IOException {Selector selector = Selector.open();SocketChannel sc = SocketChannel.open();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);sc.connect(new InetSocketAddress("localhost", 8080));int count = 0;while (true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isConnectable()) {System.out.println(sc.finishConnect());} else if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);count += sc.read(buffer);buffer.clear();System.out.println(count);}}}}
}

💡 write 為何要取消

只要向 channel 發(fā)送數(shù)據(jù)時,socket 緩沖可寫,這個事件會頻繁觸發(fā),因此應(yīng)當(dāng)只在 socket 緩沖區(qū)寫不下時再關(guān)注可寫事件,數(shù)據(jù)寫完之后再取消關(guān)注

4.6 更進(jìn)一步

💡 利用多線程優(yōu)化

現(xiàn)在都是多核 cpu,設(shè)計時要充分考慮別讓 cpu 的力量被白白浪費

前面的代碼只有一個選擇器,沒有充分利用多核 cpu,如何改進(jìn)呢?

分兩組選擇器

  • 單線程配一個選擇器,專門處理 accept 事件,只關(guān)注連接事件

  • 創(chuàng)建 cpu 核心數(shù)的線程,每個線程配一個選擇器,輪流處理 read 事件和write事件

服務(wù)器端代碼:

package com.kjz.nio.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;import static com.kjz.nio.c2.ByteBufferUtil.debugAll;@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));// 1. 創(chuàng)建固定數(shù)量的 worker 并初始化Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}AtomicInteger index = new AtomicInteger();while(true) {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 2. 關(guān)聯(lián) selectorlog.debug("before register...{}", sc.getRemoteAddress());// round robin 輪詢workers[index.getAndIncrement() % workers.length].register(sc); // boss 調(diào)用 初始化 selector , 啟動 worker-0log.debug("after register...{}", sc.getRemoteAddress());}}}}//使用另外一個線程創(chuàng)建專門監(jiān)聽讀寫事件的Selectorstatic class Worker implements Runnable{private Thread thread;private Selector selector;private String name;private volatile boolean start = false; // 還未初始化//隊列用于在線程之間傳遞數(shù)據(jù)private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public Worker(String name) {this.name = name;}// 初始化線程和selectorpublic void register(SocketChannel sc) throws IOException {if(!start) {selector = Selector.open();thread = new Thread(this, name);thread.start();start = true;}//向隊列添加了任務(wù),但是任務(wù)中的代碼并沒有立即執(zhí)行,要保證任務(wù)中的代碼在同一線程下先執(zhí)行//selector.select()再執(zhí)行 sc.register(selector,SelectionKey.OP_READ,null)//以此來保證selector.select()和  sc.register(selector,SelectionKey.OP_READ,null)不會相互阻塞
//            queue.add(()->{
//                try {
//                    sc.register(selector,SelectionKey.OP_READ,null);
//                } catch (ClosedChannelException e) {
//                    e.printStackTrace();
//                }
//            });// 喚醒 select 方法 ,防止下面的run方法中 selector.select()在初始化注冊讀事件時阻塞
//            selector.wakeup();
//            sc.register(selector, SelectionKey.OP_READ, null); // bossselector.wakeup();sc.register(selector,SelectionKey.OP_READ,null);}@Overridepublic void run() {while(true) {try {//監(jiān)聽事件selector.select(); // worker-0  阻塞,此時register方法就不能執(zhí)行了
//                    Runnable task = queue.poll();
//                    if (task != null) {
//                        //執(zhí)行任務(wù)隊列中的方法
//                        task.run();
//                    }Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}
💡 如何拿到 cpu 個數(shù)
  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因為容器不是物理隔離的,會拿到物理 cpu 個數(shù),而不是容器申請時的個數(shù)

  • 這個問題直到 jdk 10 才修復(fù),使用 jvm 參數(shù) UseContainerSupport 配置, 默認(rèn)開啟

4.7 UDP

  • UDP 是無連接的,client 發(fā)送數(shù)據(jù)不會管 server 是否開啟

  • server 這邊的 receive 方法會將接收到的數(shù)據(jù)存入 byte buffer,但如果數(shù)據(jù)報文超過 buffer 大小,多出來的數(shù)據(jù)會被默默拋棄

首先啟動服務(wù)器端

public class UdpServer {public static void main(String[] args) {try (DatagramChannel channel = DatagramChannel.open()) {channel.socket().bind(new InetSocketAddress(9999));System.out.println("waiting...");ByteBuffer buffer = ByteBuffer.allocate(32);channel.receive(buffer);buffer.flip();debug(buffer);} catch (IOException e) {e.printStackTrace();}}
}

輸出

waiting...

運行客戶端

public class UdpClient {public static void main(String[] args) {try (DatagramChannel channel = DatagramChannel.open()) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");InetSocketAddress address = new InetSocketAddress("localhost", 9999);channel.send(buffer, address);} catch (Exception e) {e.printStackTrace();}}
}

接下來服務(wù)器端輸出

         +-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+

5. NIO vs BIO

5.1 stream vs channel

  • stream 不會自動緩沖數(shù)據(jù),channel 會利用系統(tǒng)提供的發(fā)送緩沖區(qū)、接收緩沖區(qū)(更為底層

  • stream 僅支持阻塞 API,channel 同時支持阻塞、非阻塞 API,網(wǎng)絡(luò) channel 可配合 selector 實現(xiàn)多路復(fù)用

  • 二者均為全雙工,即讀寫可以同時進(jìn)行

5.2 IO 模型

同步阻塞、同步非阻塞、同步多路復(fù)用、異步阻塞(沒有此情況)、異步非阻塞

  • 同步:線程自己去獲取結(jié)果(一個線程)

  • 異步:線程自己不去獲取結(jié)果,而是由其它線程送結(jié)果(至少兩個線程)

當(dāng)調(diào)用一次 channel.read 或 stream.read 后,會切換至操作系統(tǒng)內(nèi)核態(tài)來完成真正數(shù)據(jù)讀取,而讀取又分為兩個階段,分別為:

  • 等待數(shù)據(jù)階段

  • 復(fù)制數(shù)據(jù)階段

8a2089a9d694487694fbd86e83a8573e.png

  • 阻塞 IO

8c3242cb357c424481cb0b373966d1e8.png

用戶線程在讀取數(shù)據(jù)時被阻塞,在阻塞過程中,用戶線程無法干其他的事

  • 非阻塞 IO

458f1211876f4d318eccef0104c7197d.png

向內(nèi)核空間讀取數(shù)據(jù)時如果沒有數(shù)據(jù)就會立即返回,用戶線程中的讀方法被while(true)包裹,用戶線程會不斷向內(nèi)核空間讀取,一旦有數(shù)據(jù)就會進(jìn)入到復(fù)制數(shù)據(jù),在等待數(shù)據(jù)的過程中是非阻塞的,但是復(fù)制數(shù)據(jù)的過程是非阻塞的。

  • 多路復(fù)用

400bb7923eda4b8ca6c6c6ee3932bd2d.png

  • 信號驅(qū)動

  • 異步 IO

43720b4f57f045c29efdd9dd8ac7c4aa.png

同步:線程自己去獲取結(jié)果(一個線程親力親為

異步:線程自己不去獲取結(jié)果,而是由其他線程送結(jié)果(至少兩個線程

異步情況下線程一定是非阻塞

  • 阻塞 IO vs 多路復(fù)用

e4fe955b96014b9a8af6c121030317dc.png

在阻塞IO下如果read阻塞在channel1的讀取數(shù)據(jù)中,那么此時另外一個channel2如果過來要建立連接accept,那么這個accept方法就要阻塞到read方法執(zhí)行完畢才能執(zhí)行。也就是說阻塞IO在同一時間只能處理一個channel的事件。

09973d3312c34de48242a7ca8835c6ed.png

在多路復(fù)用的情況下,則是select方法是阻塞的,它會返回多個channel的事件,在SelectionKey的集合中處理多個channel事件,可以同時應(yīng)對多個channel事件

🔖 參考

UNIX 網(wǎng)絡(luò)編程 - 卷 I

5.3 零拷貝(從文件傳輸?shù)慕嵌葋砜?#xff09;

傳統(tǒng) IO 問題

傳統(tǒng)的 IO 將一個文件通過 socket 寫出

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
?
byte[] buf = new byte[(int)f.length()];
file.read(buf);
?
Socket socket = ...;
socket.getOutputStream().write(buf);

內(nèi)部工作流程是這樣的:

e23789830fc84ad3aa45c321c1b07f73.png

  1. java 本身并不具備 IO 讀寫能力,因此 read 方法調(diào)用后,要從 java 程序的用戶態(tài)切換至內(nèi)核態(tài),去調(diào)用操作系統(tǒng)(Kernel)的讀能力,將數(shù)據(jù)讀入內(nèi)核緩沖區(qū)。這期間用戶線程阻塞,操作系統(tǒng)使用 DMA(Direct Memory Access)來實現(xiàn)文件讀,其間也不會使用 cpu

    DMA 也可以理解為硬件單元,用來解放 cpu 完成文件 IO

  2. 內(nèi)核態(tài)切換回用戶態(tài),將數(shù)據(jù)從內(nèi)核緩沖區(qū)讀入用戶緩沖區(qū)(即 byte[] buf),這期間 cpu 會參與拷貝,無法利用 DMA

  3. 調(diào)用 write 方法,這時將數(shù)據(jù)從用戶緩沖區(qū)(byte[] buf)寫入 socket 緩沖區(qū),cpu 會參與拷貝

  4. 接下來要向網(wǎng)卡寫數(shù)據(jù),這項能力 java 又不具備,因此又得從用戶態(tài)切換至內(nèi)核態(tài),調(diào)用操作系統(tǒng)的寫能力,使用 DMA 將 socket 緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會使用 cpu

可以看到中間環(huán)節(jié)較多,java 的 IO 實際不是物理設(shè)備級別的讀寫,而是緩存的復(fù)制,底層的真正讀寫是操作系統(tǒng)來完成的

  • 用戶態(tài)與內(nèi)核態(tài)的切換發(fā)生了 3 次,這個操作比較重量級

  • 數(shù)據(jù)拷貝了共 4 次

NIO 優(yōu)化

通過 DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的還是 java 內(nèi)存

  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系統(tǒng)內(nèi)存

58d3756eaff3492fa4f607d9e162e193.png

大部分步驟與優(yōu)化前相同,不再贅述。唯有一點:java 可以使用 DirectByteBuf 將堆外內(nèi)存映射到 jvm 內(nèi)存中來直接訪問使用

  • 這塊內(nèi)存不受 jvm 垃圾回收的影響,因此內(nèi)存地址固定,有助于 IO 讀寫

  • java 中的 DirectByteBuf 對象僅維護(hù)了此內(nèi)存的虛引用,內(nèi)存回收分成兩步

    • DirectByteBuf 對象被垃圾回收,將虛引用加入引用隊列

    • 通過專門線程訪問引用隊列,根據(jù)虛引用釋放堆外內(nèi)存

  • 減少了一次數(shù)據(jù)拷貝,用戶態(tài)與內(nèi)核態(tài)的切換次數(shù)沒有減少

進(jìn)一步優(yōu)化(底層采用了 linux 2.1 后提供的 sendFile 方法),java 中對應(yīng)著兩個 channel 調(diào)用 transferTo/transferFrom 方法拷貝數(shù)據(jù)

2ad9f2e8d9fa437489bb99b83d52658e.png

  1. java 調(diào)用 transferTo 方法后,要從 java 程序的用戶態(tài)切換至內(nèi)核態(tài),使用 DMA將數(shù)據(jù)讀入內(nèi)核緩沖區(qū),不會使用 cpu

  2. 數(shù)據(jù)從內(nèi)核緩沖區(qū)傳輸?shù)?socket 緩沖區(qū),cpu 會參與拷貝

  3. 最后使用 DMA 將 socket 緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會使用 cpu

可以看到

  • 只發(fā)生了一次用戶態(tài)與內(nèi)核態(tài)的切換

  • 數(shù)據(jù)拷貝了 3 次

進(jìn)一步優(yōu)化(linux 2.4)

1183d91b62584bcd99fb0e157e39ad23.png

  1. java 調(diào)用 transferTo 方法后,要從 java 程序的用戶態(tài)切換至內(nèi)核態(tài),使用 DMA將數(shù)據(jù)讀入內(nèi)核緩沖區(qū),不會使用 cpu

  2. 只會將一些 offset(偏移量) 和 length(長度) 信息拷入 socket 緩沖區(qū),幾乎無消耗

  3. 使用 DMA 將 內(nèi)核緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會使用 cpu

整個過程僅只發(fā)生了一次用戶態(tài)與內(nèi)核態(tài)的切換,數(shù)據(jù)拷貝了 2 次。所謂的【零拷貝】,并不是真正無拷貝,而是在不會拷貝重復(fù)數(shù)據(jù)到 jvm 內(nèi)存中,零拷貝的優(yōu)點有

  • 更少的用戶態(tài)與內(nèi)核態(tài)的切換

  • 不利用 cpu 計算,減少 cpu 緩存?zhèn)喂蚕?/p>

  • 零拷貝適合小文件傳輸

5.3 AIO

AIO 用來解決數(shù)據(jù)復(fù)制階段的阻塞問題

  • 同步意味著,在進(jìn)行讀寫操作時,線程需要等待結(jié)果,還是相當(dāng)于閑置

  • 異步意味著,在進(jìn)行讀寫操作時,線程不必等待結(jié)果,而是將來由操作系統(tǒng)來通過回調(diào)方式由另外的線程來獲得結(jié)果

異步模型需要底層操作系統(tǒng)(Kernel)提供支持

  • Windows 系統(tǒng)通過 IOCP 實現(xiàn)了真正的異步 IO

  • Linux 系統(tǒng)異步 IO 在 2.6 版本引入,但其底層實現(xiàn)還是用多路復(fù)用模擬了異步 IO性能沒有優(yōu)勢

文件 AIO

先來看看 AsynchronousFileChannel

@Slf4j
public class AioDemo1 {public static void main(String[] args) throws IOException {try{AsynchronousFileChannel s = AsynchronousFileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);ByteBuffer buffer = ByteBuffer.allocate(2);log.debug("begin...");s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer attachment) {log.debug("read completed...{}", result);buffer.flip();debug(buffer);}
?@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {log.debug("read failed...");}});
?} catch (IOException e) {e.printStackTrace();}log.debug("do other things...");System.in.read();}
}

輸出

13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d                                           |a.              |
+--------+-------------------------------------------------+----------------+

可以看到

  • 響應(yīng)文件讀取成功的是另一個線程 Thread-5

  • 主線程并沒有 IO 操作阻塞

💡 守護(hù)線程

默認(rèn)文件 AIO 使用的線程都是守護(hù)線程,所以最后要執(zhí)行 System.in.read() 以避免守護(hù)線程意外結(jié)束

網(wǎng)絡(luò) AIO

public class AioServer {public static void main(String[] args) throws IOException {AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.accept(null, new AcceptHandler(ssc));System.in.read();}
?private static void closeChannel(AsynchronousSocketChannel sc) {try {System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());sc.close();} catch (IOException e) {e.printStackTrace();}}
?private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;
?public ReadHandler(AsynchronousSocketChannel sc) {this.sc = sc;}
?@Overridepublic void completed(Integer result, ByteBuffer attachment) {try {if (result == -1) {closeChannel(sc);return;}System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());attachment.flip();System.out.println(Charset.defaultCharset().decode(attachment));attachment.clear();// 處理完第一個 read 時,需要再次調(diào)用 read 方法來處理下一個 read 事件sc.read(attachment, attachment, this);} catch (IOException e) {e.printStackTrace();}}
?@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {closeChannel(sc);exc.printStackTrace();}}
?private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;
?private WriteHandler(AsynchronousSocketChannel sc) {this.sc = sc;}
?@Overridepublic void completed(Integer result, ByteBuffer attachment) {// 如果作為附件的 buffer 還有內(nèi)容,需要再次 write 寫出剩余內(nèi)容if (attachment.hasRemaining()) {sc.write(attachment);}}
?@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();closeChannel(sc);}}
?private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {private final AsynchronousServerSocketChannel ssc;
?public AcceptHandler(AsynchronousServerSocketChannel ssc) {this.ssc = ssc;}
?@Overridepublic void completed(AsynchronousSocketChannel sc, Object attachment) {try {System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}ByteBuffer buffer = ByteBuffer.allocate(16);// 讀事件由 ReadHandler 處理sc.read(buffer, buffer, new ReadHandler(sc));// 寫事件由 WriteHandler 處理sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));// 處理完第一個 accpet 時,需要再次調(diào)用 accept 方法來處理下一個 accept 事件ssc.accept(null, this);}
?@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}}
}

學(xué)習(xí)完NIO的基礎(chǔ)知識,接下來正式進(jìn)入Netty的學(xué)習(xí),請看這篇博客:Netty入門-CSDN博客

http://www.risenshineclean.com/news/2943.html

相關(guān)文章:

  • 湖南建設(shè)廳官方網(wǎng)站官網(wǎng)seo排名關(guān)鍵詞
  • 湛江專業(yè)做網(wǎng)站seo優(yōu)化系統(tǒng)
  • 寧波市城鄉(xiāng)建設(shè)委員會網(wǎng)站填寫電話的廣告
  • wordpress數(shù)據(jù)庫鏈接地址seo綜合查詢平臺官網(wǎng)
  • 廊坊專業(yè)做網(wǎng)站十大免費cms建站系統(tǒng)介紹
  • 網(wǎng)站出現(xiàn)風(fēng)險如何處理方法營銷培訓(xùn)課程ppt
  • 用帝國軟件做網(wǎng)站的心得uc瀏覽器網(wǎng)頁版入口
  • 做一個公司的網(wǎng)站應(yīng)做哪些準(zhǔn)備工作內(nèi)容百度上免費創(chuàng)建網(wǎng)站
  • 龍崗?fù)瑯飞鐓^(qū)做網(wǎng)站網(wǎng)絡(luò)工程師培訓(xùn)一般多少錢
  • 如何做網(wǎng)站監(jiān)控關(guān)鍵詞優(yōu)化是什么工作
  • 自用電腦做網(wǎng)站百度seo優(yōu)化服務(wù)項目
  • 怎么做網(wǎng)站的導(dǎo)航條下載百度語音導(dǎo)航地圖安裝
  • 做網(wǎng)站開發(fā)的女生多嗎百度推廣競價是什么意思
  • 做網(wǎng)站的要多少錢怎樣推廣app
  • wordpress 標(biāo)簽 函數(shù)廣州優(yōu)化營商環(huán)境條例
  • 外貿(mào)做雙語網(wǎng)站好還是單語網(wǎng)站seo搜索引擎優(yōu)化課后答案
  • 好網(wǎng)站建設(shè)公司的網(wǎng)站響應(yīng)式模版移動優(yōu)化
  • bootstrap網(wǎng)站模版臺州關(guān)鍵詞優(yōu)化報價
  • 開封市住房和城鄉(xiāng)建設(shè)局網(wǎng)站做百度線上推廣
  • wordpress 復(fù)制網(wǎng)站競價推廣網(wǎng)絡(luò)推廣運營
  • 汕頭網(wǎng)站制作電話網(wǎng)頁設(shè)計案例
  • 優(yōu)化大師好用嗎武漢網(wǎng)站seo推廣
  • 青島網(wǎng)站排名上海百網(wǎng)優(yōu)seo優(yōu)化公司
  • 網(wǎng)站后臺傳照片 c windows temp 拒絕訪問無錫百度信息流
  • 做現(xiàn)貨黃金網(wǎng)站體育熱點新聞
  • 網(wǎng)站圖片優(yōu)化怎么推廣自己的店鋪
  • wordpress最新版下載2022網(wǎng)站seo
  • 鄭州企業(yè)網(wǎng)站價格百度廣告搜索推廣
  • 做網(wǎng)站需要幾個服務(wù)器網(wǎng)絡(luò)營銷自學(xué)網(wǎng)站
  • 優(yōu)化的網(wǎng)站做域名跳轉(zhuǎn)最新經(jīng)濟(jì)新聞