浏览代码

优化msgpack编解码器

johnclot69 3 周之前
父节点
当前提交
d2bb1b4abb

+ 73 - 74
incubator-core/src/main/java/com/incubator/core/protocol/MsgPackCodec.java

@@ -20,6 +20,79 @@ import java.util.Map;
 @ChannelHandler.Sharable
 public class MsgPackCodec extends MessageToMessageCodec<ByteBuf, Message> {
 
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
+        byte[] bytes = new byte[msg.readableBytes()];
+        msg.readBytes(bytes);
+
+        try (MessageUnpacker unpacked = MessagePack.newDefaultUnpacker(new ByteArrayInputStream(bytes))) {
+            Message message = new Message();
+            // 解包 Map Header,期望有 2 个键值对("cmd" 和 "data")
+            int mapSize = unpacked.unpackMapHeader();
+            for (int i = 0; i < mapSize; i++) {
+                // 解包键(假设键是字符串)
+                String key = unpacked.unpackString();
+                switch (key) {
+                    case "cmd" -> message.cmd = unpacked.unpackInt();
+                    case "code" -> message.code = unpacked.unpackInt();
+                    case "message" -> message.message = unpacked.unpackString();
+                    case "data" -> {
+                        if (unpacked.tryUnpackNil()) {
+                            message.data.clear();
+                        } else {
+                            // 解包 "data" 的值(假设是一个 Map)
+                            int dataSize = unpacked.unpackMapHeader();
+                            for (int j = 0; j < dataSize; j++) {
+                                String dataKey = unpacked.unpackString();
+                                Object dataValue = unpackNextValue(unpacked);
+                                message.data.put(dataKey, dataValue);
+                            }
+                        }
+                    }
+                    default -> unpacked.skipValue();
+                }
+            }
+            out.add(message);
+        }
+    }
+
+    private Object unpackNextValue(MessageUnpacker unpacker) throws Exception {
+        return switch (unpacker.getNextFormat().getValueType()) {
+            case STRING -> unpacker.unpackString();
+            case INTEGER -> {
+                // 根据值范围判断是否为 int 或 long
+                long value = unpacker.unpackLong();
+                yield (value >= Integer.MIN_VALUE && value <= Integer.MAX_VALUE) ? (int) value : value;
+            }
+            case FLOAT -> unpacker.unpackDouble(); // 改为 double 以防 float 精度损失
+            case BOOLEAN -> unpacker.unpackBoolean();
+            case MAP -> {
+                // 递归解包 Map
+                int mapSize = unpacker.unpackMapHeader();
+                Map<String, Object> map = new HashMap<>();
+                for (int i = 0; i < mapSize; i++) {
+                    String key = unpacker.unpackString();
+                    Object value = unpackNextValue(unpacker);
+                    map.put(key, value);
+                }
+                yield map;
+            }
+            case ARRAY -> {
+                int arraySize = unpacker.unpackArrayHeader();
+                Object[] array = new Object[arraySize];
+                for (int i = 0; i < arraySize; i++) {
+                    array[i] = unpackNextValue(unpacker);
+                }
+                yield array;
+            }
+            default -> {
+                // 跳过未知类型
+                unpacker.skipValue();
+                yield null;
+            }
+        };
+    }
+
     @Override
     protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
         try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) {
@@ -111,78 +184,4 @@ public class MsgPackCodec extends MessageToMessageCodec<ByteBuf, Message> {
             packObject(packer, entry.getValue());
         }
     }
-
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
-        byte[] bytes = new byte[msg.readableBytes()];
-        msg.readBytes(bytes);
-
-        try (MessageUnpacker unpacked = MessagePack.newDefaultUnpacker(new ByteArrayInputStream(bytes))) {
-            Message message = new Message();
-
-            // 解包 Map Header,期望有 2 个键值对("cmd" 和 "data")
-            int mapSize = unpacked.unpackMapHeader();
-            for (int i = 0; i < mapSize; i++) {
-                // 解包键(假设键是字符串)
-                String key = unpacked.unpackString();
-                switch (key) {
-                    case "cmd" -> message.cmd = unpacked.unpackInt();
-                    case "code" -> message.code = unpacked.unpackInt();
-                    case "message" -> message.message = unpacked.unpackString();
-                    case "data" -> {
-                        if (unpacked.tryUnpackNil()) {
-                            message.data.clear();
-                        } else {
-                            // 解包 "data" 的值(假设是一个 Map)
-                            int dataSize = unpacked.unpackMapHeader();
-                            for (int j = 0; j < dataSize; j++) {
-                                String dataKey = unpacked.unpackString();
-                                Object dataValue = unpackNextValue(unpacked);
-                                message.data.put(dataKey, dataValue);
-                            }
-                        }
-                    }
-                    default -> unpacked.skipValue();
-                }
-            }
-            out.add(message);
-        }
-    }
-
-    private Object unpackNextValue(MessageUnpacker unpacker) throws Exception {
-        return switch (unpacker.getNextFormat().getValueType()) {
-            case STRING -> unpacker.unpackString();
-            case INTEGER -> {
-                // 根据值范围判断是否为 int 或 long
-                long value = unpacker.unpackLong();
-                yield (value >= Integer.MIN_VALUE && value <= Integer.MAX_VALUE) ? (int) value : value;
-            }
-            case FLOAT -> unpacker.unpackDouble(); // 改为 double 以防 float 精度损失
-            case BOOLEAN -> unpacker.unpackBoolean();
-            case MAP -> {
-                // 递归解包 Map
-                int mapSize = unpacker.unpackMapHeader();
-                Map<String, Object> map = new HashMap<>();
-                for (int i = 0; i < mapSize; i++) {
-                    String key = unpacker.unpackString();
-                    Object value = unpackNextValue(unpacker);
-                    map.put(key, value);
-                }
-                yield map;
-            }
-            case ARRAY -> {
-                int arraySize = unpacker.unpackArrayHeader();
-                Object[] array = new Object[arraySize];
-                for (int i = 0; i < arraySize; i++) {
-                    array[i] = unpackNextValue(unpacker);
-                }
-                yield array;
-            }
-            default -> {
-                // 跳过未知类型
-                unpacker.skipValue();
-                yield null;
-            }
-        };
-    }
 }

+ 5 - 5
incubator-core/src/main/java/com/incubator/core/protocol/WebSocketBinaryFrameCodec.java

@@ -15,11 +15,6 @@ import java.util.List;
 @ChannelHandler.Sharable
 public class WebSocketBinaryFrameCodec extends MessageToMessageCodec<WebSocketFrame, ByteBuf> {
 
-    @Override
-    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
-        out.add(new BinaryWebSocketFrame(msg.retain())); // 包装为 BinaryWebSocketFrame,并增加引用计数
-    }
-
     @Override
     protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) {
         if (frame instanceof BinaryWebSocketFrame binaryFrame) {
@@ -30,4 +25,9 @@ public class WebSocketBinaryFrameCodec extends MessageToMessageCodec<WebSocketFr
             ctx.fireExceptionCaught(new IllegalArgumentException("Unsupported WebSocketFrame type: " + frame.getClass().getName()));
         }
     }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
+        out.add(new BinaryWebSocketFrame(msg.retain())); // 包装为 BinaryWebSocketFrame,并增加引用计数
+    }
 }