Browse Source

升级netty 4.2.0

johnclot69 1 week ago
parent
commit
c56af9a526

+ 1 - 1
incubator-core/build.gradle

@@ -11,7 +11,7 @@ dependencies {
     api 'it.unimi.dsi:fastutil:8.5.15'
 
     // 网络通信和序列化
-    api 'io.netty:netty-all:4.1.119.Final' // Netty 全功能包
+    api 'io.netty:netty-all:4.2.0.Final' // Netty 全功能包
     api 'org.msgpack:msgpack-core:0.9.9' // MessagePack 序列化
     api 'org.msgpack:jackson-dataformat-msgpack:0.9.9' // MessagePack 序列化
 

+ 31 - 14
incubator-core/src/main/java/com/incubator/core/network/NettySystem.java

@@ -6,12 +6,13 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.*;
 import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.kqueue.KQueue;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueIoHandler;
 import io.netty.channel.kqueue.KQueueServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
+import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
@@ -34,16 +35,16 @@ public class NettySystem extends BaseSystem {
     private static final int WORKER_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors() * 2);
 
     private final ServerBootstrap bootstrap = new ServerBootstrap();
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
+    private IoEventLoopGroup bossGroup;
+    private IoEventLoopGroup workerGroup;
     private Channel channel;
 
     @Override
     protected void initialize() {
         InetSocketAddress address = new InetSocketAddress("0.0.0.0", 9000);
         // 使用单线程 bossGroup,高并发时 workerGroup 使用默认线程数
-        this.bossGroup = this.createEventLoopGroup(BOSS_THREADS, "Netty-Boss");
-        this.workerGroup = this.createEventLoopGroup(WORKER_THREADS, "Netty-Worker");
+        this.bossGroup = this.createIoEventLoopGroup(BOSS_THREADS, "Netty-Boss");
+        this.workerGroup = this.createIoEventLoopGroup(WORKER_THREADS, "Netty-Worker");
         initBootstrap(this.world);
 
         try {
@@ -59,7 +60,7 @@ public class NettySystem extends BaseSystem {
 
     private void initBootstrap(World world) {
         this.bootstrap.group(this.bossGroup, this.workerGroup)
-                .channel(this.getDefaultChannelClass())
+                .channel(this.getServerChannelClass())
                 // 全局选项
                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .option(ChannelOption.SO_BACKLOG, 1024) // 提升连接等待队列大小
@@ -89,15 +90,31 @@ public class NettySystem extends BaseSystem {
                 });
     }
 
-    // 工具方法
-    private EventLoopGroup createEventLoopGroup(int threads, String prefix) {
+    /**
+     * 创建IoEventLoopGroup
+     *
+     * @param threads
+     * @param prefix
+     * @return
+     */
+    private IoEventLoopGroup createIoEventLoopGroup(int threads, String prefix) {
         ThreadFactory factory = Thread.ofVirtual().name(prefix + "-", 1).factory();
-        if (Epoll.isAvailable()) return new EpollEventLoopGroup(threads, factory);
-        if (KQueue.isAvailable()) return new KQueueEventLoopGroup(threads, factory);
-        return new NioEventLoopGroup(threads, factory);
+        // 根据平台选择IoHandler
+        if (Epoll.isAvailable()) {
+            return new MultiThreadIoEventLoopGroup(threads, factory, EpollIoHandler.newFactory());
+        } else if (KQueue.isAvailable()) {
+            return new MultiThreadIoEventLoopGroup(threads, factory, KQueueIoHandler.newFactory());
+        } else {
+            return new MultiThreadIoEventLoopGroup(threads, factory, NioIoHandler.newFactory());
+        }
     }
 
-    private Class<? extends ServerChannel> getDefaultChannelClass() {
+    /**
+     * 获取通道类型
+     *
+     * @return
+     */
+    private Class<? extends ServerSocketChannel> getServerChannelClass() {
         if (Epoll.isAvailable()) return EpollServerSocketChannel.class;
         if (KQueue.isAvailable()) return KQueueServerSocketChannel.class;
         return NioServerSocketChannel.class;