Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forked EventEmitter stream requires two yields #4197

Closed
danielnixon opened this issue Dec 28, 2024 · 3 comments
Closed

Forked EventEmitter stream requires two yields #4197

danielnixon opened this issue Dec 28, 2024 · 3 comments

Comments

@danielnixon
Copy link

What version of Effect is running?

3.12.0

What steps can reproduce the bug?

Please see a minimal reproducer below.

Basically, it does the following:

  1. Creates an EventEmitter
  2. Creates a stream using Stream.async that subscribes to an event from that emitter
  3. Forks, then runs that stream to a simple logging effect
  4. Immediately yields, per advice in https://effect.website/docs/concurrency/fibers/#when-do-fibers-run
  5. Immediately yields a second time
  6. Emits a test event
  7. Interrupt the forked fiber
/* eslint-disable @typescript-eslint/no-floating-promises */
/* eslint-disable functional/no-expression-statements */
/* eslint-disable functional/no-return-void */
import type { StreamEmit } from "effect";
import { Effect, Stream, Chunk, Fiber } from "effect";
import { EventEmitter } from "node:events";

export const example = Effect.gen(function* () {
  const emitter = new EventEmitter();

  const stream = Stream.async(
    (emit: StreamEmit.Emit<never, never, unknown, void>) => {
      const callback = (a: unknown): void => {
        emit(Effect.succeed(Chunk.of(a)));
      };
      emitter.on("test", callback);
    },
  );

  const streamEffect = Stream.runForEach(stream, (a) => {
    return Effect.logWarning(a);
  });

  const fiber = yield* Effect.fork(streamEffect);

  // yield now to let the fiber begin execution
  yield* Effect.yieldNow();
  // Comment out this second yield to reproduce the issue
  yield* Effect.yieldNow();

  emitter.emit("test", "test event");

  yield* Fiber.interrupt(fiber);
});

If you run this example effect in a wider program, e.g. via yield* example; in a generator, you'll see the expected logging output:

[16:04:28.376] WARN (#37): test event

If you comment out the second yield, you'll see no such logging. This is the part that is surprising to me.

I've experimented with omitting the interrupt, Stream.asyncEffect instead of Stream.async, using forkDaemon or forkScoped (with a wide, long-lived scope) and a handful of other things but the only way I've been able to make this work is by adding the second yield (or with a Effect.sleep(100) in place of the second yield).

I think I have an intuition for what's going wrong - pumping the event loop just once isn't enough for the stream initialization to subscribe to the event emitter before the event is emitted. The second yield (or sleep) puts the main fiber back on the end of the event loop, giving the stream initialization a chance to run before the main fiber emits the event.

What I can't quite work out is what I'm doing wrong or how to make this work without the kludge.

What is the expected behavior?

Stream subscribes to events before the first event is emitted, with a single yieldNow at most.

What do you see instead?

Two yieldNows required.

Additional information

Node v23.3.0 + these additional Effect packages:

    "@effect/opentelemetry": "^0.42.0",
    "@effect/platform": "^0.72.0",
    "@effect/platform-node": "^0.68.0",
@danielnixon danielnixon added the bug Something isn't working label Dec 28, 2024
@mikearnaldi mikearnaldi added working as intended and removed bug Something isn't working labels Dec 28, 2024
@mikearnaldi
Copy link
Member

When fibers run is not deterministic and many different factors can play a role. You should not rely on fibers starting after a single yield.

@danielnixon
Copy link
Author

Thanks @mikearnaldi. Using a Deferred to synchronize the two fibers is indeed the answer here - no yieldNow required at all. I wonder if https://effect.website/docs/concurrency/fibers/#when-do-fibers-run deserves a note. Happy to close this one in any case.

@mikearnaldi
Copy link
Member

Thanks @mikearnaldi. Using a Deferred to synchronize the two fibers is indeed the answer here - no yieldNow required at all. I wonder if https://effect.website/docs/concurrency/fibers/#when-do-fibers-run deserves a note. Happy to close this one in any case.

probably deserves an update, yield was used as an example to give the idea of "sometime in the future" but I think we can improve. cc @gcanti

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants