diff --git a/.scripts/list-of-samples.json b/.scripts/list-of-samples.json index bf1f908b..4ea67bb5 100644 --- a/.scripts/list-of-samples.json +++ b/.scripts/list-of-samples.json @@ -3,6 +3,7 @@ "activities-cancellation-heartbeating", "activities-dependency-injection", "activities-examples", + "batch", "child-workflows", "continue-as-new", "cron-workflows", diff --git a/batch/.eslintignore b/batch/.eslintignore new file mode 100644 index 00000000..7bd99a41 --- /dev/null +++ b/batch/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js \ No newline at end of file diff --git a/batch/.eslintrc.js b/batch/.eslintrc.js new file mode 100644 index 00000000..b8251a06 --- /dev/null +++ b/batch/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/batch/.gitignore b/batch/.gitignore new file mode 100644 index 00000000..a9f4ed54 --- /dev/null +++ b/batch/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules \ No newline at end of file diff --git a/batch/.npmrc b/batch/.npmrc new file mode 100644 index 00000000..9cf94950 --- /dev/null +++ b/batch/.npmrc @@ -0,0 +1 @@ +package-lock=false \ No newline at end of file diff --git a/batch/.nvmrc b/batch/.nvmrc new file mode 100644 index 00000000..b6a7d89c --- /dev/null +++ b/batch/.nvmrc @@ -0,0 +1 @@ +16 diff --git a/batch/.post-create b/batch/.post-create new file mode 100644 index 00000000..a682bb78 --- /dev/null +++ b/batch/.post-create @@ -0,0 +1,18 @@ +To begin development, install the Temporal CLI: + + Mac: {cyan brew install temporal} + Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest + +Start Temporal Server: + + {cyan temporal server start-dev} + +Use Node version 16+: + + Mac: {cyan brew install node@16} + Other: https://nodejs.org/en/download/ + +Then, in the project directory, using two other shells, run these commands: + + {cyan npm run start.watch} + {cyan npm run workflow} diff --git a/batch/.prettierignore b/batch/.prettierignore new file mode 100644 index 00000000..7951405f --- /dev/null +++ b/batch/.prettierignore @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/batch/.prettierrc b/batch/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/batch/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/batch/README.md b/batch/README.md new file mode 100644 index 00000000..dfb86c08 --- /dev/null +++ b/batch/README.md @@ -0,0 +1,3 @@ +# Batch Examples + +## [Iterator](./src/iterator/README.md) diff --git a/batch/package.json b/batch/package.json new file mode 100644 index 00000000..3903fb7f --- /dev/null +++ b/batch/package.json @@ -0,0 +1,42 @@ +{ + "name": "child-workflows", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "lint": "eslint .", + "start-iterator": "ts-node src/iterator/worker.ts", + "start-iterator.watch": "nodemon src/iterator/worker.ts", + "workflow-iterator": "ts-node src/iterator/client.ts", + "format": "prettier --config .prettierrc 'src/**/*.ts' --write" + }, + "nodemonConfig": { + "execMap": { + "ts": "ts-node" + }, + "ext": "ts", + "watch": [ + "src" + ] + }, + "dependencies": { + "@temporalio/activity": "^1.10.1", + "@temporalio/client": "^1.10.1", + "@temporalio/worker": "^1.10.1", + "@temporalio/workflow": "^1.10.1" + }, + "devDependencies": { + "@tsconfig/node16": "^1.0.0", + "@types/node": "^16.11.43", + "@typescript-eslint/eslint-plugin": "^5.0.0", + "@typescript-eslint/parser": "^5.0.0", + "eslint": "^7.32.0", + "eslint-config-prettier": "^8.3.0", + "eslint-plugin-deprecation": "^1.2.1", + "nodemon": "^2.0.22", + "prettier": "^2.8.8", + "ts-node": "^10.9.2", + "typescript": "^4.4.2" + } +} diff --git a/batch/src/iterator/README.md b/batch/src/iterator/README.md new file mode 100644 index 00000000..87189263 --- /dev/null +++ b/batch/src/iterator/README.md @@ -0,0 +1,21 @@ +# Batch Iterator + +A sample implementation of the Workflow iterator pattern. + +A workflow starts a configured number of Child Workflows in parallel. Each child processes a single record. +After all children close (complete or fail), the parent calls continue-as-new and starts the children for the next page of records. + +The parent tracks and returns the total number of records processed and the number of failed ones. + +This allows processing a set of records of any size. The advantage of this approach is simplicity. +The main disadvantage is that it processes records in batches, with each batch waiting for the slowest child workflow. + +A variation of this pattern runs activities instead of child workflows. + +## Running this sample + +1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). +2. Navigate to the parent directory (`batch`), and run: + 1. `npm install` to install dependencies. + 2. `npm run start-iterator.watch` to start the Worker. + 3. In another shell, `npm run workflow-iterator` to run the Workflow. diff --git a/batch/src/iterator/activities.ts b/batch/src/iterator/activities.ts new file mode 100644 index 00000000..4289e171 --- /dev/null +++ b/batch/src/iterator/activities.ts @@ -0,0 +1,20 @@ +export async function getRecords(pageSize: number, offset: number) { + // This always returns 2 pages, the real implementation would iterate over an existing dataset or file. + const PAGE_COUNT = 2; + const result = []; + if (offset < pageSize * PAGE_COUNT) { + for (let i = 0; i < pageSize; i++) { + result.push(new Record(offset + i)); + } + } + return result; +} + +export class Record { + public readonly id: any; + public readonly description: string; + constructor(id: number) { + this.id = id; + this.description = 'record number ' + this.id; + } +} diff --git a/batch/src/iterator/client.ts b/batch/src/iterator/client.ts new file mode 100644 index 00000000..182f3d8e --- /dev/null +++ b/batch/src/iterator/client.ts @@ -0,0 +1,25 @@ +import { Connection, Client } from '@temporalio/client'; +import { processBatch } from './workflows'; + +async function run() { + const connection = await Connection.connect(); + const client = new Client({ connection }); + + const handle = await client.workflow.start(processBatch, { + taskQueue: 'tq-iterator-wf', + workflowId: 'iterator-wf', + args: [ + { + pageSize: 5, + offset: 0, + }, + ], + }); + const result = await handle.result(); + console.log('Execution result:', result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/batch/src/iterator/worker.ts b/batch/src/iterator/worker.ts new file mode 100644 index 00000000..cdb35e35 --- /dev/null +++ b/batch/src/iterator/worker.ts @@ -0,0 +1,17 @@ +import { Worker } from '@temporalio/worker'; +import * as activities from './activities'; + +async function run() { + const worker = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue: 'tq-iterator-wf', + }); + + await worker.run(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/batch/src/iterator/workflows.ts b/batch/src/iterator/workflows.ts new file mode 100644 index 00000000..ea2df879 --- /dev/null +++ b/batch/src/iterator/workflows.ts @@ -0,0 +1,89 @@ +import { ChildWorkflowHandle, continueAsNew, log, sleep, startChild, workflowInfo } from '@temporalio/workflow'; + +import { proxyActivities } from '@temporalio/workflow'; +import type * as activities from './activities'; +import { Record } from './activities'; +import { ApplicationFailure } from '@temporalio/workflow'; + +const { getRecords } = proxyActivities({ + startToCloseTimeout: '1 minute', +}); + +export async function processBatch(batch: Batch, previousExecutionResult?: Result): Promise { + // load the records to process in this batch + const records: Record[] = await getRecords(batch.pageSize, batch.offset); + + // Starts a child per record asynchronously. + const handles: Array> = await Promise.all( + records.map((record) => { + return startChild(recordProcessor, { + workflowId: workflowInfo().workflowId + '/child-' + record.id, + args: [record], + }); + }) + ); + + const totalProcessedRecords = previousExecutionResult + ? previousExecutionResult.totalProcessedRecords + handles.length + : handles.length; + let failedRecords = previousExecutionResult ? previousExecutionResult.failedRecords : 0; + + //wait for all child workflows to close + for (const handle of handles) { + await handle.result().catch(() => { + //intentionally failing 1/5 child workflow, track child workflows failures. + failedRecords++; + }); + } + + const executionResult = { + totalProcessedRecords, + failedRecords, + }; + + //Complete the workflow if there are no more records to process + if (records.length == 0) { + return executionResult; + } + + //Continue as new to process the next batch + return continueAsNew( + { + pageSize: batch.pageSize, + offset: batch.offset + records.length, + }, + executionResult + ); +} + +export async function recordProcessor(record: Record): Promise { + log.info(`Processing record ${JSON.stringify(record)} in child workflow `); + + const maxSleep = 2000; + const minSleep = 1000; + + //sleep to simulate record processing + await sleep(Math.floor(Math.random() * (maxSleep - minSleep + 1) + minSleep)); + + //intentionally failing 1/5 child workflow + if (record.id % 5 == 0) { + throw ApplicationFailure.nonRetryable( + `Intentionally failing the child workflow with input ${JSON.stringify(record)}` + ); + } +} + +export class Batch { + public readonly pageSize: number; + public readonly offset: number; + + constructor(pageSize: number, offset: number) { + this.pageSize = pageSize; + this.offset = offset; + } +} + +interface Result { + totalProcessedRecords: number; + failedRecords: number; +} diff --git a/batch/tsconfig.json b/batch/tsconfig.json new file mode 100644 index 00000000..6ff187f6 --- /dev/null +++ b/batch/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@tsconfig/node16/tsconfig.json", + "version": "4.4.2", + "compilerOptions": { + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src/**/*.ts"] +}