Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lwt aware stream module for IO #218

Closed
wants to merge 4 commits into from
Closed

Conversation

anuragsoni
Copy link
Collaborator

No description provided.

@anuragsoni
Copy link
Collaborator Author

I'd be interested in @rizo 's input on this. This is based on some discussions in the ocaml discord channel about not using Lwt_stream for streaming bodies. Just a draft for now, but if this seems reasonable to start with, we can start a discussion about what the Body module will look like. I'm not a huge fan of exposing the internals like we do today, and I think not having the same type for input and output streaming bodies is not that bad?

@rgrinberg
Copy link
Owner

Ah, I have some familiarity with this form of streaming :)

I'd start with a separate package inside the repository and not make it a part of rock.

@anuragsoni
Copy link
Collaborator Author

I'd start with a separate package inside the repository and not make it a part of rock.

Makes sense. I'll move it to a separate module.

Ah, I have some familiarity with this form of streaming :)

The discord discussion i mentioned was comments made by you on the Lwt channel 😄

val write : 'a option -> 'a t -> unit Lwt.t
end

val transfer : 'a Input.t -> 'a Output.t -> unit Lwt.t
Copy link
Owner

@rgrinberg rgrinberg Nov 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd try to match the API here: https://github.com/ocaml/ocaml-lsp/blob/master/fiber-unix/src/fiber_stream.mli#L33

It's already the one I'm familiar with (which I copied from the original Haskell source)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll add in the Lwt version of fiber_stream module.

iter f t
;;

let singleton item =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more useful is a of_list primitive. it covers this use case as well.

val create : (unit -> 'a option Lwt.t) -> 'a t
val singleton : 'a -> 'a t
val read : 'a t -> 'a option Lwt.t
val iter : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working with Httpaf bodies, we'll have places where we use iter to consume the input stream, but the handler there will be of the form,

Input.iter (fun item -> Httpaf.Body.write_string item; Lwt.return_unit) stream;;

I think it might make sense to add a second flavor of iter that works with ('a -> unit). Maybe we can match the naming scheme from lwt by having

val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Input.iter (fun item -> Httpaf.Body.write_string item; Lwt.return_unit) stream;;

I think this is bound to allocate a huge amount of memory though. We need to return a promise that's determined when the buffer hits max size and needs to be flushed.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make sense to add a second flavor of iter that works with ('a -> unit). Maybe we can match the naming scheme from lwt by having

For now let's see if we can make do with iter that returns a unit Lwt.t. I suspect that the other iter will not be very useful in the end.

Copy link
Collaborator Author

@anuragsoni anuragsoni Nov 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is bound to allocate a huge amount of memory though. We need to return a promise that's determined when the buffer hits max size and needs to be flushed.

Hmm that's true. On a related note we don't read this config today, but we should respect the body buffer size limits set by the user in the server config handed to Rock. Working with promises when writing to httpaf's body should straightforward. (Reading is a different story though).

@rgrinberg
Copy link
Owner

The way I see it, a specialized streaming for IO makes sense. But I think we should set some concrete goals for it to make sure that the scope stays relevant to opium. Some example goals:

  • Good back pressure support
  • Ability to construct pipelines that run in constant memory
  • Make it easy to reuse buffers.

To give a concrete use case, it should be possible to handle a compressed upload of a file in a performant and memory efficient way with this API.

@anuragsoni
Copy link
Collaborator Author

The way I see it, a specialized streaming for IO makes sense. But I think we should set some concrete goals for it to make sure that the scope stays relevant to opium. Some example goals:

* Good back pressure support

* Ability to construct pipelines that run in constant memory

* Make it easy to reuse buffers.

Agreed on all three goals. I think to achieve these we'd need a buffer module like the one used by httpaf internally? That should allow us to write streaming implementations that work with say filesystems, but keep a constant sized buffer that we use, and it'll allow us to hand a view of the bigstring to httpaf to schedule a write later so we avoid a copy (we'll need to be careful about flushing the buffer and not modifying the view we hand to httpaf before a flush is done)

To give a concrete use case, it should be possible to handle a compressed upload of a file in a performant and memory efficient way with this API.

This kind of example will be a good test to try before considering this ready for a merge.

@tmattio
Copy link
Collaborator

tmattio commented Nov 14, 2020

That's great you're getting the ball rolling on this @anuragsoni!

I'm not familiar with designing streaming APIs, but I started working on #214 and was blocked because the streams of multipart_form have type type 'a stream = unit -> 'a option and I didn't see a way to get this from a Lwt_stream.t. Maybe that's just me who didn't read the doc thoroughly enough, but I thought I'd mention, in case it could impact the design of the new streaming API. In particular, is there a way to get a (unit -> 'a option) Lwt.t or similar from the current type?

@anuragsoni
Copy link
Collaborator Author

@tmattio In particular, is there a way to get a (unit -> 'a option) Lwt.t or similar from the current type?

You can get (unit -> 'a option) Lwt.t via let stream () = Io_stream.In.read in_stream. You can also get it via Lwt_stream by let stream () = Lwt_stream.get lwt_stream

@tmattio
Copy link
Collaborator

tmattio commented Nov 14, 2020

@anuragsoni I've tried Lwt_stream.get, but it has type 'a t -> 'a option Lwt.t, so fun () -> Lwt_stream.get lwt_stream would have type unit -> 'a option Lwt.t and not (unit -> 'a option) Lwt.t, so it won't be compatible with multipart_form

From what I can see, it would be the same issue with Io_stream.In.read, right?

@anuragsoni
Copy link
Collaborator Author

@tmattio Ah yes indeed, i missed the lack of Lwt.t in the function signature. Hmm, i think the easiest way to work with it might be to write an angstrom parser driver and have it work with our streaming solution.

@tmattio
Copy link
Collaborator

tmattio commented Nov 14, 2020

multipart_form already uses angstrom to parse the requests. Are you suggesting that we fork it to change how streaming works? The alternative would be to keep multipart-form-data, even if it relies on Lwt_stream, but I wonder if there would be a way to make the two APIs compatible if we're going to implement our own streaming solutions.

@anuragsoni
Copy link
Collaborator Author

multipart_form already uses angstrom to parse the requests. Are you suggesting that we fork it to change how streaming works?

Not forking it. I was thinking of using

https://github.com/dinosaure/multipart_form/blob/85b2de9f68e639b773f696d72d1a2061785c7d74/lib/multipart_form.mli#L197

instead of the of_stream functions. That'll give us access to an Angstrom parser that we can drive however we want by feeding it content from the stream.

@tmattio
Copy link
Collaborator

tmattio commented Nov 14, 2020

@anuragsoni Oh ok, that makes sense! I'll try that. In this case using the current streaming implementation or the module in this PR shouldn't make a difference.

Thanks for the pointer :)

@anuragsoni
Copy link
Collaborator Author

Closing this for now. I'll need to iterate on this a couple of times before proposing something for review.

@anuragsoni anuragsoni closed this Nov 17, 2020
@rizo
Copy link
Collaborator

rizo commented Nov 18, 2020

@anuragsoni sorry for the delay replying to this. I've been busy the last few weeks.

I have many thoughts on this kind of things. Some of them are just opinions, so not sure how useful they would be in the context of Opium. I guess understanding long-term requirements for streams in Opium would help. I'd be happy to review the current design if you still thinks this would be useful. Having said that, I think there's a larger scope that needs to be considered for this (as suggested by @rgrinberg).

I'm currently experimenting with various buffering models for my streaming package that would provide a simple zero-copy interface without sacrificing composability of the streams. Maybe we could have a chat about it some time and compare notes?

@anuragsoni
Copy link
Collaborator Author

I have many thoughts on this kind of things. Some of them are just opinions, so not sure how useful they would be in the context of Opium. I guess understanding long-term requirements for streams in Opium would help.

I'd be very interested in learning your thoughts on this as there is a lot i can learn about designing streaming APIs 😄

I'd be happy to review the current design if you still thinks this would be useful. Having said that, I think there's a larger scope that needs to be considered.

Agreed. I closed the PR for now since i think before going too far on just the interface i had in the current commit, I should spend more time thinking about the backing buffer for IO and how to make efficient use of that for reads/writes while thinking about pushback for when the buffer gets full. I don't think there is much point in the interface in the PR if we can't use efficiently to write composable streaming IO.

I'm currently experimenting with various buffering models for my streaming package that would provide a simple zero-copy interface without sacrificing composability of the streams. Maybe we could have a chat about it some time and compare notes?

This sounds good. I'd be interested in learning about your ideas and i'm happy to talk in more detail over emails!
I started by trying out something simple written as a wrapper around Faraday since it provided some useful utilities out of the box. I was thinking of a buffer module that could have functions like:

module Buffer = struct
  type t = Faraday.t

  let create size = Faraday.create size

  let write_string s t =
    if Faraday.free_bytes_in_buffer t > String.length s
    then (
      Faraday.write_string t s;
      Lwt.return_unit)
    else (
      let flushed, wakeup = Lwt.wait () in
      Faraday.flush t (fun () -> Lwt.wakeup_later wakeup ());
      let+ () = flushed in
      Faraday.write_string t s)
  ;;

  let read t =
    match Faraday.operation t with
    | `Yield -> Lwt.return (`Ok None)
    | `Close -> Lwt.return `Eof
    | `Writev [] ->
      (* I don't think we will enter this branch if there aren't any issues
          with faraday. *)
      assert false
    | `Writev ({ Faraday.buffer; off; len } :: _) ->
      (* we should be able to forward the iovec itself if we ensure that we don't
          overwrite the buffer before a flush on the destination socket takes place  *)
      let sub = Bigstringaf.sub buffer ~off ~len in
      Faraday.shift t len;
      Lwt.return (`Ok (Some sub))
  ;;
end

I picked faraday to begin with mostly for convenience, we should be able to get a lot of what we need with a simpler buffer module with a reduced feature set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants