Skip to main content

java nio

背景

了解java的nio,因为在看到lucene的MappedByteBuffer , 所以想了解一下nio的内容 nio 主要包括三个内容:

  • Buffer
  • Selector
  • Channel

epoll 例子

#include <stdio.h>     // for fprintf()
#include <unistd.h> // for close()
#include <sys/epoll.h> // for epoll_create1()

int main()
{
int epoll_fd = epoll_create1(0); // 创建文件

if (epoll_fd == -1) {
fprintf(stderr, "Failed to create epoll file descriptor\n");
return 1;
}

if (close(epoll_fd)) {
fprintf(stderr, "Failed to close epoll file descriptor\n");
return 1;
}

return 0;
}

Buffer

ByteBuffer

  • MappedByteBuffer
  • HeapByteBuffer
  • DirectByteBuffer

例子

简单的例子:

    @Test
public void fileChannel()
throws IOException {
try (FileChannel fc = FileChannel.open(Paths.get("ccc.cc"),StandardOpenOption.WRITE , StandardOpenOption.READ ,StandardOpenOption.CREATE) ) {
MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_WRITE, 0, 1);
byte b = 97;
bb.put(0 ,b );
fc.write(bb);
var charset = Charset.defaultCharset();
System.out.println( "res" + charset.decode(bb).toString());
}
}

其实调用:

    @ForceInline
public void put$Type$(Scope scope, Object base, long offset, $type$ value) {
try {
put$Type$Internal(scope, base, offset, value);
} catch (Scope.ScopedAccessError ex) {
throw new IllegalStateException("This segment is already closed");
}
}

@ForceInline @Scoped
private void put$Type$Internal(Scope scope, Object base, long offset, $type$ value) {
try {
if (scope != null) {
scope.checkValidState();
}
UNSAFE.put$Type$(base, offset, value);
} finally {
Reference.reachabilityFence(scope);
}
}

最后写在这里

  void put(T x) {
GuardUnsafeAccess guard(_thread);
*addr() = normalize_for_write(x);
}

整个流程就是: mmap返回的是一个指针 ,MappedByteBuffer 调用UNSAFE.put方法直接修改堆外内存的值,不经过堆

java force 强制刷新

FileDispatcherImplforce0

JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
jobject fdo, jboolean md)
{
jint fd = fdval(env, fdo);
int result = 0;

result = fcntl(fd, F_FULLFSYNC);
if (result == -1) {
struct statfs fbuf;
int errno_fcntl = errno;
if (fstatfs(fd, &fbuf) == 0) {
if ((fbuf.f_flags & MNT_LOCAL) == 0) {
/* Try fsync() in case file is not local. */
result = fsync(fd);
}
} else {
/* fstatfs() failed so restore errno from fcntl(). */
errno = errno_fcntl;
}
}

return handle(env, result, "Force failed");
}

java 代码

main[1] where
[1] sun.nio.ch.EPollSelectorProvider.openSelector (EPollSelectorProvider.java:36)
[2] java.nio.channels.Selector.open (Selector.java:295)
[3] NioClient.main (NioClient.java:49)

epoll 关心的事件

    /**
* Process changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);

synchronized (updateLock) {
SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) {
if (ski.isValid()) {
int fd = ski.getFDVal();
// add to fdToKey if needed
SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert (previous == null) || (previous == ski);

int newEvents = ski.translateInterestOps();
int registeredEvents = ski.registeredEvents();
if (newEvents != registeredEvents) {
if (newEvents == 0) {
// remove from epoll
EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
} else {
if (registeredEvents == 0) {
// add to epoll
EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
} else {
// modify events
EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
}
}
ski.registeredEvents(newEvents);
}
}
}
}
}

select 处理

translateReadyOps:1439, SocketChannelImpl (sun.nio.ch)
translateAndSetReadyOps:1474, SocketChannelImpl (sun.nio.ch)
translateAndSetReadyOps:170, SelectionKeyImpl (sun.nio.ch)
processReadyEvents:308, SelectorImpl (sun.nio.ch)
processEvents:201, EPollSelectorImpl (sun.nio.ch)
doSelect:141, EPollSelectorImpl (sun.nio.ch)
lockAndDoSelect:130, SelectorImpl (sun.nio.ch)
select:142, SelectorImpl (sun.nio.ch)
main:52, NioClient (nio)

nio 是如何转化connect 的

    protected final int processReadyEvents(int rOps, SelectionKeyImpl ski, Consumer<SelectionKey> action) {
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
this.ensureOpen();
return 1;
}
} else {
assert Thread.holdsLock(this.publicSelectedKeys);

if (this.selectedKeys.contains(ski)) {
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps); // 转化ops 到 connect
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
this.selectedKeys.add(ski);
return 1;
}
}
}

return 0;
}

最后转换到

    public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
int intOps = ski.nioInterestOps();
int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0) {
return false;
} else if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
ski.nioReadyOps(intOps);
return (intOps & ~oldOps) != 0;
} else {
boolean connected = this.isConnected();
if ((ops & Net.POLLIN) != 0 && (intOps & 1) != 0 && connected) {
newOps = initialOps | 1;
}

if ((ops & Net.POLLCONN) != 0 && (intOps & 8) != 0 && this.isConnectionPending()) { // 转换到 connect
newOps |= 8;
}

if ((ops & Net.POLLOUT) != 0 && (intOps & 4) != 0 && connected) {
newOps |= 4;
}

ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
}

状态变化:

    public boolean connect(SocketAddress remote) throws IOException {
SocketAddress sa = this.checkRemote(remote);

try {
this.readLock.lock();

int n;
try {
this.writeLock.lock();

try {
boolean blocking = this.isBlocking();
boolean connected = 0;

try {
this.beginConnect(blocking, sa); // 在这里 state 会变成1
this.configureSocketNonBlockingIfVirtualThread();
if (this.isUnixSocket()) {
n = UnixDomainSockets.connect(this.fd, sa);
} else {
n = Net.connect(this.family, this.fd, sa);
}

if (n > 0) {
connected = 1;
} else if (blocking) {
assert IOStatus.okayToRetry((long)n);

boolean polled;
for(polled = false; !polled && this.isOpen(); polled = Net.pollConnectNow(this.fd)) {
this.park(Net.POLLOUT);
}

connected = polled && this.isOpen() ? 1 : 0;
}
} finally {
this.endConnect(blocking, (boolean)connected);
}

n = connected;
} finally {
this.writeLock.unlock();
}
} finally {
this.readLock.unlock();
}

return (boolean)n;
} catch (IOException var25) {
this.close();
throw SocketExceptions.of(var25, sa);
}
}

相关阅读