Skip to content

Commit

Permalink
Add inbox prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
dx3mod committed Nov 26, 2024
1 parent d45230a commit 7fa574f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
12 changes: 2 additions & 10 deletions examples/natsbyexample/request_reply.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,10 @@ let main =
Lwt_io.printlf "response: %s;" response;%lwt

let%lwt response =
Lwt.pick
[
Lwt_unix.timeout 1.;
Nats_client_lwt.request client ~subject:"greet.bob" "";
]
Lwt_unix.with_timeout 1. @@ fun () ->
Nats_client_lwt.request client ~subject:"greet.bob" ""
in
Lwt_io.printlf "response: %s;" response;%lwt

let%lwt response =
Nats_client_lwt.request_with_timeout client ~subject:"greet.tom"
~timeout:0.5 ""
in
Lwt_io.printlf "response: %s;" response;%lwt

Lwt.return_unit
Expand Down
15 changes: 9 additions & 6 deletions lwt/nats_client_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type client = {
connection : Connection.t;
info : Protocol.info;
incoming_messages : Protocol.message Lwt_stream.t;
mutable inbox_prefix : string;
}

let send_initialize_message client (message : Protocol.connect) =
Expand Down Expand Up @@ -43,7 +44,7 @@ let connect ?switch ?settings uri =
Lwt.return_some message)
in

let client = { connection; info; incoming_messages } in
let client = { connection; info; incoming_messages; inbox_prefix = "" } in

(match settings with
| Some settings ->
Expand Down Expand Up @@ -87,9 +88,14 @@ let sub ?switch client ~subject ?sid () =

(* TODO: make drain method, unsub all subscribers *)

let set_inbox_prefix client prefix = client.inbox_prefix <- prefix

let request client ~subject payload =
(* Inbox for replies. *)
let inbox = Printf.sprintf "_INBOX.%s" @@ Nats_client.Sid.create 9 in
let inbox =
(* Collisions? It's needs unique prefix and SID! *)
let sid = client.inbox_prefix ^ Nats_client.Sid.create 9 in
Printf.sprintf "_INBOX.%s" sid
in

Lwt_switch.with_switch @@ fun switch ->
let%lwt subscription = sub ~switch client ~subject:inbox () in
Expand All @@ -98,6 +104,3 @@ let request client ~subject payload =

let%lwt incoming_message = Lwt_stream.next subscription.messages in
Lwt.return incoming_message.payload

let request_with_timeout client ~subject ~timeout payload =
Lwt.pick [ Lwt_unix.timeout timeout; request client ~subject payload ]

0 comments on commit 7fa574f

Please sign in to comment.