|
@@ -6,12 +6,13 @@ import io.netty.bootstrap.ServerBootstrap;
|
|
|
import io.netty.buffer.PooledByteBufAllocator;
|
|
|
import io.netty.channel.*;
|
|
|
import io.netty.channel.epoll.Epoll;
|
|
|
-import io.netty.channel.epoll.EpollEventLoopGroup;
|
|
|
+import io.netty.channel.epoll.EpollIoHandler;
|
|
|
import io.netty.channel.epoll.EpollServerSocketChannel;
|
|
|
import io.netty.channel.kqueue.KQueue;
|
|
|
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
|
|
|
+import io.netty.channel.kqueue.KQueueIoHandler;
|
|
|
import io.netty.channel.kqueue.KQueueServerSocketChannel;
|
|
|
-import io.netty.channel.nio.NioEventLoopGroup;
|
|
|
+import io.netty.channel.nio.NioIoHandler;
|
|
|
+import io.netty.channel.socket.ServerSocketChannel;
|
|
|
import io.netty.channel.socket.SocketChannel;
|
|
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
|
import io.netty.handler.codec.http.HttpObjectAggregator;
|
|
@@ -35,8 +36,8 @@ public class NettySystem extends BaseSystem {
|
|
|
private static final int WORKER_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors() * 2);
|
|
|
|
|
|
private final ServerBootstrap bootstrap = new ServerBootstrap();
|
|
|
- private EventLoopGroup bossGroup;
|
|
|
- private EventLoopGroup workerGroup;
|
|
|
+ private IoEventLoopGroup bossGroup;
|
|
|
+ private IoEventLoopGroup workerGroup;
|
|
|
private Channel serverChannel;
|
|
|
|
|
|
public NettySystem(int port) {
|
|
@@ -47,8 +48,8 @@ public class NettySystem extends BaseSystem {
|
|
|
protected void initialize() {
|
|
|
InetSocketAddress address = new InetSocketAddress("0.0.0.0", this.port);
|
|
|
// 使用单线程 bossGroup,高并发时 workerGroup 使用默认线程数
|
|
|
- this.bossGroup = this.createEventLoopGroup(BOSS_THREADS, "Netty-Boss");
|
|
|
- this.workerGroup = this.createEventLoopGroup(WORKER_THREADS, "Netty-Worker");
|
|
|
+ this.bossGroup = this.createIoEventLoopGroup(BOSS_THREADS, "Netty-Boss");
|
|
|
+ this.workerGroup = this.createIoEventLoopGroup(WORKER_THREADS, "Netty-Worker");
|
|
|
initBootstrap(this.world);
|
|
|
|
|
|
try {
|
|
@@ -64,7 +65,7 @@ public class NettySystem extends BaseSystem {
|
|
|
|
|
|
private void initBootstrap(World world) {
|
|
|
this.bootstrap.group(this.bossGroup, this.workerGroup)
|
|
|
- .channel(this.getDefaultChannelClass())
|
|
|
+ .channel(this.getServerChannelClass())
|
|
|
// 全局选项
|
|
|
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
|
|
.option(ChannelOption.SO_BACKLOG, 1024) // 提升连接等待队列大小
|
|
@@ -94,15 +95,31 @@ public class NettySystem extends BaseSystem {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- // 工具方法
|
|
|
- private EventLoopGroup createEventLoopGroup(int threads, String prefix) {
|
|
|
+ /**
|
|
|
+ * 创建IoEventLoopGroup
|
|
|
+ *
|
|
|
+ * @param threads
|
|
|
+ * @param prefix
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private IoEventLoopGroup createIoEventLoopGroup(int threads, String prefix) {
|
|
|
ThreadFactory factory = Thread.ofVirtual().name(prefix + "-", 1).factory();
|
|
|
- if (Epoll.isAvailable()) return new EpollEventLoopGroup(threads, factory);
|
|
|
- if (KQueue.isAvailable()) return new KQueueEventLoopGroup(threads, factory);
|
|
|
- return new NioEventLoopGroup(threads, factory);
|
|
|
+ // 根据平台选择IoHandler
|
|
|
+ if (Epoll.isAvailable()) {
|
|
|
+ return new MultiThreadIoEventLoopGroup(threads, factory, EpollIoHandler.newFactory());
|
|
|
+ } else if (KQueue.isAvailable()) {
|
|
|
+ return new MultiThreadIoEventLoopGroup(threads, factory, KQueueIoHandler.newFactory());
|
|
|
+ } else {
|
|
|
+ return new MultiThreadIoEventLoopGroup(threads, factory, NioIoHandler.newFactory());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private Class<? extends ServerChannel> getDefaultChannelClass() {
|
|
|
+ /**
|
|
|
+ * 获取通道类型
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Class<? extends ServerSocketChannel> getServerChannelClass() {
|
|
|
if (Epoll.isAvailable()) return EpollServerSocketChannel.class;
|
|
|
if (KQueue.isAvailable()) return KQueueServerSocketChannel.class;
|
|
|
return NioServerSocketChannel.class;
|