Skip to content

Commit

Permalink
Merge pull request #3 from cloudfounders/protocol_update
Browse files Browse the repository at this point in the history
work against simulator version 0.8.0.4
  • Loading branch information
toolslive committed Jun 12, 2015
2 parents d0b9e2a + b9339a5 commit 405b3ca
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 102 deletions.
2 changes: 1 addition & 1 deletion META
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- conf -*-

description = "Kinetic client"
version = "0.0.4"
version = "0.0.5"
exists_if = "kinetic.cmx,kinetic.cmi,kinetic.mli"
requires = "threads lwt lwt.unix cryptokit"
archive(byte) = "kinetic.client.cma"
Expand Down
46 changes: 30 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Kinetic OCaml Client
====================
This is an OCaml client for [Seagate's Kinetic drives](https://developers.seagate.com/display/KV/Kinetic+Open+Storage+Documentation+Wiki).
Currently, it uses protocol version 3.0.6.
This is corresponds with version 0.8.0.3 of the Java Simulator.
This is corresponds with version 0.8.0.4 of the Java Simulator.


Installation
Expand Down Expand Up @@ -33,8 +33,6 @@ Once you have the library installed, you just add `true:package(kinetic-client)
Usage
=====



The API is defined in [kinetic.mli](src/kinetic.mli)

typically you'd do something like:
Expand All @@ -44,22 +42,38 @@ typically you'd do something like:
let secret = "...." in
let cluster_version = ... in
let session = Kinetic.make_session ....in
Lwt_io.with_connection
sa
(fun conn ->
Kinetic.handshake secret cluster_version conn >>= fun session ->
...
Kinetic.put session
conn "the_key"
(Some "the value")
~db_version:None ~new_version:None
~forced:true
>>= fun () ->
...
)
Lwt_io.with_connection sa
(fun conn ->
Kinetic.handshake secret cluster_version conn >>= fun session ->
...
Kinetic.put session conn
"the_key" (Some "the value")
~db_version:None ~new_version:None
~forced:true
~synchronization:(Some Kinetic.WRITEBACK)
>>= fun () ->
...
)
```

Remarks
=======

Protocol?
---------

There is a rather stable `protocol`
defined in Seagate's [kinetic-protocol](https://github.com/Seagate/kinetic-protocol) which defines the serialization for valid messages. The protocol itself is rather implicitly defined by the Kinetic Simulator, and the interpretation of what comprises an acceptable client/server conversation varies between Simulator releases.
All this to say that even if both client and server state they support protocol version X, they still might not be able to talk to each other. YMMV

Todo
----
- We only catered for our own needs, so feature support is rather limited.
(we welcome pull requests ;) )
- the API feels heavy, because it's a direct translation of the messages
into RPC. Once the protocol stabilizes, we should move into something
more elegant.

Have fun,

Expand Down
115 changes: 82 additions & 33 deletions examples/test_it.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,50 @@ let vco2s = function
open Lwt
open Kinetic

let batch_ops1 session conn =
let lwt_test name (f:unit -> unit Lwt.t) : bool Lwt.t=
Lwt_log.debug_f "starting:%s" name >>= fun () ->
let timeout = 60. (* Simulator isn't that fast (FLUSH | WRITETHROUGH) *) in
Lwt.catch
(fun () ->
Lwt_unix.with_timeout timeout f
>>= fun () ->
Lwt.return true
)
(fun exn ->
Lwt_log.info_f ~exn "failing:%s" name >>= fun () ->
Lwt.return false
)
>>= fun r ->
Lwt_log.debug_f "end of :%s" name >>= fun () ->
Lwt.return r

let test_get_non_existing session conn =
Kinetic.get session conn "I do not exist?"
>>= fun vo ->
assert (vo = None);
Lwt.return ()

let test_noop session conn =
Kinetic.noop session conn

let batch_ops1 session conn : unit Lwt.t=
Kinetic.start_batch_operation session conn >>= fun batch ->
let pe = Kinetic.make_entry
~key:"xxx"
~db_version:None
~new_version:None
(Some "XXX")
in
Kinetic.batch_put batch pe ~forced:(Some true) >>= fun () ->
Kinetic.batch_put batch pe ~forced:(Some true) >>= fun () ->
let de = Kinetic.make_entry
~key:"xxx"
~db_version:None
~new_version: None
None
in
Kinetic.batch_delete batch de ~forced:(Some true) >>= fun () ->
Kinetic.end_batch_operation batch
Kinetic.end_batch_operation batch >>= fun conn ->
Lwt.return ()

let batch_ops2 session conn =
Kinetic.start_batch_operation session conn >>= fun batch ->
Expand All @@ -37,7 +64,8 @@ let batch_ops2 session conn =
(Some "ZZZ")
in
Kinetic.batch_put batch pe ~forced:(Some true) >>= fun () ->
Kinetic.end_batch_operation batch
Kinetic.end_batch_operation batch >>= fun conn ->
Lwt.return ()


let batch_ops3 session conn =
Expand All @@ -50,20 +78,22 @@ let batch_ops3 session conn =
None
in
Kinetic.batch_delete batch de ~forced:None >>= fun () ->
Kinetic.end_batch_operation batch

let put_get_delete_test session conn =
Kinetic.end_batch_operation batch >>= fun conn ->
Lwt.return ()

let test_put_get_delete session conn =
let rec loop i =
if i = 1000
then Lwt.return ()
else
let key = Printf.sprintf "x_%05i" i in
let value = Printf.sprintf "value_%05i" i in
let synchronization = Some Kinetic.WRITEBACK in
Kinetic.put session conn key value
~db_version:None
~new_version:None
~forced:(Some true)
~forced:None
~synchronization
>>= fun () ->
Kinetic.get session conn key >>= fun vco ->
Lwt_io.printlf "drive[%S]=%s" key (vco2s vco) >>= fun () ->
Expand All @@ -79,19 +109,22 @@ let put_get_delete_test session conn =
Lwt_io.printlf "deleted %S" key >>= fun () ->
Kinetic.get session conn key >>= fun vco ->
Lwt_io.printlf "drive[%S]=%s" key (vco2s vco) >>= fun () ->
assert (vco = None);
loop (i+1)
in
loop 0

let put_version_test session conn =
let test_put_version session conn =
let key = "with_version" in
Kinetic.delete_forced session conn key >>= fun () ->
let value = "the_value" in
let version = Some "0" in
let synchronization = Some Kinetic.FLUSH in
Kinetic.put session conn key value
~new_version:version
~db_version:None
~forced:(Some true)
~synchronization
>>= fun () ->
Kinetic.get session conn key >>= fun vco ->
Lwt_io.printlf "vco=%s" (vco2s vco) >>= fun () ->
Expand All @@ -102,6 +135,7 @@ let put_version_test session conn =
Kinetic.put session conn key "next_value"
~db_version:new_version ~new_version
~forced:None
~synchronization
>>= fun () ->
Lwt.return false
)
Expand All @@ -115,6 +149,7 @@ let put_version_test session conn =
Lwt.return ()

let fill session conn n =
let synchronization = Some Kinetic.WRITEBACK in
let rec loop i =
if i = n
then Lwt.return ()
Expand All @@ -126,6 +161,7 @@ let fill session conn n =
~db_version:None
~new_version:None
~forced:(Some true)
~synchronization
>>= fun () ->
loop (i+1)
in
Expand All @@ -134,11 +170,15 @@ let fill session conn n =


let range_test session conn =
Kinetic.get_key_range session conn
"x" true "y" true true 20
fill session conn 1000 >>= fun () ->
Kinetic.get_key_range
session conn
"x" true "y" true true 20
>>= fun keys ->
Lwt_io.printlf "[%s]\n" (String.concat "; " keys)

Lwt_io.printlf "[%s]\n" (String.concat "; " keys) >>= fun () ->
assert (List.length keys = 20);
assert (List.hd keys= "x_00999");
Lwt.return ()

(*
let peer2peer_test session conn =
Expand All @@ -161,35 +201,44 @@ let () =
let cluster_version = 0L in
Lwt_io.with_connection sa
(fun conn ->
Kinetic.handshake secret cluster_version conn
>>= fun session ->
Kinetic.handshake secret cluster_version conn >>= fun session ->
let config = Kinetic.get_config session in
let open Config in
Lwt_io.printlf "Config:" >>= fun () ->
Lwt_io.printlf "version: %s" config.version >>= fun ()->
Lwt_io.printlf "wwn:%s" config.world_wide_name >>= fun ()->
Lwt_io.printlf "serial_number:%s" config.serial_number >>= fun ()->
Lwt_io.printlf "max_key_size:%i" config.max_key_size >>= fun ()->
Lwt_io.printlf "max_value_size:%i" config.max_value_size >>= fun ()->
Lwt_io.printlf "max_version_size:%i" config.max_version_size >>= fun ()->

Kinetic.get session conn "I do not exist?" >>= fun vo ->

(*put_get_delete_test session conn >>= fun () -> *)
(*put_version_test session conn >>= fun () -> *)
(*fill session conn 1000 >>= fun () ->
Lwt_io.printlf "range:" >>= fun () ->
range_test session conn >>= fun () ->
*)
(*batch_ops1 session conn >>= fun conn -> *)
(*
batch_ops2 session conn >>= fun conn ->
batch_ops3 session conn >>= fun conn ->
*)
(*
Kinetic.noop session conn >>= fun () ->
peer2peer_test session conn
*)
Lwt.return ()
let run_tests tests =
Lwt_list.map_s
(fun (test_name, test) ->
lwt_test test_name
(fun () -> test session conn)
>>= fun r ->
Lwt.return (test_name,r)
)
tests
in
run_tests
[
"get_non_existing",test_get_non_existing;
"noop", test_noop;
"put_get_delete", test_put_get_delete;

"put_version", test_put_version;
"range_test", range_test;
"batch_ops1", batch_ops1;
"batch_ops2", batch_ops2;
"batch_ops3", batch_ops3;
(*"peer2peer", peer2peer_test;*)
]
>>= fun results ->
Lwt_list.iter_s
(fun (n,r) -> Lwt_io.printlf "%-32s => %b" n r)
results
)
in
Lwt_log.add_rule "*" Lwt_log.Debug;
Expand Down
Loading

0 comments on commit 405b3ca

Please sign in to comment.