Skip to content

Commit

Permalink
feat: close response body on DataSource closure (#3851)
Browse files Browse the repository at this point in the history
* feat: close response body on DataSource closure

* dependencies
  • Loading branch information
ndr-brt authored Feb 13, 2024
1 parent c39b76d commit aed00a4
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 90 deletions.
4 changes: 2 additions & 2 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ maven/mavencentral/com.lmax/disruptor/3.4.4, Apache-2.0, approved, clearlydefine
maven/mavencentral/com.networknt/json-schema-validator/1.0.76, Apache-2.0, approved, CQ22638
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.37.3, Apache-2.0, approved, #11701
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.12.3, LGPL-2.1+, restricted, clearlydefined
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.12.3, LGPL-2.1-only AND Apache-2.0 AND LGPL-2.1-or-later AND ANTLR-PD AND GPL-2.0-or-later AND LGPL-2.0-or-later AND (Apache-2.0 AND LGPL-2.1-or-later) AND LicenseRef-scancode-proprietary-license, restricted, #13190
maven/mavencentral/com.samskivert/jmustache/1.15, BSD-2-Clause, approved, clearlydefined
maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.12.0, Apache-2.0, approved, #11159
maven/mavencentral/com.squareup.okhttp3/okhttp/4.12.0, Apache-2.0, approved, #11156
Expand Down Expand Up @@ -190,7 +190,7 @@ maven/mavencentral/net.javacrumbs.json-unit/json-unit-core/2.36.0, Apache-2.0, a
maven/mavencentral/net.minidev/accessors-smart/2.4.7, Apache-2.0, approved, #7515
maven/mavencentral/net.minidev/json-smart/2.4.7, Apache-2.0, approved, #3288
maven/mavencentral/net.sf.jopt-simple/jopt-simple/5.0.4, MIT, approved, CQ13174
maven/mavencentral/net.sf.saxon/Saxon-HE/12.3, NOASSERTION, restricted, clearlydefined
maven/mavencentral/net.sf.saxon/Saxon-HE/12.3, MPL-2.0-no-copyleft-exception AND (LicenseRef-scancode-proprietary-license AND MPL-2.0-no-copyleft-exception) AND (MPL-2.0-no-copyleft-exception AND X11) AND (MIT AND MPL-2.0-no-copyleft-exception) AND (MPL-1.0 AND MPL-2.0-no-copyleft-exception) AND (Apache-2.0 AND MPL-2.0-no-copyleft-exception) AND MPL-1.0, restricted, #13191
maven/mavencentral/org.antlr/antlr4-runtime/4.11.1, BSD-3-Clause, approved, clearlydefined
maven/mavencentral/org.apache.commons/commons-compress/1.25.0, Apache-2.0, approved, #11600
maven/mavencentral/org.apache.commons/commons-digester3/3.2, Apache-2.0, approved, clearlydefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


import okhttp3.MediaType;
import okhttp3.ResponseBody;
import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory;
import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
Expand All @@ -25,8 +26,10 @@
import org.eclipse.edc.spi.monitor.Monitor;

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static java.lang.String.format;
Expand All @@ -45,6 +48,10 @@ public class HttpDataSource implements DataSource {
private Monitor monitor;
private EdcHttpClient httpClient;
private HttpRequestFactory requestFactory;
private final AtomicReference<ResponseBodyStream> responseBodyStream = new AtomicReference<>();

private HttpDataSource() {
}

@Override
public StreamResult<Stream<Part>> openPartStream() {
Expand All @@ -58,8 +65,10 @@ public StreamResult<Stream<Part>> openPartStream() {
if (body == null) {
throw new EdcException(format("Received empty response body transferring HTTP data for request %s: %s", requestId, response.code()));
}
var stream = body.byteStream();
responseBodyStream.set(new ResponseBodyStream(body, stream));
var mediaType = Optional.ofNullable(body.contentType()).map(MediaType::toString).orElse(OCTET_STREAM);
return success(Stream.of(new HttpPart(name, body.byteStream(), mediaType)));
return success(Stream.of(new HttpPart(name, stream, mediaType)));
} else {
try {
if (NOT_AUTHORIZED == response.code() || FORBIDDEN == response.code()) {
Expand All @@ -83,11 +92,20 @@ public StreamResult<Stream<Part>> openPartStream() {

}

private HttpDataSource() {
}

@Override
public void close() {
var bodyStream = responseBodyStream.get();
if (bodyStream != null) {
bodyStream.responseBody().close();
try {
bodyStream.stream().close();
} catch (IOException e) {
// do nothing
}
}
}

private record ResponseBodyStream(ResponseBody responseBody, InputStream stream) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,104 +14,115 @@

package org.eclipse.edc.connector.dataplane.http.pipeline;

import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.Interceptor;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory;
import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailureArgument;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.TypeManager;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;

import static okhttp3.Protocol.HTTP_1_1;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.NOT_AUTHORIZED;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.junit.testfixtures.TestUtils.testHttpClient;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class HttpDataSourceTest {

private static final ObjectMapper MAPPER = new TypeManager().getMapper();

private String requestId;
private String url;
private final HttpRequestFactory requestFactory = mock(HttpRequestFactory.class);

@BeforeEach
public void setUp() {
requestId = UUID.randomUUID().toString();
url = "http://some.test.url/";
}
private final HttpRequestFactory requestFactory = mock();

@Test
void verifyCallSuccess() throws IOException {
var json = MAPPER.writeValueAsString(Map.of("key1", "Value1"));
var responseBody = ResponseBody.create(json, MediaType.parse("application/json"));
void verifyCallSuccess() {
var responseBody = ResponseBody.create("{}", MediaType.parse("application/json"));

var interceptor = new CustomInterceptor(200, responseBody, "Test message");
var params = mock(HttpRequestParams.class);
var request = new Request.Builder().url(url).get().build();
var request = dummyRequest();
var source = defaultBuilder(interceptor).params(params).requestFactory(requestFactory).build();

when(requestFactory.toRequest(any())).thenReturn(request);

var parts = source.openPartStream().getContent().toList();

var interceptedRequest = interceptor.getInterceptedRequest();
assertThat(interceptedRequest).isEqualTo(request);
assertThat(parts).hasSize(1);
var part = parts.get(0);
assertThat(part.mediaType()).startsWith("application/json");
try (var is = part.openStream()) {
assertThat(new String(is.readAllBytes())).isEqualTo(json);
}
assertThat(parts).hasSize(1).first().satisfies(part -> {
assertThat(part.mediaType()).startsWith("application/json");
assertThat(part.openStream()).hasContent("{}");
});

verify(requestFactory).toRequest(any());
}

@ParameterizedTest
@MethodSource
void verifyCallFailed(StreamFailureArgument argument) {
var message = "Test message";
var body = "Test body";
var interceptor = new CustomInterceptor(argument.getCode(), ResponseBody.create(body, MediaType.parse("text/plain")), message);
var params = mock(HttpRequestParams.class);
var request = new Request.Builder().url(url).get().build();
var source = defaultBuilder(interceptor).params(params).requestFactory(requestFactory).build();

when(requestFactory.toRequest(any())).thenReturn(request);
@ArgumentsSource(StreamFailureArguments.class)
void verifyCallFailed(int code, StreamFailure.Reason reason) {
var responseBody = ResponseBody.create("Test body", MediaType.parse("text/plain"));
var interceptor = new CustomInterceptor(code, responseBody, "Test message");
var source = defaultBuilder(interceptor).params(mock()).requestFactory(requestFactory).build();
when(requestFactory.toRequest(any())).thenReturn(dummyRequest());

var result = source.openPartStream();
assertThat(result.failed()).isTrue();
assertThat(result.reason()).isEqualTo(argument.getReason());

assertThat(result).isFailed().extracting(StreamFailure::getReason).isEqualTo(reason);
verify(requestFactory).toRequest(any());
}

static Stream<StreamFailureArgument> verifyCallFailed() {
return Stream.of(
new StreamFailureArgument(400, GENERAL_ERROR),
new StreamFailureArgument(401, NOT_AUTHORIZED),
new StreamFailureArgument(403, NOT_AUTHORIZED),
new StreamFailureArgument(500, GENERAL_ERROR));
@Test
void close_shouldCloseResponseBodyAndStream() throws IOException {
InputStream stream = mock();
var responseBody = spy(ResponseBody.create("{}", MediaType.parse("application/json")));
when(responseBody.byteStream()).thenReturn(stream);
var interceptor = new CustomInterceptor(200, responseBody, "Test message");
var source = defaultBuilder(interceptor).params(mock()).requestFactory(requestFactory).build();
when(requestFactory.toRequest(any())).thenReturn(dummyRequest());

source.openPartStream();
source.close();

verify(responseBody).close();
verify(stream).close();
}

@NotNull
private Request dummyRequest() {
return new Request.Builder().url("http://some.test.url/").get().build();
}

private static class StreamFailureArguments implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
return Stream.of(
arguments(400, GENERAL_ERROR),
arguments(401, NOT_AUTHORIZED),
arguments(403, NOT_AUTHORIZED),
arguments(500, GENERAL_ERROR)
);
}
}

private HttpDataSource.Builder defaultBuilder(Interceptor interceptor) {
Expand All @@ -120,7 +131,7 @@ private HttpDataSource.Builder defaultBuilder(Interceptor interceptor) {
.httpClient(httpClient)
.name("test-name")
.monitor(mock(Monitor.class))
.requestId(requestId);
.requestId(UUID.randomUUID().toString());
}

static final class CustomInterceptor implements Interceptor {
Expand Down

This file was deleted.

0 comments on commit aed00a4

Please sign in to comment.