DirectBuffer
之前我们用ByteBuffer.allocate()看一下源码:
1 2 3 4 5
| public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); }
|
HeapByteBuffer
是从堆上分配的内存空间创建的Buffer,实际上JDK还提供了另外一种方式ByteBuffer.allocateDirect():
1 2 3
| public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); }
|
DirectByteBuffer
创建的buffer是从直接内存中开辟的空间分配,我们叫做堆外内存,不会被gc回收,里面用到了很多没有开源的sun的api。
new DirectByteBuffer(),这个对象本身是在堆上创建的,但是源码里的
base = unsafe.allocateMemory(size);
则是在堆外内存中分配的,那么java堆上的数据是如何找到堆外的数据的呢,一定是保存了一个地址,找了一下发现如下变量:
Buffer.java
说放在Buffer这个类里是为了效率。
零拷贝
如果使用HeapByteBuffer在进行文件读写的时候,所有的数据都在Java堆上,然而操作系统不是直接处理堆上的数据,而是把堆上的数据拷贝到操作系统里(Java内存模型之外)某一块内存空间中,然后再把数据和IO设备进行交互。意思用HeapByteBuffer进行IO操作的时候中间多了一次数据拷贝的过程。
而使用DirectByteBuffer,因为数据本来就在堆外内存中,所以跟IO设备交互的时候没有拷贝的过程,提升了效率,这有一个专有名词,也就是零拷贝。
以下内容转自知乎:
DirectByteBuffer 自身是一个Java对象,在Java堆中;而这个对象中有个long类型字段address,记录着一块调用 malloc() 申请到的native memory。
HotSpot VM里的GC除了CMS之外都是要移动对象的,是所谓“compacting GC”。
如果要把一个Java里的 byte[] 对象的引用传给native代码,让native代码直接访问数组的内容的话,就必须要保证native代码在访问的时候这个 byte[] 对象不能被移动,也就是要被“pin”(钉)住。
可惜HotSpot VM出于一些取舍而决定不实现单个对象层面的object pinning,要pin的话就得暂时禁用GC——也就等于把整个Java堆都给pin住。HotSpot VM对JNI的Critical系API就是这样实现的。这用起来就不那么顺手。
所以 Oracle/Sun JDK / OpenJDK 的这个地方就用了点绕弯的做法。它假设把 HeapByteBuffer 背后的 byte[] 里的内容拷贝一次是一个时间开销可以接受的操作,同时假设真正的I/O可能是一个很慢的操作。
于是它就先把 HeapByteBuffer 背后的 byte[] 的内容拷贝到一个 DirectByteBuffer 背后的native memory去,这个拷贝会涉及 sun.misc.Unsafe.copyMemory() 的调用,背后是类似 memcpy() 的实现。这个操作本质上是会在整个拷贝过程中暂时不允许发生GC的,虽然实现方式跟JNI的Critical系API不太一样。(具体来说是 Unsafe.copyMemory() 是HotSpot VM的一个intrinsic方法,中间没有safepoint所以GC无法发生)。
然后数据被拷贝到native memory之后就好办了,就去做真正的I/O,把 DirectByteBuffer 背后的native memory地址传给真正做I/O的函数。这边就不需要再去访问Java对象去读写要做I/O的数据了。
MappedByteBuffer
DirectBuffer是继承于MappedByteBuffer的,内存映射文件是一种允许Java直接从内存访问的特殊文件,操作系统再负责将内存的改动写入的IO设备中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public class NioTest9 { public static void main(String[] args) throws IOException { RandomAccessFile randomAccessFile = new RandomAccessFile("NioTest9.txt", "rw"); FileChannel fileChannel = randomAccessFile.getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte)'a'); mappedByteBuffer.put(3, (byte)'b');
randomAccessFile.close(); } }
|
上面的代码执行完成会直接修改NioTest9.txt中的内容。
FileLock文件锁
这个用的不多,共享锁是只读,都只能读,排他是只能自己读写,别人不能读也不能写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
public class NioTest10 { public static void main(String[] args) throws IOException { RandomAccessFile randomAccessFile = new RandomAccessFile("NioTest10.txt", "rw"); FileChannel fileChannel = randomAccessFile.getChannel();
FileLock fileLock = fileChannel.lock(3, 6, true);
System.out.println("valid: " + fileLock.isValid()); System.out.println("lock type: " + fileLock.isShared());
fileLock.release();
randomAccessFile.close(); } }
|
Scattering & Gathering
之前的例子中在进行读写的时候,都是用的一个Buffer对象来完成的,Buffer的Scattering(散开),可以接受传递一个Buffer的数组。比如我要把Channel中的信息读到Buffer中,那么channel里面有20个字节,传递一个Buffer数组,往里面读信息,第一个数组长度是10,第二个数组长度是5,第三个长度也是5,它会按顺序将第一个Buffer读满,再接着往第二个读,再读第三个。就是将一个Channel中的数据读取到多个Buffer中。而Gathering则是相反的,他是写操作,先将第一个Buffer写到Channel中,然后也是顺序写入后面的channel。
下面用一个网络IO的例子来说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
public class NioTest11 { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(8899); serverSocketChannel.socket().bind(address);
int messageLength = 2 + 3 + 4;
ByteBuffer[] buffers = new ByteBuffer[3];
buffers[0] = ByteBuffer.allocate(2); buffers[1] = ByteBuffer.allocate(3); buffers[2] = ByteBuffer.allocate(4);
SocketChannel socketChannel = serverSocketChannel.accept();
while (true) { int bytesRead = 0; while (bytesRead < messageLength) { long r = socketChannel.read(buffers);
bytesRead += r;
System.out.println("bytesRead: " + bytesRead);
Stream.of(buffers) .map(buffer -> "position: " + buffer.position() + ", limit: " + buffer.limit()) .forEach(System.out::println);
}
Stream.of(buffers).forEach(Buffer::flip);
long bytesWritten = 0;
while (bytesWritten < messageLength) { long r = socketChannel.write(buffers); bytesWritten += r; }
Stream.of(buffers).forEach(Buffer::clear);
System.out.println("bytesRead: " + bytesRead + ", byteWritten: " + bytesWritten + ", messageLength: " + messageLength); } } }
|
nc localhost 8899
telnet localhost 8899
这2个命令都可以进行刚才的程序测试。
1 2 3
| nc localhost 8899 hellowor hellowor
|
回车也算一个字节,所以输入hellowor后,程序马上回写了数据。控制台输出:
1 2 3 4 5
| bytesRead: 9 position: 2, limit: 2 position: 3, limit: 3 position: 4, limit: 4 bytesRead: 9, byteWritten: 9, messageLength: 9
|
现在程序依旧在等待输入,我们继续输入
这里先输入一个hello+回车,再输入a+回车,再回车,进行了3次操作,也一共是9个字节,数据进行了回写,接下来看控制台的输出。
1 2 3 4 5 6 7 8 9 10 11 12 13
| bytesRead: 6 position: 2, limit: 2 position: 3, limit: 3 position: 1, limit: 4 bytesRead: 8 position: 2, limit: 2 position: 3, limit: 3 position: 3, limit: 4 bytesRead: 9 position: 2, limit: 2 position: 3, limit: 3 position: 4, limit: 4 bytesRead: 9, byteWritten: 9, messageLength: 9
|
第一次输入hello+回车的时候,输入了6个字节,0、1索引的buffer读满了,2索引的buffer读取了一个位置,再敲入a+回车,2索引的Buffer还剩一个位置,此时再敲入回车,Buffer全部读满,Buffer开始进行写入操作。