Skip to content

Commit

Permalink
fix #23 (#24)
Browse files Browse the repository at this point in the history
Fixing #23 DataBuffer need to be released in BrokerFrameDecoder.decode()
  • Loading branch information
kevinat authored Mar 6, 2024
1 parent 8d4c432 commit 5a53c3d
Showing 1 changed file with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.core.codec.AbstractDecoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.util.MimeType;

Expand All @@ -60,27 +61,32 @@ public Flux<BrokerFrame> decode(Publisher<DataBuffer> inputStream, ResolvableTyp

@Override
public BrokerFrame decode(DataBuffer buffer, ResolvableType targetType, MimeType mimeType, Map<String, Object> hints) throws DecodingException {
ByteBuf byteBuf = asByteBuf(buffer);
// FIXME hack for ClusterJoinListener.setupRSocket() in broker broker
if (!byteBuf.isReadable()) {
return new BrokerFrame(FrameType.RESERVED, 0) {
};
try {
ByteBuf byteBuf = asByteBuf(buffer);
// FIXME hack for ClusterJoinListener.setupRSocket() in broker broker
if (!byteBuf.isReadable()) {
return new BrokerFrame(FrameType.RESERVED, 0) {
};
}
int flags = FrameHeaderFlyweight.flags(byteBuf);
FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
switch (frameType) {
case ADDRESS:
return Address.from(byteBuf, flags);
case BROKER_INFO:
return BrokerInfo.from(byteBuf);
case ROUTE_JOIN:
return RouteJoin.from(byteBuf);
case ROUTE_REMOVE:
return RouteRemove.from(byteBuf);
case ROUTE_SETUP:
return RouteSetup.from(byteBuf);
}
throw new IllegalArgumentException("Unknown FrameType " + frameType);
}
int flags = FrameHeaderFlyweight.flags(byteBuf);
FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
switch (frameType) {
case ADDRESS:
return Address.from(byteBuf, flags);
case BROKER_INFO:
return BrokerInfo.from(byteBuf);
case ROUTE_JOIN:
return RouteJoin.from(byteBuf);
case ROUTE_REMOVE:
return RouteRemove.from(byteBuf);
case ROUTE_SETUP:
return RouteSetup.from(byteBuf);
finally {
DataBufferUtils.release(buffer);
}
throw new IllegalArgumentException("Unknown FrameType " + frameType);
}

private static ByteBuf asByteBuf(DataBuffer buffer) {
Expand Down

0 comments on commit 5a53c3d

Please sign in to comment.