From 1a323fc91bbbce2f231262a1619d5f15e9cb7762 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 17 Oct 2024 19:54:18 +1300 Subject: [PATCH] GH-44444: [Java][CI] Add Java implementation of Flight do_exchange integration test (#44445) ### Rationale for this change See #44444, this helps ensure compatibility of the Java `do_exchange` implementation with C++ and C#. ### What changes are included in this PR? Adds a Java implementation of the `do_exchange:echo` Flight integration test, and enables it in Archery. ### Are these changes tested? Yes ### Are there any user-facing changes? No * GitHub Issue: #44444 Authored-by: Adam Reeve Signed-off-by: David Li --- dev/archery/archery/integration/runner.py | 2 +- .../tests/DoExchangeEchoScenario.java | 95 +++++++++++++++++++ .../integration/tests/DoExchangeProducer.java | 82 ++++++++++++++++ .../tests/IntegrationAssertions.java | 6 ++ .../flight/integration/tests/Scenarios.java | 1 + .../integration/tests/IntegrationTest.java | 5 + 6 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java create mode 100644 java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 781b41090d7e5..5cba350253065 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -673,7 +673,7 @@ def append_tester(implementation, tester): "do_exchange:echo", description=("Test the do_exchange method by " "echoing data back to the client."), - skip_testers={"Go", "Java", "JS", "Rust"}, + skip_testers={"Go", "JS", "Rust"}, ), Scenario( "location:reuse_connection", diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java new file mode 100644 index 0000000000000..3e7fa19a81927 --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeEchoScenario.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Validator; + +/** Test DoExchange by echoing data back to the client. */ +final class DoExchangeEchoScenario implements Scenario { + public static final byte[] COMMAND = "echo".getBytes(StandardCharsets.UTF_8); + + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new DoExchangeProducer(allocator); + } + + @Override + public void buildServer(FlightServer.Builder builder) {} + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) + throws Exception { + final Schema schema = + new Schema(Collections.singletonList(Field.notNullable("x", new ArrowType.Int(32, true)))); + try (final FlightClient.ExchangeReaderWriter stream = + client.doExchange(FlightDescriptor.command(COMMAND)); + final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + final FlightStream reader = stream.getReader(); + + // Write data and check that it gets echoed back. + IntVector iv = (IntVector) root.getVector("x"); + iv.allocateNew(); + stream.getWriter().start(root); + int rowCount = 10; + for (int batchIdx = 0; batchIdx < 4; batchIdx++) { + for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) { + iv.setSafe(rowIdx, batchIdx + rowIdx); + } + root.setRowCount(rowCount); + boolean writeMetadata = batchIdx % 2 == 0; + final byte[] rawMetadata = Integer.toString(batchIdx).getBytes(StandardCharsets.UTF_8); + if (writeMetadata) { + final ArrowBuf metadata = allocator.buffer(rawMetadata.length); + metadata.writeBytes(rawMetadata); + stream.getWriter().putNext(metadata); + } else { + stream.getWriter().putNext(); + } + + IntegrationAssertions.assertTrue("Unexpected end of reader", reader.next()); + if (writeMetadata) { + IntegrationAssertions.assertNotNull(reader.getLatestMetadata()); + final byte[] readMetadata = new byte[rawMetadata.length]; + reader.getLatestMetadata().readBytes(readMetadata); + IntegrationAssertions.assertEquals(rawMetadata, readMetadata); + } else { + IntegrationAssertions.assertNull(reader.getLatestMetadata()); + } + IntegrationAssertions.assertEquals(root.getSchema(), reader.getSchema()); + Validator.compareVectorSchemaRoot(reader.getRoot(), root); + } + + stream.getWriter().completed(); + IntegrationAssertions.assertFalse("Expected to reach end of reader", reader.next()); + } + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java new file mode 100644 index 0000000000000..2e28ab1233e7c --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/DoExchangeProducer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.flight.integration.tests; + +import java.util.Arrays; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +/** The server used for testing the Flight do_exchange method. */ +final class DoExchangeProducer extends NoOpFlightProducer { + private final BufferAllocator allocator; + + DoExchangeProducer(BufferAllocator allocator) { + this.allocator = allocator; + } + + @Override + public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) { + FlightDescriptor descriptor = reader.getDescriptor(); + if (descriptor.isCommand()) { + if (Arrays.equals(DoExchangeEchoScenario.COMMAND, descriptor.getCommand())) { + doEcho(reader, writer); + } + } + throw CallStatus.UNIMPLEMENTED + .withDescription("Unsupported descriptor: " + descriptor.toString()) + .toRuntimeException(); + } + + private void doEcho(FlightStream reader, ServerStreamListener writer) { + VectorSchemaRoot root = null; + VectorLoader loader = null; + while (reader.next()) { + if (reader.hasRoot()) { + if (root == null) { + root = VectorSchemaRoot.create(reader.getSchema(), allocator); + loader = new VectorLoader(root); + writer.start(root); + } + VectorUnloader unloader = new VectorUnloader(reader.getRoot()); + try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { + loader.load(arb); + } + if (reader.getLatestMetadata() != null) { + reader.getLatestMetadata().getReferenceManager().retain(); + writer.putNext(reader.getLatestMetadata()); + } else { + writer.putNext(); + } + } else { + // Pure metadata + reader.getLatestMetadata().getReferenceManager().retain(); + writer.putMetadata(reader.getLatestMetadata()); + } + } + if (root != null) { + root.close(); + } + writer.completed(); + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java index 92d4c73f2be87..ada565c635428 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/IntegrationAssertions.java @@ -78,6 +78,12 @@ static void assertTrue(String message, boolean value) { } } + static void assertNull(Object actual) { + if (actual != null) { + throw new AssertionError("Expected: null\n\nbut got: " + actual); + } + } + static void assertNotNull(Object actual) { if (actual == null) { throw new AssertionError("Expected: (not null)\n\nbut got: null\n"); diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java index 451edb6bd5a34..7903ae994c7d1 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java @@ -51,6 +51,7 @@ private Scenarios() { scenarios.put("flight_sql:ingestion", FlightSqlIngestionScenario::new); scenarios.put("app_metadata_flight_info_endpoint", AppMetadataFlightInfoEndpointScenario::new); scenarios.put("session_options", SessionOptionsScenario::new); + scenarios.put("do_exchange:echo", DoExchangeEchoScenario::new); } private static Scenarios getInstance() { diff --git a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java index 8419432c66227..16265b8b37014 100644 --- a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java +++ b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java @@ -99,6 +99,11 @@ void sessionOptions() throws Exception { testScenario("session_options"); } + @Test + void doExchangeEcho() throws Exception { + testScenario("do_exchange:echo"); + } + void testScenario(String scenarioName) throws Exception { TestBufferAllocationListener listener = new TestBufferAllocationListener(); try (final BufferAllocator allocator = new RootAllocator(listener, Long.MAX_VALUE)) {