25
25
import java .net .http .HttpRequest ;
26
26
import java .net .http .HttpResponse ;
27
27
import java .net .http .HttpTimeoutException ;
28
+ import java .net .http .HttpResponse .BodyHandler ;
29
+ import java .net .http .HttpResponse .BodySubscriber ;
30
+ import java .net .http .HttpResponse .BodySubscribers ;
31
+ import java .net .http .HttpResponse .ResponseInfo ;
28
32
import java .nio .ByteBuffer ;
29
33
import java .time .Duration ;
30
34
import java .util .Collections ;
37
41
import java .util .concurrent .Executor ;
38
42
import java .util .concurrent .Flow ;
39
43
import java .util .concurrent .TimeUnit ;
44
+ import java .util .zip .GZIPInputStream ;
40
45
41
46
import org .jspecify .annotations .Nullable ;
42
47
@@ -70,15 +75,18 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
70
75
71
76
private final @ Nullable Duration timeout ;
72
77
78
+ private final boolean compressionEnabled ;
79
+
73
80
74
81
public JdkClientHttpRequest (HttpClient httpClient , URI uri , HttpMethod method , Executor executor ,
75
- @ Nullable Duration readTimeout ) {
82
+ @ Nullable Duration readTimeout , boolean compressionEnabled ) {
76
83
77
84
this .httpClient = httpClient ;
78
85
this .uri = uri ;
79
86
this .method = method ;
80
87
this .executor = executor ;
81
88
this .timeout = readTimeout ;
89
+ this .compressionEnabled = compressionEnabled ;
82
90
}
83
91
84
92
@@ -98,7 +106,11 @@ protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body
98
106
CompletableFuture <HttpResponse <InputStream >> responseFuture = null ;
99
107
try {
100
108
HttpRequest request = buildRequest (headers , body );
101
- responseFuture = this .httpClient .sendAsync (request , HttpResponse .BodyHandlers .ofInputStream ());
109
+ if (compressionEnabled ) {
110
+ responseFuture = this .httpClient .sendAsync (request , new DecompressingBodyHandler ());
111
+ } else {
112
+ responseFuture = this .httpClient .sendAsync (request , HttpResponse .BodyHandlers .ofInputStream ());
113
+ }
102
114
103
115
if (this .timeout != null ) {
104
116
TimeoutHandler timeoutHandler = new TimeoutHandler (responseFuture , this .timeout );
@@ -140,6 +152,10 @@ else if (cause instanceof IOException ioEx) {
140
152
141
153
private HttpRequest buildRequest (HttpHeaders headers , @ Nullable Body body ) {
142
154
HttpRequest .Builder builder = HttpRequest .newBuilder ().uri (this .uri );
155
+
156
+ if (compressionEnabled ) {
157
+ headers .add (HttpHeaders .ACCEPT_ENCODING , "gzip" );
158
+ }
143
159
144
160
headers .forEach ((headerName , headerValues ) -> {
145
161
if (!DISALLOWED_HEADERS .contains (headerName .toLowerCase (Locale .ROOT ))) {
@@ -269,4 +285,30 @@ public void close() throws IOException {
269
285
}
270
286
}
271
287
288
+ /**
289
+ * Custom BodyHandler that checks the Content-Encoding header and applies the appropriate decompression algorithm.
290
+ */
291
+ public static final class DecompressingBodyHandler implements BodyHandler <InputStream > {
292
+
293
+ @ Override
294
+ public BodySubscriber <InputStream > apply (ResponseInfo responseInfo ) {
295
+ String contentEncoding = responseInfo .headers ().firstValue (HttpHeaders .CONTENT_ENCODING ).orElse ("" );
296
+ if (contentEncoding .equalsIgnoreCase ("gzip" )) {
297
+ // If the content is gzipped, wrap the InputStream with a GZIPInputStream
298
+ return BodySubscribers .mapping (
299
+ BodySubscribers .ofInputStream (),
300
+ (InputStream is ) -> {
301
+ try {
302
+ return new GZIPInputStream (is );
303
+ } catch (IOException e ) {
304
+ throw new UncheckedIOException (e ); // Propagate IOExceptions
305
+ }
306
+ });
307
+ } else {
308
+ // Otherwise, return a standard InputStream BodySubscriber
309
+ return BodySubscribers .ofInputStream ();
310
+ }
311
+ }
312
+ }
313
+
272
314
}
0 commit comments