Skip to content

Commit

Permalink
fix(Stream|Channel): exclude Scope from the resulting effect env of…
Browse files Browse the repository at this point in the history
… all the non-scoped `.run*` methods (#3190)

Co-authored-by: Tim <[email protected]>
  • Loading branch information
2 people authored and gcanti committed Jul 8, 2024
1 parent 4dbadc3 commit f32fa39
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 61 deletions.
9 changes: 9 additions & 0 deletions .changeset/stream-channel-run-env-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"effect": minor
---

Ensure `Scope` is excluded from `R` in the `Channel` / `Stream` `run*` functions.

This fix ensures that `Scope` is now properly excluded from the resulting effect environment.
The affected functions include `run`, `runCollect`, `runCount`, `runDrain` and other non-scoped `run*` in both `Stream` and `Channel` modules.
This fix brings the type declaration in line with the runtime implementation.
6 changes: 3 additions & 3 deletions packages/effect/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1916,7 +1916,7 @@ export const repeated: <OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>(
*/
export const run: <OutErr, InErr, OutDone, InDone, Env>(
self: Channel<never, unknown, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<OutDone, OutErr, Env> = channel.run
) => Effect.Effect<OutDone, OutErr, Exclude<Env, Scope.Scope>> = channel.run

/**
* Run the channel until it finishes with a done value or fails with an error
Expand All @@ -1929,7 +1929,7 @@ export const run: <OutErr, InErr, OutDone, InDone, Env>(
*/
export const runCollect: <OutElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel<OutElem, unknown, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<[Chunk.Chunk<OutElem>, OutDone], OutErr, Env> = channel.runCollect
) => Effect.Effect<[Chunk.Chunk<OutElem>, OutDone], OutErr, Exclude<Env, Scope.Scope>> = channel.runCollect

/**
* Runs a channel until the end is received.
Expand All @@ -1939,7 +1939,7 @@ export const runCollect: <OutElem, OutErr, InErr, OutDone, InDone, Env>(
*/
export const runDrain: <OutElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel<OutElem, unknown, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<OutDone, OutErr, Env> = channel.runDrain
) => Effect.Effect<OutDone, OutErr, Exclude<Env, Scope.Scope>> = channel.runDrain

/**
* Use a scoped effect to emit an output element.
Expand Down
36 changes: 24 additions & 12 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2932,7 +2932,7 @@ export const run: {
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
sink: Sink.Sink<A2, A, unknown, E2, R2>
): Effect.Effect<A2, E | E2, R | R2>
): Effect.Effect<A2, E | E2, Exclude<R | R2, Scope.Scope>>
} = internal.run

/**
Expand All @@ -2941,23 +2941,26 @@ export const run: {
* @since 2.0.0
* @category destructors
*/
export const runCollect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Chunk.Chunk<A>, E, R> = internal.runCollect
export const runCollect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Chunk.Chunk<A>, E, Exclude<R, Scope.Scope>> =
internal.runCollect

/**
* Runs the stream and emits the number of elements processed
*
* @since 2.0.0
* @category destructors
*/
export const runCount: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<number, E, R> = internal.runCount
export const runCount: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<number, E, Exclude<R, Scope.Scope>> =
internal.runCount

/**
* Runs the stream only for its effects. The emitted elements are discarded.
*
* @since 2.0.0
* @category destructors
*/
export const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E, R> = internal.runDrain
export const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E, Exclude<R, Scope.Scope>> =
internal.runDrain

/**
* Executes a pure fold over the stream of values - reduces all elements in
Expand All @@ -2967,8 +2970,8 @@ export const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E
* @category destructors
*/
export const runFold: {
<S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, R>
<S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Exclude<R, Scope.Scope>>
<A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, Exclude<R, Scope.Scope>>
} = internal.runFold

/**
Expand All @@ -2981,12 +2984,12 @@ export const runFoldEffect: {
<S, A, E2, R2>(
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, R2 | R>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Exclude<R | R2, Scope.Scope>>
<A, E, R, S, E2, R2>(
self: Stream<A, E, R>,
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): Effect.Effect<S, E | E2, R | R2>
): Effect.Effect<S, E | E2, Exclude<R | R2, Scope.Scope>>
} = internal.runFoldEffect

/**
Expand Down Expand Up @@ -3028,8 +3031,17 @@ export const runFoldScopedEffect: {
* @category destructors
*/
export const runFoldWhile: {
<S, A>(s: S, cont: Predicate<S>, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, cont: Predicate<S>, f: (s: S, a: A) => S): Effect.Effect<S, E, R>
<S, A>(
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => S
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Exclude<R, Scope.Scope>>
<A, E, R, S>(
self: Stream<A, E, R>,
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => S
): Effect.Effect<S, E, Exclude<R, Scope.Scope>>
} = internal.runFoldWhile

/**
Expand All @@ -3044,13 +3056,13 @@ export const runFoldWhileEffect: {
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, R2 | R>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Exclude<R | R2, Scope.Scope>>
<A, E, R, S, E2, R2>(
self: Stream<A, E, R>,
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): Effect.Effect<S, E | E2, R | R2>
): Effect.Effect<S, E | E2, Exclude<R | R2, Scope.Scope>>
} = internal.runFoldWhileEffect

/**
Expand Down
7 changes: 4 additions & 3 deletions packages/effect/src/internal/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2052,17 +2052,18 @@ export const repeated = <OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>(
/** @internal */
export const run = <OutErr, InErr, OutDone, InDone, Env>(
self: Channel.Channel<never, unknown, OutErr, InErr, OutDone, InDone, Env>
): Effect.Effect<OutDone, OutErr, Env> => Effect.scoped(executor.runScoped(self))
): Effect.Effect<OutDone, OutErr, Exclude<Env, Scope.Scope>> => Effect.scoped(executor.runScoped(self))

/** @internal */
export const runCollect = <OutElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel.Channel<OutElem, unknown, OutErr, InErr, OutDone, InDone, Env>
): Effect.Effect<[Chunk.Chunk<OutElem>, OutDone], OutErr, Env> => executor.run(core.collectElements(self))
): Effect.Effect<[Chunk.Chunk<OutElem>, OutDone], OutErr, Exclude<Env, Scope.Scope>> =>
executor.run(core.collectElements(self))

/** @internal */
export const runDrain = <OutElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel.Channel<OutElem, unknown, OutErr, InErr, OutDone, InDone, Env>
): Effect.Effect<OutDone, OutErr, Env> => executor.run(drain(self))
): Effect.Effect<OutDone, OutErr, Exclude<Env, Scope.Scope>> => executor.run(drain(self))

/** @internal */
export const scoped = <A, E, R>(
Expand Down
2 changes: 1 addition & 1 deletion packages/effect/src/internal/channel/channelExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ export const readUpstream = <A, E2, R, E>(
/** @internal */
export const run = <Env, InErr, InDone, OutErr, OutDone>(
self: Channel.Channel<never, unknown, OutErr, InErr, OutDone, InDone, Env>
): Effect.Effect<OutDone, OutErr, Env> => pipe(runScoped(self), Effect.scoped)
): Effect.Effect<OutDone, OutErr, Exclude<Env, Scope.Scope>> => pipe(runScoped(self), Effect.scoped)

/** @internal */
export const runScoped = <Env, InErr, InDone, OutErr, OutDone>(
Expand Down
Loading

0 comments on commit f32fa39

Please sign in to comment.