Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Support chaining streams
Browse files Browse the repository at this point in the history
  • Loading branch information
xandkar committed Jun 26, 2022
1 parent 92ebb1d commit c38973f
Showing 1 changed file with 58 additions and 16 deletions.
74 changes: 58 additions & 16 deletions src/data/data_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from_fun/1,
from_list/1,
to_list/1,
append/2,
foreach/2,
fold/3,
map/2, % Alias for lazy_map.
Expand All @@ -21,8 +22,6 @@
sample/2
]).

-define(T, ?MODULE).

-type reservoir(A) :: #{pos_integer() => A}.

-type filter(A, B)
Expand All @@ -32,20 +31,22 @@

-type next(A) :: fun(() -> none | {some, {A, next(A)}}).

-record(?T, {
-record(stream, {
next :: next(any()),
filters :: [filter(any(), any())]
}).

-opaque t(A) ::
-type stream(A) ::
%% XXX Record syntax does not support type parameters, so we get around it with desugaring.
%% XXX Ensure the field order is the same as in the corresponding record!
{
?T,
stream,
next(A),
[filter(A, any())]
}.

-opaque t(A) :: [stream(A)].

-record(sched, {
id :: reference(),
producers :: [{pid(), reference()}],
Expand All @@ -59,18 +60,22 @@

-spec from_fun(next(A)) -> t(A).
from_fun(Next) ->
#?T{
next = Next,
filters = []
}.
[#stream{next = Next, filters = []}].

-spec append(t(A), t(A)) -> t(A).
append([#stream{} | _]=TA, [#stream{} | _]=TB) ->
TA ++ TB.

-spec next(t(A)) -> none | {some, {A, t(A)}}.
next(#?T{next=Next0, filters=Filters}=T0) when is_function(Next0) ->
next([#stream{next=Next0, filters=Filters}=S | Streams]) when is_function(Next0) ->
case Next0() of
none ->
none;
case Streams of
[] -> none;
[_|_] -> next(Streams)
end;
{some, {X, Next1}} when is_function(Next1) ->
T1 = T0#?T{next=Next1},
T1 = [S#stream{next=Next1} | Streams],
case filters_apply(X, Filters) of
none ->
next(T1);
Expand All @@ -86,12 +91,12 @@ filter(T, F) ->
lazy_filter(T, F).

-spec lazy_map(t(A), fun((A) -> B)) -> t(B).
lazy_map(#?T{filters=Filters}=T, F) ->
T#?T{filters=Filters ++ [{map, F}]}.
lazy_map([#stream{filters=Filters}=S | Streams], F) ->
[S#stream{filters=Filters ++ [{map, F}]} | Streams].

-spec lazy_filter(t(A), fun((A) -> boolean())) -> t(A).
lazy_filter(#?T{filters=Filters}=T, F) ->
T#?T{filters=Filters ++ [{test, F}]}.
lazy_filter([#stream{filters=Filters}=S | Streams], F) ->
[S#stream{filters=Filters ++ [{test, F}]} | Streams].

-spec fold(t(A), B, fun((A, B) -> B)) -> B.
fold(T0, Acc, F) ->
Expand Down Expand Up @@ -485,6 +490,43 @@ fold_test_() ->
]
].

append_test_() ->
[
?_assertEqual(
[1, 2, 3, 4, 5],
to_list(append(from_list([1, 2]), from_list([3, 4, 5])))
),
?_assertEqual(
[1, 2, 3, 4, 5, 6, 7, 8],
to_list(
append(
append(from_list([1, 2]), from_list([3, 4, 5])),
from_list([6, 7, 8]))
)
),
?_assertEqual(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
to_list(
append(
append(
from_list([1, 2]),
append(
append(from_list([3]), from_list([4])),
from_list([5])
)
),
append(
from_list([6, 7]),
append(
from_list([8]),
from_list([9, 10])
)
)
)
)
)
].

random_elements_test_() ->
TestCases =
[
Expand Down

0 comments on commit c38973f

Please sign in to comment.