From 94891b054bbce338f45616dee2aa170a8a66b5fd Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 14 Jul 2024 16:07:03 +0300 Subject: [PATCH 1/5] streaming_response_body_content_type: introduce a failing test --- .../zio/http/netty/NettyStreamBodySpec.scala | 79 ++++++++++++++++++- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala index ad4acef187..4fe922c1b5 100644 --- a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala +++ b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala @@ -3,13 +3,13 @@ package zio.http.netty import zio._ import zio.test.TestAspect.withLiveClock import zio.test.{Spec, TestEnvironment, assertTrue} - -import zio.stream.{ZStream, ZStreamAspect} - +import zio.stream.{ZPipeline, ZStream, ZStreamAspect} import zio.http.ZClient.Config import zio.http._ import zio.http.internal.HttpRunnableSpec +import zio.http.multipart.mixed.MultipartMixed import zio.http.netty.NettyConfig.LeakDetectionLevel +import zio.http.netty.NettyStreamBodySpec.app object NettyStreamBodySpec extends HttpRunnableSpec { @@ -101,6 +101,79 @@ object NettyStreamBodySpec extends HttpRunnableSpec { ) } }, + test("properly decodes body's boundary") { + def trackablePart(content : String): ZIO[Any, Nothing, (MultipartMixed.Part, Promise[Nothing, Boolean])] = { + zio.Promise.make[Nothing, Boolean].map{ p => + MultipartMixed.Part( + Headers(Header.ContentType(MediaType.text.`plain`)), + ZStream(content) + .via(ZPipeline.utf8Encode) + .ensuring(p.succeed(true)) + ) -> + p + } + } + def trackableMultipartMixed(b : Boundary)(partsContents : String*): ZIO[Any, Nothing, (MultipartMixed, Seq[Promise[Nothing, Boolean]])] = { + ZIO + .foreach(partsContents)(trackablePart) + .map{ tps => + val (parts, promisises) = tps.unzip + val mpm = MultipartMixed.fromParts(ZStream.fromIterable(parts), b, 10) + (mpm, promisises) + } + } + + def serve(resp : Response): ZIO[Any, Throwable, RuntimeFlags] = { + val app = Routes( Method.GET / "it" -> handler( resp ) ) + for { + portPromise <- Promise.make[Throwable, Int] + _ <- Server + .install(app) + .intoPromise(portPromise) + .zipRight(ZIO.never) + .provide( + ZLayer.succeed(NettyConfig.defaultWithFastShutdown.leakDetection(LeakDetectionLevel.PARANOID)), + ZLayer.succeed(Server.Config.default.onAnyOpenPort), + Server.customized, + ) + .fork + port <- portPromise.await + } yield port + } + + for{ + mpmAndPromises <- trackableMultipartMixed(Boundary("this_is_a_boundary"))("this is the boring part 1", "and this is the boring part two") + (mpm, promises) = mpmAndPromises + resp = Response(body = Body.fromStreamChunked(mpm.source).contentType(MediaType.multipart.`mixed`, mpm.boundary)) + .addHeader(Header.ContentType(MediaType.multipart.`mixed`, Some(mpm.boundary))) + port <- serve(resp) + client <- ZIO.service[Client] + req = Request.get(s"http://localhost:$port/it") + actualResp <- client(req) + actualMpm <- actualResp.body.asMultipartMixed + partsResults <- actualMpm + .parts + .zipWithIndex + .mapZIO{ + case (part, idx) => + val pr = promises(idx.toInt) + pr.isDone <*> + part.toBody.asString <*> + pr.isDone + } + .runCollect + } yield { + zio.test.assertTrue{ + actualResp.headers(Header.ContentType) == resp.headers(Header.ContentType) && + actualResp.body.boundary == Some(mpm.boundary) && + actualMpm.boundary == mpm.boundary && + partsResults == Chunk( + (false, "this is the boring part 1", true), + (false, "and this is the boring part two", true) + ) + } + } + } ).provide( singleConnectionClient, Scope.default, From 30773b8cb278f0b147d10c96fc2899f615dde216 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 14 Jul 2024 16:29:10 +0300 Subject: [PATCH 2/5] streaming_response_body_content_type: fix the issue --- zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala b/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala index 04455169a0..5793e7b030 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala @@ -55,6 +55,7 @@ object NettyResponse { onComplete.unsafe.done(Exit.succeed(ChannelState.forStatus(status))) Response(status, headers, Body.empty) } else { + val contentType = headers.get(Header.ContentType) val responseHandler = new ClientResponseStreamHandler(onComplete, keepAlive, status) ctx .pipeline() @@ -64,7 +65,7 @@ object NettyResponse { responseHandler, ): Unit - val data = NettyBody.fromAsync(callback => responseHandler.connect(callback), knownContentLength) + val data = NettyBody.fromAsync(callback => responseHandler.connect(callback), knownContentLength, contentType.map(_.renderedValue)) Response(status, headers, data) } } From 75ce5f9f5065b35292b46268509612715c226284 Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 14 Jul 2024 16:30:50 +0300 Subject: [PATCH 3/5] streaming_response_body_content_type: fix the test code, buffering (either server or client side) affects streaming 'laziness' --- .../test/scala/zio/http/netty/NettyStreamBodySpec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala index 4fe922c1b5..e3d0ea19a5 100644 --- a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala +++ b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala @@ -118,7 +118,7 @@ object NettyStreamBodySpec extends HttpRunnableSpec { .foreach(partsContents)(trackablePart) .map{ tps => val (parts, promisises) = tps.unzip - val mpm = MultipartMixed.fromParts(ZStream.fromIterable(parts), b, 10) + val mpm = MultipartMixed.fromParts(ZStream.fromIterable(parts), b, 1) (mpm, promisises) } } @@ -168,8 +168,9 @@ object NettyStreamBodySpec extends HttpRunnableSpec { actualResp.body.boundary == Some(mpm.boundary) && actualMpm.boundary == mpm.boundary && partsResults == Chunk( - (false, "this is the boring part 1", true), - (false, "and this is the boring part two", true) + //todo: due to server side buffering can't really expect the promises to be uncompleted BEFORE pulling on the client side + (true, "this is the boring part 1", true), + (true, "and this is the boring part two", true) ) } } From 130eeee025fb34a3d552b5717009770eefc9598c Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 14 Jul 2024 16:38:30 +0300 Subject: [PATCH 4/5] streaming_response_body_content_type: fmt --- .../scala/zio/http/netty/NettyResponse.scala | 8 +- .../zio/http/netty/NettyStreamBodySpec.scala | 78 ++++++++++--------- 2 files changed, 47 insertions(+), 39 deletions(-) diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala b/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala index 5793e7b030..9e68e07a1b 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/NettyResponse.scala @@ -55,7 +55,7 @@ object NettyResponse { onComplete.unsafe.done(Exit.succeed(ChannelState.forStatus(status))) Response(status, headers, Body.empty) } else { - val contentType = headers.get(Header.ContentType) + val contentType = headers.get(Header.ContentType) val responseHandler = new ClientResponseStreamHandler(onComplete, keepAlive, status) ctx .pipeline() @@ -65,7 +65,11 @@ object NettyResponse { responseHandler, ): Unit - val data = NettyBody.fromAsync(callback => responseHandler.connect(callback), knownContentLength, contentType.map(_.renderedValue)) + val data = NettyBody.fromAsync( + callback => responseHandler.connect(callback), + knownContentLength, + contentType.map(_.renderedValue), + ) Response(status, headers, data) } } diff --git a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala index e3d0ea19a5..78c744c065 100644 --- a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala +++ b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala @@ -3,7 +3,9 @@ package zio.http.netty import zio._ import zio.test.TestAspect.withLiveClock import zio.test.{Spec, TestEnvironment, assertTrue} + import zio.stream.{ZPipeline, ZStream, ZStreamAspect} + import zio.http.ZClient.Config import zio.http._ import zio.http.internal.HttpRunnableSpec @@ -102,29 +104,31 @@ object NettyStreamBodySpec extends HttpRunnableSpec { } }, test("properly decodes body's boundary") { - def trackablePart(content : String): ZIO[Any, Nothing, (MultipartMixed.Part, Promise[Nothing, Boolean])] = { - zio.Promise.make[Nothing, Boolean].map{ p => + def trackablePart(content: String): ZIO[Any, Nothing, (MultipartMixed.Part, Promise[Nothing, Boolean])] = { + zio.Promise.make[Nothing, Boolean].map { p => MultipartMixed.Part( Headers(Header.ContentType(MediaType.text.`plain`)), ZStream(content) .via(ZPipeline.utf8Encode) - .ensuring(p.succeed(true)) + .ensuring(p.succeed(true)), ) -> - p + p } } - def trackableMultipartMixed(b : Boundary)(partsContents : String*): ZIO[Any, Nothing, (MultipartMixed, Seq[Promise[Nothing, Boolean]])] = { + def trackableMultipartMixed( + b: Boundary, + )(partsContents: String*): ZIO[Any, Nothing, (MultipartMixed, Seq[Promise[Nothing, Boolean]])] = { ZIO .foreach(partsContents)(trackablePart) - .map{ tps => + .map { tps => val (parts, promisises) = tps.unzip - val mpm = MultipartMixed.fromParts(ZStream.fromIterable(parts), b, 1) + val mpm = MultipartMixed.fromParts(ZStream.fromIterable(parts), b, 1) (mpm, promisises) } } - def serve(resp : Response): ZIO[Any, Throwable, RuntimeFlags] = { - val app = Routes( Method.GET / "it" -> handler( resp ) ) + def serve(resp: Response): ZIO[Any, Throwable, RuntimeFlags] = { + val app = Routes(Method.GET / "it" -> handler(resp)) for { portPromise <- Promise.make[Throwable, Int] _ <- Server @@ -141,40 +145,40 @@ object NettyStreamBodySpec extends HttpRunnableSpec { } yield port } - for{ - mpmAndPromises <- trackableMultipartMixed(Boundary("this_is_a_boundary"))("this is the boring part 1", "and this is the boring part two") + for { + mpmAndPromises <- trackableMultipartMixed(Boundary("this_is_a_boundary"))( + "this is the boring part 1", + "and this is the boring part two", + ) (mpm, promises) = mpmAndPromises - resp = Response(body = Body.fromStreamChunked(mpm.source).contentType(MediaType.multipart.`mixed`, mpm.boundary)) + resp = Response(body = + Body.fromStreamChunked(mpm.source).contentType(MediaType.multipart.`mixed`, mpm.boundary), + ) .addHeader(Header.ContentType(MediaType.multipart.`mixed`, Some(mpm.boundary))) - port <- serve(resp) - client <- ZIO.service[Client] + port <- serve(resp) + client <- ZIO.service[Client] req = Request.get(s"http://localhost:$port/it") - actualResp <- client(req) - actualMpm <- actualResp.body.asMultipartMixed - partsResults <- actualMpm - .parts - .zipWithIndex - .mapZIO{ - case (part, idx) => - val pr = promises(idx.toInt) - pr.isDone <*> - part.toBody.asString <*> - pr.isDone - } - .runCollect + actualResp <- client(req) + actualMpm <- actualResp.body.asMultipartMixed + partsResults <- actualMpm.parts.zipWithIndex.mapZIO { case (part, idx) => + val pr = promises(idx.toInt) + pr.isDone <*> + part.toBody.asString <*> + pr.isDone + }.runCollect } yield { - zio.test.assertTrue{ - actualResp.headers(Header.ContentType) == resp.headers(Header.ContentType) && - actualResp.body.boundary == Some(mpm.boundary) && - actualMpm.boundary == mpm.boundary && - partsResults == Chunk( - //todo: due to server side buffering can't really expect the promises to be uncompleted BEFORE pulling on the client side - (true, "this is the boring part 1", true), - (true, "and this is the boring part two", true) - ) + zio.test.assertTrue { + actualResp.headers(Header.ContentType) == resp.headers(Header.ContentType) && + actualResp.body.boundary == Some(mpm.boundary) && + actualMpm.boundary == mpm.boundary && + partsResults == Chunk( + // todo: due to server side buffering can't really expect the promises to be uncompleted BEFORE pulling on the client side + (true, "this is the boring part 1", true), + (true, "and this is the boring part two", true), + ) } } - } + }, ).provide( singleConnectionClient, Scope.default, From b21f904e80028eca2ca078a6b06427f71b7872cf Mon Sep 17 00:00:00 2001 From: Eyal Farago Date: Sun, 14 Jul 2024 16:55:12 +0300 Subject: [PATCH 5/5] streaming_response_body_content_type: make the test more deterministic --- .../test/scala/zio/http/netty/NettyStreamBodySpec.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala index 78c744c065..5b0e097866 100644 --- a/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala +++ b/zio-http/jvm/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala @@ -162,8 +162,8 @@ object NettyStreamBodySpec extends HttpRunnableSpec { actualMpm <- actualResp.body.asMultipartMixed partsResults <- actualMpm.parts.zipWithIndex.mapZIO { case (part, idx) => val pr = promises(idx.toInt) - pr.isDone <*> - part.toBody.asString <*> + // todo: due to server side buffering can't really expect the promises to be uncompleted BEFORE pulling on the client side + part.toBody.asString <*> pr.isDone }.runCollect } yield { @@ -173,8 +173,8 @@ object NettyStreamBodySpec extends HttpRunnableSpec { actualMpm.boundary == mpm.boundary && partsResults == Chunk( // todo: due to server side buffering can't really expect the promises to be uncompleted BEFORE pulling on the client side - (true, "this is the boring part 1", true), - (true, "and this is the boring part two", true), + ("this is the boring part 1", true), + ("and this is the boring part two", true), ) } }