From 3e16ca606fd257cc393300cbad14ce2de47b2d95 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Sun, 27 Oct 2024 08:19:30 +0100 Subject: [PATCH] VertxHttpClientHTTPConduit asynchronous mode, fix #1447 --- .github/workflows/build.yml | 2 +- .../cxf/deployment/CxfClientProcessor.java | 15 + .../cxf/deployment/test/CertReloadTest.java | 26 +- .../deployment/test/Client3xx4xx5xxTest.java | 216 ++++++++++ .../java/io/quarkiverse/cxf/CXFRecorder.java | 6 + .../client/VertxHttpClientHTTPConduit.java | 387 +++++++++++++----- .../cxf/wsdl/QuarkusWSDLManager.java | 288 +++++++++++++ integration-tests/async-vertx-client/pom.xml | 128 ++++++ .../cxf/deployment/test/Hello.java | 64 +++ .../cxf/deployment/test/HelloResponse.java | 66 +++ .../cxf/deployment/test/HelloService.java | 47 +++ .../deployment/test/HelloService_Service.java | 90 ++++ .../cxf/deployment/test/ObjectFactory.java | 85 ++++ .../cxf/deployment/test/package-info.java | 2 + .../cxf/it/vertx/async/RestAsyncWithWsdl.java | 32 ++ .../async/RestAsyncWithWsdlWithBlocking.java | 33 ++ .../async/RestAsyncWithWsdlWithEagerInit.java | 66 +++ .../it/vertx/async/RestAsyncWithoutWsdl.java | 29 ++ .../RestAsyncWithoutWsdlWithBlocking.java | 31 ++ .../vertx/async/VerboseExceptionMapper.java | 30 ++ .../service/HelloWithWsdlWithEagerInit.java | 31 ++ .../vertx/async/service/HelloWithoutWsdl.java | 31 ++ .../service/HelloWithoutWsdlWithBlocking.java | 31 ++ .../src/main/resources/application.properties | 20 + .../wsdl/CalculatorService-async-binding.xml | 10 + .../resources/wsdl/CalculatorService.wsdl | 350 ++++++++++++++++ .../it/vertx/async/AsyncVertxClientIT.java | 7 + .../it/vertx/async/AsyncVertxClientTest.java | 136 ++++++ .../async/AsyncVertxClientTestResource.java | 52 +++ integration-tests/pom.xml | 1 + 30 files changed, 2190 insertions(+), 122 deletions(-) create mode 100644 extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/wsdl/QuarkusWSDLManager.java create mode 100644 integration-tests/async-vertx-client/pom.xml create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/Hello.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloResponse.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService_Service.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/ObjectFactory.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/package-info.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdl.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithBlocking.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithEagerInit.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdl.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdlWithBlocking.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/VerboseExceptionMapper.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithWsdlWithEagerInit.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdl.java create mode 100644 integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdlWithBlocking.java create mode 100644 integration-tests/async-vertx-client/src/main/resources/application.properties create mode 100644 integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService-async-binding.xml create mode 100644 integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService.wsdl create mode 100644 integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientIT.java create mode 100644 integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java create mode 100644 integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTestResource.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9e1c3fd27..674f4c84b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -38,7 +38,7 @@ jobs: strategy: fail-fast: false matrix: - testModule: ['client', 'client-server', 'fastinfoset', 'hc5', 'metrics', 'mtls', 'mtls -Djks', 'mtom', 'mtom-awt', 'opentelemetry', 'saaj', 'santuario-xmlsec', 'server', 'ws-rm-client', 'ws-security', 'ws-security -Djks', 'ws-security-policy', 'ws-security-policy -Djks', 'ws-trust', 'wsdl2java', 'wsdl2java-no-config'] + testModule: ['async-vertx-client', 'client', 'client-server', 'fastinfoset', 'hc5', 'metrics', 'mtls', 'mtls -Djks', 'mtom', 'mtom-awt', 'opentelemetry', 'saaj', 'santuario-xmlsec', 'server', 'ws-rm-client', 'ws-security', 'ws-security -Djks', 'ws-security-policy', 'ws-security-policy -Djks', 'ws-trust', 'wsdl2java', 'wsdl2java-no-config'] name: ${{matrix.testModule}} native tests needs: build-and-run-jvm-tests runs-on: ubuntu-latest diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java index 05597b34b..5909367e8 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java @@ -23,6 +23,8 @@ import org.apache.cxf.endpoint.Client; import org.apache.cxf.transport.http.HTTPTransportFactory; +import org.apache.cxf.wsdl11.CatalogWSDLLocator; +import org.apache.cxf.wsdl11.WSDLManagerImpl; import org.jboss.jandex.AnnotationInstance; import org.jboss.jandex.AnnotationTarget; import org.jboss.jandex.AnnotationValue; @@ -507,6 +509,19 @@ void customizers( } } + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + void workaroundAsyncWsdlInit( + CXFRecorder recorder, + BuildProducer customizers) { + customizers.produce(new RuntimeBusCustomizerBuildItem(recorder.setQuarkusWSDLManager())); + } + + @BuildStep + ReflectiveClassBuildItem workaroundAsyncWsdlInit() { + return ReflectiveClassBuildItem.builder(WSDLManagerImpl.class, CatalogWSDLLocator.class).fields().build(); + } + @BuildStep @Record(ExecutionTime.RUNTIME_INIT) void workaroundBadForceURLConnectionInit(CXFRecorder recorder) { diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/CertReloadTest.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/CertReloadTest.java index bfcc64ad5..da8936f08 100644 --- a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/CertReloadTest.java +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/CertReloadTest.java @@ -134,10 +134,7 @@ void verticeDeploy() throws IOException, InterruptedException, ExecutionExceptio vertx.undeploy(deplId).toCompletionStage().toCompletableFuture().get(); } /* Make sure the server is down */ - Assertions.assertThatThrownBy(() -> helloVertice.hello("Doe")) - .rootCause() - .hasMessageContaining("Connection refused") - .isInstanceOf(java.net.ConnectException.class); + assertServerDown(); /* Put the valid stores aside */ Files.move(localHostKs, localHostKsCp, StandardCopyOption.REPLACE_EXISTING); @@ -168,10 +165,7 @@ void verticeDeploy() throws IOException, InterruptedException, ExecutionExceptio } /* Make sure the server is down */ - Assertions.assertThatThrownBy(() -> helloVertice.hello("Doe")) - .rootCause() - .hasMessageContaining("Connection refused") - .isInstanceOf(java.net.ConnectException.class); + assertServerDown(); /* Revert everything back */ Files.move(localHostKsCp, localHostKs, StandardCopyOption.REPLACE_EXISTING); @@ -194,13 +188,21 @@ void verticeDeploy() throws IOException, InterruptedException, ExecutionExceptio } /* Make sure the server is down */ - Assertions.assertThatThrownBy(() -> helloVertice.hello("Doe")) - .rootCause() - .hasMessageContaining("Connection refused") - .isInstanceOf(java.net.ConnectException.class); + assertServerDown(); } + private void assertServerDown() { + Awaitility.await().atMost(3000, TimeUnit.SECONDS).until(() -> { + try { + helloVertice.hello("Doe"); + return false; + } catch (Exception e) { + return rootCause(e).getMessage().equals("Connection refused"); + } + }); + } + @Test void simple() throws IOException, InterruptedException, ExecutionException { diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java new file mode 100644 index 000000000..fb3d70275 --- /dev/null +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java @@ -0,0 +1,216 @@ +package io.quarkiverse.cxf.deployment.test; + +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.Instance; +import jakarta.jws.WebMethod; +import jakarta.jws.WebService; + +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.frontend.ClientProxy; +import org.apache.cxf.transport.http.URLConnectionHTTPConduit; +import org.assertj.core.api.Assertions; +import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; + +public class Client3xx4xx5xxTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(HelloService.class, HelloServiceImpl.class)) + + /* Service */ + .overrideConfigKey("quarkus.cxf.endpoint.\"/hello\".implementor", + HelloServiceImpl.class.getName()) + .overrideConfigKey("quarkus.cxf.endpoint.\"/hello\".logging.enabled", "true") + + /* Clients */ + .overrideConfigKey("quarkus.cxf.client.wsdlUri200.client-endpoint-url", "http://localhost:8081/services/hello") + .overrideConfigKey("quarkus.cxf.client.wsdlUri200.wsdl", "http://localhost:8081/services/hello?wsdl") + // Not needed when the WSDL is set and HelloService has both serviceName and targetNamespace set + //.overrideConfigKey("quarkus.cxf.client.wsdlUri404.service-interface", HelloService.class.getName()) + .overrideConfigKey("quarkus.cxf.client.wsdlUri200.logging.enabled", "true") + + /* Bad WSDL URI */ + .overrideConfigKey("quarkus.cxf.client.wsdlUri404.client-endpoint-url", "http://localhost:8081/services/hello") + .overrideConfigKey("quarkus.cxf.client.wsdlUri404.wsdl", "http://localhost:8081/services/no-such-service?wsdl") + .overrideConfigKey("quarkus.cxf.client.wsdlUri404.logging.enabled", "true") + + /* Bad service endpoint URI */ + .overrideConfigKey("quarkus.cxf.client.endpointUri404.client-endpoint-url", + "http://localhost:8081/services/no-such-service") + .overrideConfigKey("quarkus.cxf.client.endpointUri404.service-interface", HelloService.class.getName()) + .overrideConfigKey("quarkus.cxf.client.endpointUri404.logging.enabled", "true"); + + @CXFClient("wsdlUri200") + // Use Instance to avoid greedy initialization + Instance wsdlUri200; + + @CXFClient("wsdlUri404") + Instance wsdlUri404; + + @CXFClient("endpointUri404") + Instance endpointUri404; + + Instance getClient(String clientName) { + switch (clientName) { + case "wsdlUri200": { + return wsdlUri200; + } + case "wsdlUri404": { + return wsdlUri404; + } + case "endpointUri404": { + return endpointUri404; + } + default: + throw new IllegalArgumentException("Unexpected client name: " + clientName); + } + } + + @Test + void wsdlUri200() { + Assertions.assertThat(wsdlUri200.get().hello("foo")).isEqualTo("Hello foo"); + } + + @Test + void wsdlUri404() { + Assertions.assertThatThrownBy(() -> wsdlUri404.get().hello("foo")) + .hasRootCauseInstanceOf(org.apache.cxf.transport.http.HTTPException.class) + .hasRootCauseMessage( + "HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service?wsdl"); + } + + @Test + void endpointUri404() { + Assertions.assertThatThrownBy(() -> endpointUri404.get().hello("foo")).hasRootCauseMessage( + "HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service"); + } + + public void init(@Observes Router router) { + router.route().handler(BodyHandler.create()); + router.post("/vertx-blocking/:client").blockingHandler(ctx -> { + final String person = ctx.body().asString(); + final String resp = getClient(ctx.pathParam("client")).get().hello(person); + ctx.response().end(resp); + }); + router.post("/vertx/:client").handler(ctx -> { + final String person = ctx.body().asString(); + try { + final String resp = getClient(ctx.pathParam("client")).get().hello(person); + ctx.response().end(resp); + } catch (Exception e) { + Throwable r = rootCause(e); + ctx.response().setStatusCode(500).end(r.getClass().getName() + " " + r.getMessage()); + } + }); + } + + @Test + void wsdlUri200OnWorkerThread() { + RestAssured.given() + .body("Joe") + .post("http://localhost:8081/vertx-blocking/wsdlUri200") + .then() + .statusCode(200) + .body(Matchers.is("Hello Joe")); + } + + @Test + void wsdlUri200OnEventLoop() throws InterruptedException { + final Client client = ClientProxy.getClient(wsdlUri200.get()); + if (client.getConduit() instanceof URLConnectionHTTPConduit) { + /* URLConnectionHTTPConduit is not as picky as VertxHttpClientHTTPConduit */ + RestAssured.given() + .body("Joe") + .post("http://localhost:8081/vertx/wsdlUri200") + .then() + .statusCode(200) + .body(Matchers.is("Hello Joe")); + } else { + /* VertxHttpClientHTTPConduit */ + RestAssured.given() + .body("Joe") + .post("http://localhost:8081/vertx/wsdlUri200") + .then() + .statusCode(500) + .body(CoreMatchers.containsString( + "java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread.")); + } + + } + + @Test + void endpointUri404OnWorkerThread() { + RestAssured.given() + .body("Joe") + .post("http://localhost:8081/vertx-blocking/endpointUri404") + .then() + .statusCode(500) + .body(CoreMatchers.containsString( + "org.apache.cxf.transport.http.HTTPException: HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service")); + } + + @Test + void endpointUri404OnEventLoop() throws InterruptedException { + final Client client = ClientProxy.getClient(endpointUri404.get()); + if (client.getConduit() instanceof URLConnectionHTTPConduit) { + /* URLConnectionHTTPConduit is not as picky as VertxHttpClientHTTPConduit */ + RestAssured.given() + .body("Joe") + .post("http://localhost:8081/vertx/endpointUri404") + .then() + .statusCode(500) + .body(CoreMatchers.containsString( + "org.apache.cxf.transport.http.HTTPException HTTP response '404: Not Found' when communicating with http://localhost:8081/services/no-such-service")); + } else { + /* VertxHttpClientHTTPConduit */ + RestAssured.given() + .body("Joe") + .post("http://localhost:8081/vertx/endpointUri404") + .then() + .statusCode(500) + .body(CoreMatchers.containsString( + "java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread.")); + + } + + } + + private static Throwable rootCause(Exception e) { + e.printStackTrace(); + Throwable result = e; + while (result.getCause() != null) { + result = result.getCause(); + } + return result; + } + + @WebService(serviceName = "HelloService", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/") + public interface HelloService { + + @WebMethod + String hello(String person); + + } + + @WebService(serviceName = "HelloService") + public static class HelloServiceImpl implements HelloService { + + @Override + public String hello(String person) { + return "Hello " + person; + } + } + +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java index a481ee763..ef2e7ed28 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java @@ -10,12 +10,14 @@ import org.apache.cxf.Bus; import org.apache.cxf.transport.http.HTTPTransportFactory; +import org.apache.cxf.wsdl.WSDLManager; import org.jboss.logging.Logger; import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Supplier; import io.quarkiverse.cxf.annotation.CXFEndpoint; import io.quarkiverse.cxf.transport.CxfHandler; import io.quarkiverse.cxf.transport.VertxDestinationFactory; +import io.quarkiverse.cxf.wsdl.QuarkusWSDLManager; import io.quarkus.arc.Arc; import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.runtime.RuntimeValue; @@ -285,6 +287,10 @@ public RuntimeValue> setBusHTTPConduitFactory(HTTPConduitImpl fact return new RuntimeValue<>(bus -> bus.setExtension(factory, HTTPConduitSpec.class)); } + public RuntimeValue> setQuarkusWSDLManager() { + return new RuntimeValue<>(bus -> bus.setExtension(QuarkusWSDLManager.newInstance(bus), WSDLManager.class)); + } + public void workaroundBadForceURLConnectionInit() { // A workaround for the bad initialization of HTTPTransportFactory.forceURLConnectionConduit // in the downstream CXF 4.0.5.fuse-redhat-00012: diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java index e5ff07c1d..f9088303e 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java @@ -41,10 +41,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Logger; import org.apache.cxf.Bus; -import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.configuration.jsse.TLSClientParameters; import org.apache.cxf.endpoint.ClientCallback; import org.apache.cxf.endpoint.Endpoint; @@ -53,6 +51,7 @@ import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.phase.PhaseInterceptorChain; import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.Conduit; import org.apache.cxf.transport.MessageObserver; @@ -65,10 +64,15 @@ import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.version.Version; import org.apache.cxf.ws.addressing.EndpointReferenceType; +import org.eclipse.microprofile.context.ManagedExecutor; +import org.jboss.logging.Logger; import io.quarkiverse.cxf.QuarkusTLSClientParameters; import io.quarkiverse.cxf.vertx.http.client.HttpClientPool.ClientSpec; import io.quarkiverse.cxf.vertx.http.client.VertxHttpClientHTTPConduit.RequestBodyEvent.RequestBodyEventType; +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; +import io.quarkus.runtime.BlockingOperationControl; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -87,7 +91,9 @@ /** */ public class VertxHttpClientHTTPConduit extends HTTPConduit { - private static final Logger LOG = LogUtils.getL7dLogger(VertxHttpClientHTTPConduit.class); + private static final Logger log = Logger.getLogger(VertxHttpClientHTTPConduit.class); + public static final String USE_ASYNC = "use.async.http.conduit"; + public static final String ENABLE_HTTP2 = "org.apache.cxf.transports.http2.enabled"; private final HttpClientPool httpClientPool; private final String userAgent; @@ -109,25 +115,25 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic final HttpMethod method = getMethod(message); - // message.put("use.async.http.conduit", Boolean.TRUE); + final UseAsyncPolicy useAsync = UseAsyncPolicy.of(message.getContextualProperty(USE_ASYNC)); + final boolean isAsync = useAsync.isAsync(message); + message.put(USE_ASYNC, isAsync); + + if (!isAsync && !BlockingOperationControl.isBlockingAllowed()) { + throw new IllegalStateException("You have attempted to perform a blocking operation on an IO thread." + + " This is not allowed, as blocking the IO thread will cause major performance issues with your application." + + " You need to offload the blocking CXF client call to a worker thread," + + " e.g. by using the @io.smallrye.common.annotation.Blocking annotation on a caller method" + + " where it is supported by the underlying Quarkus extension, such as quarkus-rest, quarkus-vertx," + + " quarkus-reactive-routes, quarkus-grpc, quarkus-messaging-* and possibly others."); + } final HttpVersion version = getVersion(message, csPolicy); final boolean isHttps = "https".equals(uri.getScheme()); final QuarkusTLSClientParameters clientParameters; if (isHttps) { clientParameters = findTLSClientParameters(message); - if (clientParameters.getSSLSocketFactory() != null) { - throw new IllegalStateException(VertxHttpClientHTTPConduit.class.getName() - + " does not support SSLSocketFactory set via TLSClientParameters"); - } - if (clientParameters.getSslContext() != null) { - throw new IllegalStateException(VertxHttpClientHTTPConduit.class.getName() - + " does not support SSLContext set via TLSClientParameters"); - } - if (clientParameters.isUseHttpsURLConnectionDefaultSslSocketFactory()) { - throw new IllegalStateException(VertxHttpClientHTTPConduit.class.getName() - + " does not support TLSClientParameters.isUseHttpsURLConnectionDefaultSslSocketFactory() returning true"); - } + validateClientParameters(clientParameters); final List trustDeciders; final MessageTrustDecider decider2; if ((decider2 = message.get(MessageTrustDecider.class)) != null || this.trustDecider != null) { @@ -178,11 +184,27 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic ? new ClientSpec(version, clientParameters.getTlsConfigurationName(), clientParameters.getTlsConfiguration()) : new ClientSpec(version, null, null), - determineReceiveTimeout(message, csPolicy)); + determineReceiveTimeout(message, csPolicy), + isAsync); message.put(RequestContext.class, requestContext); } + private static void validateClientParameters(QuarkusTLSClientParameters clientParameters) { + if (clientParameters.getSSLSocketFactory() != null) { + throw new IllegalStateException(VertxHttpClientHTTPConduit.class.getName() + + " does not support SSLSocketFactory set via TLSClientParameters"); + } + if (clientParameters.getSslContext() != null) { + throw new IllegalStateException(VertxHttpClientHTTPConduit.class.getName() + + " does not support SSLContext set via TLSClientParameters"); + } + if (clientParameters.isUseHttpsURLConnectionDefaultSslSocketFactory()) { + throw new IllegalStateException(VertxHttpClientHTTPConduit.class.getName() + + " does not support TLSClientParameters.isUseHttpsURLConnectionDefaultSslSocketFactory() returning true"); + } + } + static ProxyType toProxyType(Type type) { switch (type) { case HTTP: @@ -215,12 +237,17 @@ protected OutputStream createOutputStream( requestContext.requestOptions, requestContext.clientSpec, requestContext.receiveTimeoutMs, - responseHandler); + responseHandler, + requestContext.async); return new RequestBodyOutputStream(chunkThreshold, requestBodyHandler); } static HttpVersion getVersion(Message message, HTTPClientPolicy csPolicy) { String verc = (String) message.getContextualProperty(FORCE_HTTP_VERSION); + final Object enableHttp2 = message.getContextualProperty(ENABLE_HTTP2); + if (verc == null && enableHttp2 != null) { + csPolicy.setVersion("2"); + } if (verc == null) { verc = csPolicy.getVersion(); } @@ -292,7 +319,8 @@ static record RequestContext( URI uri, RequestOptions requestOptions, ClientSpec clientSpec, - long receiveTimeoutMs) { + long receiveTimeoutMs, + boolean async) { } static record RequestBodyEvent(Buffer buffer, RequestBodyEventType eventType) { @@ -378,9 +406,6 @@ static class RequestBodyHandler implements IOEHandler { private final HttpClientPool clientPool; private final RequestOptions requestOptions; private final ClientSpec clientSpec; - /** Time in epoch milliseconds when the response should be fully received */ - private final long receiveTimeoutDeadline; - private final IOEHandler responseHandler; /** Read an written only from the producer thread */ private boolean firstEvent = true; @@ -389,21 +414,16 @@ static class RequestBodyHandler implements IOEHandler { * {@link Condition} */ private Result request; - /** - * Read from the producer thread, written from the event loop. Protected by {@link #lock} {@link #responseReceived} - * {@link Condition} - */ - private Result response; /* Locks and conditions */ private final ReentrantLock lock = new ReentrantLock(); private final Condition requestReady = lock.newCondition(); private final Condition requestWriteable = lock.newCondition(); - private final Condition responseReceived = lock.newCondition(); /* Backpressure control when writing the request body */ private boolean drainHandlerRegistered; private boolean waitingForDrain; + private Mode mode; public RequestBodyHandler( Message outMessage, @@ -413,7 +433,8 @@ public RequestBodyHandler( RequestOptions requestOptions, ClientSpec clientSpec, long receiveTimeoutMs, - IOEHandler responseHandler) { + IOEHandler responseHandler, + boolean isAsync) { super(); this.outMessage = outMessage; this.url = url; @@ -421,8 +442,11 @@ public RequestBodyHandler( this.clientPool = clientPool; this.requestOptions = requestOptions; this.clientSpec = clientSpec; - this.receiveTimeoutDeadline = System.currentTimeMillis() + receiveTimeoutMs; - this.responseHandler = responseHandler; + + final long deadline = System.currentTimeMillis() + receiveTimeoutMs; + this.mode = isAsync + ? new Mode.Async(url, deadline, responseHandler, outMessage) + : new Mode.Sync(url, deadline, responseHandler, lock); } @Override @@ -431,18 +455,8 @@ public void handle(RequestBodyEvent event) throws IOException { firstEvent = false; final HttpClient client = clientPool.getClient(clientSpec); - switch (event.eventType()) { - case NON_FINAL_CHUNK: - case FINAL_CHUNK: { - break; - } - case COMPLETE_BODY: { - requestOptions.putHeader("Content-Length", String.valueOf(event.buffer().length())); - break; - } - default: - throw new IllegalArgumentException( - "Unexpected " + RequestBodyEventType.class.getName() + ": " + event.eventType()); + if (event.eventType() == RequestBodyEventType.COMPLETE_BODY && requestHasBody(requestOptions.getMethod())) { + requestOptions.putHeader("Content-Length", String.valueOf(event.buffer().length())); } setProtocolHeaders(outMessage, requestOptions, userAgent); @@ -454,7 +468,7 @@ public void handle(RequestBodyEvent event) throws IOException { req .setChunked(true) .write(event.buffer()) - .onFailure(RequestBodyHandler.this::failResponse); + .onFailure(t -> mode.responseFailed(t, true)); lock.lock(); try { @@ -484,8 +498,7 @@ public void handle(RequestBodyEvent event) throws IOException { requestReady.signal(); /* Fail also the response so that awaitResponse() fails rather than waiting forever */ - response = Result.failure(t); - responseReceived.signal(); + mode.responseFailed(t, false); } finally { lock.unlock(); } @@ -497,7 +510,7 @@ public void handle(RequestBodyEvent event) throws IOException { break; case FINAL_CHUNK: case COMPLETE_BODY: { - responseHandler.handle(awaitResponse()); + mode.awaitResponse(); break; } default: @@ -519,7 +532,7 @@ public void handle(RequestBodyEvent event) throws IOException { case FINAL_CHUNK: case COMPLETE_BODY: { finishRequest(req, event.buffer()); - responseHandler.handle(awaitResponse()); + mode.awaitResponse(); break; } default: @@ -547,19 +560,13 @@ void finishRequest(HttpClientRequest req, Buffer buffer) { pipedInputStream.setException(new IOException(ar.cause())); } } - lock.lock(); - try { - response = new Result<>(new ResponseEvent(ar.result(), pipedInputStream), - ar.cause()); - responseReceived.signal(); - } finally { - lock.unlock(); - } + mode.responseReady(new Result<>(new ResponseEvent(ar.result(), pipedInputStream), + ar.cause())); }); req .end(buffer) - .onFailure(RequestBodyHandler.this::failResponse); + .onFailure(t -> mode.responseFailed(t, true)); } catch (IOException e) { throw new VertxHttpException(e); @@ -567,13 +574,6 @@ void finishRequest(HttpClientRequest req, Buffer buffer) { } void failResponse(Throwable t) { - lock.lock(); - try { - response = Result.failure(t); - responseReceived.signal(); - } finally { - lock.unlock(); - } } static void setProtocolHeaders(Message outMessage, RequestOptions requestOptions, String userAgent) throws IOException { @@ -646,31 +646,6 @@ static boolean requestHasBody(HttpMethod method) { return true; } - ResponseEvent awaitResponse() throws IOException { - /* This should be called from the same worker thread as handle() */ - if (response == null) { - lock.lock(); - try { - if (response == null) { - if (!responseReceived.await(receiveTimeout(), TimeUnit.MILLISECONDS) || response == null) { - throw new SocketTimeoutException("Timeout waiting for HTTP response from " + url); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted waiting for HTTP response from " + url, e); - } finally { - lock.unlock(); - } - } - if (response.succeeded()) { - return response.result(); - } else { - final Throwable e = response.cause(); - throw new IOException("Unable to receive HTTP response from " + url, e); - } - } - HttpClientRequest awaitRequest() throws IOException { /* This should be called from the same worker thread as handle() */ if (request == null) { @@ -758,27 +733,183 @@ public void handle(Void event) { } try { waitingForDrain = true; - requestWriteable.await(receiveTimeout(), TimeUnit.MILLISECONDS); + if (!requestWriteable.await(mode.receiveTimeout(), TimeUnit.MILLISECONDS)) { + throw new SocketTimeoutException("Timeout waiting for sending HTTP headers to " + url); + } } finally { waitingForDrain = false; } } } - /** - * Computes the timeout for receive related operations based on {@link #receiveTimeoutDeadline} - * - * @return the timeout in milliseconds for response related operations - * @throws SocketTimeoutException if {@link #receiveTimeoutDeadline} was missed already - */ - long receiveTimeout() throws SocketTimeoutException { - final long timeout = receiveTimeoutDeadline - System.currentTimeMillis(); - if (timeout <= 0) { - /* Too late already */ - throw new SocketTimeoutException("Timeout waiting for HTTP response from " + url); + static abstract class Mode { + /** Time in epoch milliseconds when the response should be fully received */ + private final long receiveTimeoutDeadline; + protected final URI url; + protected final IOEHandler responseHandler; + + Mode(URI url, long receiveTimeoutDeadline, IOEHandler responseHandler) { + this.url = url; + this.receiveTimeoutDeadline = receiveTimeoutDeadline; + this.responseHandler = responseHandler; + } + + /** + * Computes the timeout for receive related operations based on {@link #receiveTimeoutDeadline} + * + * @return the timeout in milliseconds for response related operations + * @throws SocketTimeoutException if {@link #receiveTimeoutDeadline} was missed already + */ + long receiveTimeout() throws SocketTimeoutException { + final long timeout = receiveTimeoutDeadline - System.currentTimeMillis(); + if (timeout <= 0) { + /* Too late already */ + throw new SocketTimeoutException("Timeout waiting for HTTP response from " + url); + } + return timeout; + } + + protected abstract void responseFailed(Throwable t, boolean lockIfNeeded); + + protected abstract void responseReady(Result response); + + protected abstract void awaitResponse() throws IOException; + + static class Sync extends Mode { + private final ReentrantLock lock; + private final Condition responseReceived; + /** + * Read from the producer thread, written from the event loop. Protected by {@link #lock} + * {@link #responseReceived} + * {@link Condition} + */ + private Result response; + + Sync(URI url, long receiveTimeoutDeadline, IOEHandler responseHandler, ReentrantLock lock) { + super(url, receiveTimeoutDeadline, responseHandler); + this.lock = lock; + this.responseReceived = lock.newCondition(); + } + + @Override + public void responseFailed(Throwable t, boolean lockIfNeeded) { + if (lockIfNeeded) { + lock.lock(); + try { + response = Result.failure(t); + responseReceived.signal(); + } finally { + lock.unlock(); + } + } else { + assert lock.isHeldByCurrentThread(); + response = Result.failure(t); + responseReceived.signal(); + } + } + + @Override + public void responseReady(Result response) { + lock.lock(); + try { + this.response = response; + responseReceived.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void awaitResponse() throws IOException { + responseHandler.handle(awaitResponseInternal()); + } + + ResponseEvent awaitResponseInternal() throws IOException { + /* This should be called from the same worker thread as handle() */ + if (response == null) { + lock.lock(); + try { + if (response == null) { + if (!responseReceived.await(receiveTimeout(), TimeUnit.MILLISECONDS) || response == null) { + throw new SocketTimeoutException("Timeout waiting for HTTP response from " + url); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted waiting for HTTP response from " + url, e); + } finally { + lock.unlock(); + } + } + if (response.succeeded()) { + return response.result(); + } else { + final Throwable e = response.cause(); + throw new IOException("Unable to receive HTTP response from " + url, e); + } + } + + } + + static class Async extends Mode { + private final Message outMessage; + + Async(URI url, long receiveTimeoutDeadline, IOEHandler responseHandler, Message outMessage) { + super(url, receiveTimeoutDeadline, responseHandler); + this.outMessage = outMessage; + } + + @Override + protected void responseFailed(Throwable t, boolean lockIfNeeded) { + // dispatch on worker thread + responseReady(Result.failure(t)); + } + + protected void responseFailedOnWorkerThread(Throwable t) { + // on worker thread already + ((PhaseInterceptorChain) outMessage.getInterceptorChain()).abort(); + outMessage.setContent(Exception.class, t); + if (t instanceof Exception) { + outMessage.put(Exception.class, (Exception) t); + } else { + // FIXME: log this special case + } + ((PhaseInterceptorChain) outMessage.getInterceptorChain()).unwind(outMessage); + MessageObserver mo = outMessage.getInterceptorChain().getFaultObserver(); + if (mo == null) { + mo = outMessage.getExchange().get(MessageObserver.class); + } + mo.onMessage(outMessage); + } + + @Override + protected void responseReady(Result response) { + // dispatch on worker thread + final InstanceHandle managedExecutorInst = Arc.container().instance(ManagedExecutor.class); + if (!managedExecutorInst.isAvailable()) { + throw new IllegalStateException(ManagedExecutor.class.getName() + " not available in Arc"); + } + managedExecutorInst.get().execute(() -> { + if (response.succeeded()) { + try { + responseHandler.handle(response.result()); + } catch (Throwable e) { + responseFailedOnWorkerThread(e); + } + } else { + responseFailedOnWorkerThread(response.cause()); + } + }); + } + + @Override + protected void awaitResponse() throws IOException { + /* Nothing to do in async mode because we dispatch the response via responseReady */ + } + } - return timeout; } + } static record ResponseEvent(HttpClientResponse response, InputStream responseBodyInputStream) { @@ -882,9 +1013,7 @@ public void handle(ResponseEvent responseEvent) throws IOException { final String charset = HttpHeaderHelper.findCharset((String) inMessage.get(Message.CONTENT_TYPE)); final String normalizedEncoding = HttpHeaderHelper.mapCharset(charset); if (normalizedEncoding == null) { - final String m = new org.apache.cxf.common.i18n.Message("INVALID_ENCODING_MSG", - LOG, charset).toString(); - throw new VertxHttpException(m); + throw new VertxHttpException("Invalid character set " + charset + " in request"); } inMessage.put(Message.ENCODING, normalizedEncoding); if (in == null) { @@ -899,9 +1028,6 @@ public void handle(ResponseEvent responseEvent) throws IOException { static int doProcessResponseCode(URI uri, HttpClientResponse response, Exchange exchange, Message outMessage) throws IOException { final int rc = response.statusCode(); - if (rc == -1) { - LOG.warning("HTTP Response code appears to be corrupted"); - } if (exchange != null) { exchange.put(Message.RESPONSE_CODE, rc); final Collection serviceNotAvailableOnHttpStatusCodes = MessageUtils @@ -945,7 +1071,7 @@ static InputStream getPartialResponse(HttpClientResponse response, InputStream r try { contentLength = Integer.parseInt(rawContentLength); } catch (NumberFormatException e) { - LOG.fine("Could not parse Content-Length value " + rawContentLength); + log.debug("Could not parse Content-Length value " + rawContentLength); } } final String transferEncoding = headers.get(HttpHeaderHelper.TRANSFER_ENCODING); @@ -1073,4 +1199,47 @@ public interface IOEHandler { */ void handle(E event) throws IOException; } + + public enum UseAsyncPolicy { + ALWAYS(true), + NEVER(false), + ASYNC_ONLY(false) { + @Override + public boolean isAsync(Message message) { + return !message.getExchange().isSynchronous(); + } + }; + + private final boolean async; + + private UseAsyncPolicy(Boolean async) { + this.async = async; + } + + static final Map values = Map.of( + "ALWAYS", ALWAYS, + "always", ALWAYS, + "ASYNC_ONLY", ASYNC_ONLY, + "async_only", ASYNC_ONLY, + "NEVER", NEVER, + "never", NEVER, + Boolean.TRUE, ALWAYS, + Boolean.FALSE, NEVER); + + public static UseAsyncPolicy of(Object st) { + if (st == null) { + return UseAsyncPolicy.ASYNC_ONLY; + } + if (st instanceof UseAsyncPolicy) { + return (UseAsyncPolicy) st; + } + final UseAsyncPolicy result = values.get(st); + return result != null ? result : ASYNC_ONLY; + } + + public boolean isAsync(Message message) { + return async; + } + }; + } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/wsdl/QuarkusWSDLManager.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/wsdl/QuarkusWSDLManager.java new file mode 100644 index 000000000..88d41fd07 --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/wsdl/QuarkusWSDLManager.java @@ -0,0 +1,288 @@ +package io.quarkiverse.cxf.wsdl; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.wsdl.Definition; +import javax.wsdl.WSDLException; +import javax.wsdl.extensions.ExtensionRegistry; +import javax.wsdl.xml.WSDLReader; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusException; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.helpers.LoadingByteArrayOutputStream; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.ExchangeImpl; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.staxutils.StaxUtils; +import org.apache.cxf.staxutils.XMLStreamReaderWrapper; +import org.apache.cxf.transport.Conduit; +import org.apache.cxf.transport.ConduitInitiator; +import org.apache.cxf.transport.ConduitInitiatorManager; +import org.apache.cxf.transport.MessageObserver; +import org.apache.cxf.transport.TransportURIResolver; +import org.apache.cxf.wsdl11.CatalogWSDLLocator; +import org.apache.cxf.wsdl11.ResourceManagerWSDLLocator; +import org.apache.cxf.wsdl11.WSDLManagerImpl; +import org.w3c.dom.Document; +import org.xml.sax.InputSource; + +public class QuarkusWSDLManager extends WSDLManagerImpl { + + private final ExtensionRegistry registry; + private XMLStreamReaderWrapper xmlStreamReaderWrapper; + + public static QuarkusWSDLManager newInstance(Bus b) { + try { + return new QuarkusWSDLManager(b); + } catch (BusException e) { + throw new RuntimeException(e); + } + } + + private QuarkusWSDLManager(Bus b) throws BusException { + super(); + this.registry = getField(WSDLManagerImpl.class, this, "registry"); + setBus(b); + } + + @SuppressWarnings("unchecked") + private static T getField(Class cl, Object inst, String fieldName) { + try { + final Field fld = cl.getDeclaredField(fieldName); + fld.setAccessible(true); + return (T) fld.get(inst); + } catch (IllegalArgumentException | IllegalAccessException | NoSuchFieldException | SecurityException e) { + throw new RuntimeException(e); + } + } + + private static void setField(Class cl, Object inst, String fieldName, Object value) { + try { + final Field fld = cl.getDeclaredField(fieldName); + fld.setAccessible(true); + fld.set(inst, value); + } catch (IllegalArgumentException | IllegalAccessException | NoSuchFieldException | SecurityException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings({ "removal", "deprecation" }) + @Override + protected Definition loadDefinition(String url) throws WSDLException { + final Bus bus = getBus(); + final WSDLReader reader = getWSDLFactory().newWSDLReader(); + reader.setFeature("javax.wsdl.verbose", false); + reader.setFeature("javax.wsdl.importDocuments", true); + reader.setExtensionRegistry(registry); + + //we'll create a new String here to make sure the passed in key is not referenced in the loading of + //the wsdl and thus would be held onto from the cached map from both the weak reference (key) and + //from the strong reference (Definition). For example, the Definition sometimes keeps the original + //string as the documentBaseLocation which would result in it being held onto strongly + //from the definition. With this, the String the definition holds onto would be unique + url = new String(url); + CatalogWSDLLocator catLocator = new CatalogWSDLLocator(url, bus); + setField(CatalogWSDLLocator.class, catLocator, "resolver", new QuarkusTransportURIResolver(bus)); + + final ResourceManagerWSDLLocator wsdlLocator = new ResourceManagerWSDLLocator(url, + catLocator, + bus); + InputSource src = wsdlLocator.getBaseInputSource(); + final Definition def; + if (src.getByteStream() != null || src.getCharacterStream() != null) { + final Document doc; + XMLStreamReader xmlReader = null; + try { + xmlReader = StaxUtils.createXMLStreamReader(src); + if (xmlStreamReaderWrapper != null) { + xmlReader = xmlStreamReaderWrapper.wrap(xmlReader); + } + doc = StaxUtils.read(xmlReader, true); + if (src.getSystemId() != null) { + try { + doc.setDocumentURI(new String(src.getSystemId())); + } catch (Exception e) { + //ignore - probably not DOM level 3 + } + } + } catch (Exception e) { + throw new WSDLException(WSDLException.PARSER_ERROR, e.getMessage(), e); + } finally { + try { + StaxUtils.close(xmlReader); + } catch (XMLStreamException ex) { + throw new WSDLException(WSDLException.PARSER_ERROR, ex.getMessage(), ex); + } + } + + // This is needed to avoid security exceptions when running with a security manager + if (System.getSecurityManager() == null) { + def = reader.readWSDL(wsdlLocator, doc.getDocumentElement()); + } else { + try { + def = AccessController.doPrivileged((PrivilegedExceptionAction) () -> reader + .readWSDL(wsdlLocator, doc.getDocumentElement())); + } catch (PrivilegedActionException paex) { + throw new WSDLException(WSDLException.PARSER_ERROR, paex.getMessage(), paex); + } + } + } else { + if (System.getSecurityManager() == null) { + def = reader.readWSDL(wsdlLocator); + } else { + try { + def = AccessController + .doPrivileged((PrivilegedExceptionAction) () -> reader.readWSDL(wsdlLocator)); + } catch (PrivilegedActionException paex) { + throw new WSDLException(WSDLException.PARSER_ERROR, paex.getMessage(), paex); + } + } + } + + return def; + } + + @Override + public void setXMLStreamReaderWrapper(XMLStreamReaderWrapper wrapper) { + super.setXMLStreamReaderWrapper(wrapper); + this.xmlStreamReaderWrapper = wrapper; + } + + static class QuarkusTransportURIResolver extends TransportURIResolver { + static final Logger LOG = LogUtils.getL7dLogger(TransportURIResolver.class); + private static final Set DEFAULT_URI_RESOLVER_HANDLES = new HashSet<>(); + static { + //bunch we really don't want to have the conduits checked for + //as we know the conduits don't handle. No point + //wasting the time checking/loading conduits and such + DEFAULT_URI_RESOLVER_HANDLES.add("file"); + DEFAULT_URI_RESOLVER_HANDLES.add("classpath"); + DEFAULT_URI_RESOLVER_HANDLES.add("wsjar"); + DEFAULT_URI_RESOLVER_HANDLES.add("jar"); + DEFAULT_URI_RESOLVER_HANDLES.add("zip"); + } + + public QuarkusTransportURIResolver(Bus b) { + super(b); + } + + @Override + public InputSource resolve(String curUri, String baseUri) { + // Spaces must be encoded or URI.resolve() will choke + curUri = curUri.replace(" ", "%20"); + + InputSource is = null; + URI base; + try { + if (baseUri == null) { + base = new URI(curUri); + } else { + base = new URI(baseUri); + base = base.resolve(curUri); + } + } catch (URISyntaxException use) { + //ignore + base = null; + LOG.log(Level.FINEST, "Could not resolve curUri " + curUri, use); + } + try { + if (base == null + || DEFAULT_URI_RESOLVER_HANDLES.contains(base.getScheme())) { + is = super.resolve(curUri, baseUri); + } + } catch (Exception ex) { + //nothing + LOG.log(Level.FINEST, "Default URI handlers could not resolve " + baseUri + " " + curUri, ex); + } + if (is == null && base != null + && base.getScheme() != null + && !DEFAULT_URI_RESOLVER_HANDLES.contains(base.getScheme())) { + try { + ConduitInitiatorManager mgr = bus.getExtension(ConduitInitiatorManager.class); + ConduitInitiator ci = null; + if ("http".equals(base.getScheme()) || "https".equals(base.getScheme())) { + //common case, don't "search" + ci = mgr.getConduitInitiator("http://cxf.apache.org/transports/http"); + } + if (ci == null) { + ci = mgr.getConduitInitiatorForUri(base.toString()); + } + if (ci != null) { + EndpointInfo info = new EndpointInfo(); + // set the endpointInfo name which could be used for configuration + info.setName(new QName("http://cxf.apache.org", "TransportURIResolver")); + info.setAddress(base.toString()); + Conduit c = ci.getConduit(info, bus); + Message message = new MessageImpl(); + Exchange exch = new ExchangeImpl(); + exch.put(Bus.class, bus); + message.setExchange(exch); + + message.put(Message.HTTP_REQUEST_METHOD, "GET"); + c.setMessageObserver(new MessageObserver() { + @Override + public void onMessage(Message message) { + LoadingByteArrayOutputStream bout = new LoadingByteArrayOutputStream(); + try { + IOUtils.copyAndCloseInput(message.getContent(InputStream.class), bout); + message.getExchange().put(InputStream.class, bout.createInputStream()); + } catch (IOException e) { + //ignore + } + } + }); + c.prepare(message); + c.close(message); + if (exch.getInMessage() != null) { + c.close(exch.getInMessage()); + } + if (exch.getInFaultMessage() != null) { + c.close(exch.getInFaultMessage()); + } + c.close(); + InputStream ins = exch.get(InputStream.class); + resourceOpened.add(ins); + InputSource src = new InputSource(ins); + String u = (String) message.get("transport.retransmit.url"); + if (u == null) { + u = base.toString(); + } + src.setPublicId(u); + src.setSystemId(u); + lastestImportUri = u; + currentResolver.unresolve(); + return src; + } + } catch (Exception e) { + throw new RuntimeException("Conduit initiator could not resolve " + baseUri + " " + curUri, e); + } + } + if (is == null + && (base == null + || base.getScheme() == null + || !DEFAULT_URI_RESOLVER_HANDLES.contains(base.getScheme()))) { + is = super.resolve(curUri, baseUri); + } + return is; + } + } + +} diff --git a/integration-tests/async-vertx-client/pom.xml b/integration-tests/async-vertx-client/pom.xml new file mode 100644 index 000000000..4c6afef0d --- /dev/null +++ b/integration-tests/async-vertx-client/pom.xml @@ -0,0 +1,128 @@ + + + 4.0.0 + + io.quarkiverse.cxf + quarkus-cxf-integration-tests + 3.17.0-SNAPSHOT + ../pom.xml + + + quarkus-cxf-integration-test-async-vertx-client + + Quarkus CXF - Integration Test - Asynchronous Vert.x Client + + + + io.quarkiverse.cxf + quarkus-cxf + + + io.quarkus + quarkus-rest + + + + io.rest-assured + rest-assured + test + + + org.testcontainers + testcontainers + test + + + junit + junit + + + + + io.quarkus + quarkus-junit4-mock + test + + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + generate-code + + + + + + + + + + native + + false + + + native + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + + integration-test + verify + + + + + + + + + virtualDependencies + + + !noVirtualDependencies + + + + + + io.quarkiverse.cxf + quarkus-cxf-deployment + ${project.version} + pom + test + + + * + * + + + + + + + skip-testcontainers-tests + + + skip-testcontainers-tests + + + + true + + + + + diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/Hello.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/Hello.java new file mode 100644 index 000000000..bcf294571 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/Hello.java @@ -0,0 +1,64 @@ + +package io.quarkiverse.cxf.deployment.test; + +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlType; + +/** + *

+ * Java class for hello complex type + *

+ * . + * + *

+ * The following schema fragment specifies the expected content contained within this class. + *

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "hello", propOrder = { + "arg0" +}) +public class Hello { + + protected String arg0; + + /** + * Gets the value of the arg0 property. + * + * @return + * possible object is + * {@link String } + * + */ + public String getArg0() { + return arg0; + } + + /** + * Sets the value of the arg0 property. + * + * @param value + * allowed object is + * {@link String } + * + */ + public void setArg0(String value) { + this.arg0 = value; + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloResponse.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloResponse.java new file mode 100644 index 000000000..6646dc926 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloResponse.java @@ -0,0 +1,66 @@ + +package io.quarkiverse.cxf.deployment.test; + +import jakarta.xml.bind.annotation.XmlAccessType; +import jakarta.xml.bind.annotation.XmlAccessorType; +import jakarta.xml.bind.annotation.XmlElement; +import jakarta.xml.bind.annotation.XmlType; + +/** + *

+ * Java class for helloResponse complex type + *

+ * . + * + *

+ * The following schema fragment specifies the expected content contained within this class. + *

+ * + *
{@code
+ * 
+ *   
+ *     
+ *       
+ *         
+ *       
+ *     
+ *   
+ * 
+ * }
+ * + * + */ +@XmlAccessorType(XmlAccessType.FIELD) +@XmlType(name = "helloResponse", propOrder = { + "_return" +}) +public class HelloResponse { + + @XmlElement(name = "return") + protected String _return; + + /** + * Gets the value of the return property. + * + * @return + * possible object is + * {@link String } + * + */ + public String getReturn() { + return _return; + } + + /** + * Sets the value of the return property. + * + * @param value + * allowed object is + * {@link String } + * + */ + public void setReturn(String value) { + this._return = value; + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService.java new file mode 100644 index 000000000..dbe5ba87f --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService.java @@ -0,0 +1,47 @@ +package io.quarkiverse.cxf.deployment.test; + +import java.util.concurrent.Future; + +import jakarta.jws.WebMethod; +import jakarta.jws.WebParam; +import jakarta.jws.WebResult; +import jakarta.jws.WebService; +import jakarta.xml.bind.annotation.XmlSeeAlso; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.RequestWrapper; +import jakarta.xml.ws.Response; +import jakarta.xml.ws.ResponseWrapper; + +/** + * This class was generated by Apache CXF 4.0.5 + * 2024-11-15T12:48:45.542+01:00 + * Generated source version: 4.0.5 + * + */ +@WebService(targetNamespace = "http://test.deployment.cxf.quarkiverse.io/", name = "HelloService") +@XmlSeeAlso({ ObjectFactory.class }) +public interface HelloService { + + @WebMethod(operationName = "hello") + @RequestWrapper(localName = "hello", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/", className = "io.quarkiverse.cxf.deployment.test.Hello") + @ResponseWrapper(localName = "helloResponse", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/", className = "io.quarkiverse.cxf.deployment.test.HelloResponse") + public Response helloAsync( + + @WebParam(name = "arg0", targetNamespace = "") java.lang.String arg0); + + @WebMethod(operationName = "hello") + @ResponseWrapper(localName = "helloResponse", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/", className = "io.quarkiverse.cxf.deployment.test.HelloResponse") + @RequestWrapper(localName = "hello", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/", className = "io.quarkiverse.cxf.deployment.test.Hello") + public Future helloAsync( + + @WebParam(name = "arg0", targetNamespace = "") java.lang.String arg0, + @WebParam(name = "asyncHandler", targetNamespace = "") AsyncHandler asyncHandler); + + @WebMethod + @RequestWrapper(localName = "hello", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/", className = "io.quarkiverse.cxf.deployment.test.Hello") + @ResponseWrapper(localName = "helloResponse", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/", className = "io.quarkiverse.cxf.deployment.test.HelloResponse") + @WebResult(name = "return", targetNamespace = "") + public java.lang.String hello( + + @WebParam(name = "arg0", targetNamespace = "") java.lang.String arg0); +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService_Service.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService_Service.java new file mode 100644 index 000000000..5e226573b --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/HelloService_Service.java @@ -0,0 +1,90 @@ +package io.quarkiverse.cxf.deployment.test; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; + +import javax.xml.namespace.QName; + +import jakarta.xml.ws.Service; +import jakarta.xml.ws.WebEndpoint; +import jakarta.xml.ws.WebServiceClient; +import jakarta.xml.ws.WebServiceFeature; + +/** + * This class was generated by Apache CXF 4.0.5 + * 2024-11-15T12:48:45.544+01:00 + * Generated source version: 4.0.5 + * + */ +@WebServiceClient(name = "HelloService", wsdlLocation = "file:/home/ppalaga/orgs/cxf/qcxf/integration-tests/async-vertx-client/src/main/resources/wsdl/HelloService.wsdl", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/") +public class HelloService_Service extends Service { + + public static final URL WSDL_LOCATION; + + public static final QName SERVICE = new QName("http://test.deployment.cxf.quarkiverse.io/", "HelloService"); + public static final QName HelloServiceImplPort = new QName("http://test.deployment.cxf.quarkiverse.io/", + "HelloServiceImplPort"); + static { + URL url = null; + try { + url = URI.create( + "file:/home/ppalaga/orgs/cxf/qcxf/integration-tests/async-vertx-client/src/main/resources/wsdl/HelloService.wsdl") + .toURL(); + } catch (MalformedURLException e) { + java.util.logging.Logger.getLogger(HelloService_Service.class.getName()) + .log(java.util.logging.Level.INFO, + "Can not initialize the default wsdl from {0}", + "file:/home/ppalaga/orgs/cxf/qcxf/integration-tests/async-vertx-client/src/main/resources/wsdl/HelloService.wsdl"); + } + WSDL_LOCATION = url; + } + + public HelloService_Service(URL wsdlLocation) { + super(wsdlLocation, SERVICE); + } + + public HelloService_Service(URL wsdlLocation, QName serviceName) { + super(wsdlLocation, serviceName); + } + + public HelloService_Service() { + super(WSDL_LOCATION, SERVICE); + } + + public HelloService_Service(WebServiceFeature... features) { + super(WSDL_LOCATION, SERVICE, features); + } + + public HelloService_Service(URL wsdlLocation, WebServiceFeature... features) { + super(wsdlLocation, SERVICE, features); + } + + public HelloService_Service(URL wsdlLocation, QName serviceName, WebServiceFeature... features) { + super(wsdlLocation, serviceName, features); + } + + /** + * + * @return + * returns HelloService + */ + @WebEndpoint(name = "HelloServiceImplPort") + public HelloService getHelloServiceImplPort() { + return super.getPort(HelloServiceImplPort, HelloService.class); + } + + /** + * + * @param features + * A list of {@link jakarta.xml.ws.WebServiceFeature} to configure on the proxy. Supported features not in the + * features parameter will have their default values. + * @return + * returns HelloService + */ + @WebEndpoint(name = "HelloServiceImplPort") + public HelloService getHelloServiceImplPort(WebServiceFeature... features) { + return super.getPort(HelloServiceImplPort, HelloService.class, features); + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/ObjectFactory.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/ObjectFactory.java new file mode 100644 index 000000000..6129ddad3 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/ObjectFactory.java @@ -0,0 +1,85 @@ + +package io.quarkiverse.cxf.deployment.test; + +import javax.xml.namespace.QName; + +import jakarta.xml.bind.JAXBElement; +import jakarta.xml.bind.annotation.XmlElementDecl; +import jakarta.xml.bind.annotation.XmlRegistry; + +/** + * This object contains factory methods for each + * Java content interface and Java element interface + * generated in the io.quarkiverse.cxf.deployment.test package. + *

+ * An ObjectFactory allows you to programmatically + * construct new instances of the Java representation + * for XML content. The Java representation of XML + * content can consist of schema derived interfaces + * and classes representing the binding of schema + * type definitions, element declarations and model + * groups. Factory methods for each of these are + * provided in this class. + * + */ +@XmlRegistry +public class ObjectFactory { + + private static final QName _Hello_QNAME = new QName("http://test.deployment.cxf.quarkiverse.io/", "hello"); + private static final QName _HelloResponse_QNAME = new QName("http://test.deployment.cxf.quarkiverse.io/", "helloResponse"); + + /** + * Create a new ObjectFactory that can be used to create new instances of schema derived classes for package: + * io.quarkiverse.cxf.deployment.test + * + */ + public ObjectFactory() { + } + + /** + * Create an instance of {@link Hello } + * + * @return + * the new instance of {@link Hello } + */ + public Hello createHello() { + return new Hello(); + } + + /** + * Create an instance of {@link HelloResponse } + * + * @return + * the new instance of {@link HelloResponse } + */ + public HelloResponse createHelloResponse() { + return new HelloResponse(); + } + + /** + * Create an instance of {@link JAXBElement }{@code <}{@link Hello }{@code >} + * + * @param value + * Java instance representing xml element's value. + * @return + * the new instance of {@link JAXBElement }{@code <}{@link Hello }{@code >} + */ + @XmlElementDecl(namespace = "http://test.deployment.cxf.quarkiverse.io/", name = "hello") + public JAXBElement createHello(Hello value) { + return new JAXBElement<>(_Hello_QNAME, Hello.class, null, value); + } + + /** + * Create an instance of {@link JAXBElement }{@code <}{@link HelloResponse }{@code >} + * + * @param value + * Java instance representing xml element's value. + * @return + * the new instance of {@link JAXBElement }{@code <}{@link HelloResponse }{@code >} + */ + @XmlElementDecl(namespace = "http://test.deployment.cxf.quarkiverse.io/", name = "helloResponse") + public JAXBElement createHelloResponse(HelloResponse value) { + return new JAXBElement<>(_HelloResponse_QNAME, HelloResponse.class, null, value); + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/package-info.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/package-info.java new file mode 100644 index 000000000..b985101a2 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/deployment/test/package-info.java @@ -0,0 +1,2 @@ +@jakarta.xml.bind.annotation.XmlSchema(namespace = "http://test.deployment.cxf.quarkiverse.io/") +package io.quarkiverse.cxf.deployment.test; diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdl.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdl.java new file mode 100644 index 000000000..d9e467f4d --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdl.java @@ -0,0 +1,32 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import jakarta.enterprise.inject.Instance; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.smallrye.mutiny.Uni; + +@Path("/RestAsyncWithWsdl") +public class RestAsyncWithWsdl { + + @CXFClient("calculatorWithWsdl") + Instance calculatorWithWsdl; + + @Path("/calculatorWithWsdl") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Uni calculatorWithWsdl(@QueryParam("a") int a, @QueryParam("b") int b) { + /* With WSDL and without @Blocking should fail due to blocking WSDL call on the I/O thread */ + return Uni.createFrom() + .future(calculatorWithWsdl.get().addAsync(a, b)) + .map(addResponse -> addResponse.getReturn()) + .map(String::valueOf); + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithBlocking.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithBlocking.java new file mode 100644 index 000000000..15893a62d --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithBlocking.java @@ -0,0 +1,33 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; + +@Path("/RestAsyncWithWsdlWithBlocking") +public class RestAsyncWithWsdlWithBlocking { + + @CXFClient("calculatorWithWsdlWithBlocking") + CalculatorService calculatorWithWsdlWithBlocking; + + @Path("/calculatorWithWsdlWithBlocking") + @GET + @Blocking + @Produces(MediaType.TEXT_PLAIN) + public Uni calculatorWithWsdlWithBlocking(@QueryParam("a") int a, @QueryParam("b") int b) { + /* With WSDL and with @Blocking should work */ + return Uni.createFrom() + .future(calculatorWithWsdlWithBlocking.addAsync(a, b)) + .map(addResponse -> addResponse.getReturn()) + .map(String::valueOf); + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithEagerInit.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithEagerInit.java new file mode 100644 index 000000000..ea9b4cab3 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithWsdlWithEagerInit.java @@ -0,0 +1,66 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.Instance; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.quarkiverse.cxf.deployment.test.HelloService; +import io.quarkus.logging.Log; +import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.Uni; + +@Path("/RestAsyncWithWsdlWithEagerInit") +public class RestAsyncWithWsdlWithEagerInit { + + @ConfigProperty(name = "quarkus.http.test-port") + int testPort; + + @CXFClient("helloWithWsdlWithEagerInit") + /* Use instance to prevent initializing before the local HTTP endpoint is available */ + Instance helloWithWsdlWithEagerInitInst; + HelloService helloWithWsdlWithEagerInit; + + void init(@Observes StartupEvent start) { + /* + * We need to delay the initialization, because helloWithWsdlWithEagerInit + * points at the local service that is not exposed yet when this handler is triggered + */ + new Thread(() -> { + Log.infof("Waiting for the application to open port " + testPort); + while (true) { + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress("localhost", testPort), 200); + break; + } catch (IOException e) { + } + } + helloWithWsdlWithEagerInit = helloWithWsdlWithEagerInitInst.get(); + Log.infof("Initializing helloWithWsdlWithEagerInit eagerly: %s", helloWithWsdlWithEagerInit.hello("foo")); + }, "await-application-readiness") + .start(); + } + + @Path("/helloWithWsdlWithEagerInit") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Uni helloWithWsdlWithEagerInit(@QueryParam("person") String person) { + while (helloWithWsdlWithEagerInit == null) { + /* Spin until the client is ready */ + } + /* We have triggered the initialization of helloWithWsdlWithEagerInit in init() above so it should work */ + return Uni.createFrom() + .future(helloWithWsdlWithEagerInit.helloAsync(person)) + .map(helloResponse -> helloResponse.getReturn()); + } +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdl.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdl.java new file mode 100644 index 000000000..65582e363 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdl.java @@ -0,0 +1,29 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.quarkiverse.cxf.deployment.test.HelloService; +import io.smallrye.mutiny.Uni; + +@Path("/RestAsyncWithoutWsdl") +public class RestAsyncWithoutWsdl { + + @CXFClient("helloWithoutWsdl") + HelloService helloWithoutWsdl; + + @Path("/helloWithoutWsdl") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Uni helloWithoutWsdl(@QueryParam("person") String person) { + /* Without WSDL and without @Blocking should work */ + return Uni.createFrom() + .future(helloWithoutWsdl.helloAsync(person)) + .map(helloResponse -> helloResponse.getReturn()); + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdlWithBlocking.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdlWithBlocking.java new file mode 100644 index 000000000..3d889b225 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/RestAsyncWithoutWsdlWithBlocking.java @@ -0,0 +1,31 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import io.quarkiverse.cxf.annotation.CXFClient; +import io.quarkiverse.cxf.deployment.test.HelloService; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; + +@Path("/RestAsyncWithoutWsdlWithBlocking") +public class RestAsyncWithoutWsdlWithBlocking { + + @CXFClient("helloWithoutWsdlWithBlocking") + HelloService helloWithoutWsdlWithBlocking; + + @Path("/helloWithoutWsdlWithBlocking") + @GET + @Produces(MediaType.TEXT_PLAIN) + @Blocking + public Uni helloWithoutWsdlWithBlocking(@QueryParam("person") String person) { + /* Without WSDL and with @Blocking should work */ + return Uni.createFrom() + .future(helloWithoutWsdlWithBlocking.helloAsync(person)) + .map(helloResponse -> helloResponse.getReturn()); + } + +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/VerboseExceptionMapper.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/VerboseExceptionMapper.java new file mode 100644 index 000000000..982a54f7c --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/VerboseExceptionMapper.java @@ -0,0 +1,30 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import jakarta.annotation.Priority; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.ext.Provider; + +@Provider +@Priority(10000) +//the more generic the mapper, the lower its prio should be(the lower the number the higher the pri), so you can override it with more specific mappers +public class VerboseExceptionMapper implements ExceptionMapper { + + @Override + public Response toResponse(Exception e) { + return Response + .serverError() + .entity(rootCause(e).getMessage()) + .build(); + } + + private static Throwable rootCause(Throwable e) { + e.printStackTrace(); + Throwable result = e; + while (result.getCause() != null) { + result = result.getCause(); + } + return result; + } + +} \ No newline at end of file diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithWsdlWithEagerInit.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithWsdlWithEagerInit.java new file mode 100644 index 000000000..eef50b552 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithWsdlWithEagerInit.java @@ -0,0 +1,31 @@ +package io.quarkiverse.cxf.it.vertx.async.service; + +import java.util.concurrent.Future; + +import jakarta.jws.WebService; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.Response; + +import io.quarkiverse.cxf.annotation.CXFEndpoint; +import io.quarkiverse.cxf.deployment.test.HelloResponse; +import io.quarkiverse.cxf.deployment.test.HelloService; + +@WebService(serviceName = "HelloService", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/") +@CXFEndpoint("/helloWithWsdlWithEagerInit") +public class HelloWithWsdlWithEagerInit implements HelloService { + + @Override + public String hello(String person) { + return "Hello " + person + " from " + this.getClass().getSimpleName(); + } + + @Override + public Response helloAsync(String arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public Future helloAsync(String arg0, AsyncHandler asyncHandler) { + throw new UnsupportedOperationException(); + } +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdl.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdl.java new file mode 100644 index 000000000..a19edaaef --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdl.java @@ -0,0 +1,31 @@ +package io.quarkiverse.cxf.it.vertx.async.service; + +import java.util.concurrent.Future; + +import jakarta.jws.WebService; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.Response; + +import io.quarkiverse.cxf.annotation.CXFEndpoint; +import io.quarkiverse.cxf.deployment.test.HelloResponse; +import io.quarkiverse.cxf.deployment.test.HelloService; + +@WebService(serviceName = "HelloService", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/") +@CXFEndpoint("/helloWithoutWsdl") +public class HelloWithoutWsdl implements HelloService { + + @Override + public String hello(String person) { + return "Hello " + person + " from " + this.getClass().getSimpleName(); + } + + @Override + public Response helloAsync(String arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public Future helloAsync(String arg0, AsyncHandler asyncHandler) { + throw new UnsupportedOperationException(); + } +} diff --git a/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdlWithBlocking.java b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdlWithBlocking.java new file mode 100644 index 000000000..10d55fd3c --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/java/io/quarkiverse/cxf/it/vertx/async/service/HelloWithoutWsdlWithBlocking.java @@ -0,0 +1,31 @@ +package io.quarkiverse.cxf.it.vertx.async.service; + +import java.util.concurrent.Future; + +import jakarta.jws.WebService; +import jakarta.xml.ws.AsyncHandler; +import jakarta.xml.ws.Response; + +import io.quarkiverse.cxf.annotation.CXFEndpoint; +import io.quarkiverse.cxf.deployment.test.HelloResponse; +import io.quarkiverse.cxf.deployment.test.HelloService; + +@WebService(serviceName = "HelloService", targetNamespace = "http://test.deployment.cxf.quarkiverse.io/") +@CXFEndpoint("/helloWithoutWsdlWithBlocking") +public class HelloWithoutWsdlWithBlocking implements HelloService { + + @Override + public String hello(String person) { + return "Hello " + person + " from " + this.getClass().getSimpleName(); + } + + @Override + public Response helloAsync(String arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public Future helloAsync(String arg0, AsyncHandler asyncHandler) { + throw new UnsupportedOperationException(); + } +} diff --git a/integration-tests/async-vertx-client/src/main/resources/application.properties b/integration-tests/async-vertx-client/src/main/resources/application.properties new file mode 100644 index 000000000..086d35a97 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/resources/application.properties @@ -0,0 +1,20 @@ +quarkus.cxf.client.calculatorWithWsdl.wsdl = ${cxf.it.calculator.baseUri0}/calculator-ws/CalculatorService?wsdl +quarkus.cxf.client.calculatorWithWsdl.client-endpoint-url = ${cxf.it.calculator.baseUri0}/calculator-ws/CalculatorService +quarkus.cxf.client.calculatorWithWsdl.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService + +quarkus.cxf.client.calculatorWithWsdlWithBlocking.wsdl = ${cxf.it.calculator.baseUri1}/calculator-ws/CalculatorService?wsdl +quarkus.cxf.client.calculatorWithWsdlWithBlocking.client-endpoint-url = ${cxf.it.calculator.baseUri1}/calculator-ws/CalculatorService +quarkus.cxf.client.calculatorWithWsdlWithBlocking.service-interface = org.jboss.eap.quickstarts.wscalculator.calculator.CalculatorService + +quarkus.cxf.client.helloWithWsdlWithEagerInit.wsdl = http://localhost:${quarkus.http.test-port}/services/helloWithWsdlWithEagerInit?wsdl +quarkus.cxf.client.helloWithWsdlWithEagerInit.client-endpoint-url = http://localhost:${quarkus.http.test-port}/services/helloWithWsdlWithEagerInit +quarkus.cxf.client.helloWithWsdlWithEagerInit.service-interface = io.quarkiverse.cxf.deployment.test.HelloService + +quarkus.cxf.client.helloWithoutWsdl.client-endpoint-url = http://localhost:${quarkus.http.test-port}/services/helloWithoutWsdl +quarkus.cxf.client.helloWithoutWsdl.service-interface = io.quarkiverse.cxf.deployment.test.HelloService + +quarkus.cxf.client.helloWithoutWsdlWithBlocking.client-endpoint-url = http://localhost:${quarkus.http.test-port}/services/helloWithoutWsdlWithBlocking +quarkus.cxf.client.helloWithoutWsdlWithBlocking.service-interface = io.quarkiverse.cxf.deployment.test.HelloService + +quarkus.cxf.codegen.wsdl2java.includes = wsdl/CalculatorService.wsdl +quarkus.cxf.codegen.wsdl2java.additional-params = -b,src/main/resources/wsdl/CalculatorService-async-binding.xml diff --git a/integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService-async-binding.xml b/integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService-async-binding.xml new file mode 100644 index 000000000..6ad1c9cb0 --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService-async-binding.xml @@ -0,0 +1,10 @@ + + + + true + + diff --git a/integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService.wsdl b/integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService.wsdl new file mode 100644 index 000000000..c66f611ae --- /dev/null +++ b/integration-tests/async-vertx-client/src/main/resources/wsdl/CalculatorService.wsdl @@ -0,0 +1,350 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientIT.java b/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientIT.java new file mode 100644 index 000000000..fba2ca2cc --- /dev/null +++ b/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientIT.java @@ -0,0 +1,7 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +class AsyncVertxClientIT extends AsyncVertxClientTest { +} diff --git a/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java b/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java new file mode 100644 index 000000000..3b9df1e42 --- /dev/null +++ b/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java @@ -0,0 +1,136 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import static org.hamcrest.CoreMatchers.is; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; + +import org.assertj.core.api.Assertions; +import org.eclipse.microprofile.config.ConfigProvider; +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.Test; + +import io.quarkiverse.cxf.HTTPConduitImpl; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; + +@QuarkusTest +@QuarkusTestResource(AsyncVertxClientTestResource.class) +class AsyncVertxClientTest { + + @Test + void calculatorWithWsdl() { + HTTPConduitImpl defaultImpl = io.quarkiverse.cxf.HTTPConduitImpl.findDefaultHTTPConduitImpl(); + if (io.quarkiverse.cxf.HTTPConduitImpl.URLConnectionHTTPConduitFactory == defaultImpl) { + RestAssured.given() + .queryParam("a", 7) + .queryParam("b", 4) + .get("/RestAsyncWithWsdl/calculatorWithWsdl") + .then() + .statusCode(200) + .body(is("11")); + } else { + RestAssured.given() + .queryParam("a", 7) + .queryParam("b", 4) + .get("/RestAsyncWithWsdl/calculatorWithWsdl") + .then() + .statusCode(500) + .body(CoreMatchers.containsString( + "You have attempted to perform a blocking operation on an IO thread.")); + } + + } + + @Test + void calculatorWithWsdlWithBlocking() { + RestAssured.given() + .queryParam("a", 7) + .queryParam("b", 4) + .get("/RestAsyncWithWsdlWithBlocking/calculatorWithWsdlWithBlocking") + .then() + .statusCode(200) + .body(is("11")); + } + + @Test + void helloWithWsdlWithEagerInit() { + RestAssured.given() + .queryParam("person", "Max") + .get("/RestAsyncWithWsdlWithEagerInit/helloWithWsdlWithEagerInit") + .then() + .statusCode(200) + .body(is("Hello Max from HelloWithWsdlWithEagerInit")); + } + + @Test + void helloWithoutWsdl() { + RestAssured.given() + .queryParam("person", "Joe") + .get("/RestAsyncWithoutWsdl/helloWithoutWsdl") + .then() + .statusCode(200) + .body(is("Hello Joe from HelloWithoutWsdl")); + } + + @Test + void helloWithoutWsdlWithBlocking() { + RestAssured.given() + .queryParam("person", "Joe") + .get("/RestAsyncWithoutWsdlWithBlocking/helloWithoutWsdlWithBlocking") + .then() + .statusCode(200) + .body(is("Hello Joe from HelloWithoutWsdlWithBlocking")); + } + + /** + * Make sure that our static copy is the same as the WSDL served by the container + * + * @throws IOException + */ + @Test + void wsdlUpToDate() throws IOException { + final String wsdlUrl = ConfigProvider.getConfig() + .getValue("quarkus.cxf.client.calculatorWithWsdl.wsdl", String.class); + + Path staticCopyPath = Paths.get("src/main/resources/wsdl/CalculatorService.wsdl"); + if (!Files.isRegularFile(staticCopyPath)) { + /* + * This test can be run from the test jar on Quarkus Platform + * In that case target/classes does not exist an we have to copy + * what's needed manually + */ + staticCopyPath = Paths.get("target/classes/wsdl/CalculatorService.wsdl"); + Files.createDirectories(staticCopyPath.getParent()); + try (InputStream in = AsyncVertxClientTest.class.getClassLoader() + .getResourceAsStream("wsdl/CalculatorService.wsdl")) { + Files.copy(in, staticCopyPath, StandardCopyOption.REPLACE_EXISTING); + } + } + /* The changing Docker IP address in the WSDL should not matter */ + final String sanitizerRegex = ""; + final String staticCopyContent = Files + .readString(staticCopyPath, StandardCharsets.UTF_8) + .replaceAll(sanitizerRegex, ""); + + final String expected = RestAssured.given() + .get(wsdlUrl) + .then() + .statusCode(200) + .extract().body().asString(); + + if (!expected.replaceAll(sanitizerRegex, "").equals(staticCopyContent)) { + Files.writeString(staticCopyPath, expected, StandardCharsets.UTF_8); + Assertions.fail("The static WSDL copy in " + staticCopyPath + + " went out of sync with the WSDL served by the container. The content was updated by the test, you just need to review and commit the changes."); + } + + } + +} diff --git a/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTestResource.java b/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTestResource.java new file mode 100644 index 000000000..f55b165cf --- /dev/null +++ b/integration-tests/async-vertx-client/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTestResource.java @@ -0,0 +1,52 @@ +package io.quarkiverse.cxf.it.vertx.async; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class AsyncVertxClientTestResource implements QuarkusTestResourceLifecycleManager { + + private static final int WILDFLY_PORT = 8080; + private final List> calculatorContainers = new ArrayList<>(); + + @SuppressWarnings("resource") + @Override + public Map start() { + final Map result = new LinkedHashMap<>(); + try { + for (int i = 0; i < 2; i++) { + GenericContainer calculatorContainer = new GenericContainer<>("quay.io/l2x6/calculator-ws:1.3") + .withExposedPorts(WILDFLY_PORT) + .waitingFor(Wait.forHttp("/calculator-ws/CalculatorService?wsdl")); + calculatorContainer.start(); + + result.put("cxf.it.calculator.baseUri" + i, + "http://" + calculatorContainer.getHost() + ":" + calculatorContainer.getMappedPort(WILDFLY_PORT)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + System.out.println("result " + result); + return Collections.unmodifiableMap(result); + } + + @Override + public void stop() { + for (GenericContainer calculatorContainer : calculatorContainers) { + try { + if (calculatorContainer != null) { + calculatorContainer.stop(); + } + } catch (Exception e) { + // ignored + } + } + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 5e66ff57f..3cc1fd323 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -14,6 +14,7 @@ Quarkus CXF - Integration Tests + async-vertx-client client client-server fastinfoset