From 0fd2d47ce56b57a4ed265d2009802e23a23be288 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Wed, 5 Jun 2024 19:55:23 +0200 Subject: [PATCH] Refactor ResteasyReactiveOutputStream so that it can be used by Quarkus CXF fix #40994 --- bom/application/pom.xml | 5 + .../resteasy-reactive/pom.xml | 6 + .../resteasy-reactive/server/pom.xml | 1 + .../server/vertx-java-io/pom.xml | 45 ++ .../quarkus/vertx/java/io}/AppendBuffer.java | 2 +- .../vertx/java/io/VertxBufferImpl.java | 490 ++++++++++++++++++ .../vertx/java/io/VertxJavaIoContext.java | 42 ++ .../vertx/java/io/VertxOutputStream.java} | 67 +-- .../resteasy-reactive/server/vertx/pom.xml | 4 + .../VertxResteasyReactiveRequestContext.java | 42 +- 10 files changed, 657 insertions(+), 47 deletions(-) create mode 100644 independent-projects/resteasy-reactive/server/vertx-java-io/pom.xml rename independent-projects/resteasy-reactive/server/{vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx => vertx-java-io/src/main/java/io/quarkus/vertx/java/io}/AppendBuffer.java (99%) create mode 100644 independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxBufferImpl.java create mode 100644 independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxJavaIoContext.java rename independent-projects/resteasy-reactive/server/{vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java => vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxOutputStream.java} (74%) diff --git a/bom/application/pom.xml b/bom/application/pom.xml index ebc5797f2beb2d..7a88e35813fa77 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -4898,6 +4898,11 @@ resteasy-reactive-client-processor ${project.version} + + io.quarkus.resteasy.reactive + vertx-java-io + ${project.version} + org.wildfly.common wildfly-common diff --git a/independent-projects/resteasy-reactive/pom.xml b/independent-projects/resteasy-reactive/pom.xml index 76ccb42fe2ff2b..1efbf9a64e4794 100644 --- a/independent-projects/resteasy-reactive/pom.xml +++ b/independent-projects/resteasy-reactive/pom.xml @@ -135,6 +135,12 @@ ${project.version} + + io.quarkus.resteasy.reactive + vertx-java-io + ${project.version} + + io.quarkus.resteasy.reactive resteasy-reactive-client-processor diff --git a/independent-projects/resteasy-reactive/server/pom.xml b/independent-projects/resteasy-reactive/server/pom.xml index 131685df66fbc0..88a554fc8c333b 100644 --- a/independent-projects/resteasy-reactive/server/pom.xml +++ b/independent-projects/resteasy-reactive/server/pom.xml @@ -18,6 +18,7 @@ runtime processor vertx + vertx-java-io jsonb jackson diff --git a/independent-projects/resteasy-reactive/server/vertx-java-io/pom.xml b/independent-projects/resteasy-reactive/server/vertx-java-io/pom.xml new file mode 100644 index 00000000000000..53815e649feabb --- /dev/null +++ b/independent-projects/resteasy-reactive/server/vertx-java-io/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + + io.quarkus.resteasy.reactive + resteasy-reactive-server-parent + 999-SNAPSHOT + + + vertx-java-io + Ancillary classes for using Vert.x with frameworks designed to read/write from/to java.io streams + + + + + io.vertx + vertx-web + + + + org.jboss.logging + jboss-logging + + + + org.jboss.logmanager + jboss-logmanager + test + + + + + + + + net.bytebuddy + byte-buddy-maven-plugin + + + + + diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/AppendBuffer.java similarity index 99% rename from independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java rename to independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/AppendBuffer.java index 07212a4d2b6d06..89839610d845d4 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java +++ b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/AppendBuffer.java @@ -1,4 +1,4 @@ -package org.jboss.resteasy.reactive.server.vertx; +package io.quarkus.vertx.java.io; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; diff --git a/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxBufferImpl.java b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxBufferImpl.java new file mode 100644 index 00000000000000..2495631d771857 --- /dev/null +++ b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxBufferImpl.java @@ -0,0 +1,490 @@ +package io.quarkus.vertx.java.io; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.impl.Arguments; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; + +public class VertxBufferImpl implements Buffer { + + private ByteBuf buffer; + + public VertxBufferImpl(ByteBuf buffer) { + this.buffer = buffer; + } + + public String toString() { + return buffer.toString(StandardCharsets.UTF_8); + } + + public String toString(String enc) { + return buffer.toString(Charset.forName(enc)); + } + + public String toString(Charset enc) { + return buffer.toString(enc); + } + + @Override + public JsonObject toJsonObject() { + return new JsonObject(this); + } + + @Override + public JsonArray toJsonArray() { + return new JsonArray(this); + } + + public byte getByte(int pos) { + return buffer.getByte(pos); + } + + public short getUnsignedByte(int pos) { + return buffer.getUnsignedByte(pos); + } + + public int getInt(int pos) { + return buffer.getInt(pos); + } + + public int getIntLE(int pos) { + return buffer.getIntLE(pos); + } + + public long getUnsignedInt(int pos) { + return buffer.getUnsignedInt(pos); + } + + public long getUnsignedIntLE(int pos) { + return buffer.getUnsignedIntLE(pos); + } + + public long getLong(int pos) { + return buffer.getLong(pos); + } + + public long getLongLE(int pos) { + return buffer.getLongLE(pos); + } + + public double getDouble(int pos) { + return buffer.getDouble(pos); + } + + public float getFloat(int pos) { + return buffer.getFloat(pos); + } + + public short getShort(int pos) { + return buffer.getShort(pos); + } + + public short getShortLE(int pos) { + return buffer.getShortLE(pos); + } + + public int getUnsignedShort(int pos) { + return buffer.getUnsignedShort(pos); + } + + public int getUnsignedShortLE(int pos) { + return buffer.getUnsignedShortLE(pos); + } + + public int getMedium(int pos) { + return buffer.getMedium(pos); + } + + public int getMediumLE(int pos) { + return buffer.getMediumLE(pos); + } + + public int getUnsignedMedium(int pos) { + return buffer.getUnsignedMedium(pos); + } + + public int getUnsignedMediumLE(int pos) { + return buffer.getUnsignedMediumLE(pos); + } + + public byte[] getBytes() { + byte[] arr = new byte[buffer.writerIndex()]; + buffer.getBytes(0, arr); + return arr; + } + + public byte[] getBytes(int start, int end) { + Arguments.require(end >= start, "end must be greater or equal than start"); + byte[] arr = new byte[end - start]; + buffer.getBytes(start, arr, 0, end - start); + return arr; + } + + @Override + public Buffer getBytes(byte[] dst) { + return getBytes(dst, 0); + } + + @Override + public Buffer getBytes(byte[] dst, int dstIndex) { + return getBytes(0, buffer.writerIndex(), dst, dstIndex); + } + + @Override + public Buffer getBytes(int start, int end, byte[] dst) { + return getBytes(start, end, dst, 0); + } + + @Override + public Buffer getBytes(int start, int end, byte[] dst, int dstIndex) { + Arguments.require(end >= start, "end must be greater or equal than start"); + buffer.getBytes(start, dst, dstIndex, end - start); + return this; + } + + public Buffer getBuffer(int start, int end) { + return new VertxBufferImpl(Unpooled.wrappedBuffer(getBytes(start, end))); + } + + public String getString(int start, int end, String enc) { + byte[] bytes = getBytes(start, end); + Charset cs = Charset.forName(enc); + return new String(bytes, cs); + } + + public String getString(int start, int end) { + byte[] bytes = getBytes(start, end); + return new String(bytes, StandardCharsets.UTF_8); + } + + public Buffer appendBuffer(Buffer buff) { + buffer.writeBytes(buff.getByteBuf()); + return this; + } + + public Buffer appendBuffer(Buffer buff, int offset, int len) { + ByteBuf byteBuf = buff.getByteBuf(); + int from = byteBuf.readerIndex() + offset; + buffer.writeBytes(byteBuf, from, len); + return this; + } + + public Buffer appendBytes(byte[] bytes) { + buffer.writeBytes(bytes); + return this; + } + + public Buffer appendBytes(byte[] bytes, int offset, int len) { + buffer.writeBytes(bytes, offset, len); + return this; + } + + public Buffer appendByte(byte b) { + buffer.writeByte(b); + return this; + } + + public Buffer appendUnsignedByte(short b) { + buffer.writeByte(b); + return this; + } + + public Buffer appendInt(int i) { + buffer.writeInt(i); + return this; + } + + public Buffer appendIntLE(int i) { + buffer.writeIntLE(i); + return this; + } + + public Buffer appendUnsignedInt(long i) { + buffer.writeInt((int) i); + return this; + } + + public Buffer appendUnsignedIntLE(long i) { + buffer.writeIntLE((int) i); + return this; + } + + public Buffer appendMedium(int i) { + buffer.writeMedium(i); + return this; + } + + public Buffer appendMediumLE(int i) { + buffer.writeMediumLE(i); + return this; + } + + public Buffer appendLong(long l) { + buffer.writeLong(l); + return this; + } + + public Buffer appendLongLE(long l) { + buffer.writeLongLE(l); + return this; + } + + public Buffer appendShort(short s) { + buffer.writeShort(s); + return this; + } + + public Buffer appendShortLE(short s) { + buffer.writeShortLE(s); + return this; + } + + public Buffer appendUnsignedShort(int s) { + buffer.writeShort(s); + return this; + } + + public Buffer appendUnsignedShortLE(int s) { + buffer.writeShortLE(s); + return this; + } + + public Buffer appendFloat(float f) { + buffer.writeFloat(f); + return this; + } + + public Buffer appendDouble(double d) { + buffer.writeDouble(d); + return this; + } + + public Buffer appendString(String str, String enc) { + return append(str, Charset.forName(Objects.requireNonNull(enc))); + } + + public Buffer appendString(String str) { + return append(str, CharsetUtil.UTF_8); + } + + public Buffer setByte(int pos, byte b) { + ensureWritable(pos, 1); + buffer.setByte(pos, b); + return this; + } + + public Buffer setUnsignedByte(int pos, short b) { + ensureWritable(pos, 1); + buffer.setByte(pos, b); + return this; + } + + public Buffer setInt(int pos, int i) { + ensureWritable(pos, 4); + buffer.setInt(pos, i); + return this; + } + + public Buffer setIntLE(int pos, int i) { + ensureWritable(pos, 4); + buffer.setIntLE(pos, i); + return this; + } + + public Buffer setUnsignedInt(int pos, long i) { + ensureWritable(pos, 4); + buffer.setInt(pos, (int) i); + return this; + } + + public Buffer setUnsignedIntLE(int pos, long i) { + ensureWritable(pos, 4); + buffer.setIntLE(pos, (int) i); + return this; + } + + public Buffer setMedium(int pos, int i) { + ensureWritable(pos, 3); + buffer.setMedium(pos, i); + return this; + } + + public Buffer setMediumLE(int pos, int i) { + ensureWritable(pos, 3); + buffer.setMediumLE(pos, i); + return this; + } + + public Buffer setLong(int pos, long l) { + ensureWritable(pos, 8); + buffer.setLong(pos, l); + return this; + } + + public Buffer setLongLE(int pos, long l) { + ensureWritable(pos, 8); + buffer.setLongLE(pos, l); + return this; + } + + public Buffer setDouble(int pos, double d) { + ensureWritable(pos, 8); + buffer.setDouble(pos, d); + return this; + } + + public Buffer setFloat(int pos, float f) { + ensureWritable(pos, 4); + buffer.setFloat(pos, f); + return this; + } + + public Buffer setShort(int pos, short s) { + ensureWritable(pos, 2); + buffer.setShort(pos, s); + return this; + } + + public Buffer setShortLE(int pos, short s) { + ensureWritable(pos, 2); + buffer.setShortLE(pos, s); + return this; + } + + public Buffer setUnsignedShort(int pos, int s) { + ensureWritable(pos, 2); + buffer.setShort(pos, s); + return this; + } + + public Buffer setUnsignedShortLE(int pos, int s) { + ensureWritable(pos, 2); + buffer.setShortLE(pos, s); + return this; + } + + public Buffer setBuffer(int pos, Buffer b) { + ensureWritable(pos, b.length()); + buffer.setBytes(pos, b.getByteBuf()); + return this; + } + + public Buffer setBuffer(int pos, Buffer b, int offset, int len) { + ensureWritable(pos, len); + ByteBuf byteBuf = b.getByteBuf(); + buffer.setBytes(pos, byteBuf, byteBuf.readerIndex() + offset, len); + return this; + } + + public VertxBufferImpl setBytes(int pos, ByteBuffer b) { + ensureWritable(pos, b.limit()); + buffer.setBytes(pos, b); + return this; + } + + public Buffer setBytes(int pos, byte[] b) { + ensureWritable(pos, b.length); + buffer.setBytes(pos, b); + return this; + } + + public Buffer setBytes(int pos, byte[] b, int offset, int len) { + ensureWritable(pos, len); + buffer.setBytes(pos, b, offset, len); + return this; + } + + public Buffer setString(int pos, String str) { + return setBytes(pos, str, CharsetUtil.UTF_8); + } + + public Buffer setString(int pos, String str, String enc) { + return setBytes(pos, str, Charset.forName(enc)); + } + + public int length() { + return buffer.writerIndex(); + } + + public Buffer copy() { + return new VertxBufferImpl(buffer.copy()); + } + + public Buffer slice() { + return new VertxBufferImpl(buffer.slice()); + } + + public Buffer slice(int start, int end) { + return new VertxBufferImpl(buffer.slice(start, end - start)); + } + + public ByteBuf getByteBuf() { + // Return a duplicate so the Buffer can be written multiple times. + // See #648 + return buffer; + } + + private Buffer append(String str, Charset charset) { + byte[] bytes = str.getBytes(charset); + buffer.writeBytes(bytes); + return this; + } + + private Buffer setBytes(int pos, String str, Charset charset) { + byte[] bytes = str.getBytes(charset); + ensureWritable(pos, bytes.length); + buffer.setBytes(pos, bytes); + return this; + } + + private void ensureWritable(int pos, int len) { + int ni = pos + len; + int cap = buffer.capacity(); + int over = ni - cap; + if (over > 0) { + buffer.writerIndex(cap); + buffer.ensureWritable(over); + } + //We have to make sure that the writerindex is always positioned on the last bit of data set in the buffer + if (ni > buffer.writerIndex()) { + buffer.writerIndex(ni); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + VertxBufferImpl buffer1 = (VertxBufferImpl) o; + return buffer != null ? buffer.equals(buffer1.buffer) : buffer1.buffer == null; + } + + @Override + public int hashCode() { + return buffer != null ? buffer.hashCode() : 0; + } + + @Override + public void writeToBuffer(Buffer buff) { + buff.appendInt(this.length()); + buff.appendBuffer(this); + } + + @Override + public int readFromBuffer(int pos, Buffer buffer) { + int len = buffer.getInt(pos); + Buffer b = buffer.getBuffer(pos + 4, pos + 4 + len); + this.buffer = b.getByteBuf(); + return pos + 4 + len; + } +} diff --git a/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxJavaIoContext.java b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxJavaIoContext.java new file mode 100644 index 00000000000000..e78898914b5786 --- /dev/null +++ b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxJavaIoContext.java @@ -0,0 +1,42 @@ +package io.quarkus.vertx.java.io; + +import java.util.Optional; + +import io.vertx.ext.web.RoutingContext; + +public class VertxJavaIoContext { + private final RoutingContext context; + private final int minChunkSize; + private final int outputBufferSize; + + public VertxJavaIoContext(RoutingContext context, int minChunkSize, int outputBufferSize) { + this.context = context; + this.minChunkSize = minChunkSize; + this.outputBufferSize = outputBufferSize; + } + + public RoutingContext getRoutingContext() { + return context; + } + + public int getMinChunkSize() { + return minChunkSize; + } + + public int getOutputBufferSize() { + return outputBufferSize; + } + + /** + * The framework writing into {@link VertxOutputStream} may need a way to pass a user defined content length. + * You may want to override this method if you integrate such a framework. + *

+ * This implementation always returns an empty {@link Optional}. + * + * @return {@link Optional#empty()} + */ + public Optional getContentLength() { + return Optional.empty(); + } + +} diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxOutputStream.java similarity index 74% rename from independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java rename to independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxOutputStream.java index 66277a84605aef..efded229879540 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java +++ b/independent-projects/resteasy-reactive/server/vertx-java-io/src/main/java/io/quarkus/vertx/java/io/VertxOutputStream.java @@ -1,16 +1,12 @@ -package org.jboss.resteasy.reactive.server.vertx; +package io.quarkus.vertx.java.io; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; - -import jakarta.ws.rs.core.HttpHeaders; -import jakarta.ws.rs.core.MultivaluedMap; +import java.util.Optional; import org.jboss.logging.Logger; -import org.jboss.resteasy.reactive.server.core.LazyResponse; -import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpHeaderNames; @@ -19,12 +15,13 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; import io.vertx.core.http.impl.HttpServerRequestInternal; -public class ResteasyReactiveOutputStream extends OutputStream { +public class VertxOutputStream extends OutputStream { private static final Logger log = Logger.getLogger("org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveOutputStream"); - private final ResteasyReactiveRequestContext context; + private final VertxJavaIoContext context; protected final HttpServerRequest request; private final AppendBuffer appendBuffer; private boolean committed; @@ -35,12 +32,12 @@ public class ResteasyReactiveOutputStream extends OutputStream { protected Throwable throwable; private ByteArrayOutputStream overflow; - public ResteasyReactiveOutputStream(VertxResteasyReactiveRequestContext context) { + public VertxOutputStream(VertxJavaIoContext context) { this.context = context; - this.request = context.getContext().request(); + this.request = context.getRoutingContext().request(); this.appendBuffer = AppendBuffer.withMinChunks( - context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), - context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); + context.getMinChunkSize(), + context.getOutputBufferSize()); request.response().exceptionHandler(new Handler() { @Override public void handle(Throwable event) { @@ -60,7 +57,7 @@ public void handle(Throwable event) { request.response().drainHandler(handler); request.response().closeHandler(handler); - context.getContext().addEndHandler(new Handler>() { + context.getRoutingContext().addEndHandler(new Handler>() { @Override public void handle(AsyncResult event) { synchronized (request.connection()) { @@ -198,47 +195,27 @@ public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { if (!committed) { committed = true; + final HttpServerResponse response = request.response(); if (finished) { - if (!context.serverResponse().headWritten()) { + if (!response.headWritten()) { if (buffer == null) { - context.serverResponse().setResponseHeader(HttpHeaderNames.CONTENT_LENGTH, "0"); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); } else { - context.serverResponse().setResponseHeader(HttpHeaderNames.CONTENT_LENGTH, "" + buffer.readableBytes()); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buffer.readableBytes())); } } } else { - var contentLengthSet = contentLengthSet(request, context.getResponse()); - if (contentLengthSet == ContentLengthSetResult.NOT_SET) { - request.response().setChunked(true); - } else if (contentLengthSet == ContentLengthSetResult.IN_JAX_RS_HEADER) { - // we need to make sure the content-length header is copied to Vert.x headers - // otherwise we could run into a race condition: see https://github.com/quarkusio/quarkus/issues/26599 - Object contentLength = context.getResponse().get().getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH); - context.serverResponse().setResponseHeader(HttpHeaderNames.CONTENT_LENGTH, contentLength.toString()); + Optional contentLength = context.getContentLength(); + if (contentLength.isEmpty()) { + response.setChunked(true); + } else { + /* The framework writing into this stream may need a way to pass a user defined content length */ + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, contentLength.get()); } } } } - public static ContentLengthSetResult contentLengthSet(HttpServerRequest request, LazyResponse lazyResponse) { - if (request.response().headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { - return ContentLengthSetResult.IN_VERTX_HEADER; - } - if (!lazyResponse.isCreated()) { - return ContentLengthSetResult.NOT_SET; - } - MultivaluedMap responseHeaders = lazyResponse.get().getHeaders(); - return (responseHeaders != null) && responseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH) - ? ContentLengthSetResult.IN_JAX_RS_HEADER - : ContentLengthSetResult.NOT_SET; - } - - public enum ContentLengthSetResult { - NOT_SET, - IN_VERTX_HEADER, - IN_JAX_RS_HEADER - } - /** * {@inheritDoc} */ @@ -272,9 +249,9 @@ public void close() throws IOException { } private static class DrainHandler implements Handler { - private final ResteasyReactiveOutputStream out; + private final VertxOutputStream out; - public DrainHandler(ResteasyReactiveOutputStream out) { + public DrainHandler(VertxOutputStream out) { this.out = out; } diff --git a/independent-projects/resteasy-reactive/server/vertx/pom.xml b/independent-projects/resteasy-reactive/server/vertx/pom.xml index 6149a3d62dc5bf..53379aedbfca07 100644 --- a/independent-projects/resteasy-reactive/server/vertx/pom.xml +++ b/independent-projects/resteasy-reactive/server/vertx/pom.xml @@ -27,6 +27,10 @@ io.quarkus.resteasy.reactive resteasy-reactive + + io.quarkus.resteasy.reactive + vertx-java-io + jakarta.enterprise jakarta.enterprise.cdi-api diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java index d6a097ed92312b..8e5dd692e287a8 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java @@ -8,6 +8,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -15,9 +16,12 @@ import java.util.function.Consumer; import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MultivaluedMap; +import org.jboss.resteasy.reactive.common.ResteasyReactiveConfig; import org.jboss.resteasy.reactive.common.util.CaseInsensitiveMap; import org.jboss.resteasy.reactive.server.core.Deployment; +import org.jboss.resteasy.reactive.server.core.LazyResponse; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; import org.jboss.resteasy.reactive.server.core.multipart.FormData; import org.jboss.resteasy.reactive.server.spi.ServerHttpRequest; @@ -30,6 +34,8 @@ import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.util.concurrent.ScheduledFuture; +import io.quarkus.vertx.java.io.VertxJavaIoContext; +import io.quarkus.vertx.java.io.VertxOutputStream; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -454,7 +460,12 @@ public ServerHttpResponse sendFile(String path, long offset, long length) { @Override public OutputStream createResponseOutputStream() { - return new ResteasyReactiveOutputStream(this); + final ResteasyReactiveConfig config = getDeployment().getResteasyReactiveConfig(); + return new VertxOutputStream( + new ResteasyVertxJavaIoContext( + context, + config.getMinChunkSize(), + config.getOutputBufferSize())); } @Override @@ -498,4 +509,33 @@ enum ContinueState { REQUIRED, SENT; } + + final class ResteasyVertxJavaIoContext extends VertxJavaIoContext { + + public ResteasyVertxJavaIoContext(RoutingContext context, int minChunkSize, int outputBufferSize) { + super(context, minChunkSize, outputBufferSize); + } + + @Override + public Optional getContentLength() { + if (getRoutingContext().request().response().headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { + return Optional.empty(); + } + final LazyResponse lazyResponse = VertxResteasyReactiveRequestContext.this.getResponse(); + if (!lazyResponse.isCreated()) { + return Optional.empty(); + } + MultivaluedMap responseHeaders = lazyResponse.get().getHeaders(); + if (responseHeaders != null) { + // we need to make sure the content-length header is copied to Vert.x headers + // otherwise we could run into a race condition: see https://github.com/quarkusio/quarkus/issues/26599 + Object contentLength = responseHeaders.getFirst(HttpHeaders.CONTENT_LENGTH); + if (contentLength != null) { + return Optional.of(contentLength.toString()); + } + } + return Optional.empty(); + } + + } }