Skip to content

Commit 3eba1df

Browse files
committed
Fix and cleanup PauseWorkflowExecutionSuite
1 parent 187cb11 commit 3eba1df

File tree

1 file changed

+32
-43
lines changed

1 file changed

+32
-43
lines changed

tests/pause_workflow_execution_test.go

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,30 @@ func (s *PauseWorkflowExecutionSuite) SetupTest() {
5353
s.activityCompletedOnce = sync.Once{}
5454

5555
s.workflowFn = func(ctx workflow.Context) (string, error) {
56+
s.T().Log("workflow started")
5657
ao := workflow.ActivityOptions{
5758
StartToCloseTimeout: 5 * time.Second,
5859
ScheduleToCloseTimeout: 10 * time.Second,
5960
}
6061
ctx = workflow.WithActivityOptions(ctx, ao)
6162

6263
var activityResult string
64+
s.T().Log("executing activity")
6365
if err := workflow.ExecuteActivity(ctx, s.activityFn).Get(ctx, &activityResult); err != nil {
6466
return "", err
6567
}
6668

6769
var childResult string
70+
s.T().Log("executing child workflow")
6871
if err := workflow.ExecuteChildWorkflow(ctx, s.childWorkflowFn).Get(ctx, &childResult); err != nil {
6972
return "", err
7073
}
7174

75+
s.T().Log("waiting to receive signal to complete the workflow")
7276
signalCh := workflow.GetSignalChannel(ctx, s.testEndSignal)
7377
var signalPayload string
7478
signalCh.Receive(ctx, &signalPayload)
79+
s.T().Log("signal received to complete the workflow")
7580
return signalPayload + activityResult + childResult, nil
7681
}
7782

@@ -80,13 +85,18 @@ func (s *PauseWorkflowExecutionSuite) SetupTest() {
8085
}
8186

8287
s.activityFn = func(ctx context.Context) (string, error) {
88+
s.T().Log("activity started")
8389
s.activityCompletedOnce.Do(func() {
8490
// blocks until the test case unblocks the activity.
8591
<-s.activityCompletedCh
8692
})
8793
s.T().Log("activity completed")
8894
return "activity", nil
8995
}
96+
97+
s.Worker().RegisterWorkflow(s.workflowFn)
98+
s.Worker().RegisterWorkflow(s.childWorkflowFn)
99+
s.Worker().RegisterActivity(s.activityFn)
90100
}
91101

92102
// TestPauseUnpauseWorkflowExecution tests that the pause and unpause workflow execution APIs work as expected.
@@ -101,10 +111,6 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() {
101111
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
102112
defer cancel()
103113

104-
s.Worker().RegisterWorkflow(s.workflowFn)
105-
s.Worker().RegisterWorkflow(s.childWorkflowFn)
106-
s.Worker().RegisterActivity(s.activityFn)
107-
108114
workflowOptions := sdkclient.StartWorkflowOptions{
109115
ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()),
110116
TaskQueue: s.TaskQueue(),
@@ -136,20 +142,9 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() {
136142
s.NoError(err)
137143
s.NotNil(pauseResp)
138144

139-
// unblock the activity to complete.
140-
// s.activityCompletedCh <- struct{}{}
141-
142-
// ensure that the workflow is paused even when the activity is completed.
145+
// ensure that the workflow is paused
143146
s.EventuallyWithT(func(t *assert.CollectT) {
144-
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
145-
require.NoError(t, err)
146-
info := desc.GetWorkflowExecutionInfo()
147-
require.NotNil(t, info)
148-
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, info.GetStatus(), "workflow is not paused. Status: %s", info.GetStatus())
149-
if pauseInfo := desc.GetWorkflowExtendedInfo().GetPauseInfo(); pauseInfo != nil {
150-
require.Equal(t, s.pauseIdentity, pauseInfo.GetIdentity())
151-
require.Equal(t, s.pauseReason, pauseInfo.GetReason())
152-
}
147+
s.assertWorkflowIsPaused(t, ctx, workflowID, runID)
153148
}, 5*time.Second, 200*time.Millisecond)
154149

155150
// Send signal to the workflow to complete the workflow. Since the workflow is paused, it should stay paused.
@@ -198,8 +193,6 @@ func (s *PauseWorkflowExecutionSuite) TestQueryWorkflowWhenPaused() {
198193
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
199194
defer cancel()
200195

201-
s.Worker().RegisterWorkflow(s.workflowFn)
202-
203196
workflowOptions := sdkclient.StartWorkflowOptions{
204197
ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()),
205198
TaskQueue: s.TaskQueue(),
@@ -233,15 +226,7 @@ func (s *PauseWorkflowExecutionSuite) TestQueryWorkflowWhenPaused() {
233226

234227
// Wait until paused.
235228
s.EventuallyWithT(func(t *assert.CollectT) {
236-
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
237-
require.NoError(t, err)
238-
info := desc.GetWorkflowExecutionInfo()
239-
require.NotNil(t, info)
240-
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, info.GetStatus())
241-
if pauseInfo := desc.GetWorkflowExtendedInfo().GetPauseInfo(); pauseInfo != nil {
242-
require.Equal(t, s.pauseIdentity, pauseInfo.GetIdentity())
243-
require.Equal(t, s.pauseReason, pauseInfo.GetReason())
244-
}
229+
s.assertWorkflowIsPaused(t, ctx, workflowID, runID)
245230
}, 5*time.Second, 200*time.Millisecond)
246231

247232
// Issue a query to the paused workflow. It should return QueryRejected with WORKFLOW_EXECUTION_STATUS_PAUSED status.
@@ -261,7 +246,21 @@ func (s *PauseWorkflowExecutionSuite) TestQueryWorkflowWhenPaused() {
261246
s.NotNil(queryResp.GetQueryRejected())
262247
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, queryResp.GetQueryRejected().GetStatus())
263248

264-
// Complete the workflow to finish the test.
249+
// Unpause the workflow.
250+
unpauseRequest := &workflowservice.UnpauseWorkflowExecutionRequest{
251+
Namespace: s.Namespace().String(),
252+
WorkflowId: workflowID,
253+
RunId: runID,
254+
Identity: s.pauseIdentity,
255+
Reason: s.pauseReason,
256+
RequestId: uuid.NewString(),
257+
}
258+
unpauseResp, err := s.FrontendClient().UnpauseWorkflowExecution(ctx, unpauseRequest)
259+
s.NoError(err)
260+
s.NotNil(unpauseResp)
261+
262+
// Unblock the activity and send the signal to complete the workflow.
263+
s.activityCompletedCh <- struct{}{}
265264
err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal")
266265
s.NoError(err)
267266

@@ -417,10 +416,6 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecutionAlreadyPaused()
417416
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
418417
defer cancel()
419418

420-
s.Worker().RegisterWorkflow(s.workflowFn)
421-
s.Worker().RegisterWorkflow(s.childWorkflowFn)
422-
s.Worker().RegisterActivity(s.activityFn)
423-
424419
workflowOptions := sdkclient.StartWorkflowOptions{
425420
ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()),
426421
TaskQueue: s.TaskQueue(),
@@ -452,16 +447,9 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecutionAlreadyPaused()
452447
s.NoError(err)
453448
s.NotNil(pauseResp)
454449

450+
// Wait until paused.
455451
s.EventuallyWithT(func(t *assert.CollectT) {
456-
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
457-
require.NoError(t, err)
458-
info := desc.GetWorkflowExecutionInfo()
459-
require.NotNil(t, info)
460-
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, info.GetStatus())
461-
if pauseInfo := desc.GetWorkflowExtendedInfo().GetPauseInfo(); pauseInfo != nil {
462-
require.Equal(t, s.pauseIdentity, pauseInfo.GetIdentity())
463-
require.Equal(t, s.pauseReason, pauseInfo.GetReason())
464-
}
452+
s.assertWorkflowIsPaused(t, ctx, workflowID, runID)
465453
}, 5*time.Second, 200*time.Millisecond)
466454

467455
// 2nd pause request should fail with failed precondition error.
@@ -495,7 +483,8 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecutionAlreadyPaused()
495483
require.Nil(t, desc.GetWorkflowExtendedInfo().GetPauseInfo())
496484
}, 5*time.Second, 200*time.Millisecond)
497485

498-
// For now sending this signal will complete the workflow and finish the test.
486+
// Unblock the activity and send the signal to complete the workflow.
487+
s.activityCompletedCh <- struct{}{}
499488
err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal")
500489
s.NoError(err)
501490

0 commit comments

Comments
 (0)