In this chapter, our goal is to implement a distributed, transactional key-value store. To make things a little more interesting, we'll make our values lists of integers, and our writes appends of integers. This data model will enable Maelstrom to perform more sophisticated safety analyses, and make it easier to debug when things go wrong. The transaction system we build will support arbitrary operations: there's nothing here which is specific to lists or appends.
The workload we'll use for this chapter is called
txn-list-append. Our servers need
to support a single RPC type txn
, which takes an incomplete transaction (e.g.
one where reads have no values) to execute, and returns the
completed version of that transaction (i.e. with reads filled in).
{"type": "txn",
"txn": a_transaction,
"msg_id": 123}
Which returns
{"type": "txn_ok",
"txn": that_transaction_completed,
"in_reply_to": 123}
Each transaction is a list of micro-operations.
[op1, op2, op3]
And each micro-operation is of the form [function, key, value]:
["r", 1, [3, 4, 5]]
denotes a read of key 1 observing the value[3, 4, 5]
.["append", 1, 6]
denotes appending 6 to key 1.
If we executed these operations in order, the resulting value of key 1 would be
[3, 4, 5, 6]
.
Maelstrom can check a broad range of consistency models for this workload, but the property that we'll aim for is one of the strongest: strict serializability. Informally, this means we need to ensure two things:
- Transactions appear to execute in some total order, as if they had been performed one after the next.
- That order is consistent with the real-time order of events: if we return
txn_ok
for transaction T1, and a client later submits transaction T2, then T1 must appear to execute before T2.
With this in mind, let's get started!
Let's start by creating a new file for our transactional server: datomic.rb
.
We'll follow the same approach we have in previous chapters: creating a node
and calling its mainloop.
#!/usr/bin/env ruby
require_relative 'node.rb'
class Transactor
attr_reader :node
def initialize
@node = Node.new
@lock = Mutex.new
end
end
Transactor.new.node.main!
$ chmod +x datomic.rb
Now, let's fire up Maelstrom, and see what kind of messages we get from our workload.
./maelstrom test -w txn-list-append --bin datomic.rb --time-limit 10 --log-stderr --node-count 1
...
INFO [2021-02-26 10:01:36,788] n3 stderr - maelstrom.process /home/aphyr/maelstrom/node.rb:120:in `block in main!': No handler for {:dest=>"n3", :body=>{:txn=>[["r", 9, nil], ["r", 8, nil]], :type=>"txn", :msg_id=>1}, :src=>"c14", :id=>14} (RuntimeError)
Right, so we're going to need a handler for the txn
message type. Let's add
one. We'll just... send back exactly the same transaction we got, without doing anything to it. See what happens.
@node.on 'txn' do |msg|
txn = msg[:body][:txn]
@node.log "\nTxn: #{txn}"
@node.reply! msg, type: "txn_ok", txn: txn
end
$ ./maelstrom test -w txn-list-append --bin datomic.rb --time-limit 10 --log-stderr --node-count 1
...
INFO [2021-02-26 10:04:41,678] n3 stderr - maelstrom.process Txn: [["append", 9, 13], ["r", 7, nil]]
OK, so there's a transaction with two operations: an append and a read. We're going to need something that can go through and apply those micro-ops to our state. How did this test run end, anyway?
:workload {:valid? false,
:anomaly-types (:internal),
:anomalies {:internal ({:op {:type :ok,
:f :txn,
:value [[:append 9 6]
[:r 9 nil]],
:time 3361561215,
:process 3,
:index 35},
:mop [:r 9 nil],
:expected [... 6]}
{:op {:type :ok,
:f :txn,
:value [[:append 9 12]
[:r 9 nil]],
:time 5249106084,
:process 0,
:index 53},
:mop [:r 9 nil],
:expected [... 12]}
{:op {:type :ok,
:f :txn,
:value [[:append 9 16]
[:r 9 nil]],
:time 9097287963,
:process 4,
:index 87},
:mop [:r 9 nil],
:expected [... 16]})},
:not #{:read-atomic},
:also-not #{:ROLA
:causal-cerone
:parallel-snapshot-isolation
:prefix
:serializable
:snapshot-isolation
:strict-serializable
:strong-session-serializable
:strong-session-snapshot-isolation
:strong-snapshot-isolation}},
:valid? false}
Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
OK, neat! So this is not strict-serializable. But it's also not a bunch of
other things--in particular, this history wasn't even read-atomic
. We know
that because it exhibited internal consistency violations--anomalies within
the scope of a single transaction. For example, we executed:
{:op {:type :ok,
:f :txn,
:value [[:append 9 6]
[:r 9 nil]],
:time 3361561215,
:process 3,
:index 35},
:mop [:r 9 nil],
:expected [... 6]}
This transaction appended 6 to key 9, and then read key 9, and observed its
state to be... nil
. That's got to be wrong! In particular, the micro-op
(mop
) [:r 9 nil]
was invalid: it should have seen some kind of list
ending in a 6. This happened because we didn't actually do any kind of read:
we just returned the transaction exactly as it arrived, with all read values
left nil
.
So, we're going to need state. Local state, to start; we'll worry about
sharing it between nodes later. Let's create a state
class to store our
state. Inside, we'll store a mutable hashmap of keys to values.
class State
def initialize
@state = {}
end
Next we need a method, State.transact!
, that can execute a transaction,
mutating State, and returning a completed version of that transaction with the
values of any reads. We'll run through the transaction's operations
sequentially.
def transact!(txn)
txn2 = []
txn.each do |op|
f, k, v = op
case f
when 'r'
txn2 << [f, k, (@state[k] or [])]
when 'append'
txn2 << op
list = (@state[k] or [])
list << v
@state[k] = list
end
end
txn2
end
end
Now we can create a State
object in our Transactor
, and use its transact!
method to update the state and produce a resulting transaction.
class Transactor
attr_reader :node
def initialize
@node = Node.new
@lock = Mutex.new
@state = State.new
@node.on 'txn' do |msg|
txn = msg[:body][:txn]
@node.log "\nTxn: #{txn}"
txn2 = @lock.synchronize do
@state.transact! txn
end
@node.reply! msg, type: "txn_ok", txn: txn2
end
end
end
./maelstrom test -w txn-list-append --bin datomic.rb --time-limit 10 --node-count 1
...
Everything looks good! ヽ(‘ー`)ノ
Our code actually contains a bug. Can you guess what it might be? Let's try upping the number of concurrent clients, the request rate, and running for a tad longer:
$ ./maelstrom test -w txn-list-append --bin datomic.rb --time-limit 30 --node-count 1 --concurrency 10n --rate 100
...
:workload {:valid? false,
:anomaly-types (:internal),
:anomalies {:internal ({:op {:type :ok,
:f :txn,
:value [[:r
9
[1 2 3 4 5 6 7]]
[:append 9 6]
[:append 9 7]
[:r
9
[1 2 3 4 5 6 7]]],
:time 56728387,
:process 8,
:index 13},
:mop [:r 9 [1 2 3 4 5 6 7]],
Well THAT'S neat! A single transaction executed a read of key 9, observing [1 2 3 4 5 6 7]. It then appended 6 and 7 to key 9, and read... [1 2 3 4 5 6 7] again. Did we fail to append values? Or somehow read them from the future? Let's look at the full history for key 9:
$ cat store/latest/history.txt | grep :ok | egrep --color '(append|r) 9 '
8 :ok :txn [[:r 6 nil] [:append 9 1] [:append 9 2]]
0 :ok :txn [[:append 7 1] [:r 8 nil] [:r 9 [1 2]]]
1 :ok :txn [[:r 7 [1]] [:append 9 3]]
9 :ok :txn [[:append 9 4] [:r 9 [1 2 3 4]]]
3 :ok :txn [[:r 9 [1 2 3 4]] [:r 7 [1]] [:r 7 [1]] [:r 8 nil]]
0 :ok :txn [[:r 9 [1 2 3 4 5]] [:r 6 nil] [:append 9 5]]
8 :ok :txn [[:r 9 [1 2 3 4 5 6 7]] [:append 9 6] [:append 9 7] [:r 9 [1 2 3 4 5 6 7]]]
This actually happened several times--and every time, we saw a value that was theoretically appended later in the history. We've been bitten by mutability!
class State
def transact!(txn)
...
when 'append'
txn2 << op
list = (@state[k] or [])
list << v
@state[k] = list
end
This instruction list << v
mutates list
in-place, which changes its value
in every read of that key. We actually want to construct a copy of the array
instead.
when 'append'
txn2 << op
list = (@state[k].clone or [])
list << v
@state[k] = list
end
$ ./maelstrom test -w txn-list-append --bin datomic.rb --time-limit 30 --node-count 1 --concurrency 10n --rate 100
...
Everything looks good! ヽ(‘ー`)ノ
Excellent. What happens when we add more nodes?
$ ./maelstrom test -w txn-list-append --bin datomic.rb --time-limit 10 --node-count 2
...
:not #{:read-atomic :read-committed},
:also-not #{:ROLA
:causal-cerone
:consistent-view
:cursor-stability
:forward-consistent-view
:monotonic-atomic-view
:monotonic-snapshot-read
:monotonic-view
:parallel-snapshot-isolation
:prefix
:repeatable-read
:serializable
:snapshot-isolation
:strict-serializable
:strong-session-serializable
:strong-session-snapshot-isolation
:strong-snapshot-isolation
:update-serializable}},
Well at least we don't have any internal consistency anomalies, but we're still not read-atomic, or read-committed, for that matter. Why? Because we observed incompatible orders.
:incompatible-order ({:key 7,
:values [[3 4] [1 2]]}
{:key 9,
:values [[4 5 7]
[1 2 3]]}
{:key 10,
:values [[8] [1]]}
{:key 8,
:values [[3 4] [1 2 5]]})},
At various points in the history, a read of key 7 returned the values [3 4]
,
but also [1 2]
. That can't possibly be true, if there actually were a
single list for key 7, and we only appended to it. And indeed, there isn't a
single key 7 any more: we have a completely different state on each node. What
we need to do is share that state somehow.
Next: Shared State.