Skip to content

Commit

Permalink
Merge pull request #1 from cloudfounders/batchoperations
Browse files Browse the repository at this point in the history
Batchoperations
  • Loading branch information
toolslive committed Jan 28, 2015
2 parents c3fbbf3 + b742a52 commit c2eaf19
Show file tree
Hide file tree
Showing 9 changed files with 2,071 additions and 977 deletions.
4 changes: 2 additions & 2 deletions META
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# -*- conf -*-

description = "Kinetic client"
version = "0.0.1"
version = "0.0.3"
exists_if = "kinetic.cmx,kinetic.cmi,kinetic.mli"
requires = "threads lwt lwt.unix"
requires = "threads lwt lwt.unix cryptokit"
archive(byte) = "kinetic.client.cma"
archive(native) = "kinetic.client.cmxa"
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ piqi:

kinetic_piqi_ml: piqi
piqic-ocaml --embed-piqi -C src src/kinetic.proto.piqi
patch -d src < src/kinetic_piqi.patch


example:
ocamlbuild -use-ocamlfind test_it.native
Expand Down
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
Kinetic OCaml Client
====================
This is an OCaml client for [Seagate's Kinetic drives](https://developers.seagate.com/display/KV/Kinetic+Open+Storage+Documentation+Wiki). Currently, it uses protocol version 2.0.4. This is what our drives speak and corresponds with version 0.7.0.2 of the Java Simulator

This is an OCaml client for [Seagate's Kinetic drives](https://developers.seagate.com/display/KV/Kinetic+Open+Storage+Documentation+Wiki).
Currently, it uses protocol version 3.0.6.
This is corresponds with version 0.8.0.3 of the Java Simulator.

Todo:
- [ ] Also support 3.X protocol
- [X] support 3.X protocol
- [ ] use 4.0.2 Bytes iso strings for buffers (depends on piqi)
- [X] opam installable
- [ ] publish on opam repo
- [ ] publish 0.0.3 on opam repo

Installation
============
In order to build the client, you need to have some OCaml libraries present.
In concreto, you need:
- Lwt
- piqi.0.7.1
- piqi
- Cryptokit


Expand Down Expand Up @@ -46,12 +47,19 @@ typically you'd do something like:
```OCaml
let sa = make_socket_address "127.0.0.1" 8123 in
let secret = "...." in
let cluster_version = ... in
let session = Kinetic.make_session ....in
Lwt_io.with_connection
sa
(fun conn ->
Kinetic.handshake secret cluster_version conn >>= fun session ->
...
Kinetic.set session conn key (Some value) >>= fun () ->
Kinetic.put session
conn "the_key"
(Some "the value")
~db_version:None ~new_version:None
~forced:true
>>= fun () ->
...
)
Expand Down
139 changes: 109 additions & 30 deletions examples/test_it.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,103 @@ let so2s = function
| None -> "None"
| Some s -> Printf.sprintf "Some(%S)" s

let vco2s = function
| None -> "None"
| Some (v, version) -> Printf.sprintf "Some(%S, %s)" v (so2s version)


open Lwt
open Kinetic

let fill session conn n =
let put_get_delete_test session conn =

let rec loop i =
if i = n
if i = 1000
then Lwt.return ()
else
let key = Printf.sprintf "x_%05i" i in
let v = Printf.sprintf "value_%05i" i in
let vo = Some v in
Kinetic.set session conn key vo >>= fun () ->
let key = Printf.sprintf "x_%05i" i in
let value = Printf.sprintf "value_%05i" i in
Kinetic.put session conn key value
~db_version:None
~new_version:None
~forced:(Some true)
>>= fun () ->
Kinetic.get session conn key >>= fun vco ->
Lwt_io.printlf "drive[%S]=%s" key (vco2s vco) >>= fun () ->
let () = match vco with
| None -> failwith "should be present"
| Some (value2, version) ->
begin
assert (value = value2);
assert (version = Some "");
end
in
Kinetic.delete_forced session conn key >>= fun () ->
Lwt_io.printlf "deleted %S" key >>= fun () ->
Kinetic.get session conn key >>= fun vco ->
Lwt_io.printlf "drive[%S]=%s" key (vco2s vco) >>= fun () ->
loop (i+1)
in
loop 0

let put_get_delete_test session conn =
let put_version_test session conn =
let key = "with_version" in
Kinetic.delete_forced session conn key >>= fun () ->
let value = "the_value" in
let version = Some "0" in
Kinetic.put session conn key value
~new_version:version
~db_version:None
~forced:(Some true)
>>= fun () ->
Kinetic.get session conn key >>= fun vco ->
Lwt_io.printlf "vco=%s" (vco2s vco) >>= fun () ->
begin
Lwt.catch
(fun () ->
let new_version = Some "1" in
Kinetic.put session conn key "next_value"
~db_version:new_version ~new_version
~forced:None
>>= fun () ->
Lwt.return false
)
(fun exn -> Lwt.return true)
end
>>= function
| false -> Lwt.fail (Failure "bad behaviour")
| true ->
Kinetic.get session conn key >>= fun vco2 ->
Lwt_io.printlf "vco2=%s" (vco2s vco2) >>= fun () ->
Lwt.return ()

let fill session conn n =
let rec loop i =
if i = 1000
if i = n
then Lwt.return ()
else
let key = Printf.sprintf "x_%05i" i in
let value = Printf.sprintf "value_%05i" i in
Kinetic.set session conn key (Some value) >>= fun () ->
Kinetic.get session conn key >>= fun vo ->
Lwt_io.printlf "drive[%S]=%s" key (so2s vo) >>= fun () ->
assert (Some value = vo);
Kinetic.set session conn key None >>= fun () ->
Lwt_io.printlf "deleted %S" key >>= fun () ->
Kinetic.get session conn key >>= fun vo ->
Lwt_io.printlf "drive[%S]=%s" key (so2s vo) >>= fun () ->
let key = Printf.sprintf "x_%05i" i in
let v = Printf.sprintf "value_%05i" i in
Kinetic.put
session conn key v
~db_version:None
~new_version:None
~forced:(Some true)
>>= fun () ->
loop (i+1)
in
loop 0



let range_test session conn =
Kinetic.get_key_range session conn
"x" true "y" true true 20
>>= fun keys ->
Lwt_io.printlf "[%s]\n" (String.concat "; " keys)


(*
let peer2peer_test session conn =
let peer = "192.168.11.102", 8000, false in
let operations = [
Expand All @@ -52,28 +108,51 @@ let peer2peer_test session conn =
]
in
Kinetic.p2p_push session conn peer operations
*)


let () =
let make_socket_address h p = Unix.ADDR_INET(Unix.inet_addr_of_string h, p) in
let sa = make_socket_address "127.0.0.1" 8123 in
let sa = make_socket_address "127.0.0.1" 9000 in
let t =
let secret = "asdfasdf" in
let session =
Kinetic.make_session
~cluster_version:0L
~sequence:0L
~identity:1L
~secret
~connection_id:1407518469L
in
Lwt_io.with_connection
sa
let cluster_version = 0L in
Lwt_io.with_connection sa
(fun conn ->
Kinetic.noop session conn >>= fun () ->
Kinetic.handshake secret cluster_version conn
>>= fun session ->

put_get_delete_test session conn >>= fun () ->
put_version_test session conn >>= fun () ->
fill session conn 1000 >>= fun () ->
Lwt_io.printlf "range:" >>= fun () ->
range_test session conn >>= fun () ->

Kinetic.start_batch_operation session conn >>= fun batch ->
let pe = Kinetic.make_entry
~key:"xxx"
~db_version:None
~new_version:None
(Some "XXX")
in
Kinetic.batch_put batch pe ~forced:(Some true)
>>= fun () ->
let de = Kinetic.make_entry
~key:"xxx"
~db_version:None
~new_version: None
None
in
Kinetic.batch_delete batch de ~forced:(Some true)
>>= fun () ->

Kinetic.end_batch_operation batch >>= fun conn ->

(*
Kinetic.noop session conn >>= fun () ->
peer2peer_test session conn
*)
Lwt.return ()
)
in
Lwt_log.add_rule "*" Lwt_log.Debug;
Expand Down
Loading

0 comments on commit c2eaf19

Please sign in to comment.