Skip to content

Commit 187cb11

Browse files
committed
Handle paused workflows in EventsReapplierImpl.ReapplyEvents()
1 parent 532e3cf commit 187cb11

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

service/history/ndc/events_reapplier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (r *EventsReapplierImpl) ReapplyEvents(
7676
return reappliedEvents, nil
7777
}
7878

79-
if !ms.HasPendingWorkflowTask() {
79+
if !ms.HasPendingWorkflowTask() && !ms.IsWorkflowExecutionStatusPaused() {
8080
if _, err := ms.AddWorkflowTaskScheduledEvent(
8181
false,
8282
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,

service/history/ndc/events_reapplier_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,7 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_NoPendingWor
465465
msCurrent.EXPECT().IsResourceDuplicated(dedupResource).Return(false)
466466
msCurrent.EXPECT().UpdateDuplicatedResource(dedupResource)
467467
msCurrent.EXPECT().HasPendingWorkflowTask().Return(false)
468+
msCurrent.EXPECT().IsWorkflowExecutionStatusPaused().Return(false)
468469
msCurrent.EXPECT().AddWorkflowTaskScheduledEvent(
469470
false,
470471
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
@@ -477,3 +478,53 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_NoPendingWor
477478
s.NoError(err)
478479
s.Equal(1, len(appliedEvent))
479480
}
481+
482+
// Reapplies a signal event to a paused workflow
483+
// Asserts that AddWorkflowTaskScheduledEvent() is NOT called
484+
485+
func (s *nDCEventReapplicationSuite) TestReapplyEvents_PausedWorkflow_NoWorkflowTaskScheduled() {
486+
runID := uuid.NewString()
487+
execution := &persistencespb.WorkflowExecutionInfo{
488+
NamespaceId: uuid.NewString(),
489+
}
490+
event := &historypb.HistoryEvent{
491+
EventId: 1,
492+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED,
493+
Attributes: &historypb.HistoryEvent_WorkflowExecutionSignaledEventAttributes{WorkflowExecutionSignaledEventAttributes: &historypb.WorkflowExecutionSignaledEventAttributes{
494+
Identity: "test",
495+
SignalName: "signal",
496+
Input: payloads.EncodeBytes([]byte{}),
497+
Header: &commonpb.Header{Fields: map[string]*commonpb.Payload{"myheader": {Data: []byte("myheader")}}},
498+
}},
499+
}
500+
attr := event.GetWorkflowExecutionSignaledEventAttributes()
501+
502+
msCurrent := historyi.NewMockMutableState(s.controller)
503+
msCurrent.EXPECT().VisitUpdates(gomock.Any()).Return()
504+
msCurrent.EXPECT().GetCurrentVersion().Return(int64(0))
505+
updateRegistry := update.NewRegistry(msCurrent)
506+
msCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true).Times(2)
507+
msCurrent.EXPECT().GetExecutionInfo().Return(execution).AnyTimes()
508+
msCurrent.EXPECT().AddWorkflowExecutionSignaled(
509+
attr.GetSignalName(),
510+
attr.GetInput(),
511+
attr.GetIdentity(),
512+
attr.GetHeader(),
513+
event.Links,
514+
).Return(event, nil)
515+
msCurrent.EXPECT().HSM().Return(s.hsmNode).AnyTimes()
516+
msCurrent.EXPECT().IsWorkflowPendingOnWorkflowTaskBackoff().Return(false)
517+
dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion())
518+
msCurrent.EXPECT().IsResourceDuplicated(dedupResource).Return(false)
519+
msCurrent.EXPECT().UpdateDuplicatedResource(dedupResource)
520+
msCurrent.EXPECT().HasPendingWorkflowTask().Return(false)
521+
// Workflow is paused, so AddWorkflowTaskScheduledEvent should NOT be called.
522+
msCurrent.EXPECT().IsWorkflowExecutionStatusPaused().Return(true)
523+
events := []*historypb.HistoryEvent{
524+
{EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED},
525+
event,
526+
}
527+
appliedEvent, err := s.nDCReapplication.ReapplyEvents(context.Background(), msCurrent, updateRegistry, events, runID)
528+
s.NoError(err)
529+
s.Equal(1, len(appliedEvent))
530+
}

0 commit comments

Comments
 (0)