-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy path3socialite.tex
454 lines (318 loc) · 41.7 KB
/
3socialite.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
\chapter{SociaLite and Datalog with recursive aggregation}\label{r:socialite}
While Datalog allows to express some of graph algorithms
in an elegant and succinct way, many practical problems cannot be efficiently solved with Datalog programs.
SociaLite \cite{socialite, distsoc} is a graph query language based on Datalog. It allows a programmer to write intuitive queries using declarative semantics, which can often be executed as efficiently as highly optimized dedicated programs. The queries can also be executed in a distributed environment.
The most significant extension over Datalog in SociaLite is the ability to combine recursive rules with aggregation. Under some conditions, such rules can be evaluated incrementally and thus as efficiently as regular recursion in Datalog.
In \cite{socialite} Seo et al. introduce \emph{Sequential SociaLite}, intended to be executed on one machine. We cover its most important feature, namely recursive aggregation, in Section \ref{s:recaggr}. Two other extensions over Datalog are tail-nested tables which improve the data layout in the memory and an option to provide hints regarding the execution order. \cite{socialite} describes the language proposition and possible optimizations, but does not define the language precisely. The paper outlines conditions for the programs to be correct, but only in a simplified way and lacks definitions of some of the notions used, which makes the exact meaning of theorems and proofs unclear.
In \cite{distsoc} Sequential Socialite is extended to \emph{Distributed SociaLite}, executable on a distributed architecture. The presented version of the interpreter for Distributed SociaLite was built as an independent implementation based directly on Hadoop. Distributed SociaLite, described in Section \ref{s:distributed}, introduces a \emph{location operator}, which controls how data is distributed among workers and describes an independent implementation of the language.
In this chapter, we give precise definitions for a Datalog language extended with recursive aggregate functions, which is the main extension over Datalog in SociaLite. We also analyze the conditions that such programs have to satisfy in order to have unambiguous solution. This supplements the original definitions and proofs from \cite{socialite, distsoc}, which contain certain simplifications and are not precise about some important details.
\section{Datalog with recursive aggregation}\label{s:recaggr}
In this section we introduce the recursive aggregation extension to Datalog. Since SociaLite consists of several extensions to Datalog, not only of recursive aggregation, we will call the language defined here \emph{Datalog with recursive aggregation}, abbreviated \datalogra.
\subsection{Motivation}
Most graph algorithms are based on some kind of iteration or recursive computation. Examples of such algorithms are the Dijkstra algorithm for single source shortest paths and PageRank. Although simple recursion can be expressed easily in Datalog, it is usually difficult or impossible to express such algorithms in Datalog efficiently, as it would require computing much more intermediate results than actually needed to obtain the solution. We will explain this on an example: a simple program that computes the shortest paths from a source node.
A straightforward program in Datalog extended with non-recursive aggregation for computing single source shortest paths starting from node $1$ is presented in Figure~\ref{ex:ssspsocialite}. Due to the limitations of Datalog, this program computes all possible path lengths from node $1$ to other nodes in the first place, and after that for each node the minimal distance is chosen. Not only does this approach result in a bad performance, but the program executes indefinitely if a loop in the graph is reachable from the source node.
\dprog{}{
& \textsc{Path} (t, d) && & \assign & && \textsc{Edge} (1, t, d). & \\
& \textsc{Path} (t, d) && & \assign & && \textsc{Path} (s, d_1), \textsc{Edge} (s, t, d_2), d = d_1 + d_2. & \\
& \textsc{MinPath} (t, \textsc{Min}(d)) && & \assign & && \textsc{Path} (t, d). &
}{Datalog query for computing the shortest paths from node 1 to other nodes.}{ex:ssspdatalog}
\subsection{A program in \datalogra}
\datalogra allows aggregation to be combined with recursion under some conditions. This allows for writing straightforward programs for problems such as single source shortest paths, which finish execution in finite time and are often much more efficient than Datalog programs. An example \datalogra program that computes shortest paths from vertex 1 to all vertices is presented in Figure~\ref{ex:ssspsocialite}.
\dprog{
$\textsc{Edge}(\text{int } \textit{src}, \text{int } \textit{sink}, \text{int } \textit{len}) $ \\
$\textsc{Path}(\text{int } \textit{target}, \text{int } \textit{dist} \text{ aggregate } \textsc{Min}) $
}{
& \textsc{Path} (t, d) && & \assign & && \textsc{Edge} (1, t, d). & \\
& \textsc{Path} (t, d) && & \assign & && \textsc{Path} (s, d_1), \textsc{Edge} (s, t, d_2), d = d_1 + d_2. &
}{\datalogra query for computing the shortest paths from node 1 to other nodes}{ex:ssspsocialite}
The relation $\textsc{Path}$ is declared so that for each \textit{target} the values in \textit{dist} column are aggregated using minimum operator \textsc{Min}, i.e. for each \textit{target} the minimum of the corresponding \textit{dist} values is chosen by applying \textsc{Min} to the set of those values.
A \datalogra program $P$ is a Datalog program, with additional aggregation function defined for selected columns of some relations.
For each relation name $R \in idb(P)$, one column $\aggcol_R(P) \in {1, \dots ar_R}$ can be chosen and assigned an aggregation function $\aggfun_R(P)$ . This column is called the \emph{aggregated column}. The remaining columns are called the \emph{qualifying columns}. Intuitively, after each step of computation, we group the facts in the relation by the qualifying columns and within each group we aggregate the values in the aggregated column using $\aggfun_R(P)$. Value $\aggcol_R(P) = \bf{none}$ means that $R$ is a regular relation with no aggregation. Program $P$ can usually be clearly determined from the context. We will then simply write $\aggcol_R, \aggfun_R$ instead of $\aggcol_R(P), \aggfun_R(P)$.
To simplify the notation, we assume that if a relation has an aggregated column, then it is always the last one: $\aggcol_R = ar_R$.
The aggregation function needs to fulfill certain requirements that will be stated in the next section.
Syntactically, we require that each \idb relation is declared at the top of the program.
In the declaration of a relation, an aggregated column can be specified by adding keyword \textit{aggregate} and the name of the aggregate function next to the column declaration. This syntax allows for providing multiple rules for each aggregated relation in the program.
An example of a program in \datalogra is shown in Figure~\ref{fig:datalograexampleprogram}.
\begin{figure}[!htbp]
\narrow{
$\textsc{P}(\text{int } \textit{a}, \text{int } \textit{b} \text{ aggregate } \textsc{F}) $\\
$\textsc{R}(\text{int } \textit{src}, \text{int } \textit{sink}, \text{int } \textit{len}) $
\begin{flalign*}
& \textsc{P} (x, y) && & \assign & && P(x, z), ~ R(x, y, z). & \\
& \textsc{R} (x, y, z) && & \assign & && R(x, y, y),~R(x, z, z). & \\
\end{flalign*}
\caption{Example program in \datalogra.
\label{fig:datalograexampleprogram}}
}
\end{figure}
\subsection{Aggregation-aware order over databases for inflationary \datalogra}
While being very useful, recursive aggregation rules not always have an unambiguous solution. This is the case only under some conditions on the rules and the aggregation function itself.
Typically, Datalog programs semantics is defined using the fixed point of the immediate consequence operator $T_P$. This definition assumes that $T_P$ is inflationary with respect to the inclusion order on database instances. This requirement means that $T_P$ only adds facts to the database instance, but never removes them. This is also the reason for which program \ref{ex:ssspdatalog} is inefficient: the inflationary semantics forces all suboptimal distances to vertices to be kept in the database and as such, used in subsequent iterations.
When recursive aggregate functions are allowed, the semantics is not inflationary with respect to the inclusion order. A fact in the database can be replaced with a different one because a better aggregated value has appeared. However, an inflationary $T_P$ operator is necessary to prove that the fixed point semantics always gives a unique solution. In order to define semantics for \datalogra in terms of a fixed point, we need to use a different order on database instances than the regular set inclusion order.
In this section, we describe the idea introduced in \cite{socialite}. However, the original description lacks precision and details. Here we give the definitions in a precise way.
First, we define what a \emph{join operation} is and show the order that it induces. Then, we show that if the aggregation function is a join operation and corresponding rules are monotone with respect to the order induced by this operation, then the result of the program is unambiguously defined by the least fixed-point. We also show that it can be computed efficiently using the semi-naive evaluation.
\cite{socialite} uses notions of \emph{meet operation} and greatest fixed point instead. We chose the dual notions of join operation and least upper bound, as we find that they give more clear definitions and proofs.
\subsubsection{Join operation and induced ordering}
We start by recalling the definitions of idempotency, commutativity and associativity for binary operations.
\begin{defn}
A binary operation $\odot: X \times X \to X$ is:
\begin{itemize}
\item \emph{idempotent} iff $x \odot x = x$ for each $x \in X$,
\item \emph{commutative} iff $x \odot y = y \odot x$ for each $x, y \in X$,
\item \emph{associative} iff $(x \odot y) \odot z = x \odot (y \odot z)$ for each $x, y, z \in X$.
\end{itemize}
\end{defn}
A core concept in \datalogra is the \emph{join operation}. Join operations have the basic properties that are sufficient for the type of aggregation that we consider here to have unambiguous semantics.
\begin{defn}[Join operation]
A binary operation is a \emph{join} operation iff it is idempotent, commutative and associative.
\end{defn}
We usually denote a join operation with the symbol $\sqcup$.
An example of a join operation is the maximum of two numbers.
\begin{exmp}
$\max(a, b)$ for $a, b \in \mathbb{N}$ is a join operation; it is:
\begin{itemize}
\item idempotent --- $\max(a, a) = a$,
\item commutative --- $\max(a, b) = \max(b, a)$,
\item associative --- $\max(a, \max(b, c)) = \max(\max(a, b), c)$.
\end{itemize}
Similarly, the minimum of two numbers is also a join operation. On the contrary, $+$ is not a join operation, since it is not idempotent: $1+1 \ne 1$.
\end{exmp}
Since the join operation is associative, we can skip parentheses and write: $ a_1 \sqcup a_2 \sqcup \dots \sqcup a_n$ instead of: $ a_1 \sqcup (a_2 \sqcup ( \dots \sqcup a_n) \dots )$.
Join operations can be extended to non-empty finite sets of values in a straightforward way:
$$\sqcup(\{a_1, \dots, a_n \}) = a_1 \sqcup a_2 \sqcup \dots \sqcup a_n$$
\subsubsection{Order induced by a join operation}
A join operation over a set $P$ induces a partial order on $P$ that has some useful properties. In particular, $P$ with this ordering is a semi-lattice. We start by recalling the definitions of a least upper bound and a semi-lattice.
\begin{defn}[Upper bound]
Given a set $S$ with a partial order $\le$ over $S$, $c$ is an \emph{upper bound} of $S$ iff $\forall_{a \in S} a \le c$.
\end{defn}
\begin{defn}[Least upper bound]
Given a set $S$ with a partial order $\le$ over $S$, $m \in S$ is the \emph{least upper bound} of $S$ iff it is an upper bound of $S$ and for each upper bound $c$ of $S$, $m \le c$.
\end{defn}
\begin{defn}[Join semi-lattice]
A set $P$ with a partial order $\le$ over $P$ is a \emph{join semi-lattice} iff every two elements of $P$ have a least upper bound with respect to $\le$.
\end{defn}
For any two elements, their least upper bound is called a \emph{join} of those elements.
A join operation $\sqcap$ induces a partial order $\le_\sqcap$ over its domain, such that $a \le_\sqcap b \iff b = a \sqcap b$. This relation is indeed a partial order, as it is:
\begin{itemize}
\item reflexive --- since $\sqcap$ is idempotent, it holds that $a = a \sqcap a$, so $a \le_\sqcap a$,
\item transitive --- if $a \le_\sqcap b$ and $b \le_\sqcap c$, then $b = a \sqcap b$ and $c = b \sqcap c$ and by combining we have that $c = a \sqcap b \sqcap c = a \sqcap c$, so $a \le_\sqcap c$,
\item antisymmetric -- if $a \le_\sqcap b$ and $b \le_\sqcap a$, then $b = a \sqcap b$ and $a = b \sqcap a$, so $a = b$, because $\sqcap$ is commutative.
\end{itemize}
A join operation $\sqcap$ defines a semi-lattice with its induced order $\le_\sqcap$, because the result of the operation for any two elements is the least upper bound of those elements with respect to $\le_\sqcap$.
For example, the join operation $\max$ over natural number induces the partial order $\le$. For any two $a, b \in \mathbb{N}$, $\max(a, b)$ is their least upper bound with respect to $\le$.
\subsubsection{Aggregation operator $g_R$}
An important step in the evaluation of a \datalogra program is grouping the facts in an instance of each relation and performing the aggregation within each group. We can put that into a formal definition as function $g_R$. $g_R$ takes as an input a relation instance that may contain multiple facts with the same values in qualifying columns and performs the aggregation on the aggregated column within each such group.
\begin{defn}[Aggregation operator over relations]\label{d:aggregationoperationgr}
For non-aggregated relations, $g_R$ is an identity function. For a relation $R$ of arity $k = ar(R)$ with an aggregated column, let us define $g_R:~\inst(R)~\to~\inst(R)$ as:
$$g_R(I) =
\Big\{(x_1, \dots, x_{k-1}, t): (x_1, \dots, x_{k-1}, x_k) \in I \wedge t = \aggfun_R(\{y: (x_1, \dots, x_{k-1}, y) \in I\})\Big\}
$$
\end{defn}
\subsubsection{Pre-order over relation instances}
We can prove that there exists a unique least fixed point for any Datalog program. The fundamental fact needed for this proof is that Datalog semantics is inflationary, that is during the iterative evaluation of any Datalog program, if the state of a relation is $I_1$ in some step and $I_2$ at a later step, we know that $I_1 \subseteq I_2$. In \datalogra this property no longer holds. A fact in $I_1$ can be replaced with a different fact with a lower value in the aggregated column. To be able to define the semantics of programs in \datalogra using a least fixed point, we need to use a custom order on relation instances. This order is built based on the function $g_R$.
\begin{defn}
Let $R$ be a relation name and $k=ar(R)$. Let us define comparison $\sqsubseteq_R$ on relation instances as follows:
$$
I_1 \sqsubseteq_R I_2 \iff
\begin{cases}
\forall_{(q_1, ..., q_{k-1}, v) \in g_R(I_1)} \exists_{(q_1, ..., q_{k-1}, v') \in g_R(I_2)} v \le_{\aggfun_R} v' & \text{ if } \aggcol_R = k \\
I_1 \subseteq I_2 & \text{ if } \aggcol_R = \bf{none}
\end{cases}
$$
\end{defn}
The relation satisfies the reflexivity and transitivity requirements, so it is a pre-order.
\begin{lem}\label{lem:preorder}
For any $R$ and $inst(R)$, $\sqsubseteq_R$ is a pre-order over $\inst(R)$.
\end{lem}
\begin{prof}
If $R$ does not have an aggregated column, $\sqsubseteq_R$ is the same as the inclusion order $\subseteq$, which is a partial order.
If $R$ does have an aggregated column, then:
\begin{itemize}
\item $\sqsubseteq_R$ is reflexive: for each $R$, $inst(R)$ and $A \in \inst(R)$, since $\le_{\aggfun_R}$ is reflexive, we have that $\forall_{(q_1, ..., q_{k-1}, v) \in g_R(A)} \Big( (q_1, ..., q_{k-1}, v) \in g_R(A)~\text{and}~v~\le_{\aggfun_R}~v \Big)$. Hence, $A \sqsubseteq_R A$.
\item $\sqsubseteq_R$ is transitive: if $A \sqsubseteq_R B$ and $B \sqsubseteq_R C$, then by definition of $\sqsubseteq_R$ we have that:
\begin{align*}
\forall_{(q_1, ..., q_{n-1}, a) \in g_R(A)} \exists_{(q_1, ..., q_{n-1}, b) \in g_R(B)}~a~\le_{\aggfun_R}~b &\text{, and } \\
\forall_{(q_1, ..., q_{n-1}, b) \in g_R(B)} \exists_{(q_1, ..., q_{n-1}, c) \in g_R(C)}~b~\le_{\aggfun_R}~c&
\end{align*}
By combining it follows that $\forall_{(q_1, ..., q_{n-1}, a) \in g(A)} \exists_{(q_1, ..., q_{n-1}, c) \in g(C)} a~\le_{\aggfun_R}~c $, since $\le_{\aggfun_R}$ is a partial order and thus transitive. Therefore, $A \sqsubseteq_R C$. \QEDA
\end{itemize}
\end{prof}
\begin{exmp}
Let $R$ be a relation with arity $3$, with the last column aggregated using the join operation $\max$.
For the operation $ \max $ in $\mathbb{N}$, $ \le_{\max} $ is the usual order $ \le $. The following hold:
\begin{itemize}
\item $\{(1, 2, 3)\} \sqsubseteq_R \{(1, 2, 5)\}$, because $3 \le 5$,
\item $\{(1, 2, 3)\} \sqsubseteq_R \{(1, 2, 5), (1, 7, 2)\}$, because $3 \le 5$,
\item $\{(1, 2, 5)\} \sqsubseteq_R \{(1, 2, 3), (1, 2, 8)\}$, since $g_R(\{(1, 2, 3), (1, 2, 8)\}) = \{(1,2,8)\}$ and $5~\le~8$,
\item $A = \{(1, 2, 3), (2, 8, 1)\}$ and $B = \{(1, 2, 5), (1, 7, 2)\}$ cannot be compared, because $\forall_x (1, 7, x) \notin A$ and $\forall_x (2, 8, x) \notin B$,
\item for any $R$ an empty relation instance $\emptyset$ is smaller under $\sqsubseteq_R$ than any other relation instance,
\item if $I = \{(1, 2, 3), (1, 2, 8)\}$ and $J = \{(1, 2, 3), (1, 2, 7)\}$, we have that $I \sqsubseteq_R J$ and $J \sqsubseteq_R I$, but clearly $I \ne J$.
\end{itemize}
\label{ex:sqsubseteqorder}
\end{exmp}
The last point of example \ref{ex:sqsubseteqorder} shows that $\sqsubseteq_R$ is not necessarily a partial order over the set of all relation instances $\inst(R)$, because it is not antisymmetric.
\subsubsection{Partial order over relation instances after aggregation}
We already know that $\sqsubseteq_R$ is a pre-order over the set of all possible relation instances $\inst(R)$, but because of the lack of antisymmetry, it is not guaranteed to be a partial order. However, if we restrict to the relation instances resulting from applying aggregation, the relation is antisymmetric.
For any $\textsc{R}$ let $Z_\textsc{R}$ denote the set of relation instances that can be obtained by application of $g_\textsc{R}$ to some instance of $\textsc{R}$:
$$Z_\textsc{R} = \{g_\textsc{R}(I): I \in \inst(\textsc{R})\}$$
We will refer to elements of $Z_\textsc{R}$ as \emph{relations after aggregation}.
\begin{lem}\label{lem:fixgr}
For any $R$ such that $k=ar(R)$ and $\aggcol_R \ne {\bf none}$, if $I \in Z_R$, then $g_R(I) = I$ and for each $x_1, \dots, x_{n-1}$ there is at most one $x_n$ such that $(x_1, \dots, x_{n-1}, x_n) \in I$.
\end{lem}
\begin{prof}
Since we know that $I = g_R(I')$ for some $I'$, there is at most one fact $R(x_1, \dots, x_n)$ in $I$ for each $(x_1, \dots, x_{n-1})$, because $g_R$ by definition groups facts by qualifying columns $x_1, \dots, x_{n-1}$ and computes a single aggregated value $x_n$ for each such group. Hence, after the next application of $g_R$ to $I$ the aggregated value for each $x_1, \dots, x_{n-1}$ is simply $x_n$, so $g_R(I) = I$.
\QEDA
\end{prof}
\begin{thm}
For any $R$ such that $k=ar(R)$, $\sqsubseteq_R$ is a partial order over $Z_R$.
\end{thm}
\begin{prof}
If $R$ does not have an aggregated column, $\sqsubseteq_R$ is the same as inclusion order $\subseteq$, which is a partial order.
If $R$ has an aggregated column, then by Lemma \ref{lem:preorder}, $\sqsubseteq_R$ is a pre-order over $\inst(R)$. This implies that it is also a pre-order over $Z_R$, since it is a subset of $\inst(R)$, so it only remains to be shown that $\sqsubseteq_R$ is antisymmetric over $Z_R$.
Let $A, B$ be any relation instances from $Z_R$. Let us suppose that $A \sqsubseteq_R B$ and $B \sqsubseteq_R A$. To prove antisymmetry, we need to show that $A = B$. By definition of $\sqsubseteq_R$, we have that:
\begin{align*}
\forall_{(q_1, ..., q_{n-1}, a) \in g_R(A)} \exists_{(q_1, ..., q_{n-1}, b) \in g_R(B)} a \le_{\aggfun_R} b & \text{, and } \\
\forall_{(q_1, ..., q_{n-1}, b) \in g_R(B)} \exists_{(q_1, ..., q_{n-1}, a) \in g_R(A)} b \le_{\aggfun_R} a &
\end{align*}
Since $A, B \in Z_R$ we know that there exist $A', B'$ such that $A = g_R(A'), B = g_R(B')$. By Lemma \ref{lem:fixgr} $g_R(g_R(A')) = g_R(A')$ and thus $g_R(A) = g_R(g_R(A')) = g_R(A') = A$, and similarly $g_R(B) = B$, so the formulas above are equivalent to:
\begin{align*}
\forall_{(q_1, ..., q_{n-1}, a) \in A} \exists_{(q_1, ..., q_{n-1}, b) \in B}~a~\le_{\aggfun_R}~b &\text{, and } \\
\forall_{(q_1, ..., q_{n-1}, b) \in B} \exists_{(q_1, ..., q_{n-1}, a) \in A}~b~\le_{\aggfun_R}~a&
\end{align*}
Let $t = R(x_1, \dots, x_{n-1}, a)$ be any fact in $A$. We know that there exists $b$ such that $(x_1, ..., x_{n-1}, b) \in B$ and $a~\le_{\aggfun_R}~b$. Further, we know that there exists $(x_1, ..., x_{n-1}, a') \in A$ such that $b~\le_{\aggfun_R}~a'$. By Lemma \ref{lem:fixgr}, it must hold that $a = a'$, so $a \le_{\aggfun_R} b \le_{\aggfun_R} a$, and because $\le_{\aggfun_R}$ is a partial order we have that $a = b$, so $t \in B$. Therefore, $A \subseteq B$, because $t$ was chosen as an arbitrary element of $A$. By symmetry of the proof, it also holds that $B \subseteq A$, so $A = B$. This means that $\sqsubseteq_R$ is indeed antisymmetric.
As $\sqsubseteq_R$ is an antisymmetric pre-order over $Z_R$, it is a partial order over this set.
\QEDA
\end{prof}
\subsubsection{Order over database instances}
In regular Datalog, database instances can be compared using the inclusion order. We can extend the custom order $\sqsubseteq_R$ defined on relation instances to an order on database instances in a straightforward way, by comparing the databases relation-by-relation:
\begin{defn}
Let $\sigma$ be a database schema and $\textbf{K}, \textbf{L}$ be database instances over $\sigma$. Let $R_1, \dots, R_n$ be relation names in $\sigma$. By definition, $\textbf{K}$ is a union of relation instances $I_1, \dots I_n$ over $R_1, \dots, R_n$, respectively. Similarly, $\textbf{L}$ is a union of relation instances $J_1, \dots J_n$ over $R_1, \dots, R_n$, respectively. Let the order $\sqsubseteq_\sigma$ on database instances over $\sigma$ be defined as:
$$\textbf{K} \sqsubseteq_\sigma \textbf{L} \iff \forall_{i=1, \dots, n}~I_i \sqsubseteq_{R_i} J_i$$
\end{defn}
\subsection{Semantics for \datalogra programs}\label{ss:semdra}
In this subsection we will that the semantics of a \datalogra program can be unambiguously defined using least fixed point in the introduced order, as long as it satisfies some conditions. Intuitively, the rules should be monotone with respect to the order implied by aggregations.
Let $P$ be a \datalogra program, with $w$ \idb relations $R_1, R_2, \dots, R_w$ of arities $k_1, k_2, \dots, k_w$, respectively. Program $P$ has the following form:
\narrow{
$R_1(x_1, \dots, x_{{k_1}-1}, x_{k_1}~[\text{ aggregate } F_1]) $\\
\dots \\
$R_w(x_1, \dots, x_{{k_w}-1}, x_{k_w}~[\text{ aggregate } F_w]) $\\
\begin{flalign*}
& R_1 (x_1, \dots, x_{k_1}) && & \assign & && Q_{1,1}(x_1, \dots, x_{k_1}). & \\
& && & \dots & && & \\
& R_1 (x_1, \dots, x_{k_1}) && & \assign & && Q_{1,{m_1}}(x_1, \dots, x_{k_1}). & \\
& && & \dots & && & \\
& R_w (x_1, \dots, x_{k_w}) && & \assign & && Q_{w, 1}(x_1, \dots, x_{k_w}), & \\
& && & \dots & && & \\
& R_w (x_1, \dots, x_{k_w}) && & \assign & && Q_{w, {m_w}}(x_1, \dots, x_{k_w}). & \\
\end{flalign*}
}
The program has $m_i$ rules for computing $R_i$, for each $i$. $Q_{1, 1}, \dots, Q_{1, m_i}$ for $i = 1, \dots, w$ are bodies of these rules, with free variables $x_1, \dots, x_{k_i}$. Each relation $R_i$ may have an aggregation function $\aggfun_{R_i}$ defined for column $k_i$. Each such $\aggfun_{R_i}$ is required to be a join operation.
The subgoals in the rule bodies may refer to any relation of an arbitrary set of \edb relations $\edb(P)$, which are constant during the evaluation, and to any of the \idb relations, $R_1, \dots, R_w$. The \edb relations are not declared in the program.
By definition, $P$ is a program $P'$ in regular Datalog, with $\aggfun$ additionally defined. $P$ can be treated as a pair $(P', \aggfun)$. The immediate consequence operator $T_{P'}$ for the regular Datalog program $P'$ will be the base for defining the semantics for \datalogra program $P$. Schema $\sch(P)$ for the \datalogra program $P$ is defined as the schema $\sch(P')$ of the corresponding Datalog program $P'$.
We start by we extending aggregation operator $g_R$ over relation instances to aggregation over database instances. Intuitively, it applies the corresponding aggregation operator $g_R$ to each relation instance $I$ of relation $R$ in a database instance.
\begin{defn}[Aggregation operator over databases]
Let $P$ be a program in \datalogra and $\textbf{K}$ be a database instance over $\sch(P)$, and $\textbf{K}$ be a union of relation instances $I_1, \dots I_n$ over $R_1, \dots, R_n$ respectively, where $R_1, \dots, R_n$ are relation names in $\sch(P)$.
Let the \emph{aggregation operator} $G_P$ for $\textbf{K}$ be defined as:
$$G_P(\textbf{K}) = \bigcup_{i = 1}^n g_{R_i}(I_i)$$
\end{defn}
Similarly to relations after aggregation, we will call a database $\textbf{K}$, such that for some $\textbf{L}$ $\textbf{K} = G_P(\textbf{L})$, a \emph{database after aggregation}.
\begin{lem}\label{gpmonotonedatabase}
$G_P$ is monotone with respect to $\sqsubseteq_{\sch(P)}$ on all databases over $\sch(P)$.
\end{lem}
\begin{prof}
Since $G_P$ applies $g_R$ to each relation $R$ in the database and $\sqsubseteq_{\sch(P)}$ compares databases relation-by-relation using $\sqsubseteq_R$, it is sufficient to show that $g_R$ is monotone with respect to $\sqsubseteq_R$ for each $R \in \sch(P)$.
Let $R$ be any relation name in $\sch(P)$ and $A, B$ be relation instances over $R$ such that $A \sqsubseteq_R B$. If $R$ is not aggregated, then $g_R$ is an identity function, so it is monotone. If $R$ is aggregated, $A \sqsubseteq_R B$ is equivalent to $\forall_{(q_1, ..., q_{k-1}, v) \in g_R(A)} \exists_{(q_1, ..., q_{k-1}, v') \in g_R(B)} v \le_{\aggfun_R} v'$. As it holds by definition of $g_R$ that $g_R(g_R(A)) = g_R(A)$ and $g_R(g_R(B)) = g_R(B)$, we have that $\forall_{(q_1, ..., q_{k-1}, v) \in g_R(g_R(A))} \exists_{(q_1, ..., q_{k-1}, v') \in g_R(g_R(B))} v \le_{\aggfun_R} v'$, which is equivalent to $g_R(A) \sqsubseteq_R g_R(B)$. Therefore, $g_R$ is monotone with respect to $\sqsubseteq_R$ and $G_P$ is monotone with respect to $\sqsubseteq_{\sch(P)}$. \QEDA
\end{prof}
We can now define the immediate consequence operator for \datalogra programs.
\begin{defn}[Immediate consequence operator for \datalogra]
The \emph{immediate consequence operator} for a \datalogra program $P = (P', \aggfun)$, where $P'$ is a program in Datalog, is a function $T_P: \inst(\sch(P)) \to \inst(\sch(P))$:
$$T_P = G_P \circ T_{P'}$$
\end{defn}
The immediate consequence operator can be used to define semantics for a \datalogra program as its fix-point, similarly to the definition of Datalog's semantics.
\begin{thm}
Let $P$ be a program in \datalogra and $P = (P', \aggfun)$ where $P'$ is a program in Datalog. Let $\textbf{K}$ be a database instance over $\edb(P)$. Let $\sigma = \sch(P)$.
If $T_{P'}$ is monotone with respect to $\sqsubseteq_\sigma$ on databases over $\sigma$ after aggregation, then there exists a finite minimal fix-point of $T_P$ containing $\textbf{K}$. We denote this fix-point by $P(\textbf{K})$.
\end{thm}
\begin{prof}
$G_P$ is by Lemma \ref{gpmonotonedatabase} monotone with respect to $\sqsubseteq_\sigma$ on all databases over $\sigma$. Since it is assumed that $T_{P'}$ is monotone with respect to $\sqsubseteq_\sigma$ on databases after aggregation, $T_P = G_P \circ T_{P'}$ is also monotone with respect to $\sqsubseteq_\sigma$, as a composition of two monotone functions.
$\textbf{K}$ is a database instance over $\edb(P)$, so it is a database after aggregation, because $\textbf{K} = G_P(\textbf{K})$. $T_P^i(\textbf{K}) = G_P(T_{P'}(T_P^{i-1}(\textbf{K})))$ is also a database after aggregation for each $i > 0$.
As it holds that $\adom(P) \cup \adom(\textbf{K})$ and the database schema $\sch(P)$ are all finite, there is a finite number $n$ of database instances over $\sch(P)$ that can be reached by iteratively applying $T_P$ to $\textbf{K}$. Hence, because of the monotonicity of $T_P$ on databases after aggregation, we have inductively that $T_P^i(\textbf{K}) \sqsubseteq_\sigma T_P^{i+1}(\textbf{K})$ for each $i \ge 0$. Therefore, the sequence $\{T_P^i(\textbf{K})\}_i$ reaches a fix-point: $T_P^n(\textbf{K}) = T_P^{n+1}(\textbf{K})$. Let us denote this fix-point by $T_P^*(\textbf{K})$.
$$\textbf{K} \sqsubseteq_\sigma T_P(\textbf{K}) \sqsubseteq_\sigma T_P^2(\textbf{K}) \sqsubseteq_\sigma T_P^3(\textbf{K}) \sqsubseteq_\sigma \dots \sqsubseteq_\sigma T_P^*(\textbf{K}) $$
We will now prove that this is the minimum fix-point of $T_P$ containing $\textbf{K}$. Let us suppose that $\textbf{J}$ is a fix-point of $T_P$ containing $\textbf{K}$: $\textbf{K} \sqsubseteq_\sigma \textbf{J}$. By applying $T_P$ $n$ times to both sides of the inequality, we have that $T_P^*(\textbf{K}) = T_P^n(\textbf{K}) \sqsubseteq_\sigma \textbf T_P^n(\textbf{J}) = \textbf{J}$. Hence, $T_P^*(\textbf{K})$ is the minimum fix-point of $T_P$ containing $\textbf{K}$.
\QEDA
\end{prof}
\begin{exmp}
As an example, let us recall the program for computing the shortest paths from a selected source to other vertices of a graph, which we repeat in Figure~\ref{ssspexampledatalogra}.
\dprog{
$\textsc{Edge}(\text{int } \textit{src}, \text{int } \textit{sink}, \text{int } \textit{len}) $ \\
$\textsc{Path}(\text{int } \textit{target}, \text{int } \textit{dist} \text{ aggregate } \textsc{Min}) $
}{
& \textsc{Path} (t, d) && & \assign & && t = 1, d = 0. & \\
& \textsc{Path} (t, d) && & \assign & && \textsc{Path} (s, d_1), \textsc{Edge} (s, t, d_2), d = d_1 + d_2. &
}{\datalogra query for computing the shortest paths from node 1 to other nodes.}{ssspexampledatalogra}
In this program, we the following aggregation function for relation $\textsc{Path}$ is defined:
$$ \aggfun_\textsc{Path} = \lambda x, y. \min(x, y) $$
$\aggfun_\textsc{Path}$ is clearly a join operation. It induces a partial order $\ge$: for $a, b \in \mathbb{N}$, $\min(a, b)$ is the least upper bound of $a$ and $b$ with respect to $\ge$.
The corresponding aggregation operator for \textsc{Path} is:
$$g_\textsc{Path}(I) = \Big\{(x, t): (x, t) \in I \wedge t = \min(\{y: (x, y) \in I\})\Big\} $$
The aggregation-aware ordering on instances of \textsc{Path} is:
$$ I_1 \sqsubseteq_\textsc{Path} I_2 \iff \forall_{(x, d) \in g_\textsc{Path}(I_1)} \exists_{(x, d') \in g_\textsc{Path}(I_2)} d \ge d' $$
For \textsc{Edge}, the order on relation instances is simply given by inclusion, i.e. $\sqsubseteq_\textsc{Edge}$ is by definition $\subseteq$.
The order $\sqsubseteq$ on database instances over $\sch(P) = \{\textsc{Path}, \textsc{Edge}\}$ is as follows. Let \textbf{K}$_1$ and \textbf{K}$_2$ be any such database instances. By definition, $\textbf{K}_1 = P_1 \cup E_1$ and $\textbf{K}_2 = P_2 \cup E_2$, where $P_1, P_2$ are relations over \textsc{Path} and $E_1, E_2$ are relations over \textsc{Edge}.
$$ K_1 \sqsubseteq K_2 \iff P_1 \sqsubseteq_\textsc{Path} P_2 \wedge E_1 \subseteq E_2 $$
$P$ can be viewed as a Datalog program $P'$ with \aggfun additionally defined. As for any Datalog program, the immediate consequence operator for $T_{P'}(\textbf{K})$ is defined as the set of facts that are immediate consequences for $P'$ and \textbf{K}.
For $P$ to be a \datalogra program, $T_{P'}$ is required to be monotone with respect to $\sqsubseteq$. As ensuring this is the responsibility of the programmer, we will now show that this is the case.
\begin{prof}
Let \textbf{K}$_1$ and \textbf{K}$_2$ be database instances after aggregation over $\sch(P)$, such that $\textbf{K}_1 \sqsubseteq \textbf{K}_2$. By definition, $\textbf{K}_1 = P_1 \cup E_1$ and $\textbf{K}_2 = P_2 \cup E_2$, where $P_1, P_2$ are relations after aggregation over \textsc{Path} and $E_1, E_2$ are relations over \textsc{Edge}. $\textbf{K}_1 \sqsubseteq \textbf{K}_2$ implies that $E_1 \subseteq E_2$ and $P_1 \sqsubseteq_\textsc{Path} P_2$. We need to show that $T_{P'}(\textbf{K}_1) \sqsubseteq T_{P'}(\textbf{K}_2)$. This by definition is equivalent to $E'_1 \subseteq E'_2 \wedge P'_1 \sqsubseteq_\textsc{Path} P'_2$, where $E'_1, E'_2, P'_1, P'_2$ are such that for $i = 1, 2$, $T_{P'}(\textbf{K}_i) = E'_i \cup P'_i$, $P'_i$ is a relation instance over \textsc{Path} and $E'_i$ is a relation instance over \textsc{Edge}.
For any $\textsc{Edge}{(x, y, z)} \in E'_1$, it must be true that $\textsc{Edge}{(x, y, z)} \in E_1$, since there are no rules with \textsc{Edge} in head. Hence, as $E_1 \subseteq E_2$, \relat{Edge}{(x, y, z)}$ \in E_2$ and consequently \relat{Edge}{(x, y, z)}$ \in E'_2$, so $E'_1 \subseteq E'_2$.
$P'_1 \sqsubseteq_\textsc{Path} P'_2$ is by definition equivalent to $\forall_{(v, d) \in g_\textsc{Path}(P'_1)} \exists_{(v, d') \in g_\textsc{Path}(P'_2)} d \ge d' $. To prove this, let us suppose by contradiction that this does not hold, that is that for some $(v, d) \in g_\textsc{Path}(P'_1)$, for all $d'$ such that $(v, d') \in g_\textsc{Path}(P'_2)$, it holds that $d < d'$.
$P_1, P_2$ are relations after aggregation, so $P_1 = g_\textsc{Path}(P_1)$ and $P_2 = g_\textsc{Path}(P_2)$. Since $g_\textsc{Path}$ groups by the first column and aggregates the second column with $\min$, we know that $(v, d) \in P'_1$. By definition of immediate consequence, this means that at least one of the following is true:
\begin{enumerate}
\item $(v, d) \in P_1$,
\item there is a corresponding instantiation of the first rule in $\textbf{K}_1$,
\item there is a corresponding instantiation of the second rule in $\textbf{K}_1$.
\end{enumerate}
Let us consider each one of these possibilities.
If $(v, d) \in P_1$, as it holds that $P_1 \sqsubseteq_\textsc{Path} P_2$ and $P_1 = g_\textsc{Path}(P_1)$, there exists $d^* \le d$ such that $(v, d^*) \in g_\textsc{Path}(P_2)$. By definition of $g_\textsc{Path}$ we have that $(v, d^*) \in P_2$. As it follows from the definition of immediate consequence, $(v, d^*) \in P'_2$, so it cannot hold that $d < d'$ for all $d'$ such that $(v, d') \in g_\textsc{Path}(P'_2)$.
If there exists a corresponding instantiation of the first rule in $\textbf{K}_1$, i.e. $(v, d) = (1, 0)$, we also have the same instantiation in $\textbf{K}_2$, so $(1, 0) \in P'_2$ and it also cannot hold that $d < d'$ for all $d'$ such that $(v, d') \in g_\textsc{Path}(P'_2)$.
The last possibility is that there is a corresponding instantiation of the second rule in $\textbf{K}_1$, i.e. there exist $s, d_1, d_2$ such that $d = d_1 + d_2$, $(s, d_1) \in P_1$ and $(s, v, d_2) \in E_1$. Since $\textsc{Path}(s, d_1) \in P_1 = g_\textsc{Path}(P_1)$ and $P_1 \sqsubseteq_\textsc{Path} P_2$, there exists $d^*_1 \le d_1$ such that $(s, d^*_1) \in g_\textsc{Path}(P_2)$. By definition of $g_\textsc{Path}$, $(s, d^*_1) \in P_2$. Additionally, $(s, v, d_2) \in E_1 \subseteq E_2$. Therefore, $\textsc{Path} (v, d^*) \assign \textsc{Path} (s, d^*_1), \textsc{Edge} (s, v, d_2), d^* = d^*_1 + d_2$, where $d^* = d^*_1 + d_2 \le d_1 + d_2 = d$ is an instantiation of the second rule in $\textbf{K}_2$. This means that $(v, d^*) \in P_2$, so again it cannot hold that $d < d'$ for all $d'$ such that $(v, d') \in g_\textsc{Path}(P'_2)$.
Since each of the three possibilities results in a contradiction, it follows that $P'_1 \sqsubseteq_\textsc{Path} P'_2$. Therefore $T_{P'}(\textbf{K}_1) \sqsubseteq T_{P'}(\textbf{K}_2)$, which proves that $T_{P'}$ is monotone with respect to $\sqsubseteq$.
\QEDA
\end{prof}
\end{exmp}
\subsection{Evaluation}
A straightforward way to evaluate $P(\textbf{K})$, i.e.\ to compute the minimal fix-point of $T_P$ containing $\textbf{K}$, is to iteratively apply $T_P$ to $\textbf{K}$ until a fix-point is reached. This algorithm, used in Datalog and described in detail in Section \ref{ss:datalognaiveeval}, can also be directly applied in \datalogra. The only difference is that the immediate consequence operator which is used needs to take into account the aggregations.
\subsubsection{Semi-naive evaluation} Semi-naive evaluation, the most basic optimization technique used in Datalog evaluation, can be easily adopted in \datalogra.
In semi-naive evaluation of Datalog, which is described in Section \ref{ss:seminaiveevaldatalog}, $T_P$ is computed in a more efficient way than in the naive evaluation. To achieve this, a $T^\Delta_P$ function is used. $T_P^\Delta(I, \Delta)$ evaluates the rules of program $P$ on database instance $I$ and the set of new facts from the last iteration $\Delta$, so that at least one new fact is used in the application of each rule. In each iteration, $T^\Delta_P$ is applied to the full database and $\Delta$ from in the last step and the output is merged with the current database instance.
In the evaluation of a program $P$ in \datalogra, such that $P = (P', \aggfun)$ where $P'$ is a Datalog program, we can use this technique to efficiently compute $T_{P'}$. The full algorithm is presented in pseudocode in Figure~\ref{psc:seminaiveevaldatalogra}.
\begin{figure}[!htbp]
\begin{codebox}
\Procname{$\proc{Semi-naive-Evaluate-DatalogRA}(P,~\textbf{K})$}
\li $I_0 \leftarrow K$, $\Delta_0 \leftarrow K$
\li $i \leftarrow 0$
\li \Repeat
\li $i \leftarrow i + 1$
\li $C_i \leftarrow T_P^\Delta(I_{i-1}, \Delta_{i-1})$
\li $I_i \leftarrow G_P(C_i \cup I_{i-1})$
\li $\Delta_i \leftarrow I_i - I_{i-1}$
\li \Until $\Delta_i = \emptyset$
\li \Return $I_i$
\end{codebox}
\caption{Semi-naive evaluation algorithm for \datalogra}\label{psc:seminaiveevaldatalogra}
\end{figure}
The only difference between this algorithm and the semi-naive evaluation algorithm for Datalog described in Section \ref{ss:seminaiveevaldatalog} is that in each step $G_P$ is applied to the newly computed database instance.
\subsection{Stratified \datalogra}
Negation can be introduced to \datalogra in the same way as to regular Datalog --- by stratification. Intuitively, if there are no negated recursive calls, then the program can be divided into smaller subprograms, called \emph{strata}, so that there is no cyclic dependency between relations defined in different strata. The whole program can be then evaluated by evaluating the subprograms for each stratum in the topological order. Stratification and semantics for \datalogneg are described in detail in Subsection \ref{ss:datalogneg}.
We have already defined the semantics for \datalogra without negation. It can be naturally extended to the semantics of semi-positive \datalogra programs, i.e.\ programs in \datalogra that can also use \edb relations in a negated subgoal. This defines the semantics for an individual stratum.
To obtain the semantics of a general program in \datalogra with negation, let us notice that stratification can be applied to \datalogra programs with negation just like to regular \datalogneg programs. The only difference is that when $P$ is partitioned into the strata $P_1, \dots, P_n$, each stratum $P_i$ can contain recursive aggregation. In other words, it each $P_i$ is a semi-positive \datalogra program. Therefore, each $P_i$ is evaluated using the \datalogra semantics, instead of the regular Datalog semantics.
\section{Distributed SociaLite}\label{s:distributed}
Running a program on a single machine significantly limits the scale of problems which can be addressed. Large datasets will not fit into one computer's memory, and the running time is usually too large for practical applications. It is not possible to perform computations on large datasets, unless they are distributed across multiple machines. This rule also applies to programs in Datalog and its variants such as \datalogra.
The second paper on SociaLite \cite{distsoc} describes one of the possible ways of distributing evaluation of such programs. It proposes that each relation is distributed across workers based on the values in its first column. The facts from relations are sharded either with a hash function, or by dividing the possible value ranges into equal parts. The first column, by which the sharding is done, is distinguished by writing it separately in square brackets, for example the following rule for each fact $(1, v, d)$ in \textsc{Edge} adds a fact $(v, d)$ to \textsc{Path}:
$$\textsc{Path}[v](d) :- \textsc{Edge}[1](v, d)$$
The evaluation is performed on the workers, which are coordinated by a \emph{master node} and distribute between themselves the facts they need. Fault tolerance is achieved by checkpointing the state of the computation on distributed file system every few evaluation steps.
In Chapter \ref{r:implementation} we present an alternative way of parallelizing Datalog evaluation --- by translating the program into a set of Apache Spark's native operations on RDDs.
\section{Differences from original SociaLite formulation}
The version of Datalog presented here differs from the original description found in \cite{socialite, distsoc}.
We presented a precise definition of a \datalogra program and the well-defined conditions for it to have an unambiguous solution. While the original papers contains these conditions, they lack precise definitions of the notions used, such as the order on database instances.
In our definition, the syntax for declaring the aggregated column and function is to place them in the preamble, while in the original paper the aggregations are placed in the rules' heads. We believe that our approach can enhance the readability of \datalogra programs.
In \cite{socialite, distsoc} recursive aggregate functions are defined using \emph{meet operation}, which is dual to join operation, but induces a partial order in which the outcome of the operation is the greatest lower bound instead of the least upper bound. This requires the semantics of a program to be defined using the greatest fixed point. We decided to use join operations, as it allows the semantics to be defined with the least fixed point, consistently with regular Datalog.