diff --git a/.github/workflows/detect-breaking-change.yml b/.github/workflows/detect-breaking-change.yml index 1913d070e8c24..e5d3fddbd36f5 100644 --- a/.github/workflows/detect-breaking-change.yml +++ b/.github/workflows/detect-breaking-change.yml @@ -1,6 +1,8 @@ name: "Detect Breaking Changes" on: - pull_request + pull_request: + branches-ignore: + - main # This branch represents a to-be-released version of OpenSearch where breaking changes are allowed jobs: detect-breaking-change: diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ffff08d2e0b0..4c1591a894244 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054)) - Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098)) - Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812)) +- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290)) ### Security diff --git a/release-notes/opensearch.release-notes-1.3.16.md b/release-notes/opensearch.release-notes-1.3.16.md new file mode 100644 index 0000000000000..a434e419d5780 --- /dev/null +++ b/release-notes/opensearch.release-notes-1.3.16.md @@ -0,0 +1,4 @@ +## 2024-04-18 Version 1.3.16 Release Notes + +### Upgrades +- Bump `netty` from 4.1.107.Final to 4.1.109.Final ([#12924](https://github.com/opensearch-project/OpenSearch/pull/12924), [#13233](https://github.com/opensearch-project/OpenSearch/pull/13233)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml index 39fb1604d9596..7410e020e1a91 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml @@ -31,8 +31,8 @@ --- "Test primary_only parameter": - skip: - version: " - 2.99.99" - reason: "primary_only is available in 3.0+" + version: " - 2.12.99" + reason: "primary_only is available in 2.13.0+" - do: indices.create: diff --git a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java index 3552e32022b2c..fc150405747ec 100644 --- a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java @@ -122,10 +122,6 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter")); } - if (request.hasContent() && request.isContentConsumed() == false) { - throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body"); - } - usageCount.increment(); // execute the action action.accept(channel); diff --git a/server/src/main/java/org/opensearch/transport/InboundAggregator.java b/server/src/main/java/org/opensearch/transport/InboundAggregator.java index e894331f3b64e..f52875d880b4f 100644 --- a/server/src/main/java/org/opensearch/transport/InboundAggregator.java +++ b/server/src/main/java/org/opensearch/transport/InboundAggregator.java @@ -40,6 +40,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.bytes.CompositeBytesReference; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; import java.io.IOException; import java.util.ArrayList; @@ -113,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) { } } - public InboundMessage finishAggregation() throws IOException { + public NativeInboundMessage finishAggregation() throws IOException { ensureOpen(); final ReleasableBytesReference releasableContent; if (isFirstContent()) { @@ -127,7 +128,7 @@ public InboundMessage finishAggregation() throws IOException { } final BreakerControl breakerControl = new BreakerControl(circuitBreaker); - final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl); + final NativeInboundMessage aggregated = new NativeInboundMessage(currentHeader, releasableContent, breakerControl); boolean success = false; try { if (aggregated.getHeader().needsToReadVariableHeader()) { @@ -142,7 +143,7 @@ public InboundMessage finishAggregation() throws IOException { if (isShortCircuited()) { aggregated.close(); success = true; - return new InboundMessage(aggregated.getHeader(), aggregationException); + return new NativeInboundMessage(aggregated.getHeader(), aggregationException); } else { success = true; return aggregated; diff --git a/server/src/main/java/org/opensearch/transport/InboundMessage.java b/server/src/main/java/org/opensearch/transport/InboundMessage.java deleted file mode 100644 index 5c68257557061..0000000000000 --- a/server/src/main/java/org/opensearch/transport/InboundMessage.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.transport; - -import org.opensearch.common.annotation.DeprecatedApi; -import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.lease.Releasable; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.transport.nativeprotocol.NativeInboundMessage; - -import java.io.IOException; - -/** - * Inbound data as a message - * This api is deprecated, please use {@link org.opensearch.transport.nativeprotocol.NativeInboundMessage} instead. - * @opensearch.api - */ -@DeprecatedApi(since = "2.14.0") -public class InboundMessage implements Releasable, ProtocolInboundMessage { - - private final NativeInboundMessage nativeInboundMessage; - - public InboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) { - this.nativeInboundMessage = new NativeInboundMessage(header, content, breakerRelease); - } - - public InboundMessage(Header header, Exception exception) { - this.nativeInboundMessage = new NativeInboundMessage(header, exception); - } - - public InboundMessage(Header header, boolean isPing) { - this.nativeInboundMessage = new NativeInboundMessage(header, isPing); - } - - public Header getHeader() { - return this.nativeInboundMessage.getHeader(); - } - - public int getContentLength() { - return this.nativeInboundMessage.getContentLength(); - } - - public Exception getException() { - return this.nativeInboundMessage.getException(); - } - - public boolean isPing() { - return this.nativeInboundMessage.isPing(); - } - - public boolean isShortCircuit() { - return this.nativeInboundMessage.getException() != null; - } - - public Releasable takeBreakerReleaseControl() { - return this.nativeInboundMessage.takeBreakerReleaseControl(); - } - - public StreamInput openOrGetStreamInput() throws IOException { - return this.nativeInboundMessage.openOrGetStreamInput(); - } - - @Override - public void close() { - this.nativeInboundMessage.close(); - } - - @Override - public String toString() { - return this.nativeInboundMessage.toString(); - } - - @Override - public String getProtocol() { - return this.nativeInboundMessage.getProtocol(); - } - -} diff --git a/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java b/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java index 861b95a8098f2..c5b65f9eb7a11 100644 --- a/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java +++ b/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java @@ -51,6 +51,7 @@ import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; import java.io.EOFException; import java.io.IOException; @@ -111,7 +112,7 @@ public void messageReceived( long slowLogThresholdMs, TransportMessageListener messageListener ) throws IOException { - InboundMessage inboundMessage = (InboundMessage) message; + NativeInboundMessage inboundMessage = (NativeInboundMessage) message; TransportLogger.logInboundMessage(channel, inboundMessage); if (inboundMessage.isPing()) { keepAlive.receiveKeepAlive(channel); @@ -122,7 +123,7 @@ public void messageReceived( private void handleMessage( TcpChannel channel, - InboundMessage message, + NativeInboundMessage message, long startTime, long slowLogThresholdMs, TransportMessageListener messageListener @@ -194,7 +195,7 @@ private Map> extractHeaders(Map heade private void handleRequest( TcpChannel channel, Header header, - InboundMessage message, + NativeInboundMessage message, TransportMessageListener messageListener ) throws IOException { final String action = header.getActionName(); diff --git a/server/src/main/java/org/opensearch/transport/TcpTransport.java b/server/src/main/java/org/opensearch/transport/TcpTransport.java index e32bba5e836d3..8ba0178577232 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransport.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransport.java @@ -761,18 +761,6 @@ protected void serverAcceptedChannel(TcpChannel channel) { */ protected abstract void stopInternal(); - /** - * @deprecated use {@link #inboundMessage(TcpChannel, ProtocolInboundMessage)} - * Handles inbound message that has been decoded. - * - * @param channel the channel the message is from - * @param message the message - */ - @Deprecated(since = "2.14.0", forRemoval = true) - public void inboundMessage(TcpChannel channel, InboundMessage message) { - inboundMessage(channel, (ProtocolInboundMessage) message); - } - /** * Handles inbound message that has been decoded. * diff --git a/server/src/main/java/org/opensearch/transport/TransportLogger.java b/server/src/main/java/org/opensearch/transport/TransportLogger.java index 997b3bb5ba18e..e780f643aafd7 100644 --- a/server/src/main/java/org/opensearch/transport/TransportLogger.java +++ b/server/src/main/java/org/opensearch/transport/TransportLogger.java @@ -40,6 +40,7 @@ import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; import java.io.IOException; @@ -64,7 +65,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) { } } - static void logInboundMessage(TcpChannel channel, InboundMessage message) { + static void logInboundMessage(TcpChannel channel, NativeInboundMessage message) { if (logger.isTraceEnabled()) { try { String logMessage = format(channel, message, "READ"); @@ -136,7 +137,7 @@ private static String format(TcpChannel channel, BytesReference message, String return sb.toString(); } - private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException { + private static String format(TcpChannel channel, NativeInboundMessage message, String event) throws IOException { final StringBuilder sb = new StringBuilder(); sb.append(channel); diff --git a/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java index a8a4c0da7ec0f..97981aeb6736e 100644 --- a/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java +++ b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java @@ -16,7 +16,6 @@ import org.opensearch.transport.InboundAggregator; import org.opensearch.transport.InboundBytesHandler; import org.opensearch.transport.InboundDecoder; -import org.opensearch.transport.InboundMessage; import org.opensearch.transport.ProtocolInboundMessage; import org.opensearch.transport.StatsTracker; import org.opensearch.transport.TcpChannel; @@ -32,7 +31,7 @@ public class NativeInboundBytesHandler implements InboundBytesHandler { private static final ThreadLocal> fragmentList = ThreadLocal.withInitial(ArrayList::new); - private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true); + private static final NativeInboundMessage PING_MESSAGE = new NativeInboundMessage(null, true); private final ArrayDeque pending; private final InboundDecoder decoder; @@ -152,7 +151,7 @@ private void forwardFragments( messageHandler.accept(channel, PING_MESSAGE); } else if (fragment == InboundDecoder.END_CONTENT) { assert aggregator.isAggregating(); - try (InboundMessage aggregated = aggregator.finishAggregation()) { + try (NativeInboundMessage aggregated = aggregator.finishAggregation()) { statsTracker.markMessageReceived(); messageHandler.accept(channel, aggregated); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/RestForceMergeActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/RestForceMergeActionTests.java index b09e592922ed9..01d72b78a679e 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/RestForceMergeActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/RestForceMergeActionTests.java @@ -32,14 +32,9 @@ package org.opensearch.action.admin.indices.forcemerge; -import org.opensearch.client.node.NodeClient; -import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.admin.indices.RestForceMergeAction; -import org.opensearch.test.rest.FakeRestChannel; import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.test.rest.RestActionTestCase; import org.junit.Before; @@ -47,9 +42,6 @@ import java.util.HashMap; import java.util.Map; -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.mock; - public class RestForceMergeActionTests extends RestActionTestCase { @Before @@ -57,20 +49,6 @@ public void setUpAction() { controller().registerHandler(new RestForceMergeAction()); } - public void testBodyRejection() throws Exception { - final RestForceMergeAction handler = new RestForceMergeAction(); - String json = JsonXContent.contentBuilder().startObject().field("max_num_segments", 1).endObject().toString(); - final FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withContent( - new BytesArray(json), - MediaTypeRegistry.JSON - ).withPath("/_forcemerge").build(); - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> handler.handleRequest(request, new FakeRestChannel(request, randomBoolean(), 1), mock(NodeClient.class)) - ); - assertThat(e.getMessage(), equalTo("request [GET /_forcemerge] does not support having a body")); - } - public void testDeprecationMessage() { final Map params = new HashMap<>(); params.put("only_expunge_deletes", Boolean.TRUE.toString()); diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index b3c58164fccbb..b3eb2443fa940 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -91,8 +91,6 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; -import static org.opensearch.test.VersionUtils.randomCompatibleVersion; -import static org.opensearch.test.VersionUtils.randomOpenSearchVersion; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -826,10 +824,9 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion request.persistentSettings(intendedCompatibilityModeSettings); // two different but compatible open search versions for the discovery nodes - final Version version1 = randomOpenSearchVersion(random()); - final Version version2 = randomCompatibleVersion(random(), version1); + final Version version1 = Version.V_2_13_0; + final Version version2 = Version.V_2_13_1; - assert version1.equals(version2) == false : "current nodes in the cluster must be of different versions"; DiscoveryNode discoveryNode1 = new DiscoveryNode( UUIDs.base64UUID(), buildNewFakeTransportAddress(), diff --git a/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java b/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java index ce929e64d8960..45653e9d8e4d6 100644 --- a/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java +++ b/server/src/test/java/org/opensearch/rest/BaseRestHandlerTests.java @@ -35,10 +35,6 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.common.Table; import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.rest.RestHandler.ReplacedRoute; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest.Method; @@ -281,81 +277,6 @@ public String getName() { assertTrue(executed.get()); } - public void testConsumedBody() throws Exception { - final AtomicBoolean executed = new AtomicBoolean(); - final BaseRestHandler handler = new BaseRestHandler() { - @Override - protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - request.content(); - return channel -> executed.set(true); - } - - @Override - public String getName() { - return "test_consumed_body"; - } - }; - - try (XContentBuilder builder = JsonXContent.contentBuilder().startObject().endObject()) { - final RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent( - new BytesArray(builder.toString()), - MediaTypeRegistry.JSON - ).build(); - final RestChannel channel = new FakeRestChannel(request, randomBoolean(), 1); - handler.handleRequest(request, channel, mockClient); - assertTrue(executed.get()); - } - } - - public void testUnconsumedNoBody() throws Exception { - final AtomicBoolean executed = new AtomicBoolean(); - final BaseRestHandler handler = new BaseRestHandler() { - @Override - protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - return channel -> executed.set(true); - } - - @Override - public String getName() { - return "test_unconsumed_body"; - } - }; - - final RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build(); - final RestChannel channel = new FakeRestChannel(request, randomBoolean(), 1); - handler.handleRequest(request, channel, mockClient); - assertTrue(executed.get()); - } - - public void testUnconsumedBody() throws IOException { - final AtomicBoolean executed = new AtomicBoolean(); - final BaseRestHandler handler = new BaseRestHandler() { - @Override - protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - return channel -> executed.set(true); - } - - @Override - public String getName() { - return "test_unconsumed_body"; - } - }; - - try (XContentBuilder builder = JsonXContent.contentBuilder().startObject().endObject()) { - final RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent( - new BytesArray(builder.toString()), - MediaTypeRegistry.JSON - ).build(); - final RestChannel channel = new FakeRestChannel(request, randomBoolean(), 1); - final IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> handler.handleRequest(request, channel, mockClient) - ); - assertThat(e, hasToString(containsString("request [GET /] does not support having a body"))); - assertFalse(executed.get()); - } - } - public void testReplaceRoutesMethod() throws Exception { List routes = Arrays.asList(new Route(Method.GET, "/path/test"), new Route(Method.PUT, "/path2/test")); List replacedRoutes = RestHandler.replaceRoutes(routes, "/prefix", "/deprecatedPrefix"); diff --git a/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java b/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java index b60541825e3ed..448ba9e5a8cd7 100644 --- a/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java @@ -82,31 +82,6 @@ public void deletePits(DeletePitRequest request, ActionListener pitCalled = new SetOnce<>(); - try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { - @Override - public void deletePits(DeletePitRequest request, ActionListener listener) { - pitCalled.set(true); - assertThat(request.getPitIds(), hasSize(1)); - assertThat(request.getPitIds().get(0), equalTo("_all")); - } - }) { - RestDeletePitAction action = new RestDeletePitAction(); - RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent( - new BytesArray("{\"pit_id\": [\"BODY\"]}"), - MediaTypeRegistry.JSON - ).withPath("/_all").build(); - FakeRestChannel channel = new FakeRestChannel(request, false, 0); - - IllegalArgumentException ex = expectThrows( - IllegalArgumentException.class, - () -> action.handleRequest(request, channel, nodeClient) - ); - assertTrue(ex.getMessage().contains("request [GET /_all] does not support having a body")); - } - } - public void testDeletePitQueryStringParamsShouldThrowException() { SetOnce pitCalled = new SetOnce<>(); try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { diff --git a/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java b/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java index 2dd98a8efe2a3..4ac78366360d7 100644 --- a/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java @@ -42,6 +42,7 @@ import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; import org.junit.Before; import java.io.IOException; @@ -107,7 +108,7 @@ public void testInboundAggregation() throws IOException { } // Signal EOS - InboundMessage aggregated = aggregator.finishAggregation(); + NativeInboundMessage aggregated = aggregator.finishAggregation(); assertThat(aggregated, notNullValue()); assertFalse(aggregated.isPing()); @@ -138,7 +139,7 @@ public void testInboundUnknownAction() throws IOException { assertEquals(0, content.refCount()); // Signal EOS - InboundMessage aggregated = aggregator.finishAggregation(); + NativeInboundMessage aggregated = aggregator.finishAggregation(); assertThat(aggregated, notNullValue()); assertTrue(aggregated.isShortCircuit()); @@ -161,7 +162,7 @@ public void testCircuitBreak() throws IOException { content1.close(); // Signal EOS - InboundMessage aggregated1 = aggregator.finishAggregation(); + NativeInboundMessage aggregated1 = aggregator.finishAggregation(); assertEquals(0, content1.refCount()); assertThat(aggregated1, notNullValue()); @@ -180,7 +181,7 @@ public void testCircuitBreak() throws IOException { content2.close(); // Signal EOS - InboundMessage aggregated2 = aggregator.finishAggregation(); + NativeInboundMessage aggregated2 = aggregator.finishAggregation(); assertEquals(1, content2.refCount()); assertThat(aggregated2, notNullValue()); @@ -199,7 +200,7 @@ public void testCircuitBreak() throws IOException { content3.close(); // Signal EOS - InboundMessage aggregated3 = aggregator.finishAggregation(); + NativeInboundMessage aggregated3 = aggregator.finishAggregation(); assertEquals(1, content3.refCount()); assertThat(aggregated3, notNullValue()); @@ -263,7 +264,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException { content.close(); // Signal EOS - InboundMessage aggregated = aggregator.finishAggregation(); + NativeInboundMessage aggregated = aggregator.finishAggregation(); assertThat(aggregated, notNullValue()); assertFalse(header.needsToReadVariableHeader()); diff --git a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java index 0d171e17e70e1..2dde27d62e759 100644 --- a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java @@ -56,6 +56,7 @@ import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; import org.junit.After; import org.junit.Before; @@ -142,7 +143,7 @@ public void testPing() throws Exception { ); requestHandlers.registerHandler(registry); - handler.inboundMessage(channel, new InboundMessage(null, true)); + handler.inboundMessage(channel, new NativeInboundMessage(null, true)); if (channel.isServerChannel()) { BytesReference ping = channel.getMessageCaptor().get(); assertEquals('E', ping.get(0)); @@ -208,7 +209,11 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput()); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); - InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {}); + NativeInboundMessage requestMessage = new NativeInboundMessage( + requestHeader, + ReleasableBytesReference.wrap(requestContent), + () -> {} + ); requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput()); handler.inboundMessage(channel, requestMessage); @@ -229,7 +234,11 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullResponseBytes = channel.getMessageCaptor().get(); BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize); Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version); - InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {}); + NativeInboundMessage responseMessage = new NativeInboundMessage( + responseHeader, + ReleasableBytesReference.wrap(responseContent), + () -> {} + ); responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput()); handler.inboundMessage(channel, responseMessage); @@ -256,7 +265,7 @@ public void testSendsErrorResponseToHandshakeFromCompatibleVersion() throws Exce TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion ); - final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader); + final NativeInboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader); requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME; requestHeader.headers = Tuple.tuple(Map.of(), Map.of()); requestHeader.features = Set.of(); @@ -296,7 +305,7 @@ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion ); - final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader); + final NativeInboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader); requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME; requestHeader.headers = Tuple.tuple(Map.of(), Map.of()); requestHeader.features = Set.of(); @@ -327,13 +336,17 @@ public void testLogsSlowInboundProcessing() throws Exception { TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion ); - final InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> { - try { - TimeUnit.SECONDS.sleep(1L); - } catch (InterruptedException e) { - throw new AssertionError(e); + final NativeInboundMessage requestMessage = new NativeInboundMessage( + requestHeader, + ReleasableBytesReference.wrap(BytesArray.EMPTY), + () -> { + try { + TimeUnit.SECONDS.sleep(1L); + } catch (InterruptedException e) { + throw new AssertionError(e); + } } - }); + ); requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME; requestHeader.headers = Tuple.tuple(Collections.emptyMap(), Collections.emptyMap()); requestHeader.features = Set.of(); @@ -407,7 +420,11 @@ public void onResponseSent(long requestId, String action, Exception error) { BytesReference fullRequestBytes = BytesReference.fromByteBuffer((ByteBuffer) buffer.flip()); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); - InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {}); + NativeInboundMessage requestMessage = new NativeInboundMessage( + requestHeader, + ReleasableBytesReference.wrap(requestContent), + () -> {} + ); requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput()); handler.inboundMessage(channel, requestMessage); @@ -474,7 +491,11 @@ public void onResponseSent(long requestId, String action, Exception error) { // Create the request payload by intentionally stripping 1 byte away BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize - 1); Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); - InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {}); + NativeInboundMessage requestMessage = new NativeInboundMessage( + requestHeader, + ReleasableBytesReference.wrap(requestContent), + () -> {} + ); requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput()); handler.inboundMessage(channel, requestMessage); @@ -540,7 +561,11 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput()); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); - InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {}); + NativeInboundMessage requestMessage = new NativeInboundMessage( + requestHeader, + ReleasableBytesReference.wrap(requestContent), + () -> {} + ); requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput()); handler.inboundMessage(channel, requestMessage); @@ -562,7 +587,11 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullResponseBytes = BytesReference.fromByteBuffer((ByteBuffer) buffer.flip()); BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize); Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version); - InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {}); + NativeInboundMessage responseMessage = new NativeInboundMessage( + responseHeader, + ReleasableBytesReference.wrap(responseContent), + () -> {} + ); responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput()); handler.inboundMessage(channel, responseMessage); @@ -628,7 +657,11 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput()); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); - InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {}); + NativeInboundMessage requestMessage = new NativeInboundMessage( + requestHeader, + ReleasableBytesReference.wrap(requestContent), + () -> {} + ); requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput()); handler.inboundMessage(channel, requestMessage); @@ -645,7 +678,11 @@ public TestResponse read(StreamInput in) throws IOException { // Create the response payload by intentionally stripping 1 byte away BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize - 1); Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version); - InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {}); + NativeInboundMessage responseMessage = new NativeInboundMessage( + responseHeader, + ReleasableBytesReference.wrap(responseContent), + () -> {} + ); responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput()); handler.inboundMessage(channel, responseMessage); @@ -654,8 +691,8 @@ public TestResponse read(StreamInput in) throws IOException { assertThat(exceptionCaptor.get().getMessage(), containsString("Failed to deserialize response from handler")); } - private static InboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) { - return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {}) { + private static NativeInboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) { + return new NativeInboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {}) { @Override public StreamInput openOrGetStreamInput() { final StreamInput streamInput = new InputStreamStreamInput(new InputStream() { diff --git a/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java b/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java index 2dfe8a0dd8590..d54f7e6fd2c2b 100644 --- a/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundPipelineTests.java @@ -49,6 +49,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; import java.io.IOException; import java.util.ArrayList; @@ -74,7 +75,7 @@ public void testPipelineHandling() throws IOException { final List toRelease = new ArrayList<>(); final BiConsumer messageHandler = (c, m) -> { try { - InboundMessage message = (InboundMessage) m; + NativeInboundMessage message = (NativeInboundMessage) m; final Header header = message.getHeader(); final MessageData actualData; final Version version = header.getVersion(); diff --git a/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java index 36ba409a2de03..ad7d4401af13c 100644 --- a/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java @@ -53,6 +53,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.nativeprotocol.NativeInboundMessage; import org.junit.After; import org.junit.Before; @@ -97,7 +98,7 @@ public void setUp() throws Exception { final InboundAggregator aggregator = new InboundAggregator(breaker, (Predicate) action -> true); pipeline = new InboundPipeline(statsTracker, millisSupplier, decoder, aggregator, (c, m) -> { try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { - InboundMessage m1 = (InboundMessage) m; + NativeInboundMessage m1 = (NativeInboundMessage) m; Streams.copy(m1.openOrGetStreamInput(), streamOutput); message.set(new Tuple<>(m1.getHeader(), streamOutput.bytes())); } catch (IOException e) {