Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkflowClient#execute doesn't pair correctly with workflowStub.getResult() #856

Open
Spikhalskiy opened this issue Nov 3, 2021 · 1 comment
Assignees
Labels
bug Something isn't working
Milestone

Comments

@Spikhalskiy
Copy link
Contributor

Spikhalskiy commented Nov 3, 2021

Actual behavior

Spikhalskiy@40c0d62#diff-7cfba47d2337ff3ee746b09a4d916e5e839f9b8bff45a26e588727667437c160R94

  @Test
  public void executeAndGetResultFromStub() throws InterruptedException, ExecutionException {
    TestNoArgsWorkflowProc stubP =
        testWorkflowRule.newWorkflowStubTimeoutOptions(TestNoArgsWorkflowProc.class);
    WorkflowStub workflowStub = WorkflowStub.fromTyped(stubP);
    CompletableFuture<Void> executeCF = WorkflowClient.execute(stubP::proc);

    // This test hangs (times out), but uncommenting of either if these two lines makes it
    // pass, which doesn't make much sense
    // sleep(1000);
    // executeCF.get();
    workflowStub.getResult(Void.class);
  }

This unit test for a trivial workflow, that finishes immediately, hangs.
Uncommenting on either sleep or waiting for a completable future makes it pass.
Replacing execute with start (that returns WorkflowExecution) also makes this test pass.

Expected behavior

The test passes.

@Spikhalskiy Spikhalskiy added this to the 1.6.0 milestone Nov 3, 2021
@Spikhalskiy Spikhalskiy added the bug Something isn't working label Nov 3, 2021
Spikhalskiy added a commit to Spikhalskiy/java-sdk that referenced this issue Nov 4, 2021
@Spikhalskiy Spikhalskiy modified the milestones: 1.6.0, 1.7.0 Dec 6, 2021
@Spikhalskiy Spikhalskiy modified the milestones: 1.7.0, 1.8.0 Jan 10, 2022
@tsurdilo tsurdilo self-assigned this Jan 21, 2022
@Spikhalskiy Spikhalskiy modified the milestones: 1.8.0, 1.9.0 Feb 3, 2022
@joelmarty
Copy link

joelmarty commented Feb 10, 2022

I am seeing a similar issue with WorkflowClient.execute() but not sure if it is exactly the same situation, as calling Thread.sleep() or waiting on the future does not fix it:

The use case looks like this:

class EventConsumer {

  private WorkflowClient workflowClient;

  public EventConsumer(WorkflowClient client) {
    this.workflowClient = client;
  }

  public void accept(Stream<SomeEvent> eventStream) {
    eventStream.forEach(event -> {
      final var workflowOptions = WorkflowOptions.newBuilder()
        .setTaskQueue("someQueue)
        .setWorkflowId(event.getId())
        .setWorkflowIdReusePolicy(WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY)
        .build();
      final var workflow = workflowClient.newWorkflowStub(MyWorkflow.class, workflowOptions);
      WorkflowClient.execute(workflow::processEvent, event)
        .whenComplete((v, throwable) -> log.info("completed"));
    });
  }
}

The test case uses junit5 extension:

class EventConsumerTest {
  @RegisterExtension
  public static final TestWorkflowExtension testWorkflowExtension = TestWorkflowExtension.newBuilder()
    .setWorkflowClientOptions(WorkflowClientOptions.newBuilder()
      .setNamespace("namespace")
      .build())
    .setWorkflowTypes(MyWorkflow.class)
    .setDoNotStart(true)
    .build();

  @Test
  void startWorkflow(TestWorkflowEnvironment testEnv, Worker worker) {
    MyActivity activity = mock(MyActivity.class, withSettings().withoutAnnotations());
    worker.registerActivitiesImplementations(activity);
    testEnv.start();

    EventConsumer consumer = new EventConsumer(testEnv.getWorkflowClient());

    // assertJ assertion
    assertThatCode(() -> consumer.accept(Stream.of(event))).doesNotThrowAnyException();

    // mockito verification
    then(activity).should(timeout(5000)).doSomething();
}

The test times out after 5s as specified on the mockito verification and the (anonymized) log trace looks like this:

16:11:05.761 [main] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=1, target=directaddress:///3c1827fc-4a62-4d7d-a725-efe16cfef991}}
16:11:05.777 [main] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - Created GRPC client for channel: ManagedChannelOrphanWrapper{delegate=ManagedChannelImpl{logId=5, target=directaddress:///06e8951f-ef30-489d-80fb-ec06add9150e}}
16:11:05.881 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:05.885 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Local Activity Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:05.887 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", identity=37008@myhostname}
16:11:05.888 [main] INFO io.temporal.internal.worker.Poller - start: Poller{name=Host Local Workflow Poller, identity=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.266 [main] INFO io.temporal.worker.WorkerFactory - shutdownNow: WorkerFactory{identity=37008@myhostname, uniqueId=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.266 [main] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Host Local Workflow Poller, identity=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.272 [main] INFO io.temporal.worker.WorkerFactory - awaitTermination begin: WorkerFactory{identity=37008@myhostname, uniqueId=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.272 [Host Local Workflow Poller: 3] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@2f410cb5
16:11:11.272 [Host Local Workflow Poller: 4] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@1da8bb99
16:11:11.272 [Host Local Workflow Poller: 2] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@368de08f
16:11:11.272 [Host Local Workflow Poller: 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@56e21162
16:11:11.272 [Host Local Workflow Poller: 5] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@18cc9d5b
16:11:11.526 [TemporalShutdownManager: 1] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:11.527 [Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace": 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@1d1ce3ca
16:11:11.527 [Workflow Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace": 2] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@61c24b97
16:11:11.529 [TemporalShutdownManager: 1] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", identity=37008@myhostname}
16:11:11.529 [TemporalShutdownManager: 1] INFO io.temporal.internal.worker.Poller - shutdown: Poller{name=Local Activity Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace", identity=37008@myhostname}
16:11:11.530 [Local Activity Poller taskQueue="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]", namespace="namespace": 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@775ac2f4
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 3] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@13f7bf0c
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 5] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@17e59124
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 2] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@1604dc95
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 1] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@130e4636
16:11:11.530 [Activity Poller taskQueue="namespace", namespace="WorkflowTest-startWorkflow(TestWorkflowEnvironment, Worker)-[engine:junit-jupiter]/[class:com.xyz.EventConsumerTest]/[method:startWorkflow(io.temporal.testing.TestWorkflowEnvironment, io.temporal.worker.Worker)]": 4] INFO io.temporal.internal.worker.Poller - poll loop is terminated: io.temporal.internal.worker.Poller$PollLoopTask@72885c3d
16:11:11.783 [main] INFO io.temporal.worker.WorkerFactory - awaitTermination done: WorkerFactory{identity=37008@myhostname, uniqueId=90621874-bf17-42a0-b6b4-16b5f6433a3b}
16:11:11.784 [main] INFO io.temporal.serviceclient.WorkflowServiceStubsImpl - shutdownNow
16:11:11.790 [ForkJoinPool.commonPool-worker-19] DEBUG io.temporal.internal.retryer.GrpcAsyncRetryer - Retrying after failure
io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked

@Spikhalskiy Spikhalskiy modified the milestones: 1.9.0, 1.10.0 Mar 18, 2022
@Spikhalskiy Spikhalskiy modified the milestones: 1.10.0, Next Apr 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants