Skip to content

Commit

Permalink
lister
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Jan 4, 2025
1 parent 6ffd9b6 commit f470265
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions tests/xdc/activity_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
s.NoError(worker1.Start())
defer worker1.Stop()

// start a workflow
workflowOptions := sdkclient.StartWorkflowOptions{
ID: testcore.RandomizeStr("wfid-" + s.T().Name()),
TaskQueue: taskQueue,
Expand Down Expand Up @@ -137,14 +138,16 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
s.NoError(err)
s.NotNil(pauseResp)

//verify activity is paused
// verify activity is paused is cluster 1
description, err := activeSDKClient.DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID())
s.NoError(err)
s.Equal(1, len(description.PendingActivities))
s.True(description.PendingActivities[0].Paused)

worker1.Stop() // stop worker1 so cluster 1 won't make any progress
// stop worker1 so cluster 1 won't make any progress on the activity (just in case)
worker1.Stop()

// failover to standby cluster
s.failover(ns, s.clusterNames[1], int64(2), s.cluster1.FrontendClient())

// verify things are replicated over
Expand All @@ -164,27 +167,28 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
s.True(standbyAckInfo.AckedTaskVisibilityTime.AsTime().Before(time.Now()))
s.True(standbyAckInfo.AckedTaskVisibilityTime.AsTime().After(startTime))

// get standby client
standbyClient, err := sdkclient.Dial(sdkclient.Options{
HostPort: s.cluster2.Host().FrontendGRPCAddress(),
Namespace: ns,
})
s.NoError(err)
s.NotNil(standbyClient)

//verify activity is still paused
// verify activity is still paused in cluster 2
description, err = standbyClient.DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID())
s.NoError(err)
s.Equal(1, len(description.PendingActivities))
s.True(description.PendingActivities[0].Paused)

// start worker2 in standby cluster
// start worker2
worker2 := sdkworker.New(standbyClient, taskQueue, sdkworker.Options{})
worker2.RegisterWorkflow(workflowFn)
worker2.RegisterActivity(activityFunction)
s.NoError(worker2.Start())
defer worker2.Stop()

// let the activity make progress and finish
// let the activity make progress once unpaused
activityWasPaused.Store(true)

// unpause the activity in cluster 2
Expand All @@ -205,7 +209,7 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
// unblock the activity
activityPausedCn <- struct{}{}

// let activity finish
// wait for activity to finish
var out string
err = workflowRun.Get(ctx, &out)

Expand Down

0 comments on commit f470265

Please sign in to comment.