From cfa1396fc475e7a44767c12b6aba30d8b4664392 Mon Sep 17 00:00:00 2001 From: dx3mod Date: Mon, 4 Nov 2024 13:07:06 +0300 Subject: [PATCH 1/7] Add INFO parsing --- dune-project | 1 + examples/demo.ml | 4 ++- lib/dune | 2 +- lib/message.ml | 56 +++++++++++++++++++++++++++++++++--------- lwt/connection.ml | 4 +-- lwt/nats_client_lwt.ml | 6 ++--- nats-client.opam | 1 + 7 files changed, 56 insertions(+), 18 deletions(-) diff --git a/dune-project b/dune-project index ca699f4..39f662f 100644 --- a/dune-project +++ b/dune-project @@ -23,6 +23,7 @@ ocaml dune yojson + ppx_yojson_conv (alcotest :with-test)) (tags (nats))) diff --git a/examples/demo.ml b/examples/demo.ml index 7481de8..ad75c77 100644 --- a/examples/demo.ml +++ b/examples/demo.ml @@ -5,7 +5,9 @@ let main () = (Uri.of_string "tcp://127.0.0.1:4222") in - Format.printf "info %a\n" Yojson.Safe.pp client.info; + Format.printf "info %s\n" + (Nats_client.Incoming_message.INFO.yojson_of_t client.info + |> Yojson.Safe.to_string); Nats_client_lwt.Subscription.handle client.incoming_messages (fun msg -> Lwt_fmt.printf "LOG: %a\n" Nats_client.Incoming_message.pp msg;%lwt diff --git a/lib/dune b/lib/dune index 2f7f497..a8dedd2 100644 --- a/lib/dune +++ b/lib/dune @@ -3,4 +3,4 @@ (public_name nats-client) (libraries lwt.unix yojson) (preprocess - (pps lwt_ppx))) + (pps lwt_ppx ppx_yojson_conv))) diff --git a/lib/message.ml b/lib/message.ml index ac30191..2704c06 100644 --- a/lib/message.ml +++ b/lib/message.ml @@ -1,8 +1,39 @@ (** NATS message. *) module Incoming = struct - (* TODO: make type for INFO *) - type t = Info of Yojson.Safe.t | Msg of msg | Ping | Pong | OK | ERR of err + open Ppx_yojson_conv_lib.Yojson_conv + + module INFO = struct + type t = { + server_id : string; + server_name : string; + version : string; + go : string; + host : string; + port : int; + headers : bool; + max_payload : int; + proto : int; + client_id : int option; [@default None] (* uint64 brr *) + auth_required : bool; [@default false] + tls_required : bool; [@default false] + tls_verify : bool; [@default false] + tls_available : bool; [@default false] + connect_urls : string list; [@default []] + ws_connect_urls : string list; [@default []] + ldm : bool; [@default false] + git_commit : string option; [@default None] + jetstream : bool; [@default false] + ip : string option; [@default None] + client_ip : string option; [@default None] + nonce : string option; [@default None] + cluster : string option; [@default None] + domain : string option; [@default None] + } + [@@deriving yojson] [@@yojson.allow_extra_fields] + end + + type t = INFO of INFO.t | MSG of msg | PING | PONG | OK | ERR of err and msg = { subject : string; @@ -20,10 +51,12 @@ module Incoming = struct payload.size payload.contents let pp fmt = function - | Info json -> Format.fprintf fmt "INFO %a" Yojson.Safe.pp json - | Msg msg -> pp_msg fmt msg - | Ping -> Format.fprintf fmt "PING" - | Pong -> Format.fprintf fmt "PONG" + | INFO info -> + Format.fprintf fmt "INFO %s" + (INFO.yojson_of_t info |> Yojson.Safe.to_string) + | MSG msg -> pp_msg fmt msg + | PING -> Format.fprintf fmt "PING" + | PONG -> Format.fprintf fmt "PONG" | OK -> Format.fprintf fmt "+OK" | ERR e -> Format.fprintf fmt "+ERR '%s'" e @@ -36,7 +69,8 @@ module Incoming = struct (** @raises Yojson.Json_error *) let info_of_line line = (* INFO {"option_name":option_value,...}␍␊ *) - Scanf.sscanf line "INFO %s" (fun s -> Yojson.Safe.from_string s) + Scanf.sscanf line "INFO %s" (fun s -> + Yojson.Safe.from_string s |> INFO.t_of_yojson) let msg_of_line line = (* MSG [reply-to] <#bytes>␍␊[payload]␍␊ *) @@ -68,15 +102,15 @@ module Incoming = struct (** @raises Invalid_argument *) let of_line = function - | "PING" -> Ping - | "PONG" -> Pong + | "PING" -> PING + | "PONG" -> PONG | "+OK" -> OK | line when String.starts_with ~prefix:"-ERR" line -> ERR (err_of_line line) | line when String.starts_with ~prefix:"INFO" line -> - Info (info_of_line line) + INFO (info_of_line line) | line when String.starts_with ~prefix:"MSG" line -> - Msg (msg_of_line line) + MSG (msg_of_line line) | _ -> raise (Invalid_argument "incoming message line") end end diff --git a/lwt/connection.ml b/lwt/connection.ml index 6a76b1c..f59a13e 100644 --- a/lwt/connection.ml +++ b/lwt/connection.ml @@ -54,13 +54,13 @@ let receive conn = let m = Incoming_message.Parser.of_line line in match m with - | Incoming_message.Msg msg -> + | Incoming_message.MSG msg -> (* read payload *) let%lwt contents = Lwt_io.read ~count:msg.payload.size conn.ic in let%lwt _ = Lwt_io.read ~count:2 conn.ic in Lwt.return - @@ Incoming_message.Msg + @@ Incoming_message.MSG { msg with payload = { msg.payload with contents } } | m -> Lwt.return m diff --git a/lwt/nats_client_lwt.ml b/lwt/nats_client_lwt.ml index 08367a2..21819d6 100644 --- a/lwt/nats_client_lwt.ml +++ b/lwt/nats_client_lwt.ml @@ -4,7 +4,7 @@ module Connection = Connection type client = { connection : Connection.t; - info : Yojson.Safe.t; + info : Incoming_message.INFO.t; incoming_messages : Incoming_message.t Lwt_stream.t; } @@ -33,7 +33,7 @@ let connect ?switch ?settings uri = let%lwt connection = Connection.create ?switch ~host ~port () in let%lwt info = match%lwt Connection.receive connection with - | Incoming_message.Info info -> Lwt.return info + | Incoming_message.INFO info -> Lwt.return info | _ -> raise @@ Connection.Invalid_response "INFO message" in @@ -67,7 +67,7 @@ let sub client ~subject ?(sid : Sid.t option) () = Lwt.return @@ Lwt_stream.filter_map (function - | Incoming_message.Msg msg + | Incoming_message.MSG msg (* Is it enough to check a message's SID? *) when msg.sid = sid -> Some msg diff --git a/nats-client.opam b/nats-client.opam index 39f5dea..8422607 100644 --- a/nats-client.opam +++ b/nats-client.opam @@ -13,6 +13,7 @@ depends: [ "ocaml" "dune" {>= "3.16"} "yojson" + "ppx_yojson_conv" "alcotest" {with-test} "odoc" {with-doc} ] From 65eefb5ff5c42c0932b7f22f50e81c3a78ab5e5b Mon Sep 17 00:00:00 2001 From: dx3mod Date: Mon, 4 Nov 2024 13:10:38 +0300 Subject: [PATCH 2/7] Use ppx_yojson_conv for CONNECT settings --- lib/message.ml | 14 +++----------- lwt/nats_client_lwt.ml | 2 +- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/lib/message.ml b/lib/message.ml index 2704c06..82721b2 100644 --- a/lib/message.ml +++ b/lib/message.ml @@ -117,20 +117,12 @@ end (** Initial message. *) module Initial = struct + open Ppx_yojson_conv_lib.Yojson_conv + type t = { verbose : bool; pedantic : bool; tls_required : bool; echo : bool } + [@@deriving yojson] [@@yojson.allow_extra_fields] (** Protocol. https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 *) - (* TODO: for encoding/decoding JSON should use [ppx_deriving_yojson] preprocessor. *) - let to_yojson t : Yojson.Safe.t = - `Assoc - [ - ("verbose", `Bool t.verbose); - ("pedantic", `Bool t.pedantic); - ("tls_required", `Bool t.tls_required); - ("echo", `Bool t.echo); - ("lang", `String "ocaml"); - ] - let default = { verbose = false; pedantic = false; tls_required = false; echo = false } diff --git a/lwt/nats_client_lwt.ml b/lwt/nats_client_lwt.ml index 21819d6..2ed0ba7 100644 --- a/lwt/nats_client_lwt.ml +++ b/lwt/nats_client_lwt.ml @@ -14,7 +14,7 @@ let send_initialize_message client (message : Initial_message.t) = Connection.Send.with_verbose ~verbose:message.verbose client.connection @@ fun () -> Connection.Send.connect - ~json:(Initial_message.to_yojson message) + ~json:(Initial_message.yojson_of_t message) client.connection (** Connect to a NATS server using the [uri] address. From d0b65eecd39ee959779eaf36f81b9ff9761f02f8 Mon Sep 17 00:00:00 2001 From: dx3mod Date: Mon, 4 Nov 2024 15:49:14 +0300 Subject: [PATCH 3/7] Add unsub --- README.md | 4 +++- examples/simple.ml | 4 +++- lwt/connection.ml | 4 ++++ lwt/nats_client_lwt.ml | 13 ++++++++++--- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e492acd..3cb0b7b 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,9 @@ let () = in (* Subscribe to HELLO subject. *) - let%lwt hello_subject = Nats_client_lwt.sub client ~subject:"HELLO" () in + let%lwt hello_subject = + Nats_client_lwt.sub ~switch client ~subject:"HELLO" () + in (* Handle incoming HELLO subject messages. *) Nats_client_lwt.Subscription.handle hello_subject (fun msg -> diff --git a/examples/simple.ml b/examples/simple.ml index f989441..afee70d 100644 --- a/examples/simple.ml +++ b/examples/simple.ml @@ -8,7 +8,9 @@ let () = in (* Subscribe to HELLO subject. *) - let%lwt hello_subject = Nats_client_lwt.sub client ~subject:"HELLO" () in + let%lwt hello_subject = + Nats_client_lwt.sub ~switch client ~subject:"HELLO" () + in (* Handle incoming HELLO subject messages. *) Nats_client_lwt.Subscription.handle hello_subject (fun msg -> diff --git a/lwt/connection.ml b/lwt/connection.ml index f59a13e..c71b0b8 100644 --- a/lwt/connection.ml +++ b/lwt/connection.ml @@ -86,6 +86,10 @@ module Send = struct (Option.fold ~none:"" ~some:(Printf.sprintf " %s") queue_group) sid + let unsub conn ?max_msgs (sid : Sid.t) = + writelnf conn.oc "UNSUB %s%s" sid + (Option.fold max_msgs ~none:"" ~some:(Printf.sprintf " %d")) + let connect ~json conn = (* NOTE: Yojson.Safe.pp gives a bad result. TODO: improve performance of JSON encoding (now is bad) *) diff --git a/lwt/nats_client_lwt.ml b/lwt/nats_client_lwt.ml index 2ed0ba7..a16565a 100644 --- a/lwt/nats_client_lwt.ml +++ b/lwt/nats_client_lwt.ml @@ -55,15 +55,22 @@ let connect ?switch ?settings uri = (** Close socket connection. *) let close client = Connection.close client.connection -(** [pub ~subject ?reply_to payload] publish a message. *) +(** [pub client ~subject ?reply_to payload] publish a message. *) let pub client ~subject ?reply_to payload = Connection.Send.pub ~subject ~reply_to ~payload client.connection -(** [sub ~subject ?sid ()] subscribe on the subject and get stream. *) -let sub client ~subject ?(sid : Sid.t option) () = +(** [unsub client ?max_msgs sid] unsubscribe from subject. *) +let unsub client ?max_msgs sid = + Connection.Send.unsub client.connection ?max_msgs sid + +(** [sub client ~subject ?sid ()] subscribe on the subject and get stream. *) +let sub ?switch client ~subject ?(sid : Sid.t option) () = let sid = Option.value ~default:(Sid.create 9) sid in Connection.Send.sub ~subject ~sid ~queue_group:None client.connection;%lwt + (* auto unsubscribe *) + Lwt_switch.add_hook switch (fun () -> unsub client sid); + Lwt.return @@ Lwt_stream.filter_map (function From 01f5ecc07f81dd8ad2bebe0deb07b40abed0c43b Mon Sep 17 00:00:00 2001 From: dx3mod Date: Mon, 4 Nov 2024 15:57:43 +0300 Subject: [PATCH 4/7] Add subscription type --- README.md | 4 ++-- examples/demo.ml | 3 ++- examples/simple.ml | 4 ++-- lwt/nats_client_lwt.ml | 21 ++++++++++++--------- lwt/subscription.ml | 13 +++++++++---- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 3cb0b7b..b15b2b7 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,12 @@ let () = in (* Subscribe to HELLO subject. *) - let%lwt hello_subject = + let%lwt hello_subscr = Nats_client_lwt.sub ~switch client ~subject:"HELLO" () in (* Handle incoming HELLO subject messages. *) - Nats_client_lwt.Subscription.handle hello_subject (fun msg -> + Nats_client_lwt.Subscription.handle hello_subscr (fun msg -> Lwt_io.printf "HELLO: %s\n" msg.payload.contents); (* Send "Hello World" message to HELLO subject. *) diff --git a/examples/demo.ml b/examples/demo.ml index ad75c77..8e72852 100644 --- a/examples/demo.ml +++ b/examples/demo.ml @@ -9,7 +9,8 @@ let main () = (Nats_client.Incoming_message.INFO.yojson_of_t client.info |> Yojson.Safe.to_string); - Nats_client_lwt.Subscription.handle client.incoming_messages (fun msg -> + Nats_client_lwt.Subscription.handle_stream client.incoming_messages + (fun msg -> Lwt_fmt.printf "LOG: %a\n" Nats_client.Incoming_message.pp msg;%lwt Lwt_fmt.flush Lwt_fmt.stdout); diff --git a/examples/simple.ml b/examples/simple.ml index afee70d..68aacbf 100644 --- a/examples/simple.ml +++ b/examples/simple.ml @@ -8,12 +8,12 @@ let () = in (* Subscribe to HELLO subject. *) - let%lwt hello_subject = + let%lwt hello_subscr = Nats_client_lwt.sub ~switch client ~subject:"HELLO" () in (* Handle incoming HELLO subject messages. *) - Nats_client_lwt.Subscription.handle hello_subject (fun msg -> + Nats_client_lwt.Subscription.handle hello_subscr (fun msg -> Lwt_io.printf "HELLO: %s\n" msg.payload.contents); (* Send "Hello World" message to HELLO subject. *) diff --git a/lwt/nats_client_lwt.ml b/lwt/nats_client_lwt.ml index a16565a..f5ab685 100644 --- a/lwt/nats_client_lwt.ml +++ b/lwt/nats_client_lwt.ml @@ -71,14 +71,17 @@ let sub ?switch client ~subject ?(sid : Sid.t option) () = (* auto unsubscribe *) Lwt_switch.add_hook switch (fun () -> unsub client sid); - Lwt.return - @@ Lwt_stream.filter_map - (function - | Incoming_message.MSG msg - (* Is it enough to check a message's SID? *) - when msg.sid = sid -> - Some msg - | _ -> None) - client.incoming_messages + let messages = + Lwt_stream.filter_map + (function + | Incoming_message.MSG msg + (* Is it enough to check a message's SID? *) + when msg.sid = sid -> + Some msg + | _ -> None) + client.incoming_messages + in + + Lwt.return Subscription.{ sid; subject; messages } (* TODO: make drain method, unsub all subscribers *) diff --git a/lwt/subscription.ml b/lwt/subscription.ml index b83fcf4..a17cb7d 100644 --- a/lwt/subscription.ml +++ b/lwt/subscription.ml @@ -1,11 +1,13 @@ open Nats_client (** Utils for handle subscriptions. *) -type messages = Incoming_message.msg Lwt_stream.t -(** Alias. *) +type t = { + sid : Sid.t; + subject : string; + messages : Incoming_message.msg Lwt_stream.t; +} -(** Asynchronous handling of incoming messages. *) -let handle stream f = +let handle_stream stream f = Lwt.dont_wait (fun () -> Lwt_stream.iter_s f stream) (function @@ -13,3 +15,6 @@ let handle stream f = | Unix.Unix_error (Unix.EBADF, "check_descriptor", "") -> () (* Otherwise, throw the exception above. *) | e -> raise e) + +(** Asynchronous handling of incoming messages. *) +let handle { messages; _ } f = handle_stream messages f From d25b02d079b684d0962244f570063663b147d24d Mon Sep 17 00:00:00 2001 From: dx3mod Date: Mon, 4 Nov 2024 15:59:13 +0300 Subject: [PATCH 5/7] Add "How to design library API?" section to contrb guide --- CONTRIBUTING.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ddf7b3e..42d3b25 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -21,6 +21,10 @@ $ opam install . --deps-only See [examples](./examples/). +### How to design library API? + +See [nats.rs](https://docs.rs/nats/latest/nats/). + ### Tasks You can find open tasks at [project board](https://github.com/users/romanchechyotkin/projects/1) From c43faec849403c003266b42abecf11d1deb70997 Mon Sep 17 00:00:00 2001 From: dx3mod Date: Mon, 4 Nov 2024 16:15:11 +0300 Subject: [PATCH 6/7] Update contrb guide --- CONTRIBUTING.md | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 42d3b25..5a2c1d2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -4,12 +4,20 @@ FEEL FREE TO CREATE ISSUES AND PRs ## Development -### Requirements +### How to start? + +#### Requirements - OCaml >= 4.14 - Dune - OPAM +#### Use development branch + +Use the `dev` branch. + +#### Setup development environment + ```sh # Create a switch for project $ opam switch create . --deps-only @@ -17,7 +25,7 @@ $ opam switch create . --deps-only $ opam install . --deps-only ``` -### How to start? +#### Next... See [examples](./examples/). @@ -25,11 +33,11 @@ See [examples](./examples/). See [nats.rs](https://docs.rs/nats/latest/nats/). -### Tasks +## Tasks You can find open tasks at [project board](https://github.com/users/romanchechyotkin/projects/1) and [issues page](https://github.com/romanchechyotkin/nats.ocaml/issues). -### Questions +## Questions If you have any questions, please contact @romanchechyotkin or @dx3mod. From 4e59f4b4e13a4082724ec6b18948c7192d03c94b Mon Sep 17 00:00:00 2001 From: dx3mod Date: Mon, 4 Nov 2024 16:42:24 +0300 Subject: [PATCH 7/7] Add natsbyexample directory with fist pub-sub example --- README.md | 37 +++++++++++++-------- examples/natsbyexample/README.md | 4 +++ examples/natsbyexample/dune | 5 +++ examples/natsbyexample/publish_subscribe.ml | 31 +++++++++++++++++ 4 files changed, 63 insertions(+), 14 deletions(-) create mode 100644 examples/natsbyexample/README.md create mode 100644 examples/natsbyexample/dune create mode 100644 examples/natsbyexample/publish_subscribe.ml diff --git a/README.md b/README.md index b15b2b7..d9a07ba 100644 --- a/README.md +++ b/README.md @@ -23,38 +23,47 @@ $ opam pin nats-client-lwt.dev https://github.com/romanchechyotkin/nats.ocaml.gi ### Simple echo example This example shows how to publish to a subject and handle its messages. -Take it from [`examples/simple.ml`](./examples/simple.ml). ```ocaml -let () = - Lwt_main.run @@ +open Lwt.Infix + +let main = (* Create a switch for automatic dispose resources. *) Lwt_switch.with_switch @@ fun switch -> - + (* Connect to a NATS server by address 127.0.0.1:4222 with ECHO flag. *) let%lwt client = Nats_client_lwt.connect ~switch ~settings:[ `Echo ] (Uri.of_string "tcp://127.0.0.1:4222") in - (* Subscribe to HELLO subject. *) - let%lwt hello_subscr = - Nats_client_lwt.sub ~switch client ~subject:"HELLO" () + (* Publish 'hello' message to greet.joe subject. *) + Nats_client_lwt.pub client ~subject:"greet.joe" "hello";%lwt + + (* Subscribe to greet.* subject. *) + let%lwt subscription = + Nats_client_lwt.sub ~switch client ~subject:"greet.*" () in - (* Handle incoming HELLO subject messages. *) - Nats_client_lwt.Subscription.handle hello_subscr (fun msg -> - Lwt_io.printf "HELLO: %s\n" msg.payload.contents); + (* Publishes 'hello' message to three subjects. *) + Lwt_list.iter_p + (fun subject -> Nats_client_lwt.pub client ~subject "hello") + [ "greet.sue"; "greet.bob"; "greet.pam" ];%lwt - (* Send "Hello World" message to HELLO subject. *) - Nats_client_lwt.pub client ~subject:"HELLO" "Hello World";%lwt + (* Handle first three incoming messages to the greet.* subject. *) + Lwt_stream.nget 3 subscription.messages + >>= Lwt_list.iter_s (fun (message : Nats_client.Incoming_message.msg) -> + Lwt_io.printlf "'%s' received on %s" message.payload.contents + message.subject) - Lwt_unix.sleep 0.1 +let () = Lwt_main.run main ``` +Take it from [`examples/natsbyexample/publish_subscribe.ml`](./examples/natsbyexample/publish_subscribe.ml). + ```console $ docker start -a nats-server -$ dune exec ./examples/simple.exe +$ dune exec ./examples/natsbyexample/publish_subscribe.exe ``` ## References diff --git a/examples/natsbyexample/README.md b/examples/natsbyexample/README.md new file mode 100644 index 0000000..5112b79 --- /dev/null +++ b/examples/natsbyexample/README.md @@ -0,0 +1,4 @@ +Adaptation examples from [NATS by example](https://natsbyexample.com/). + +Examples: +- [Core Publish-Subscribe](./publish_subscribe.ml) diff --git a/examples/natsbyexample/dune b/examples/natsbyexample/dune new file mode 100644 index 0000000..b5d8e78 --- /dev/null +++ b/examples/natsbyexample/dune @@ -0,0 +1,5 @@ +(executables + (names publish_subscribe) + (libraries nats-client-lwt lwt) + (preprocess + (pps lwt_ppx))) diff --git a/examples/natsbyexample/publish_subscribe.ml b/examples/natsbyexample/publish_subscribe.ml new file mode 100644 index 0000000..56b956a --- /dev/null +++ b/examples/natsbyexample/publish_subscribe.ml @@ -0,0 +1,31 @@ +open Lwt.Infix + +let main = + (* Create a switch for automatic dispose resources. *) + Lwt_switch.with_switch @@ fun switch -> + (* Connect to a NATS server by address 127.0.0.1:4222 with ECHO flag. *) + let%lwt client = + Nats_client_lwt.connect ~switch ~settings:[ `Echo ] + (Uri.of_string "tcp://127.0.0.1:4222") + in + + (* Publish 'hello' message to greet.joe subject. *) + Nats_client_lwt.pub client ~subject:"greet.joe" "hello";%lwt + + (* Subscribe to greet.* subject. *) + let%lwt subscription = + Nats_client_lwt.sub ~switch client ~subject:"greet.*" () + in + + (* Publishes 'hello' message to three subjects. *) + Lwt_list.iter_p + (fun subject -> Nats_client_lwt.pub client ~subject "hello") + [ "greet.sue"; "greet.bob"; "greet.pam" ];%lwt + + (* Handle first three incoming messages to the greet.* subject. *) + Lwt_stream.nget 3 subscription.messages + >>= Lwt_list.iter_s (fun (message : Nats_client.Incoming_message.msg) -> + Lwt_io.printlf "'%s' received on %s" message.payload.contents + message.subject) + +let () = Lwt_main.run main