From b9339a50a8ce4b0a9f6a0eadf0a45113bbbb9265 Mon Sep 17 00:00:00 2001 From: Romain Slootmaekers Date: Thu, 11 Jun 2015 16:42:46 +0200 Subject: [PATCH] work against simulator version 0.8.0.4 --- META | 2 +- README.md | 46 ++++++++++++------ examples/test_it.ml | 115 +++++++++++++++++++++++++++++++------------- src/kinetic.ml | 102 +++++++++++++++++++++++---------------- src/kinetic.mli | 21 ++++---- 5 files changed, 184 insertions(+), 102 deletions(-) diff --git a/META b/META index 0f367e0..594d36c 100644 --- a/META +++ b/META @@ -1,7 +1,7 @@ # -*- conf -*- description = "Kinetic client" -version = "0.0.4" +version = "0.0.5" exists_if = "kinetic.cmx,kinetic.cmi,kinetic.mli" requires = "threads lwt lwt.unix cryptokit" archive(byte) = "kinetic.client.cma" diff --git a/README.md b/README.md index 293ad63..8358b5d 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Kinetic OCaml Client ==================== This is an OCaml client for [Seagate's Kinetic drives](https://developers.seagate.com/display/KV/Kinetic+Open+Storage+Documentation+Wiki). Currently, it uses protocol version 3.0.6. -This is corresponds with version 0.8.0.3 of the Java Simulator. +This is corresponds with version 0.8.0.4 of the Java Simulator. Installation @@ -33,8 +33,6 @@ Once you have the library installed, you just add `true:package(kinetic-client) Usage ===== - - The API is defined in [kinetic.mli](src/kinetic.mli) typically you'd do something like: @@ -44,22 +42,38 @@ typically you'd do something like: let secret = "...." in let cluster_version = ... in let session = Kinetic.make_session ....in - Lwt_io.with_connection - sa - (fun conn -> - Kinetic.handshake secret cluster_version conn >>= fun session -> - ... - Kinetic.put session - conn "the_key" - (Some "the value") - ~db_version:None ~new_version:None - ~forced:true - >>= fun () -> - ... - ) + Lwt_io.with_connection sa + (fun conn -> + Kinetic.handshake secret cluster_version conn >>= fun session -> + ... + Kinetic.put session conn + "the_key" (Some "the value") + ~db_version:None ~new_version:None + ~forced:true + ~synchronization:(Some Kinetic.WRITEBACK) + >>= fun () -> + ... + ) ``` +Remarks +======= + +Protocol? +--------- + +There is a rather stable `protocol` +defined in Seagate's [kinetic-protocol](https://github.com/Seagate/kinetic-protocol) which defines the serialization for valid messages. The protocol itself is rather implicitly defined by the Kinetic Simulator, and the interpretation of what comprises an acceptable client/server conversation varies between Simulator releases. +All this to say that even if both client and server state they support protocol version X, they still might not be able to talk to each other. YMMV + +Todo +---- + - We only catered for our own needs, so feature support is rather limited. + (we welcome pull requests ;) ) + - the API feels heavy, because it's a direct translation of the messages + into RPC. Once the protocol stabilizes, we should move into something + more elegant. Have fun, diff --git a/examples/test_it.ml b/examples/test_it.ml index 2c6cb55..3affcc3 100644 --- a/examples/test_it.ml +++ b/examples/test_it.ml @@ -10,7 +10,33 @@ let vco2s = function open Lwt open Kinetic -let batch_ops1 session conn = +let lwt_test name (f:unit -> unit Lwt.t) : bool Lwt.t= + Lwt_log.debug_f "starting:%s" name >>= fun () -> + let timeout = 60. (* Simulator isn't that fast (FLUSH | WRITETHROUGH) *) in + Lwt.catch + (fun () -> + Lwt_unix.with_timeout timeout f + >>= fun () -> + Lwt.return true + ) + (fun exn -> + Lwt_log.info_f ~exn "failing:%s" name >>= fun () -> + Lwt.return false + ) + >>= fun r -> + Lwt_log.debug_f "end of :%s" name >>= fun () -> + Lwt.return r + +let test_get_non_existing session conn = + Kinetic.get session conn "I do not exist?" + >>= fun vo -> + assert (vo = None); + Lwt.return () + +let test_noop session conn = + Kinetic.noop session conn + +let batch_ops1 session conn : unit Lwt.t= Kinetic.start_batch_operation session conn >>= fun batch -> let pe = Kinetic.make_entry ~key:"xxx" @@ -18,7 +44,7 @@ let batch_ops1 session conn = ~new_version:None (Some "XXX") in - Kinetic.batch_put batch pe ~forced:(Some true) >>= fun () -> + Kinetic.batch_put batch pe ~forced:(Some true) >>= fun () -> let de = Kinetic.make_entry ~key:"xxx" ~db_version:None @@ -26,7 +52,8 @@ let batch_ops1 session conn = None in Kinetic.batch_delete batch de ~forced:(Some true) >>= fun () -> - Kinetic.end_batch_operation batch + Kinetic.end_batch_operation batch >>= fun conn -> + Lwt.return () let batch_ops2 session conn = Kinetic.start_batch_operation session conn >>= fun batch -> @@ -37,7 +64,8 @@ let batch_ops2 session conn = (Some "ZZZ") in Kinetic.batch_put batch pe ~forced:(Some true) >>= fun () -> - Kinetic.end_batch_operation batch + Kinetic.end_batch_operation batch >>= fun conn -> + Lwt.return () let batch_ops3 session conn = @@ -50,20 +78,22 @@ let batch_ops3 session conn = None in Kinetic.batch_delete batch de ~forced:None >>= fun () -> - Kinetic.end_batch_operation batch - -let put_get_delete_test session conn = + Kinetic.end_batch_operation batch >>= fun conn -> + Lwt.return () +let test_put_get_delete session conn = let rec loop i = if i = 1000 then Lwt.return () else let key = Printf.sprintf "x_%05i" i in let value = Printf.sprintf "value_%05i" i in + let synchronization = Some Kinetic.WRITEBACK in Kinetic.put session conn key value ~db_version:None ~new_version:None - ~forced:(Some true) + ~forced:None + ~synchronization >>= fun () -> Kinetic.get session conn key >>= fun vco -> Lwt_io.printlf "drive[%S]=%s" key (vco2s vco) >>= fun () -> @@ -79,19 +109,22 @@ let put_get_delete_test session conn = Lwt_io.printlf "deleted %S" key >>= fun () -> Kinetic.get session conn key >>= fun vco -> Lwt_io.printlf "drive[%S]=%s" key (vco2s vco) >>= fun () -> + assert (vco = None); loop (i+1) in loop 0 -let put_version_test session conn = +let test_put_version session conn = let key = "with_version" in Kinetic.delete_forced session conn key >>= fun () -> let value = "the_value" in let version = Some "0" in + let synchronization = Some Kinetic.FLUSH in Kinetic.put session conn key value ~new_version:version ~db_version:None ~forced:(Some true) + ~synchronization >>= fun () -> Kinetic.get session conn key >>= fun vco -> Lwt_io.printlf "vco=%s" (vco2s vco) >>= fun () -> @@ -102,6 +135,7 @@ let put_version_test session conn = Kinetic.put session conn key "next_value" ~db_version:new_version ~new_version ~forced:None + ~synchronization >>= fun () -> Lwt.return false ) @@ -115,6 +149,7 @@ let put_version_test session conn = Lwt.return () let fill session conn n = + let synchronization = Some Kinetic.WRITEBACK in let rec loop i = if i = n then Lwt.return () @@ -126,6 +161,7 @@ let fill session conn n = ~db_version:None ~new_version:None ~forced:(Some true) + ~synchronization >>= fun () -> loop (i+1) in @@ -134,11 +170,15 @@ let fill session conn n = let range_test session conn = - Kinetic.get_key_range session conn - "x" true "y" true true 20 + fill session conn 1000 >>= fun () -> + Kinetic.get_key_range + session conn + "x" true "y" true true 20 >>= fun keys -> - Lwt_io.printlf "[%s]\n" (String.concat "; " keys) - + Lwt_io.printlf "[%s]\n" (String.concat "; " keys) >>= fun () -> + assert (List.length keys = 20); + assert (List.hd keys= "x_00999"); + Lwt.return () (* let peer2peer_test session conn = @@ -161,35 +201,44 @@ let () = let cluster_version = 0L in Lwt_io.with_connection sa (fun conn -> - Kinetic.handshake secret cluster_version conn - >>= fun session -> + Kinetic.handshake secret cluster_version conn >>= fun session -> let config = Kinetic.get_config session in let open Config in Lwt_io.printlf "Config:" >>= fun () -> + Lwt_io.printlf "version: %s" config.version >>= fun ()-> Lwt_io.printlf "wwn:%s" config.world_wide_name >>= fun ()-> Lwt_io.printlf "serial_number:%s" config.serial_number >>= fun ()-> Lwt_io.printlf "max_key_size:%i" config.max_key_size >>= fun ()-> Lwt_io.printlf "max_value_size:%i" config.max_value_size >>= fun ()-> Lwt_io.printlf "max_version_size:%i" config.max_version_size >>= fun ()-> - Kinetic.get session conn "I do not exist?" >>= fun vo -> - - (*put_get_delete_test session conn >>= fun () -> *) - (*put_version_test session conn >>= fun () -> *) - (*fill session conn 1000 >>= fun () -> - Lwt_io.printlf "range:" >>= fun () -> - range_test session conn >>= fun () -> - *) - (*batch_ops1 session conn >>= fun conn -> *) -(* - batch_ops2 session conn >>= fun conn -> - batch_ops3 session conn >>= fun conn -> - *) -(* - Kinetic.noop session conn >>= fun () -> - peer2peer_test session conn - *) - Lwt.return () + let run_tests tests = + Lwt_list.map_s + (fun (test_name, test) -> + lwt_test test_name + (fun () -> test session conn) + >>= fun r -> + Lwt.return (test_name,r) + ) + tests + in + run_tests + [ + "get_non_existing",test_get_non_existing; + "noop", test_noop; + "put_get_delete", test_put_get_delete; + + "put_version", test_put_version; + "range_test", range_test; + "batch_ops1", batch_ops1; + "batch_ops2", batch_ops2; + "batch_ops3", batch_ops3; + (*"peer2peer", peer2peer_test;*) + ] + >>= fun results -> + Lwt_list.iter_s + (fun (n,r) -> Lwt_io.printlf "%-32s => %b" n r) + results ) in Lwt_log.add_rule "*" Lwt_log.Debug; diff --git a/src/kinetic.ml b/src/kinetic.ml index cba16ea..8cff1aa 100644 --- a/src/kinetic.ml +++ b/src/kinetic.ml @@ -31,10 +31,15 @@ let to_hex = function String.iter (fun c -> Buffer.add_string buf (hex c)) s; Buffer.sub buf 0 (n_chars - 1) + let unwrap_option msg = function | None -> failwith ("None " ^ msg) | Some x -> x +let map_option f = function + | None -> None + | Some x -> Some (f x) + let option2s x2s = function | None -> "None" | Some x -> Printf.sprintf "Some %s" (x2s x) @@ -425,6 +430,10 @@ module Kinetic = struct type batch = Batch.t let get_batch_id (batch:batch) = let open Batch in batch.batch_id + type synchronization = + |WRITETHROUGH + |WRITEBACK + |FLUSH type rc = Batch.rc let convert_rc = function @@ -564,6 +573,7 @@ module Kinetic = struct ~db_version ~new_version ~forced + ~synchronization (body:Command_body.t) = let open Command_key_value in @@ -573,6 +583,13 @@ module Kinetic = struct kv.db_version <- db_version; kv.new_version <- new_version; body.key_value <- Some kv; + let translate = function + | WRITETHROUGH -> `writethrough + | WRITEBACK -> `writeback + | FLUSH -> `flush + in + let sync = map_option translate synchronization in + kv.synchronization <- sync; () let make_delete_forced session key = @@ -580,14 +597,21 @@ module Kinetic = struct ~db_version:None ~new_version:None ~forced:(Some true) + ~synchronization:(Some WRITEBACK) in make_serialized_msg session `delete mb - let make_put session key value ~db_version ~new_version ~forced = - let mb = set_attributes ~ko:(Some key) - ~db_version - ~new_version - ~forced + let make_put session key value + ~db_version ~new_version + ~forced ~synchronization + = + let mb = + set_attributes + ~ko:(Some key) + ~db_version + ~new_version + ~forced + ~synchronization in make_serialized_msg session `put mb @@ -635,8 +659,14 @@ module Kinetic = struct let put session (ic,oc) k value ~db_version ~new_version ~forced + ~synchronization = - let msg = make_put session k value ~db_version ~new_version ~forced in + let msg = + make_put + session k value + ~db_version ~new_version + ~forced ~synchronization + in network_send oc msg (Some value) >>= fun () -> network_receive ic >>= fun (r,vo) -> assert (vo = None); @@ -663,6 +693,7 @@ module Kinetic = struct ~db_version:None ~new_version:None ~forced:None + ~synchronization:None in make_serialized_msg session `get mb @@ -767,10 +798,10 @@ module Kinetic = struct let default_handler rc = let open Batch in match rc with - | Ok -> Lwt_log.debug ~section "default_handler ok" - | Nok(i,sm) -> Lwt_log.info_f ~section "NOK!: rc:%i; sm=%S" i sm - >>= fun () -> - Lwt.fail (Kinetic_exc [i,sm]) + | Ok -> Lwt_log.debug ~section "default_handler ok" + | Nok(i,sm) -> + Lwt_log.info_f ~section "NOK!: rc:%i; sm=%S" i sm >>= fun () -> + Lwt.fail (Kinetic_exc [i,sm]) let start_batch_operation ?(handler = default_handler) @@ -827,7 +858,9 @@ module Kinetic = struct set_attributes ~ko:(Some entry.key) ~db_version:entry.db_version ~new_version:entry.new_version - ~forced body; + ~forced + ~synchronization:None + body; command.body <- Some body; let m = default_message () in header.message_type <- Some mt; @@ -844,15 +877,10 @@ module Kinetic = struct let batch_put - ?(handler = default_handler) (batch:Batch.t) entry ~forced - = - Lwt_log.debug_f ~section "batch_put %s ~forced:%s" - (entry_to_string entry) - (bo2s forced) - >>= fun () -> + Lwt_log.debug_f ~section "batch_put %s" (entry_to_string entry) >>= fun () -> let open Batch in let msg = make_batch_msg batch.session @@ -861,10 +889,6 @@ module Kinetic = struct entry ~forced in - Batch.add_handler - batch `put_response handler - - >>= fun () -> let (ic,oc) = batch.conn in network_send oc msg (entry.vo) >>= fun () -> Session.incr_sequence batch.session; @@ -872,7 +896,6 @@ module Kinetic = struct let batch_delete - ?(handler = default_handler) (batch:Batch.t) entry ~forced @@ -884,25 +907,22 @@ module Kinetic = struct entry ~forced in - Batch.add_handler batch `delete_response handler - >>= fun () -> - let (ic,oc) = batch.conn in network_send oc msg None >>= fun () -> Session.incr_sequence batch.session; Lwt.return () -(* - - let make_noop session = - make_serialized_msg session `noop (set_ko None) - let make_set session key = - make_serialized_msg session `put (set_ko (Some key)) - let make_delete session key = - make_serialized_msg session `delete (set_ko (Some key)) - + let make_noop session = + let mb = set_attributes ~ko:None + ~db_version:None + ~new_version:None + ~forced:None + ~synchronization:None + in + make_serialized_msg session `noop mb + (* let make_p2p_push session (host,port,tls) operations = let open Message_p2_poperation in @@ -931,22 +951,20 @@ module Kinetic = struct in make_serialized_msg session `peer2_peerpush manip - - + *) let noop session (ic,oc) = let msg = make_noop session in let vo = None in network_send oc msg vo >>= fun () -> - Lwt_log.debug "done sending" >>= fun () -> + Lwt_log.debug "sent noop" >>= fun () -> network_receive ic >>= fun (r,vo) -> assert (vo = None); - _assert_type r `noop_response; - _assert_success r; - - let () = incr_session session in + let command = _parse_command r in + let () = Session.incr_sequence session in + _assert_both command `noop_response `success; Lwt.return () - +(* diff --git a/src/kinetic.mli b/src/kinetic.mli index 52c7c97..11773e8 100644 --- a/src/kinetic.mli +++ b/src/kinetic.mli @@ -41,6 +41,12 @@ module Kinetic : sig vo : value option; } val entry_to_string: entry -> string + + type synchronization = + | WRITETHROUGH + | WRITEBACK + | FLUSH + type rc type handler = rc -> unit Lwt.t exception Kinetic_exc of (int * bytes) list @@ -71,14 +77,15 @@ module Kinetic : sig -> db_version:version -> new_version:version -> forced:bool option + -> synchronization : synchronization option -> unit Lwt.t val delete_forced: session -> connection -> - key -> unit Lwt.t + key -> unit Lwt.t val get : session -> connection -> key -> (value * version) option Lwt.t - (*val noop: session -> connection -> unit Lwt.t *) + val noop: session -> connection -> unit Lwt.t val get_key_range: session -> connection -> key -> bool -> @@ -96,15 +103,9 @@ module Kinetic : sig ?handler:handler -> session -> connection -> batch Lwt.t - val batch_put : - ?handler:handler -> - batch -> entry -> forced:bool option - -> unit Lwt.t + val batch_put : batch -> entry -> forced:bool option -> unit Lwt.t - val batch_delete: - ?handler:handler -> - batch -> entry -> forced: bool option - -> unit Lwt.t + val batch_delete: batch -> entry -> forced:bool option -> unit Lwt.t val end_batch_operation : ?handler:handler ->