diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 5a28ced19b..1e8fe01001 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -10,6 +10,7 @@ from_fun/1, from_list/1, to_list/1, + append/2, foreach/2, fold/3, map/2, % Alias for lazy_map. @@ -21,8 +22,6 @@ sample/2 ]). --define(T, ?MODULE). - -type reservoir(A) :: #{pos_integer() => A}. -type filter(A, B) @@ -32,20 +31,22 @@ -type next(A) :: fun(() -> none | {some, {A, next(A)}}). --record(?T, { +-record(stream, { next :: next(any()), filters :: [filter(any(), any())] }). --opaque t(A) :: +-type stream(A) :: %% XXX Record syntax does not support type parameters, so we get around it with desugaring. %% XXX Ensure the field order is the same as in the corresponding record! { - ?T, + stream, next(A), [filter(A, any())] }. +-opaque t(A) :: [stream(A)]. + -record(sched, { id :: reference(), producers :: [{pid(), reference()}], @@ -59,18 +60,22 @@ -spec from_fun(next(A)) -> t(A). from_fun(Next) -> - #?T{ - next = Next, - filters = [] - }. + [#stream{next = Next, filters = []}]. + +-spec append(t(A), t(A)) -> t(A). +append([#stream{} | _]=TA, [#stream{} | _]=TB) -> + TA ++ TB. -spec next(t(A)) -> none | {some, {A, t(A)}}. -next(#?T{next=Next0, filters=Filters}=T0) when is_function(Next0) -> +next([#stream{next=Next0, filters=Filters}=S | Streams]) when is_function(Next0) -> case Next0() of none -> - none; + case Streams of + [] -> none; + [_|_] -> next(Streams) + end; {some, {X, Next1}} when is_function(Next1) -> - T1 = T0#?T{next=Next1}, + T1 = [S#stream{next=Next1} | Streams], case filters_apply(X, Filters) of none -> next(T1); @@ -86,12 +91,12 @@ filter(T, F) -> lazy_filter(T, F). -spec lazy_map(t(A), fun((A) -> B)) -> t(B). -lazy_map(#?T{filters=Filters}=T, F) -> - T#?T{filters=Filters ++ [{map, F}]}. +lazy_map([#stream{filters=Filters}=S | Streams], F) -> + [S#stream{filters=Filters ++ [{map, F}]} | Streams]. -spec lazy_filter(t(A), fun((A) -> boolean())) -> t(A). -lazy_filter(#?T{filters=Filters}=T, F) -> - T#?T{filters=Filters ++ [{test, F}]}. +lazy_filter([#stream{filters=Filters}=S | Streams], F) -> + [S#stream{filters=Filters ++ [{test, F}]} | Streams]. -spec fold(t(A), B, fun((A, B) -> B)) -> B. fold(T0, Acc, F) -> @@ -485,6 +490,43 @@ fold_test_() -> ] ]. +append_test_() -> + [ + ?_assertEqual( + [1, 2, 3, 4, 5], + to_list(append(from_list([1, 2]), from_list([3, 4, 5]))) + ), + ?_assertEqual( + [1, 2, 3, 4, 5, 6, 7, 8], + to_list( + append( + append(from_list([1, 2]), from_list([3, 4, 5])), + from_list([6, 7, 8])) + ) + ), + ?_assertEqual( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + to_list( + append( + append( + from_list([1, 2]), + append( + append(from_list([3]), from_list([4])), + from_list([5]) + ) + ), + append( + from_list([6, 7]), + append( + from_list([8]), + from_list([9, 10]) + ) + ) + ) + ) + ) + ]. + random_elements_test_() -> TestCases = [