Skip to content

Commit cc1a3ee

Browse files
authored
Add testing helper to safe message handlers sample (#423)
* Add in maxHistoryLength for testing * use undefined, not null * forgot a spot
1 parent 6123449 commit cc1a3ee

File tree

4 files changed

+26
-5
lines changed

4 files changed

+26
-5
lines changed

message-passing/safe-message-handlers/src/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ export async function startClusterManager(): Promise<WorkflowHandle<typeof clust
99
return client.workflow.start(clusterManagerWorkflow, {
1010
taskQueue: 'safe-message-handlers-task-queue',
1111
workflowId: `cm-${nanoid()}`,
12+
args: [{ testContinueAsNew: false }],
1213
});
1314
}

message-passing/safe-message-handlers/src/cluster-manager.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
ClusterState,
88
ClusterManagerStateSummary,
99
DeleteJobUpdateInput,
10+
ClusterManagerInput,
1011
} from './types';
1112

1213
const { assignNodesToJob, unassignNodesForJob, startCluster, shutdownCluster } = wf.proxyActivities<typeof activities>({
@@ -25,15 +26,17 @@ export class ClusterManager {
2526
state: ClusterManagerState;
2627
seenJobs: Set<string>;
2728
nodesMutex: Mutex;
29+
private maxHistoryLength?: number;
2830

29-
constructor(state?: ClusterManagerState) {
30-
this.state = state ?? {
31+
constructor(input: ClusterManagerInput = {}) {
32+
this.state = input.state ?? {
3133
clusterState: ClusterState.NOT_STARTED,
3234
nodes: new Map<string, string | null>(),
3335
maxAssignedNodes: 0,
3436
};
3537
this.nodesMutex = new Mutex();
3638
this.seenJobs = new Set<string>();
39+
this.maxHistoryLength = input.testContinueAsNew ? 120 : undefined;
3740
}
3841

3942
async startCluster(): Promise<void> {
@@ -145,4 +148,17 @@ export class ClusterManager {
145148
})
146149
);
147150
}
151+
152+
shouldContinueAsNew(): boolean {
153+
if (wf.workflowInfo().continueAsNewSuggested) {
154+
return true;
155+
}
156+
157+
// This is just for ease-of-testing. In production, we trust temporal to tell us when to continue-as-new.
158+
if (this.maxHistoryLength !== undefined && wf.workflowInfo().historyLength > this.maxHistoryLength) {
159+
return true;
160+
}
161+
162+
return false;
163+
}
148164
}

message-passing/safe-message-handlers/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export interface ClusterManagerState {
66

77
export interface ClusterManagerInput {
88
state?: ClusterManagerState;
9+
testContinueAsNew?: boolean;
910
}
1011

1112
export interface ClusterManagerStateSummary {

message-passing/safe-message-handlers/src/workflows.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export const deleteJobUpdate = wf.defineUpdate<void, [DeleteJobUpdateInput]>('de
1717
export const getClusterStatusQuery = wf.defineQuery<ClusterManagerStateSummary>('getClusterStatus');
1818

1919
export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): Promise<ClusterManagerStateSummary> {
20-
const manager = new ClusterManager(input.state);
20+
const manager = new ClusterManager(input);
2121
//
2222
// Message-handling API
2323
//
@@ -52,7 +52,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
5252
// continue-as-new.
5353
await wf.condition(() => manager.state.clusterState === ClusterState.STARTED);
5454
await wf.condition(
55-
() => manager.state.clusterState === ClusterState.SHUTTING_DOWN || wf.workflowInfo().continueAsNewSuggested
55+
() => manager.state.clusterState === ClusterState.SHUTTING_DOWN || manager.shouldContinueAsNew()
5656
);
5757
if (manager.state.clusterState !== ClusterState.SHUTTING_DOWN) {
5858
// You should typically wait for all async handlers to finish before
@@ -62,7 +62,10 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
6262
// new. This sample does not schedule any activities or child workflows, so
6363
// it is sufficient just to wait for handlers to finish.
6464
await wf.condition(wf.allHandlersFinished);
65-
return await wf.continueAsNew<typeof clusterManagerWorkflow>({ state: manager.getState() });
65+
return await wf.continueAsNew<typeof clusterManagerWorkflow>({
66+
state: manager.getState(),
67+
testContinueAsNew: input.testContinueAsNew
68+
});
6669
} else {
6770
return manager.getStateSummary();
6871
}

0 commit comments

Comments
 (0)