3 次代碼提交 0da4cb1917 ... 1e92ae987e

作者 SHA1 備註 提交日期
  johnclot69 1e92ae987e 优化NettySystem 1 周之前
  johnclot69 c5578550e2 Merge remote-tracking branch 'origin/master' 1 周之前
  johnclot69 c56af9a526 升级netty 4.2.0 1 周之前
共有 2 個文件被更改,包括 32 次插入15 次删除
  1. 1 1
      incubator-core/build.gradle
  2. 31 14
      incubator-core/src/main/java/com/incubator/core/network/NettySystem.java

+ 1 - 1
incubator-core/build.gradle

@@ -11,7 +11,7 @@ dependencies {
     api 'it.unimi.dsi:fastutil:8.5.15'
 
     // 网络通信和序列化
-    api 'io.netty:netty-all:4.1.119.Final' // Netty 全功能包
+    api 'io.netty:netty-all:4.2.0.Final' // Netty 全功能包
     api 'org.msgpack:msgpack-core:0.9.9' // MessagePack 序列化
     api 'org.msgpack:jackson-dataformat-msgpack:0.9.9' // MessagePack 序列化
 

+ 31 - 14
incubator-core/src/main/java/com/incubator/core/network/NettySystem.java

@@ -6,12 +6,13 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.*;
 import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.kqueue.KQueue;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueIoHandler;
 import io.netty.channel.kqueue.KQueueServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
+import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
@@ -35,8 +36,8 @@ public class NettySystem extends BaseSystem {
     private static final int WORKER_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors() * 2);
 
     private final ServerBootstrap bootstrap = new ServerBootstrap();
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
+    private IoEventLoopGroup bossGroup;
+    private IoEventLoopGroup workerGroup;
     private Channel serverChannel;
 
     public NettySystem(int port) {
@@ -47,8 +48,8 @@ public class NettySystem extends BaseSystem {
     protected void initialize() {
         InetSocketAddress address = new InetSocketAddress("0.0.0.0", this.port);
         // 使用单线程 bossGroup,高并发时 workerGroup 使用默认线程数
-        this.bossGroup = this.createEventLoopGroup(BOSS_THREADS, "Netty-Boss");
-        this.workerGroup = this.createEventLoopGroup(WORKER_THREADS, "Netty-Worker");
+        this.bossGroup = this.createIoEventLoopGroup(BOSS_THREADS, "Netty-Boss");
+        this.workerGroup = this.createIoEventLoopGroup(WORKER_THREADS, "Netty-Worker");
         initBootstrap(this.world);
 
         try {
@@ -64,7 +65,7 @@ public class NettySystem extends BaseSystem {
 
     private void initBootstrap(World world) {
         this.bootstrap.group(this.bossGroup, this.workerGroup)
-                .channel(this.getDefaultChannelClass())
+                .channel(this.getServerChannelClass())
                 // 全局选项
                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .option(ChannelOption.SO_BACKLOG, 1024) // 提升连接等待队列大小
@@ -94,15 +95,31 @@ public class NettySystem extends BaseSystem {
                 });
     }
 
-    // 工具方法
-    private EventLoopGroup createEventLoopGroup(int threads, String prefix) {
+    /**
+     * 创建IoEventLoopGroup
+     *
+     * @param threads
+     * @param prefix
+     * @return
+     */
+    private IoEventLoopGroup createIoEventLoopGroup(int threads, String prefix) {
         ThreadFactory factory = Thread.ofVirtual().name(prefix + "-", 1).factory();
-        if (Epoll.isAvailable()) return new EpollEventLoopGroup(threads, factory);
-        if (KQueue.isAvailable()) return new KQueueEventLoopGroup(threads, factory);
-        return new NioEventLoopGroup(threads, factory);
+        // 根据平台选择IoHandler
+        if (Epoll.isAvailable()) {
+            return new MultiThreadIoEventLoopGroup(threads, factory, EpollIoHandler.newFactory());
+        } else if (KQueue.isAvailable()) {
+            return new MultiThreadIoEventLoopGroup(threads, factory, KQueueIoHandler.newFactory());
+        } else {
+            return new MultiThreadIoEventLoopGroup(threads, factory, NioIoHandler.newFactory());
+        }
     }
 
-    private Class<? extends ServerChannel> getDefaultChannelClass() {
+    /**
+     * 获取通道类型
+     *
+     * @return
+     */
+    private Class<? extends ServerSocketChannel> getServerChannelClass() {
         if (Epoll.isAvailable()) return EpollServerSocketChannel.class;
         if (KQueue.isAvailable()) return KQueueServerSocketChannel.class;
         return NioServerSocketChannel.class;