2323import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_SIGNED_PAYLOAD_TRAILER ;
2424import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_UNSIGNED_PAYLOAD_TRAILER ;
2525import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_CONTENT_SHA256 ;
26+ import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_DECODED_CONTENT_LENGTH ;
2627import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_TRAILER ;
2728import  static  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils .moveContentLength ;
2829
3132import  java .util .ArrayList ;
3233import  java .util .Collections ;
3334import  java .util .List ;
35+ import  java .util .Optional ;
36+ import  java .util .concurrent .CompletableFuture ;
3437import  org .reactivestreams .Publisher ;
3538import  software .amazon .awssdk .annotations .SdkInternalApi ;
3639import  software .amazon .awssdk .checksums .SdkChecksum ;
3740import  software .amazon .awssdk .checksums .spi .ChecksumAlgorithm ;
3841import  software .amazon .awssdk .http .ContentStreamProvider ;
3942import  software .amazon .awssdk .http .Header ;
4043import  software .amazon .awssdk .http .SdkHttpRequest ;
44+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .AsyncChunkEncodedPayload ;
4145import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
4246import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
47+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPayload ;
48+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
4349import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4ChunkExtensionProvider ;
4450import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4TrailerProvider ;
51+ import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SyncChunkEncodedPayload ;
4552import  software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
46- import  software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
4753import  software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
54+ import  software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils ;
4855import  software .amazon .awssdk .http .auth .spi .signer .PayloadChecksumStore ;
4956import  software .amazon .awssdk .utils .BinaryUtils ;
5057import  software .amazon .awssdk .utils .Logger ;
@@ -79,81 +86,140 @@ public static Builder builder() {
7986
8087    @ Override 
8188    public  ContentStreamProvider  sign (ContentStreamProvider  payload , V4RequestSigningResult  requestSigningResult ) {
82-         SdkHttpRequest .Builder  request  = requestSigningResult .getSignedRequest ();
83- 
84-         String  checksum  = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
85-             () -> new  IllegalArgumentException (X_AMZ_CONTENT_SHA256  + " must be set!" )
86-         );
87- 
8889        ChunkedEncodedInputStream .Builder  chunkedEncodedInputStreamBuilder  = ChunkedEncodedInputStream 
8990            .builder ()
9091            .inputStream (payload .newStream ())
9192            .chunkSize (chunkSize )
9293            .header (chunk  -> Integer .toHexString (chunk .remaining ()).getBytes (StandardCharsets .UTF_8 ));
9394
94-         preExistingTrailers .forEach (trailer  -> chunkedEncodedInputStreamBuilder .addTrailer (() -> trailer ));
95+         SyncChunkEncodedPayload  chunkedPayload  = new  SyncChunkEncodedPayload (chunkedEncodedInputStreamBuilder );
96+         signCommon (chunkedPayload , requestSigningResult );
97+ 
98+         return  new  ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
99+     }
100+ 
101+     @ Override 
102+     public  Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult  requestSigningResult ) {
103+         ChunkedEncodedPublisher .Builder  chunkedStreamBuilder  = ChunkedEncodedPublisher .builder ()
104+                                                                                       .publisher (payload )
105+                                                                                       .chunkSize (chunkSize )
106+                                                                                       .addEmptyTrailingChunk (true );
107+ 
108+         AsyncChunkEncodedPayload  chunkedPayload  = new  AsyncChunkEncodedPayload (chunkedStreamBuilder );
109+         signCommon (chunkedPayload , requestSigningResult );
110+ 
111+         return  chunkedStreamBuilder .build ();
112+     }
113+ 
114+     private  void  signCommon (ChunkedEncodedPayload  payload , V4RequestSigningResult  requestSigningResult ) {
115+         preExistingTrailers .forEach (t  -> payload .addTrailer (() -> t ));
116+ 
117+         SdkHttpRequest .Builder  request  = requestSigningResult .getSignedRequest ();
118+ 
119+         payload .decodedContentLength (request .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
120+                                             .map (Long ::parseLong )
121+                                             .orElseThrow (() -> {
122+                                                 String  msg  = String .format ("Expected header '%s' to be present" ,
123+                                                                            X_AMZ_DECODED_CONTENT_LENGTH );
124+                                                 return  new  RuntimeException (msg );
125+                                             }));
126+ 
127+         String  checksum  = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
128+             () -> new  IllegalArgumentException (X_AMZ_CONTENT_SHA256  + " must be set!" )
129+         );
95130
96131        switch  (checksum ) {
97132            case  STREAMING_SIGNED_PAYLOAD : {
98133                RollingSigner  rollingSigner  = new  RollingSigner (requestSigningResult .getSigningKey (),
99134                                                                requestSigningResult .getSignature ());
100-                 chunkedEncodedInputStreamBuilder .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
135+                 payload .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
101136                break ;
102137            }
103138            case  STREAMING_UNSIGNED_PAYLOAD_TRAILER :
104-                 setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
139+                 setupChecksumTrailerIfNeeded (payload );
105140                break ;
106141            case  STREAMING_SIGNED_PAYLOAD_TRAILER : {
142+                 setupChecksumTrailerIfNeeded (payload );
107143                RollingSigner  rollingSigner  = new  RollingSigner (requestSigningResult .getSigningKey (),
108144                                                                requestSigningResult .getSignature ());
109-                 chunkedEncodedInputStreamBuilder .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
110-                 setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
111-                 chunkedEncodedInputStreamBuilder .addTrailer (
112-                     new  SigV4TrailerProvider (chunkedEncodedInputStreamBuilder .trailers (), rollingSigner , credentialScope )
145+                 payload .addExtension (new  SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
146+                 payload .addTrailer (
147+                     new  SigV4TrailerProvider (payload .trailers (), rollingSigner , credentialScope )
113148                );
114149                break ;
115150            }
116151            default :
117152                throw  new  UnsupportedOperationException ();
118153        }
119- 
120-         return  new  ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
121-     }
122- 
123-     @ Override 
124-     public  Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult  requestSigningResult ) {
125-         // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage 
126-         throw  new  UnsupportedOperationException ();
127154    }
128155
129156    @ Override 
130157    public  void  beforeSigning (SdkHttpRequest .Builder  request , ContentStreamProvider  payload ) {
131158        long  encodedContentLength  = 0 ;
132-         long  contentLength  = moveContentLength (request , payload );
159+         long  contentLength  = SignerUtils . computeAndMoveContentLength (request , payload );
133160        setupPreExistingTrailers (request );
134161
135162        // pre-existing trailers 
163+         encodedContentLength  = calculateEncodedContentLength (request , contentLength );
164+ 
165+         if  (checksumAlgorithm  != null ) {
166+             String  checksumHeaderName  = checksumHeaderName (checksumAlgorithm );
167+             request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
168+         }
169+         request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
170+         request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
171+     }
172+ 
173+     @ Override 
174+     public  CompletableFuture <Pair <SdkHttpRequest .Builder , Optional <Publisher <ByteBuffer >>>> beforeSigningAsync (
175+         SdkHttpRequest .Builder  request , Publisher <ByteBuffer > payload ) {
176+         return  moveContentLength (request , payload )
177+             .thenApply (p  -> {
178+                 SdkHttpRequest .Builder  requestBuilder  = p .left ();
179+                 setupPreExistingTrailers (requestBuilder );
180+ 
181+                 long  decodedContentLength  = requestBuilder .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
182+                                                           .map (Long ::parseLong )
183+                                                           // should not happen, this header is added by moveContentLength 
184+                                                           .orElseThrow (() -> new  RuntimeException (X_AMZ_DECODED_CONTENT_LENGTH 
185+                                                                                                   + " header not present" ));
186+ 
187+                 long  encodedContentLength  = calculateEncodedContentLength (request , decodedContentLength );
188+ 
189+                 if  (checksumAlgorithm  != null ) {
190+                     String  checksumHeaderName  = checksumHeaderName (checksumAlgorithm );
191+                     request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
192+                 }
193+                 request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
194+                 request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
195+                 return  Pair .of (requestBuilder , p .right ());
196+             });
197+     }
198+ 
199+     private  long  calculateEncodedContentLength (SdkHttpRequest .Builder  requestBuilder , long  decodedContentLength ) {
200+         long  encodedContentLength  = 0 ;
201+ 
136202        encodedContentLength  += calculateExistingTrailersLength ();
137203
138-         String  checksum  = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
204+         String  checksum  = requestBuilder .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
139205            () -> new  IllegalArgumentException (X_AMZ_CONTENT_SHA256  + " must be set!" )
140206        );
141207
142208        switch  (checksum ) {
143209            case  STREAMING_SIGNED_PAYLOAD : {
144210                long  extensionsLength  = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes> 
145-                 encodedContentLength  += calculateChunksLength (contentLength , extensionsLength );
211+                 encodedContentLength  += calculateChunksLength (decodedContentLength , extensionsLength );
146212                break ;
147213            }
148214            case  STREAMING_UNSIGNED_PAYLOAD_TRAILER :
149215                if  (checksumAlgorithm  != null ) {
150216                    encodedContentLength  += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
151217                }
152-                 encodedContentLength  += calculateChunksLength (contentLength , 0 );
218+                 encodedContentLength  += calculateChunksLength (decodedContentLength , 0 );
153219                break ;
154220            case  STREAMING_SIGNED_PAYLOAD_TRAILER : {
155221                long  extensionsLength  = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes> 
156-                 encodedContentLength  += calculateChunksLength (contentLength , extensionsLength );
222+                 encodedContentLength  += calculateChunksLength (decodedContentLength , extensionsLength );
157223                if  (checksumAlgorithm  != null ) {
158224                    encodedContentLength  += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
159225                }
@@ -167,12 +233,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
167233        // terminating \r\n 
168234        encodedContentLength  += 2 ;
169235
170-         if  (checksumAlgorithm  != null ) {
171-             String  checksumHeaderName  = checksumHeaderName (checksumAlgorithm );
172-             request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
173-         }
174-         request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
175-         request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
236+         return  encodedContentLength ;
176237    }
177238
178239    /** 
@@ -256,12 +317,7 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) {
256317        return  lengthInBytes  + 2 ;
257318    }
258319
259-     /** 
260-      * Add the checksum as a trailer to the chunk-encoded stream. 
261-      * <p> 
262-      * If the checksum-algorithm is not present, then nothing is done. 
263-      */ 
264-     private  void  setupChecksumTrailerIfNeeded (ChunkedEncodedInputStream .Builder  builder ) {
320+     private  void  setupChecksumTrailerIfNeeded (ChunkedEncodedPayload  payload ) {
265321        if  (checksumAlgorithm  == null ) {
266322            return ;
267323        }
@@ -273,20 +329,17 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil
273329        if  (cachedChecksum  != null ) {
274330            LOG .debug (() -> String .format ("Cached payload checksum available for algorithm %s: %s. Using cached value" ,
275331                                          checksumAlgorithm .algorithmId (), checksumHeaderName ));
276-             builder .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
332+             payload .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
277333            return ;
278334        }
279335
280336        SdkChecksum  sdkChecksum  = fromChecksumAlgorithm (checksumAlgorithm );
281-         ChecksumInputStream  checksumInputStream  = new  ChecksumInputStream (
282-             builder .inputStream (),
283-             Collections .singleton (sdkChecksum )
284-         );
285337
286338        TrailerProvider  checksumTrailer  =
287339            new  ChecksumTrailerProvider (sdkChecksum , checksumHeaderName , checksumAlgorithm , payloadChecksumStore );
288340
289-         builder .inputStream (checksumInputStream ).addTrailer (checksumTrailer );
341+         payload .checksumPayload (sdkChecksum );
342+         payload .addTrailer (checksumTrailer );
290343    }
291344
292345    private  String  getCachedChecksum () {
0 commit comments