Skip to content

Commit

Permalink
switch to object options for Stream.async apis (#3213)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored and effect-bot committed Dec 21, 2024
1 parent a33a7c0 commit 062c4d2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 20 deletions.
24 changes: 24 additions & 0 deletions .changeset/chilled-ducks-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"effect": major
---

Use object options for Stream.async apis

Instead of:

```ts
Stream.async((emit) => {
//...
}, 16);
```

You can now write:

```ts
Stream.async(
(emit) => {
//...
},
{ bufferSize: 16 },
);
```
6 changes: 3 additions & 3 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ export const as: {

const _async: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
bufferSize?: number | "unbounded" | {
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
Expand Down Expand Up @@ -375,7 +375,7 @@ export {
*/
export const asyncEffect: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
bufferSize?: number | "unbounded" | {
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
Expand Down Expand Up @@ -434,7 +434,7 @@ export const asyncPush: <A, E = never, R = never>(
*/
export const asyncScoped: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
bufferSize?: number | "unbounded" | {
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
Expand Down
34 changes: 20 additions & 14 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,23 +454,23 @@ export const as = dual<
>(2, <A, E, R, B>(self: Stream.Stream<A, E, R>, value: B): Stream.Stream<B, E, R> => map(self, () => value))

const queueFromBufferOptions = <A, E>(
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Effect.Effect<Queue.Queue<Take.Take<A, E>>> => {
if (bufferSize === "unbounded") {
if (options?.bufferSize === "unbounded") {
return Queue.unbounded()
} else if (typeof bufferSize === "number" || bufferSize === undefined) {
return Queue.bounded(bufferSize ?? 16)
}
switch (bufferSize.strategy) {
switch (options?.strategy) {
case "dropping":
return Queue.dropping(bufferSize.bufferSize ?? 16)
return Queue.dropping(options.bufferSize ?? 16)
case "sliding":
return Queue.sliding(bufferSize.bufferSize ?? 16)
return Queue.sliding(options.bufferSize ?? 16)
default:
return Queue.bounded(bufferSize.bufferSize ?? 16)
return Queue.bounded(options?.bufferSize ?? 16)
}
}

Expand All @@ -479,13 +479,15 @@ export const _async = <A, E = never, R = never>(
register: (
emit: Emit.Emit<R, E, A, void>
) => Effect.Effect<void, never, R> | void,
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, R> =>
Effect.acquireRelease(
queueFromBufferOptions<A, E>(bufferSize),
queueFromBufferOptions<A, E>(options),
(queue) => Queue.shutdown(queue)
).pipe(
Effect.flatMap((output) =>
Expand Down Expand Up @@ -534,14 +536,16 @@ export const _async = <A, E = never, R = never>(
/** @internal */
export const asyncEffect = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, R> =>
pipe(
Effect.acquireRelease(
queueFromBufferOptions<A, E>(bufferSize),
queueFromBufferOptions<A, E>(options),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
Expand Down Expand Up @@ -637,14 +641,16 @@ export const asyncPush = <A, E = never, R = never>(
/** @internal */
export const asyncScoped = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
pipe(
Effect.acquireRelease(
queueFromBufferOptions<A, E>(bufferSize),
queueFromBufferOptions<A, E>(options),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
Expand Down
6 changes: 3 additions & 3 deletions packages/effect/test/Stream/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ describe("Stream", () => {
)
)
return Effect.void
}, 5)
}, { bufferSize: 5 })
const sink = pipe(Sink.take<number>(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
Expand Down Expand Up @@ -205,7 +205,7 @@ describe("Stream", () => {
)
)
return Effect.void
}, 5)
}, { bufferSize: 5 })
const sink = pipe(Sink.take<number>(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
Expand Down Expand Up @@ -398,7 +398,7 @@ describe("Stream", () => {
)
)
return Effect.void
}, 5)
}, { bufferSize: 5 })
const sink = pipe(Sink.take<number>(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
Expand Down

0 comments on commit 062c4d2

Please sign in to comment.