Skip to content

Non-blocking mechanism to transform request/response body streams #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
pimterry opened this issue May 13, 2025 · 2 comments
Open

Non-blocking mechanism to transform request/response body streams #192

pimterry opened this issue May 13, 2025 · 2 comments

Comments

@pimterry
Copy link
Member

Hi @avarayr, I saw you left a comment on a PR (#145, comment now removed?) relating to this, so I thought I'd file a proper issue to track the suggestion.

I can definitely see use cases for transforming streams of content, and I'm open to ideas. This is indeed not supported by the current APIs, which instead block for the body before running the callbacks every time. The PR you commented on isn't really related to this as such, but it's part of a general improvement in support for streaming overall.

Do you have any thoughts on how you'd like this to work? If you can share more information about the kind of transform you're interested in that would be useful.

A relatively simple option would be to add a new kind of RequestTransform/ResponseTransform option, for declarative transformations instead of a callback, but whether that's possible depends on the kinds of transform you'd be doing.

Alternatively, there are certainly ways to modify the existing callback approach to support this, it's just a question of tradeoffs. I'm currently in the process of preparing some big changes for Mockttp v4 though, so if there are breaking changes required for this then now is the time to make them.

@avarayr
Copy link

avarayr commented May 13, 2025

Thank you for this @pimterry and thank you for this amazing well-tested library.

I ended up forking and hacking up a solution that fit my usecase, here's how I did it.

request-handlers.ts
// definitions
export interface PassThroughResponseStream extends Omit<PassThroughResponse, "body" | "rawHeaders"> {
  body: Readable;
}
export interface CompletedRequest extends Request {
    body: CompletedBody;
    rawTrailers: RawTrailers;
    trailers: Trailers;
}
public readonly onStreamingResponse?: (
    res: PassThroughResponseStream,
    req: CompletedRequest,
) => MaybePromise<CallbackResponseStreamResult | void> | void;


// request-handlers.ts

/**
 * Check if we need to transform the response
 */
if (this.onStreamingResponse) {
  let resTransformStream: CallbackResponseStreamResult | void;
  let serverHeaders = rawHeadersToObject(serverRawHeaders);

  let reqHeader = rawHeadersToObjectPreservingCase(rawHeaders);
  resTransformStream = await this.onStreamingResponse(
    {
      id: clientReq.id,
      statusCode: serverStatusCode,
      statusMessage: serverRes.statusMessage,
      headers: serverHeaders,
      body: decodeBodyStream(serverRes, serverHeaders), // decode the body for convenience
    },
    {
      id: clientReq.id,
      protocol: protocol?.replace(":", "") ?? "",
      method: method,
      httpVersion: serverRes.httpVersion,
      url: clientReq.url, // original, unmodified URL
      modifiedReqUrl: clientReq.url !== reqUrl ? reqUrl : undefined, // modified URL
      destination: {
        hostname: hostname || "localhost",
        port: effectivePort,
      },
      path: path ?? "",
      headers: reqHeader,
      rawHeaders: rawHeaders,
      timingEvents: clientReq.timingEvents,
      tags: clientReq.tags,
      body: buildBodyReader(
        reqBodyOverride ? Buffer.from(reqBodyOverride.buffer) : await clientReq.body.asDecodedBuffer(),
        reqHeader,
      ),
      rawTrailers: clientReq.rawTrailers ?? [],
      trailers: rawHeadersToObject(clientReq.rawTrailers ?? []),
    },
  );

  if (!resTransformStream) {
    // what are we supposed to do here?
    serverRes.pipe(clientRes);
    resolve();
    return;
  }

  if (resTransformStream === "close" || resTransformStream === "reset") {
    // If you kill the connection, we need to fire an upstream event separately here, since
    // this means the body won't be delivered in normal response events.
    if (options.emitEventCallback) {
      options.emitEventCallback!("passthrough-response-body", {
        overridden: true,
        rawBody: undefined,
      });
    }

    if (resTransformStream === "close") {
      (clientReq as any).socket.end();
    } else if (resTransformStream === "reset") {
      requireSocketResetSupport();
      resetOrDestroy(clientReq);
    }

    throw new AbortError(
      `Connection ${resTransformStream === "close" ? "closed" : "reset"} intentionally by rule`,
      `E_RULE_BRES_${resTransformStream.toUpperCase()}`,
    );
  }

  validateCustomHeaders(serverHeaders, resTransformStream?.headers);

  serverStatusCode = resTransformStream?.statusCode || serverStatusCode;
  serverStatusMessage = resTransformStream?.statusMessage || serverStatusMessage;

  serverHeaders = resTransformStream?.headers || serverHeaders;

  // delete the content-encoding to avoid needless CPU usage
  // there's no need to encode again if the downstream client is perfectly happy with raw bytes
  delete serverHeaders["content-encoding"];

  /**
   * Check if the .body is a buffer
   * If it's a buffer, we have the entire response ready to go, we can calculate the content-length
   * And pass it down immediately
   */
  if (Buffer.isBuffer(resTransformStream.body)) {
    serverHeaders["content-length"] = getContentLengthAfterModification(
      resTransformStream.body,
      serverRes.headers,
      resTransformStream?.headers,
      method === "HEAD", // HEAD responses are allowed mismatched content-length
    );

    // rebuild raw headers after content length modification
    serverRawHeaders = objectHeadersToRaw(serverHeaders);

    writeHead(
      clientRes,
      serverStatusCode,
      serverStatusMessage,
      serverRawHeaders.filter(([key, value]) => {
        if (key === ":status") return false;
        if (!validateHeader(key, value)) {
          console.warn(`Not forwarding invalid header: "${key}: ${value}"`);
          // Nothing else we can do in this case regardless - setHeaders will
          // throw within Node if we try to set this value.
          return false;
        }
        return true;
      }),
    );

    // Return the override data to the client:
    clientRes.end(resTransformStream.body);
    // Dump the real response data, in case that body wasn't read yet:
    serverRes.resume();
    resolve();
    return;
  } else {
    /**
     * Body is a stream | undefined
     * Write the modified headers and pipe the stream to client
     *
     * Because we're potentially transforming the body,
     * we're unsure of the content-length, so we don't set it
     */
    delete serverHeaders["content-length"];

    // get the raw headers
    serverRawHeaders = objectHeadersToRaw(serverHeaders);

    // send the headers first
    writeHead(
      clientRes,
      serverStatusCode,
      serverStatusMessage,
      serverRawHeaders.filter(([key, value]) => {
        if (key === ":status") return false;
        if (!validateHeader(key, value)) {
          console.warn(`Not forwarding invalid header: "${key}: ${value}"`);
          // Nothing else we can do in this case regardless - setHeaders will
          // throw within Node if we try to set this value.
          return false;
        }
        return true;
      }),
    );

    resTransformStream.body.pipe(clientRes);
    resTransformStream.body.once("end", resolve);
    return;
  }
}

Couple important notes to consider:

  1. This comes before calling beforeResponse and transformResponse, most of the body of the code is essentially copy-pasted from beforeResponse, with several modifications to avoid buffering response

  2. Since we're dealing with transform streams, I'm not sure how we can reliably calculate the resulting Content-Length... I think we can't. Therefore I'm just deleting the Content-Length (since the HTTP RFC seems to allow omitting it, and clients will just read the response until connection closes.

  3. I'm deleting the Content-Encoding, thus the downstream client will receive decoded content. This might be a questionable decision, but in my use case, I need to achieve low latency, and re-encoding seems to be unnecessary.

@avarayr
Copy link

avarayr commented May 13, 2025

I also had to write a custom decodeBodyStream function because I couldn't find support for it in the @httptoolkit/http-encoding

request-utils.ts
export function decodeBodyStream(stream: stream.Readable, headers: Headers | RawHeaders) {
  const contentEncoding = getHeaderValue(headers, "content-encoding");

  if (!contentEncoding) return stream;

  let encodingLayers: string[] = [];
  if (contentEncoding) {
    const headerValue = Array.isArray(contentEncoding) ? contentEncoding.join(",") : String(contentEncoding);
    encodingLayers = headerValue.split(",").map((e) => e.trim().toLowerCase());
  }

  if (encodingLayers.length > 0) {
    // Apply decompression layers in order
    for (const enc of encodingLayers) {
      if (enc === "gzip" || enc === "x-gzip") {
        stream = stream.pipe(zlib.createGunzip());
      } else if (enc === "deflate") {
        stream = stream.pipe(zlib.createInflate());
      } else if (enc === "br" || enc === "brotli") {
        stream = stream.pipe(zlib.createBrotliDecompress());
      } else if (enc === "zstd") {
        if (typeof zlib.createZstdDecompress === "function") {
          stream = stream.pipe(zlib.createZstdDecompress());
        } else {
          // i had a weird edge case where zstd was not available in a particular electron version. not sure what happened there
          console.warn("Skipping Zstd streaming decompression: zlib.createZstdDecompress not available");
        }
      }
    }
  }

  return stream;
}

export function encodeBodyStream(stream: stream.Readable, headers: Headers | RawHeaders) {
  const contentEncoding = getHeaderValue(headers, "content-encoding");

  if (!contentEncoding) return stream;

  let encodingLayers: string[] = [];
  if (contentEncoding) {
    const headerValue = Array.isArray(contentEncoding) ? contentEncoding.join(",") : String(contentEncoding);
    encodingLayers = headerValue.split(",").map((e) => e.trim().toLowerCase());
  }

  if (encodingLayers.length > 0) {
    // Apply decompression layers in order
    for (const enc of encodingLayers) {
      if (enc === "gzip" || enc === "x-gzip") {
        stream = stream.pipe(zlib.createGzip());
      } else if (enc === "deflate") {
        stream = stream.pipe(zlib.createDeflate());
      } else if (enc === "br" || enc === "brotli") {
        stream = stream.pipe(zlib.createBrotliCompress());
      } else if (enc === "zstd") {
        if (typeof zlib.createZstdCompress === "function") {
          stream = stream.pipe(zlib.createZstdCompress());
        } else {
          console.warn("Skipping Zstd streaming compression: zlib.createZstdCompress not available");
        }
      }
    }
  }

  return stream;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants