-
Notifications
You must be signed in to change notification settings - Fork 0
/
2019-03-03-000-conduit-and-postgresql.html
108 lines (97 loc) · 11.3 KB
/
2019-03-03-000-conduit-and-postgresql.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<link rel="alternate"
type="application/rss+xml"
href="https://magnus.therning.org/feed.xml"
title="RSS feed for https://magnus.therning.org/">
<title>Conduit and PostgreSQL</title>
<meta name="author" content="Magnus Therning"><meta name="referrer" content="no-referrer"><link href= "static/style.css" rel="stylesheet" type="text/css" /><link href= "static/htmlize.css" rel="stylesheet" type="text/css" /><link href= "static/extra_style.css" rel="stylesheet" type="text/css" /></head>
<body>
<div id="preamble" class="status"><div class="nav-bar"><a class="nav-link" href="./index.html">Top</a><a class="nav-link" href="./archive.html">Archive</a><a class="nav-link align-right" href="./feed.xml"><img src="static/rss-feed-icon.png" style="height: 24px;" /></a></div></div>
<div id="content">
<div class="post-date">03 Mar 2019</div><h1 class="post-title"><a href="https://magnus.therning.org/2019-03-03-000-conduit-and-postgresql.html">Conduit and PostgreSQL</a></h1>
<p>
For a while now I've been playing around with an event-drive software design
(<a href="https://en.wikipedia.org/wiki/Event-driven_architecture">EDA</a>) using <a href="https://hackage.haskell.org/package/conduit"><code>conduit</code></a> for processing of events. For this post the processing can
basically be viewed as the following diagram
</p>
<pre class="example" id="org234dea9">
+-----------+ +------------+ +---------+
| | | | | |
| PG source |-->| Processing |-->| PG sink |
| | | | | |
+-----------+ +------------+ +---------+
^ |
| +------+ |
| | | |
| | PG | |
+------------| DB |<-----------+
| |
+------+
</pre>
<p>
I started out looking for Conduit components for PostgreSQL on <a href="https://hackage.haskell.org/">Hackage</a> but
failed to find something fitting so I started looking into writing them myself
using <a href="https://hackage.haskell.org/package/postgresql-simple"><code>postgresql-simple</code></a>.
</p>
<p>
The sink wasn't much of a problem, use <code>await</code> to get an event (a tuple) and
write it to the database. My almost complete ignorance of using databases
resulted in a first version of the source was rather naive and used
busy-waiting. Then I stumbled on PostgreSQL's support for notifications through
the <code>LISTEN</code> and <code>NOTIFY</code> commands. I rather like the result and it seems to
work well.<sup><a id="fnr.1" class="footref" href="#fn.1">1</a></sup>
</p>
<p>
It looks like this
</p>
<div class="org-src-container">
<pre class="src src-haskell"><span class="org-haskell-keyword">import</span> <span class="org-haskell-constructor">Control.Monad.IO.Class</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-constructor">MonadIO</span>, liftIO<span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-constructor">Data.Aeson</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-constructor">Value</span><span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-keyword">qualified</span> <span class="org-haskell-constructor">Data.Conduit</span> <span class="org-haskell-keyword">as</span> <span class="org-haskell-constructor">C</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-keyword">qualified</span> <span class="org-haskell-constructor">Data.Conduit.Combinators</span> <span class="org-haskell-keyword">as</span> <span class="org-haskell-constructor">CC</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-constructor">Data.Text</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-constructor">Text</span><span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-constructor">Data.Time.Clock</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-constructor">UTCTime</span><span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-constructor">Data.UUID</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-constructor">UUID</span><span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-constructor">Database.PostgreSQL.Simple</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-constructor">Connection</span>, <span class="org-haskell-constructor">Only</span><span class="org-rainbow-delimiters-depth-2">(</span><span class="org-haskell-operator">..</span><span class="org-rainbow-delimiters-depth-2">)</span>, execute, execute_, query<span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-keyword">import</span> <span class="org-haskell-constructor">Database.PostgreSQL.Simple.Notification</span> <span class="org-rainbow-delimiters-depth-1">(</span>getNotification<span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-definition">fst8</span> <span class="org-haskell-operator">::</span> <span class="org-rainbow-delimiters-depth-1">(</span>a, b, c, d, e, f, g, h<span class="org-rainbow-delimiters-depth-1">)</span> <span class="org-haskell-operator">-></span> a
<span class="org-haskell-definition">fst8</span> <span class="org-rainbow-delimiters-depth-1">(</span>a, <span class="org-haskell-keyword">_</span>, <span class="org-haskell-keyword">_</span>, <span class="org-haskell-keyword">_</span>, <span class="org-haskell-keyword">_</span>, <span class="org-haskell-keyword">_</span>, <span class="org-haskell-keyword">_</span>, <span class="org-haskell-keyword">_</span><span class="org-rainbow-delimiters-depth-1">)</span> <span class="org-haskell-operator">=</span> a
<span class="org-haskell-definition">dbSource</span> <span class="org-haskell-operator">::</span> <span class="org-haskell-type">MonadIO</span> m <span class="org-haskell-operator">=></span> <span class="org-haskell-type">Connection</span> <span class="org-haskell-operator">-></span> <span class="org-haskell-type">Int</span> <span class="org-haskell-operator">-></span> <span class="org-haskell-type">C.ConduitT</span> <span class="org-haskell-constructor"><span class="org-rainbow-delimiters-depth-1">()</span></span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-type">Int</span>, <span class="org-haskell-type">UTCTime</span>, <span class="org-haskell-type">Int</span>, <span class="org-haskell-type">Int</span>, <span class="org-haskell-type">Bool</span>, <span class="org-haskell-type">UUID</span>, <span class="org-haskell-type">Text</span>, <span class="org-haskell-type">Value</span><span class="org-rainbow-delimiters-depth-1">)</span> m <span class="org-haskell-constructor"><span class="org-rainbow-delimiters-depth-1">()</span></span>
<span class="org-haskell-definition">dbSource</span> conn ver <span class="org-haskell-operator">=</span> <span class="org-haskell-keyword">do</span>
res <span class="org-haskell-operator"><-</span> liftIO <span class="org-haskell-operator">$</span> query conn <span class="org-string">"SELECT * from events where id > (?) ORDER BY id"</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-constructor">Only</span> ver<span class="org-rainbow-delimiters-depth-1">)</span>
<span class="org-haskell-keyword">case</span> res <span class="org-haskell-keyword">of</span>
<span class="org-haskell-constructor"><span class="org-rainbow-delimiters-depth-1">[]</span></span> <span class="org-haskell-operator">-></span> <span class="org-haskell-keyword">do</span>
liftIO <span class="org-haskell-operator">$</span> execute_ conn <span class="org-string">"LISTEN MyEvent"</span>
liftIO <span class="org-haskell-operator">$</span> getNotification conn
dbSource conn ver
<span class="org-haskell-keyword">_</span> <span class="org-haskell-operator">-></span> <span class="org-haskell-keyword">do</span>
<span class="org-haskell-keyword">let</span> ver' <span class="org-haskell-operator">=</span> maximum <span class="org-haskell-operator">$</span> map fst8 res
CC.yieldMany res
dbSource conn ver'
<span class="org-haskell-definition">dbSink</span> <span class="org-haskell-operator">::</span> <span class="org-haskell-type">MonadIO</span> m <span class="org-haskell-operator">=></span> <span class="org-haskell-type">Connection</span> <span class="org-haskell-operator">-></span> <span class="org-haskell-type">C.ConduitT</span> <span class="org-rainbow-delimiters-depth-1">(</span><span class="org-haskell-type">Int</span>, <span class="org-haskell-type">Int</span>, <span class="org-haskell-type">Bool</span>, <span class="org-haskell-type">UUID</span>, <span class="org-haskell-type">Text</span>, <span class="org-haskell-type">Value</span><span class="org-rainbow-delimiters-depth-1">)</span> <span class="org-haskell-type">C.Void</span> m <span class="org-haskell-constructor"><span class="org-rainbow-delimiters-depth-1">()</span></span>
<span class="org-haskell-definition">dbSink</span> conn <span class="org-haskell-operator">=</span> <span class="org-haskell-keyword">do</span>
evt <span class="org-haskell-operator"><-</span> C.await
<span class="org-haskell-keyword">case</span> evt <span class="org-haskell-keyword">of</span>
<span class="org-haskell-constructor">Nothing</span> <span class="org-haskell-operator">-></span> return <span class="org-haskell-constructor"><span class="org-rainbow-delimiters-depth-1">()</span></span>
<span class="org-haskell-constructor">Just</span> event <span class="org-haskell-operator">-></span> <span class="org-haskell-keyword">do</span>
liftIO <span class="org-haskell-operator">$</span> execute conn <span class="org-string">"INSERT INTO events \</span>
<span class="org-string"> \(srv_id, stream_id, cmd, cmd_id, correlation_id, event_data) \</span>
<span class="org-string"> \VALUES (?, ?, ?, ?, ?, ?)"</span> event
liftIO <span class="org-haskell-operator">$</span> execute_ conn <span class="org-string">"NOTIFY MyEvent"</span>
dbSink conn
</pre>
</div>
<div id="footnotes">
<h2 class="footnotes">Footnotes: </h2>
<div id="text-footnotes">
<div class="footdef"><sup><a id="fn.1" class="footnum" href="#fnr.1">1</a></sup> <div class="footpara"><p class="footpara">
If I've missed something crucial I would of course love to hear about it.
</p></div></div>
</div>
</div><div class="taglist"><a href="https://magnus.therning.org/tags.html">Tags</a>: <a href="https://magnus.therning.org/tag-conduit.html">conduit</a> <a href="https://magnus.therning.org/tag-event_sourcing.html">event_sourcing</a> <a href="https://magnus.therning.org/tag-haskell.html">haskell</a> <a href="https://magnus.therning.org/tag-postgresql.html">postgresql</a> </div></div>
<div id="postamble" class="status"><!-- org-static-blog-page-postamble --></div>
</body>
</html>