-
Notifications
You must be signed in to change notification settings - Fork 57
Pipelines
This assumes you’ve already read the Channels wiki entry. If you haven’t, go there first.
The thread-per-request model makes certain things very simple. Consider a simple query where we receive a request, query a database, and then respond with a formatted version of the database results.
(defn handler [request]
(->> request
query-db
transform-results))
This assumes that the entire request-response process takes place on a single thread. To make this asynchronous, the handler can’t return the response; instead, we must enqueue our response into a channel. This response can be enqueued at any time, from any thread. This isn’t too much more complicated.
(defn handler [channel request]
(->> request
query-db
transform-results
(enqueue channel)))
But what we if also want our query to the database to be asynchronous?
(defn handler [channel request]
(let [results (query-db request)]
(on-success results
#(->> %
transform-results
(enqueue channel)))))
But what if the query fails? In the thread-per-request model, we can just throw an exception, but here we need to hook into the error event as well.
(defn handler [channel request]
(let [results (query-db request)]
(on-error results
#(log-error %))
(on-success results
#(->> %
transform-results
(enqueue-and-close channel)))))
(Note: the above is not working code, it just illustrates how a callback-based system could work)
Now imagine chaining together two or three of these events, each time checking for both success and error outcomes. Clearly, this is not a fun way to write a program.
Ideally, we could just use something like the first or second example, and have the details handled for us. This is what pipelines are designed to do. Pipelines are functions which chain together stages, each of which represents a synchronous or asynchronous operation. Since asynchronous calls can fail asynchronously, we can’t represent their result with a single channel. Instead, asynchronous operations return a hash of two constant channels, :success
and :error
. These are called result channels, and are created using (result-channel)
.
Each stage is simply a function which takes a single parameter, and either returns a normal value or a result channel. Every pipeline is itself a function which returns a result channel. A pipeline for the above example is simple.
(pipeline
query-db
transform-request
#(enqueue ch %))
This returns a function, into which we pass request
. Executing the function will return a result channel, which will emit a result once the pipeline has completed, or an error if the query failed.
query-db
is an asynchronous operation, so it returns a result channel. When a result channel is returned, further execution is paused until it emits a result or error. The other two functions are synchronous, and execute immediately once the query result has come in.
The fact that pipelines return result channels mean that they can be nested. These two pipelines are equivalent:
(pipeline
a
b
c)
(pipeline
a
(pipeline
b
(pipeline
c)))
Here’s one way to use pipelines to handle incoming server requests:
(defn request-pipeline [channel]
(pipeline
query-db
transform-results
#(enqueue channel %))
(defn handler [channel request]
((request-pipeline channel) request))
But this is a little awkward. We don’t always want to have to write something like ((pipeline stage-a stage-b) initial-value)
. Instead, we can use run-pipeline
:
(defn handler [channel request]
(run-pipeline request
query-db
transform-results
#(enqueue-and-close channel %)))
This is simpler, but also more limited; a call to run-pipeline
cannot be nested within another pipeline.
In order to create a pipeline stage that consumes a message from a basic channel, use read-channel
.
> (def ch (channel))
#'ch
> (enqueue ch 3)
nil
> (wait-for-result
(run-pipeline ch
read-channel)
3
You can optionally define a timeout via (read-channel channel timeout)
. If the timeout elapses without receiving a message, the pipeline will fail.
Let’s assume that we want to create a chat client. Per our protocol, the first message sent by each client is their name, and the second is the chat room they want to join. A non-pipeline implementation for our chat client would look like this:
(defn chat-handler [ch]
(receive ch
(fn [name]
(receive ch
(fn [chatroom-name]
(let [chatroom (named-channel chatroom-name)]
(siphon (map* #(str name ": " %) ch) chatroom)
(siphon chatroom ch)))))
This works, but leaves a lot to be desired. It’s heavily nested, which makes it difficult to modify. Edge cases are also completely unhandled; there’s no way for us to know if the connection dies midway through the handshake, there’s no way for us to impose a timeout on the handshake, and so on.
Pipelines seem like the solution, but this problem is actually pretty difficult to solve with the tools introduced so far. Each stage takes in a single input, and produces a single output. For the first stage of our pipeline, this is fine; our input is the channel, and our output is the client’s name. But in the second stage, we have two inputs: the channel and the client’s name. To handle this, we need a new function: read-merge
.
read-merge
takes two parameters, a read function and a merge function. The read function takes zero parameters, and returns either a value or a result channel. That output is piped into the merge function, which takes two parameters: the output of the previous stage, and the output of the read function. The value returned by the merge function is passed into the next stage.
The pipeline equivalent of the above code, then, looks like this:
(defn chatroom-handler [ch]
(run-pipeline {}
(read-merge #(read-channel ch) #(assoc %1 :name %2))
(read-merge #(read-channel ch) #(assoc %1 :room %2))
(fn [params]
(let [chatroom-channel (named-channel (:room params))]
(siphon (map* #(str (:name params) ": " %) ch) chatroom-channel)
(siphon chatroom-channel ch))))
This is a much cleaner approach. The protocol can be easily modified, our calls to read-channel
can be given timeouts so that we don’t keep silent connections open forever, and the flow of information is easy to reason about.
To synchronously get a result from a result channel, use wait-for-result
. This will halt the thread until a result or error is emitted. It will either return the result, or rethrow the exception that caused the error.
> (wait-for-result
(run-pipeline 0 inc inc inc))
3
Since a result channel is just a hash, it works seamlessly with poll
.
> (wait-for-message
(poll
(run-pipeline 0 inc inc inc)))
[:success 3]
You can also hook into the success and error channels using receive
on the :success
and :error
channels.
(receive (:success pipeline)
(fn [result]
(println "Success!")))
(receive (:error pipeline)
(fn [intermediate-result exception]
(println "Error!")))
In the case of success, we only get the final result. In the case of an error, we receive both the result from the last successful stage, and the exception that was thrown.
Everything described so far works great, as long as you have a single pipeline that describes everything you want to do. But consider a pipeline with stages a
, b
, and c
, where a
might return either a number or a string. b
will have to handle both these cases, and unless it has a predictable output no matter what it’s given, so will c
. This greatly reduces our ability to rearrange and compose pipelines, and generally makes our code fragile.
Instead of returning different values into the same pipeline, a
should return different values into different pipelines. This is easy to do: in addition to values and result channels, stages can also return a redirect signal. A redirect signal is created using (redirect pipeline value)
. If value
is not specified, it defaults to the value passed into the current stage.
Consider this implementation of the Collatz conjecture using redirected pipelines:
(declare collatz)
(def three-n-plus-1
(pipeline
#(redirect collatz (+ 1 (* 3 %)))))
(def divide-by-two
(pipeline
#(redirect collatz (/ % 2))))
(def collatz
(pipeline
(fn [x]
(cond
(= 1 x) 1
(even? x) (redirect three-n-plus-1)
:else (redirect divide-by-two))))))
Redirection is tail recursive, so even thousands of mutual redirections won’t cause any issues. Also note that even though we’re redirecting to a new pipeline, the outer result channel returned by the invocation of the original pipeline will still contain the final result.
A special case of redirection is (restart value)
, which redirects to the beginning of the current pipeline. If value
is not specified, it will default to the value initially passed into the pipeline.
This pipeline will count to five:
(run-pipeline 0
inc
#(if (< % 5)
(restart %)
%)))
Notice that restart
can be used with run-pipeline
. This is not true of redirect
, since run-pipeline
never exposes a pipeline that can be directly referenced.
While it’s useful that any error inside the pipeline will be surfaced, sometimes just halting the pipeline isn’t the right thing to do. Any pipeline can define a special error handler, like so:
(pipeline
:error-handler (fn [intermediate-value exception]
(redirect log-error-pipeline exception))
something-sure-to-succeed
something-sure-to-fail)
The error handler is given the value returned by the last successful stage, and the exception that was thrown. If the error handler returns a redirect signal, the pipeline will not terminate. Among other things, this can be used to retry failed operations. This pipeline will try, try again:
(run-pipeline 0
:error-handler (fn [_ _] (restart))
inc
something-a-little-risky)