Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
dx3mod committed Nov 4, 2024
2 parents c9c0871 + 4e59f4b commit ef1416f
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 65 deletions.
20 changes: 16 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,40 @@ FEEL FREE TO CREATE ISSUES AND PRs

## Development

### Requirements
### How to start?

#### Requirements

- OCaml >= 4.14
- Dune
- OPAM

#### Use development branch

Use the `dev` branch.

#### Setup development environment

```sh
# Create a switch for project
$ opam switch create . --deps-only
# Or just install deps
$ opam install . --deps-only
```

### How to start?
#### Next...

See [examples](./examples/).

### Tasks
### How to design library API?

See [nats.rs](https://docs.rs/nats/latest/nats/).

## Tasks

You can find open tasks at [project board](https://github.com/users/romanchechyotkin/projects/1)
and [issues page](https://github.com/romanchechyotkin/nats.ocaml/issues).

### Questions
## Questions

If you have any questions, please contact @romanchechyotkin or @dx3mod.
37 changes: 24 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,47 @@ $ opam pin nats-client-lwt.dev https://github.com/romanchechyotkin/nats.ocaml.gi
### Simple echo example

This example shows how to publish to a subject and handle its messages.
Take it from [`examples/simple.ml`](./examples/simple.ml).

```ocaml
let () =
Lwt_main.run @@
open Lwt.Infix
let main =
(* Create a switch for automatic dispose resources. *)
Lwt_switch.with_switch @@ fun switch ->
(* Connect to a NATS server by address 127.0.0.1:4222 with ECHO flag. *)
let%lwt client =
Nats_client_lwt.connect ~switch ~settings:[ `Echo ]
(Uri.of_string "tcp://127.0.0.1:4222")
in
(* Subscribe to HELLO subject. *)
let%lwt hello_subject = Nats_client_lwt.sub client ~subject:"HELLO" () in
(* Publish 'hello' message to greet.joe subject. *)
Nats_client_lwt.pub client ~subject:"greet.joe" "hello";%lwt
(* Handle incoming HELLO subject messages. *)
Nats_client_lwt.Subscription.handle hello_subject (fun msg ->
Lwt_io.printf "HELLO: %s\n" msg.payload.contents);
(* Subscribe to greet.* subject. *)
let%lwt subscription =
Nats_client_lwt.sub ~switch client ~subject:"greet.*" ()
in
(* Send "Hello World" message to HELLO subject. *)
Nats_client_lwt.pub client ~subject:"HELLO" "Hello World";%lwt
(* Publishes 'hello' message to three subjects. *)
Lwt_list.iter_p
(fun subject -> Nats_client_lwt.pub client ~subject "hello")
[ "greet.sue"; "greet.bob"; "greet.pam" ];%lwt
Lwt_unix.sleep 0.1
(* Handle first three incoming messages to the greet.* subject. *)
Lwt_stream.nget 3 subscription.messages
>>= Lwt_list.iter_s (fun (message : Nats_client.Incoming_message.msg) ->
Lwt_io.printlf "'%s' received on %s" message.payload.contents
message.subject)
let () = Lwt_main.run main
```

Take it from [`examples/natsbyexample/publish_subscribe.ml`](./examples/natsbyexample/publish_subscribe.ml).

```console
$ docker start -a nats-server
$ dune exec ./examples/simple.exe
$ dune exec ./examples/natsbyexample/publish_subscribe.exe
```

## References
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ocaml
dune
yojson
ppx_yojson_conv
(alcotest :with-test))
(tags (nats)))

Expand Down
7 changes: 5 additions & 2 deletions examples/demo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ let main () =
(Uri.of_string "tcp://127.0.0.1:4222")
in

Format.printf "info %a\n" Yojson.Safe.pp client.info;
Format.printf "info %s\n"
(Nats_client.Incoming_message.INFO.yojson_of_t client.info
|> Yojson.Safe.to_string);

Nats_client_lwt.Subscription.handle client.incoming_messages (fun msg ->
Nats_client_lwt.Subscription.handle_stream client.incoming_messages
(fun msg ->
Lwt_fmt.printf "LOG: %a\n" Nats_client.Incoming_message.pp msg;%lwt
Lwt_fmt.flush Lwt_fmt.stdout);

Expand Down
4 changes: 4 additions & 0 deletions examples/natsbyexample/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Adaptation examples from [NATS by example](https://natsbyexample.com/).

Examples:
- [Core Publish-Subscribe](./publish_subscribe.ml)
5 changes: 5 additions & 0 deletions examples/natsbyexample/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
(executables
(names publish_subscribe)
(libraries nats-client-lwt lwt)
(preprocess
(pps lwt_ppx)))
31 changes: 31 additions & 0 deletions examples/natsbyexample/publish_subscribe.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
open Lwt.Infix

let main =
(* Create a switch for automatic dispose resources. *)
Lwt_switch.with_switch @@ fun switch ->
(* Connect to a NATS server by address 127.0.0.1:4222 with ECHO flag. *)
let%lwt client =
Nats_client_lwt.connect ~switch ~settings:[ `Echo ]
(Uri.of_string "tcp://127.0.0.1:4222")
in

(* Publish 'hello' message to greet.joe subject. *)
Nats_client_lwt.pub client ~subject:"greet.joe" "hello";%lwt

(* Subscribe to greet.* subject. *)
let%lwt subscription =
Nats_client_lwt.sub ~switch client ~subject:"greet.*" ()
in

(* Publishes 'hello' message to three subjects. *)
Lwt_list.iter_p
(fun subject -> Nats_client_lwt.pub client ~subject "hello")
[ "greet.sue"; "greet.bob"; "greet.pam" ];%lwt

(* Handle first three incoming messages to the greet.* subject. *)
Lwt_stream.nget 3 subscription.messages
>>= Lwt_list.iter_s (fun (message : Nats_client.Incoming_message.msg) ->
Lwt_io.printlf "'%s' received on %s" message.payload.contents
message.subject)

let () = Lwt_main.run main
6 changes: 4 additions & 2 deletions examples/simple.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ let () =
in

(* Subscribe to HELLO subject. *)
let%lwt hello_subject = Nats_client_lwt.sub client ~subject:"HELLO" () in
let%lwt hello_subscr =
Nats_client_lwt.sub ~switch client ~subject:"HELLO" ()
in

(* Handle incoming HELLO subject messages. *)
Nats_client_lwt.Subscription.handle hello_subject (fun msg ->
Nats_client_lwt.Subscription.handle hello_subscr (fun msg ->
Lwt_io.printf "HELLO: %s\n" msg.payload.contents);

(* Send "Hello World" message to HELLO subject. *)
Expand Down
2 changes: 1 addition & 1 deletion lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
(public_name nats-client)
(libraries lwt.unix yojson)
(preprocess
(pps lwt_ppx)))
(pps lwt_ppx ppx_yojson_conv)))
70 changes: 48 additions & 22 deletions lib/message.ml
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
(** NATS message. *)

module Incoming = struct
(* TODO: make type for INFO *)
type t = Info of Yojson.Safe.t | Msg of msg | Ping | Pong | OK | ERR of err
open Ppx_yojson_conv_lib.Yojson_conv

module INFO = struct
type t = {
server_id : string;
server_name : string;
version : string;
go : string;
host : string;
port : int;
headers : bool;
max_payload : int;
proto : int;
client_id : int option; [@default None] (* uint64 brr *)
auth_required : bool; [@default false]
tls_required : bool; [@default false]
tls_verify : bool; [@default false]
tls_available : bool; [@default false]
connect_urls : string list; [@default []]
ws_connect_urls : string list; [@default []]
ldm : bool; [@default false]
git_commit : string option; [@default None]
jetstream : bool; [@default false]
ip : string option; [@default None]
client_ip : string option; [@default None]
nonce : string option; [@default None]
cluster : string option; [@default None]
domain : string option; [@default None]
}
[@@deriving yojson] [@@yojson.allow_extra_fields]
end

type t = INFO of INFO.t | MSG of msg | PING | PONG | OK | ERR of err

and msg = {
subject : string;
Expand All @@ -20,10 +51,12 @@ module Incoming = struct
payload.size payload.contents

let pp fmt = function
| Info json -> Format.fprintf fmt "INFO %a" Yojson.Safe.pp json
| Msg msg -> pp_msg fmt msg
| Ping -> Format.fprintf fmt "PING"
| Pong -> Format.fprintf fmt "PONG"
| INFO info ->
Format.fprintf fmt "INFO %s"
(INFO.yojson_of_t info |> Yojson.Safe.to_string)
| MSG msg -> pp_msg fmt msg
| PING -> Format.fprintf fmt "PING"
| PONG -> Format.fprintf fmt "PONG"
| OK -> Format.fprintf fmt "+OK"
| ERR e -> Format.fprintf fmt "+ERR '%s'" e

Expand All @@ -36,7 +69,8 @@ module Incoming = struct
(** @raises Yojson.Json_error *)
let info_of_line line =
(* INFO {"option_name":option_value,...}␍␊ *)
Scanf.sscanf line "INFO %s" (fun s -> Yojson.Safe.from_string s)
Scanf.sscanf line "INFO %s" (fun s ->
Yojson.Safe.from_string s |> INFO.t_of_yojson)

let msg_of_line line =
(* MSG <subject> <sid> [reply-to] <#bytes>␍␊[payload]␍␊ *)
Expand Down Expand Up @@ -68,35 +102,27 @@ module Incoming = struct

(** @raises Invalid_argument *)
let of_line = function
| "PING" -> Ping
| "PONG" -> Pong
| "PING" -> PING
| "PONG" -> PONG
| "+OK" -> OK
| line when String.starts_with ~prefix:"-ERR" line ->
ERR (err_of_line line)
| line when String.starts_with ~prefix:"INFO" line ->
Info (info_of_line line)
INFO (info_of_line line)
| line when String.starts_with ~prefix:"MSG" line ->
Msg (msg_of_line line)
MSG (msg_of_line line)
| _ -> raise (Invalid_argument "incoming message line")
end
end

(** Initial message. *)
module Initial = struct
open Ppx_yojson_conv_lib.Yojson_conv

type t = { verbose : bool; pedantic : bool; tls_required : bool; echo : bool }
[@@deriving yojson] [@@yojson.allow_extra_fields]
(** Protocol. https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 *)

(* TODO: for encoding/decoding JSON should use [ppx_deriving_yojson] preprocessor. *)
let to_yojson t : Yojson.Safe.t =
`Assoc
[
("verbose", `Bool t.verbose);
("pedantic", `Bool t.pedantic);
("tls_required", `Bool t.tls_required);
("echo", `Bool t.echo);
("lang", `String "ocaml");
]

let default =
{ verbose = false; pedantic = false; tls_required = false; echo = false }

Expand Down
8 changes: 6 additions & 2 deletions lwt/connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ let receive conn =
let m = Incoming_message.Parser.of_line line in

match m with
| Incoming_message.Msg msg ->
| Incoming_message.MSG msg ->
(* read payload *)
let%lwt contents = Lwt_io.read ~count:msg.payload.size conn.ic in
let%lwt _ = Lwt_io.read ~count:2 conn.ic in

Lwt.return
@@ Incoming_message.Msg
@@ Incoming_message.MSG
{ msg with payload = { msg.payload with contents } }
| m -> Lwt.return m

Expand All @@ -86,6 +86,10 @@ module Send = struct
(Option.fold ~none:"" ~some:(Printf.sprintf " %s") queue_group)
sid

let unsub conn ?max_msgs (sid : Sid.t) =
writelnf conn.oc "UNSUB %s%s" sid
(Option.fold max_msgs ~none:"" ~some:(Printf.sprintf " %d"))

let connect ~json conn =
(* NOTE: Yojson.Safe.pp gives a bad result.
TODO: improve performance of JSON encoding (now is bad) *)
Expand Down
Loading

0 comments on commit ef1416f

Please sign in to comment.