diff --git a/.scripts/list-of-samples.json b/.scripts/list-of-samples.json index 84c016ab..68e48045 100644 --- a/.scripts/list-of-samples.json +++ b/.scripts/list-of-samples.json @@ -4,6 +4,7 @@ "activities-dependency-injection", "activities-examples", "child-workflows", + "context-propagation", "continue-as-new", "cron-workflows", "custom-logger", diff --git a/README.md b/README.md index 8280b04a..9192afb4 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,10 @@ and you'll be given the list of sample options. - **Interceptors**: - [**OpenTelemetry**](./interceptors-opentelemetry): Use the Interceptors feature to add OpenTelemetry metrics reporting to your workflows. + - [**Context Propagation**](./context-propagation): Uses SDK interceptors to propagate contextual data from the client, to workflows, child workflows and activities, and automatically include these as metadata emitted log messages. + Two variants are presented: + - [**Traditional**](./context-propagation/traditional): Uses the `AsyncLocalStorage` API to implicitly propagate context. This is the recommended approach. + - [**Go-Style**](./context-propagation/go-style): Propagate context using an explicit `ctx` argument on workflow and activity functions, following the Go style. This is a less common approach, but demonstrated for the benefit of teams that have strong reasons to prefer it. - [**Query Subscriptions**](./query-subscriptions): Use Redis Streams, Immer, and SDK Interceptors to subscribe to Workflow state. - [**gRPC calls**](./grpc-calls): Make raw gRPC calls for advanced queries not covered by the WorkflowClient API. diff --git a/context-propagation/go-style-context/.eslintignore b/context-propagation/go-style-context/.eslintignore new file mode 100644 index 00000000..7bd99a41 --- /dev/null +++ b/context-propagation/go-style-context/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js \ No newline at end of file diff --git a/context-propagation/go-style-context/.eslintrc.js b/context-propagation/go-style-context/.eslintrc.js new file mode 100644 index 00000000..b8251a06 --- /dev/null +++ b/context-propagation/go-style-context/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/context-propagation/go-style-context/.gitignore b/context-propagation/go-style-context/.gitignore new file mode 100644 index 00000000..a9f4ed54 --- /dev/null +++ b/context-propagation/go-style-context/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules \ No newline at end of file diff --git a/context-propagation/go-style-context/.npmrc b/context-propagation/go-style-context/.npmrc new file mode 100644 index 00000000..9cf94950 --- /dev/null +++ b/context-propagation/go-style-context/.npmrc @@ -0,0 +1 @@ +package-lock=false \ No newline at end of file diff --git a/context-propagation/go-style-context/.nvmrc b/context-propagation/go-style-context/.nvmrc new file mode 100644 index 00000000..b6a7d89c --- /dev/null +++ b/context-propagation/go-style-context/.nvmrc @@ -0,0 +1 @@ +16 diff --git a/context-propagation/go-style-context/.post-create b/context-propagation/go-style-context/.post-create new file mode 100644 index 00000000..a682bb78 --- /dev/null +++ b/context-propagation/go-style-context/.post-create @@ -0,0 +1,18 @@ +To begin development, install the Temporal CLI: + + Mac: {cyan brew install temporal} + Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest + +Start Temporal Server: + + {cyan temporal server start-dev} + +Use Node version 16+: + + Mac: {cyan brew install node@16} + Other: https://nodejs.org/en/download/ + +Then, in the project directory, using two other shells, run these commands: + + {cyan npm run start.watch} + {cyan npm run workflow} diff --git a/context-propagation/go-style-context/.prettierignore b/context-propagation/go-style-context/.prettierignore new file mode 100644 index 00000000..7951405f --- /dev/null +++ b/context-propagation/go-style-context/.prettierignore @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/context-propagation/go-style-context/.prettierrc b/context-propagation/go-style-context/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/context-propagation/go-style-context/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/context-propagation/go-style-context/README.md b/context-propagation/go-style-context/README.md new file mode 100644 index 00000000..9a99d383 --- /dev/null +++ b/context-propagation/go-style-context/README.md @@ -0,0 +1,175 @@ +# Go-Style Context Propagation + +This project demonstrate an advanced use case where interceptors are used to propagate some contextual data from client to workflow, to child workflow, and to activities. + +In this demo, that contextual data is inject custom log +attributes on log entries produced from workflow and activities. + +In particular, this sample demonstrate: + +- Using + attributes, + +### Running this sample + +1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). +1. `npm install` to install dependencies. +1. `npm run start.watch` to start the Worker. +1. In another shell, `npm run workflow` to run the Workflow. + +
+ +Sample worker output + + +``` +2023-09-05T18:19:39.646Z [worker] info: Creating worker { + options: { + namespace: 'default', + identity: '1234@MyComputer', + useVersioning: false, + buildId: undefined, + shutdownGraceTime: 0, + maxConcurrentLocalActivityExecutions: 100, + enableNonLocalActivities: true, + maxConcurrentWorkflowTaskPolls: 10, + maxConcurrentActivityTaskPolls: 10, + stickyQueueScheduleToStartTimeout: '10s', + maxHeartbeatThrottleInterval: '60s', + defaultHeartbeatThrottleInterval: '30s', + isolateExecutionTimeout: '5s', + workflowThreadPoolSize: 2, + maxCachedWorkflows: 914, + showStackTraceSources: false, + debugMode: false, + workflowsPath: '/Users/user/samples-typescript/custom-logger/src/workflows/index.ts', + activities: { greet: [AsyncFunction: greet] }, + taskQueue: 'custom-logger', + maxConcurrentWorkflowTaskExecutions: 40, + maxConcurrentActivityTaskExecutions: 100, + shutdownGraceTimeMs: 0, + shutdownForceTimeMs: undefined, + stickyQueueScheduleToStartTimeoutMs: 10000, + isolateExecutionTimeoutMs: 5000, + maxHeartbeatThrottleIntervalMs: 60000, + defaultHeartbeatThrottleIntervalMs: 30000, + loadedDataConverter: { + payloadConverter: DefaultPayloadConverter { + converterByEncoding: Map(3) { + 'binary/null' => [UndefinedPayloadConverter], + 'binary/plain' => [BinaryPayloadConverter], + 'json/plain' => [JsonPayloadConverter] + }, + converters: [ + [UndefinedPayloadConverter], + [BinaryPayloadConverter], + [JsonPayloadConverter] + ] + }, + failureConverter: DefaultFailureConverter { + options: { encodeCommonAttributes: false } + }, + payloadCodecs: [] + } + } +} +2023-09-05T18:19:40.084Z [worker] info: asset workflow-bundle-95e3c04c487ab5112957.js 755 KiB [emitted] [immutable] (name: main) +2023-09-05T18:19:40.084Z [worker] info: runtime modules 937 bytes 4 modules +2023-09-05T18:19:40.084Z [worker] info: modules by path ./node_modules/@temporalio/ 182 KiB +2023-09-05T18:19:40.084Z [worker] info: modules by path ./node_modules/@temporalio/common/lib/ 77.9 KiB 21 modules +2023-09-05T18:19:40.084Z [worker] info: modules by path ./node_modules/@temporalio/workflow/ 102 KiB +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/@temporalio/workflow/lib/worker-interface.js 11 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: + 13 modules +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/@temporalio/worker/lib/workflow-log-interceptor.js 2.42 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: modules by path ./src/workflows/ 878 bytes +2023-09-05T18:19:40.084Z [worker] info: ./src/workflows/index-autogenerated-entrypoint.cjs 597 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: ./src/workflows/index.ts 281 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: __temporal_custom_payload_converter (ignored) 15 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: __temporal_custom_failure_converter (ignored) 15 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/long/umd/index.js 43.1 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/ms/dist/index.cjs 3.41 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: webpack 5.88.2 compiled successfully in 264 ms +2023-09-05T18:19:40.085Z [worker] info: Workflow bundle created { size: '0.74MB' } +2023-09-05T18:19:40.165Z [worker] info: Initializing worker +2023-09-05T18:19:40.166Z [worker] info: Worker state changed { state: 'RUNNING' } +2023-09-05T18:19:51.963Z [worker] debug: New WFT +2023-09-05T18:19:51.963Z [worker] debug: Applying new workflow task from server +2023-09-05T18:19:51.963Z [worker] debug: Driven WF start +2023-09-05T18:19:51.963Z [worker] debug: Sending activation to lang +2023-09-05T18:19:51.987Z [workflow] debug: Workflow started { + namespace: 'default', + taskQueue: 'custom-logger', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + runId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow' +} +2023-09-05T18:19:51.987Z [worker] debug: wf bridge iteration fetch +2023-09-05T18:19:51.987Z [worker] debug: handling command +2023-09-05T18:19:51.987Z [worker] debug: prepared commands +2023-09-05T18:19:51.987Z [worker] debug: Sending responses to server +2023-09-05T18:19:51.989Z [worker] debug: Server returned 1 fewer activities for eager execution than we requested +2023-09-05T18:19:51.989Z [worker] debug: Marking WFT completed +2023-09-05T18:19:51.994Z [workflow] debug: Activity started { + isLocal: false, + attempt: 1, + namespace: 'default', + taskToken: 'CiRjZGNlMGM2Mi1mYzdjLTQyODctOGM2MC1kNWYwOWIzNDhjMWESHndvcmtmbG93LTJZcmlPbG1nSzJ2TVQtcmhSRlFRcBokZmEwYjQxNWItZWY4OS00MzRkLTgzOWMtZWMwMzFkMzE2NjhlIAUoATIBMUIFZ3JlZXRKCAgBEPKUQxgB', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + workflowRunId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + activityId: '1', + activityType: 'greet', + taskQueue: 'custom-logger' +} +2023-09-05T18:19:51.994Z [activity] info: Log from activity { + isLocal: false, + attempt: 1, + namespace: 'default', + taskToken: 'CiRjZGNlMGM2Mi1mYzdjLTQyODctOGM2MC1kNWYwOWIzNDhjMWESHndvcmtmbG93LTJZcmlPbG1nSzJ2TVQtcmhSRlFRcBokZmEwYjQxNWItZWY4OS00MzRkLTgzOWMtZWMwMzFkMzE2NjhlIAUoATIBMUIFZ3JlZXRKCAgBEPKUQxgB', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + workflowRunId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + activityId: '1', + activityType: 'greet', + taskQueue: 'custom-logger', + name: 'Temporal' +} +2023-09-05T18:19:51.994Z [activity] debug: Activity completed { + isLocal: false, + attempt: 1, + namespace: 'default', + taskToken: 'CiRjZGNlMGM2Mi1mYzdjLTQyODctOGM2MC1kNWYwOWIzNDhjMWESHndvcmtmbG93LTJZcmlPbG1nSzJ2TVQtcmhSRlFRcBokZmEwYjQxNWItZWY4OS00MzRkLTgzOWMtZWMwMzFkMzE2NjhlIAUoATIBMUIFZ3JlZXRKCAgBEPKUQxgB', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + workflowRunId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + activityId: '1', + activityType: 'greet', + taskQueue: 'custom-logger', + durationMs: 0 +} +2023-09-05T18:19:51.999Z [workerd] debug: New WFT +2023-09-05T18:19:51.999Z [workerd] debug: Applying new workflow task from server +2023-09-05T18:19:51.999Z [workerd] debug: Sending activation to lang +2023-09-05T18:19:52.001Z [workflow] info: Greeted { + namespace: 'default', + taskQueue: 'custom-logger', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + runId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + greeting: 'Hello, Temporal!' +} +2023-09-05T18:19:52.001Z [workflow] debug: Workflow completed { + namespace: 'default', + taskQueue: 'custom-logger', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + runId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow' +} +2023-09-05T18:19:52.001Z [worker] debug: wf bridge iteration fetch +2023-09-05T18:19:52.001Z [worker] debug: handling command +2023-09-05T18:19:52.001Z [worker] debug: prepared commands +2023-09-05T18:19:52.001Z [worker] debug: Sending responses to server +2023-09-05T18:19:52.004Z [worker] debug: Marking WFT completed +``` + +
diff --git a/context-propagation/go-style-context/package.json b/context-propagation/go-style-context/package.json new file mode 100644 index 00000000..18cb3cd4 --- /dev/null +++ b/context-propagation/go-style-context/package.json @@ -0,0 +1,43 @@ +{ + "name": "go-style-context-propagation", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "lint": "eslint .", + "start": "ts-node src/worker.ts", + "start.watch": "nodemon src/worker.ts", + "workflow": "ts-node src/client.ts" + }, + "nodemonConfig": { + "execMap": { + "ts": "ts-node" + }, + "ext": "ts", + "watch": [ + "src" + ] + }, + "dependencies": { + "@temporalio/activity": "^1.12.2", + "@temporalio/client": "^1.12.2", + "@temporalio/worker": "^1.12.2", + "@temporalio/workflow": "^1.12.2", + "nanoid": "3.x" + }, + "devDependencies": { + "@tsconfig/node16": "^1.0.0", + "@types/node": "^16.11.43", + "@types/triple-beam": "^1.3.2", + "@typescript-eslint/eslint-plugin": "^5.0.0", + "@typescript-eslint/parser": "^5.0.0", + "eslint": "^7.32.0", + "eslint-config-prettier": "^8.3.0", + "eslint-plugin-deprecation": "^1.2.1", + "nodemon": "^2.0.12", + "prettier": "^2.8.8", + "ts-node": "^10.2.1", + "typescript": "^4.4.2" + } +} diff --git a/context-propagation/go-style-context/src/activities/index.ts b/context-propagation/go-style-context/src/activities/index.ts new file mode 100644 index 00000000..50ee1483 --- /dev/null +++ b/context-propagation/go-style-context/src/activities/index.ts @@ -0,0 +1,8 @@ +import { GoStyleContext } from '../context/context-type'; + +export async function greet(ctx: GoStyleContext, name: string): Promise { + ctx.info(`Log from activity with customer ${ctx.customer ?? 'unknown'}`, { + name, + }); + return `Hello, ${name}!`; +} diff --git a/context-propagation/go-style-context/src/client.ts b/context-propagation/go-style-context/src/client.ts new file mode 100644 index 00000000..fab06f79 --- /dev/null +++ b/context-propagation/go-style-context/src/client.ts @@ -0,0 +1,33 @@ +import { nanoid } from 'nanoid'; +import { Connection, Client } from '@temporalio/client'; +import { sampleWorkflow } from './workflows'; +import { ContextClientInterceptor } from './context/temporal/client-interceptors'; +import { NormalContext } from './context/context-type'; + +async function run() { + const connection = await Connection.connect({ + address: 'localhost:7233', + // If appropriate, configure TLS and other settings. + // This is optional but we leave this here to remind you there is a gRPC connection being established. + }); + + const clientInterceptor = new ContextClientInterceptor(); + const client = new Client({ + connection, + namespace: 'default', + interceptors: { workflow: [clientInterceptor], schedule: [clientInterceptor] }, + }); + + const ctx = new NormalContext('Acme Inc.'); + + await client.workflow.execute(sampleWorkflow, { + taskQueue: 'context-propagation', + workflowId: 'workflow-' + nanoid(), + args: [ctx], + }); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/context-propagation/go-style-context/src/context/context-type.ts b/context-propagation/go-style-context/src/context/context-type.ts new file mode 100644 index 00000000..59aecaa2 --- /dev/null +++ b/context-propagation/go-style-context/src/context/context-type.ts @@ -0,0 +1,31 @@ +/** + * This is the Context interface you will use everywhere. + */ +export interface GoStyleContext { + customer?: string; + + error: (message: string, metadata?: Record) => void; + warn: (message: string, metadata?: Record) => void; + info: (message: string, metadata?: Record) => void; + debug: (message: string, metadata?: Record) => void; +} + +/** + * This is the Context implementation you would originally create in your normal code. + */ +export class NormalContext implements GoStyleContext { + constructor(public readonly customer?: string) {} + + info(message: string, metadata?: Record): void { + console.info(message, metadata); + } + warn(message: string, metadata?: Record): void { + console.warn(message, metadata); + } + error(message: string, metadata?: Record): void { + console.error(message, metadata); + } + debug(message: string, metadata?: Record): void { + console.debug(message, metadata); + } +} diff --git a/context-propagation/go-style-context/src/context/temporal/activity-interceptors.ts b/context-propagation/go-style-context/src/context/temporal/activity-interceptors.ts new file mode 100644 index 00000000..cd94b93f --- /dev/null +++ b/context-propagation/go-style-context/src/context/temporal/activity-interceptors.ts @@ -0,0 +1,78 @@ +import { + ActivityExecuteInput, + ActivityInboundCallsInterceptor, + ActivityInterceptors, + ActivityOutboundCallsInterceptor, + GetLogAttributesInput, + GetMetricTagsInput, + Next, +} from '@temporalio/worker'; +import { log } from '@temporalio/activity'; +import { MetricTags } from '@temporalio/common'; +import { GoStyleContext } from '../context-type'; +import { extractContextHeader, SerializedContext } from './header-injection'; + +class GoStyleContextActivityInterceptor implements ActivityInboundCallsInterceptor, ActivityOutboundCallsInterceptor { + async execute(input: ActivityExecuteInput, next: Next): Promise { + const inboundContext = extractContextHeader(input.headers); + const ctx = new ActivityContext(inboundContext); + return await next({ + ...input, + args: [ctx, ...input.args], + }); + } + + // FIXME: add the context to the log attributes + getLogAttributes( + input: GetLogAttributesInput, + next: Next + ): Record { + return next({ + input, + }); + } + + getMetricTags(input: GetMetricTagsInput, next: Next): MetricTags { + // FIXME: determine how context needs to affect metric tags + return next(input); + } +} + +/** + * Intercepts activity start, to restore the context received through headers. + * This interceptor also add the content of the context as log metadata. + */ +export function newContextActivityInterceptor(): ActivityInterceptors { + const interceptor = new GoStyleContextActivityInterceptor(); + return { + inbound: interceptor, + outbound: interceptor, + }; +} + +/** + * This is the Context implementation that will be used in Temporal Activity. + * + * It is initialized with properties from the serialized context received through headers, + * and sends logs through the Temporal Activity logger. + */ +class ActivityContext implements GoStyleContext { + public readonly customer?: string; + + constructor(serializedContext: SerializedContext | undefined) { + this.customer = serializedContext?.customer; + } + + info(message: string, metadata?: Record): void { + log.info(message, metadata); + } + warn(message: string, metadata?: Record): void { + log.warn(message, metadata); + } + error(message: string, metadata?: Record): void { + log.error(message, metadata); + } + debug(message: string, metadata?: Record): void { + log.debug(message, metadata); + } +} diff --git a/context-propagation/go-style-context/src/context/temporal/client-interceptors.ts b/context-propagation/go-style-context/src/context/temporal/client-interceptors.ts new file mode 100644 index 00000000..08784704 --- /dev/null +++ b/context-propagation/go-style-context/src/context/temporal/client-interceptors.ts @@ -0,0 +1,96 @@ +import { + CreateScheduleInput, + CreateScheduleOutput, + Next, + ScheduleClientInterceptor, + WorkflowClientInterceptor, + WorkflowQueryInput, + WorkflowSignalInput, + WorkflowSignalWithStartInput, + WorkflowStartInput, + WorkflowStartUpdateInput, + WorkflowStartUpdateOutput, +} from '@temporalio/client'; +import { GoStyleContext } from '../context-type'; +import { injectContextHeader } from './header-injection'; + +/** + * Intercepts calls to start a Workflow or a schedule, passing the current + * context via headers. + */ +export class ContextClientInterceptor implements WorkflowClientInterceptor, ScheduleClientInterceptor { + public async start(input: WorkflowStartInput, next: Next): Promise { + const [ctx, ...args] = input.options.args; + return await next({ + ...input, + options: { + ...input.options, + args, + }, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + public async signal(input: WorkflowSignalInput, next: Next): Promise { + const [ctx, ...args] = input.args; + return await next({ + ...input, + args, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + public async signalWithStart( + input: WorkflowSignalWithStartInput, + next: Next + ): Promise { + const [ctx, ...args] = input.options.args; + return await next({ + ...input, + options: { + ...input.options, + args, + }, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + public async startUpdate( + input: WorkflowStartUpdateInput, + next: Next + ): Promise { + const [ctx, ...args] = input.args; + return await next({ + ...input, + args, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + public async query(input: WorkflowQueryInput, next: Next): Promise { + const [ctx, ...args] = input.args; + return await next({ + ...input, + args, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + public async create( + input: CreateScheduleInput, + next: Next + ): Promise { + const [ctx, ...args] = input.options.action.args; + return await next({ + ...input, + options: { + ...input.options, + action: { + ...input.options.action, + args, + }, + }, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } +} diff --git a/context-propagation/go-style-context/src/context/temporal/context-storage.ts b/context-propagation/go-style-context/src/context/temporal/context-storage.ts new file mode 100644 index 00000000..2223c490 --- /dev/null +++ b/context-propagation/go-style-context/src/context/temporal/context-storage.ts @@ -0,0 +1,15 @@ +// import { AsyncLocalStorage } from 'async_hooks'; + +// const contextStorage: AsyncLocalStorage = new AsyncLocalStorage(); +// export function withContext( +// extraContext: PropagatedContext | undefined, +// fn: (context: PropagatedContext) => Ret +// ): Ret { +// if (!extraContext) return fn(getContext()); +// const newContext = { ...contextStorage.getStore(), ...extraContext }; +// return contextStorage.run(newContext, () => fn(newContext)); +// } + +// export function getContext(): PropagatedContext { +// return contextStorage.getStore() ?? {}; +// } diff --git a/context-propagation/go-style-context/src/context/temporal/header-injection.ts b/context-propagation/go-style-context/src/context/temporal/header-injection.ts new file mode 100644 index 00000000..cbfe72ee --- /dev/null +++ b/context-propagation/go-style-context/src/context/temporal/header-injection.ts @@ -0,0 +1,29 @@ +import { defaultPayloadConverter, type Headers } from '@temporalio/common'; + +/** + * This is the serialized context that is passed between the client and the server. + */ +export interface SerializedContext { + spanId?: string; + customer?: string; +} + +const CONTEXT_HEADER_NAME = 'Context'; + +// Headers needs to be converted to payload. By default, we use the default payload converter. +// You may override this if you need to use a custom payload converter. +const payloadConverter = defaultPayloadConverter; + +export function injectContextHeader(headers: Headers, context: SerializedContext | undefined): Headers { + if (!context) return headers; + return { + ...headers, + [CONTEXT_HEADER_NAME]: payloadConverter.toPayload(context), + }; +} + +export function extractContextHeader(headers: Headers): SerializedContext | undefined { + const contextHeader = headers[CONTEXT_HEADER_NAME]; + if (!contextHeader) return undefined; + return payloadConverter.fromPayload(contextHeader) as SerializedContext; +} diff --git a/context-propagation/go-style-context/src/context/temporal/workflow-interceptors.ts b/context-propagation/go-style-context/src/context/temporal/workflow-interceptors.ts new file mode 100644 index 00000000..2758eee4 --- /dev/null +++ b/context-propagation/go-style-context/src/context/temporal/workflow-interceptors.ts @@ -0,0 +1,181 @@ +import { DisposeInput, SignalWorkflowInput } from '@temporalio/workflow'; +import { + ActivityInput, + ContinueAsNewInput, + GetLogAttributesInput, + LocalActivityInput, + Next, + QueryInput, + SignalInput, + StartChildWorkflowExecutionInput, + UpdateInput, + WorkflowExecuteInput, + WorkflowInboundCallsInterceptor, + WorkflowInterceptors, + WorkflowOutboundCallsInterceptor, + WorkflowInternalsInterceptor, +} from '@temporalio/workflow'; +import { GoStyleContext } from '../context-type'; +import { extractContextHeader, injectContextHeader, SerializedContext } from './header-injection'; + +class GoStyleContextWorklfowInterceptor + implements WorkflowInboundCallsInterceptor, WorkflowOutboundCallsInterceptor, WorkflowInternalsInterceptor +{ + async execute(input: WorkflowExecuteInput, next: Next): Promise { + const inboundContext = extractContextHeader(input.headers); + const ctx = new WorkflowContext(inboundContext); + return next({ + ...input, + args: [ctx, ...input.args], + }); + } + + async handleSignal(input: SignalInput, next: Next): Promise { + const inboundContext = extractContextHeader(input.headers); + const ctx = new WorkflowContext(inboundContext); + return next({ + ...input, + args: [ctx, ...input.args], + }); + } + + async handleQuery(input: QueryInput, next: Next): Promise { + const inboundContext = extractContextHeader(input.headers); + const ctx = new WorkflowContext(inboundContext); + return next({ + ...input, + args: [ctx, ...input.args], + }); + } + + async handleUpdate( + input: UpdateInput, + next: Next + ): Promise { + const inboundContext = extractContextHeader(input.headers); + const ctx = new WorkflowContext(inboundContext); + return next({ + ...input, + args: [ctx, ...input.args], + }); + } + + validateUpdate(input: UpdateInput, next: Next) { + const inboundContext = extractContextHeader(input.headers); + const ctx = new WorkflowContext(inboundContext); + return next({ + ...input, + args: [ctx, ...input.args], + }); + } + + async scheduleActivity( + input: ActivityInput, + next: Next + ): Promise { + const [ctx, ...args] = input.args; + return await next({ + ...input, + args, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + async scheduleLocalActivity( + input: LocalActivityInput, + next: Next + ): Promise { + const [ctx, ...args] = input.args; + return await next({ + ...input, + args, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + async startChildWorkflowExecution( + input: StartChildWorkflowExecutionInput, + next: Next + ): Promise<[Promise, Promise]> { + const [ctx, ...args] = input.options.args; + return await next({ + ...input, + options: { + ...input.options, + args, + }, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + async continueAsNew( + input: ContinueAsNewInput, + next: Next + ): Promise { + const [ctx, ...args] = input.args; + return await next({ + ...input, + args, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + signalWorkflow( + input: SignalWorkflowInput, + next: Next + ): Promise { + const [ctx, ...args] = input.args; + return next({ + ...input, + args, + headers: injectContextHeader(input.headers, ctx as GoStyleContext), + }); + } + + // FIXME: add the context to the log attributes + getLogAttributes( + input: GetLogAttributesInput, + next: Next + ): Record { + return next({ + input, + }); + } + + // FIXME: This will be required to propagate log attributes + // eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-empty-function + dispose(input: DisposeInput, next: Next): void {} +} + +/** + * This is the Context implementation you would in your workflow code. + */ +class WorkflowContext implements GoStyleContext { + public readonly customer?: string; + + constructor(serializedContext: SerializedContext | undefined) { + this.customer = serializedContext?.customer; + } + + info(message: string, metadata?: Record): void { + console.info(message, metadata); + } + warn(message: string, metadata?: Record): void { + console.warn(message, metadata); + } + error(message: string, metadata?: Record): void { + console.error(message, metadata); + } + debug(message: string, metadata?: Record): void { + console.debug(message, metadata); + } +} + +export const interceptors = (): WorkflowInterceptors => { + const interceptor = new GoStyleContextWorklfowInterceptor(); + return { + inbound: [interceptor], + outbound: [interceptor], + // internals: [interceptor], + }; +}; diff --git a/context-propagation/go-style-context/src/worker.ts b/context-propagation/go-style-context/src/worker.ts new file mode 100644 index 00000000..40400f6b --- /dev/null +++ b/context-propagation/go-style-context/src/worker.ts @@ -0,0 +1,25 @@ +import { Worker } from '@temporalio/worker'; +import * as activities from './activities'; +import { newContextActivityInterceptor } from './context/temporal/activity-interceptors'; + +async function main() { + // Create a worker that uses the Runtime instance installed above + const worker = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue: 'context-propagation', + interceptors: { + activity: [newContextActivityInterceptor], + workflowModules: [require.resolve('./context/temporal/workflow-interceptors')], + }, + }); + await worker.run(); +} + +main().then( + () => void process.exit(0), + (err) => { + console.error(err); + process.exit(1); + } +); diff --git a/context-propagation/go-style-context/src/workflows/index.ts b/context-propagation/go-style-context/src/workflows/index.ts new file mode 100644 index 00000000..3f091059 --- /dev/null +++ b/context-propagation/go-style-context/src/workflows/index.ts @@ -0,0 +1,12 @@ +import { proxyActivities } from '@temporalio/workflow'; +import type { GoStyleContext } from '../context/context-type'; +import type * as activities from '../activities'; + +const { greet } = proxyActivities({ + startToCloseTimeout: '5 minutes', +}); + +export async function sampleWorkflow(ctx: GoStyleContext): Promise { + const greeting = await greet(ctx, 'Temporal'); + ctx.info('Greeted', { greeting }); +} diff --git a/context-propagation/go-style-context/tsconfig.json b/context-propagation/go-style-context/tsconfig.json new file mode 100644 index 00000000..6ff187f6 --- /dev/null +++ b/context-propagation/go-style-context/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@tsconfig/node16/tsconfig.json", + "version": "4.4.2", + "compilerOptions": { + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src/**/*.ts"] +} diff --git a/context-propagation/traditionnal/.eslintignore b/context-propagation/traditionnal/.eslintignore new file mode 100644 index 00000000..7bd99a41 --- /dev/null +++ b/context-propagation/traditionnal/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js \ No newline at end of file diff --git a/context-propagation/traditionnal/.eslintrc.js b/context-propagation/traditionnal/.eslintrc.js new file mode 100644 index 00000000..b8251a06 --- /dev/null +++ b/context-propagation/traditionnal/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/context-propagation/traditionnal/.gitignore b/context-propagation/traditionnal/.gitignore new file mode 100644 index 00000000..a9f4ed54 --- /dev/null +++ b/context-propagation/traditionnal/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules \ No newline at end of file diff --git a/context-propagation/traditionnal/.npmrc b/context-propagation/traditionnal/.npmrc new file mode 100644 index 00000000..9cf94950 --- /dev/null +++ b/context-propagation/traditionnal/.npmrc @@ -0,0 +1 @@ +package-lock=false \ No newline at end of file diff --git a/context-propagation/traditionnal/.nvmrc b/context-propagation/traditionnal/.nvmrc new file mode 100644 index 00000000..b6a7d89c --- /dev/null +++ b/context-propagation/traditionnal/.nvmrc @@ -0,0 +1 @@ +16 diff --git a/context-propagation/traditionnal/.post-create b/context-propagation/traditionnal/.post-create new file mode 100644 index 00000000..a682bb78 --- /dev/null +++ b/context-propagation/traditionnal/.post-create @@ -0,0 +1,18 @@ +To begin development, install the Temporal CLI: + + Mac: {cyan brew install temporal} + Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest + +Start Temporal Server: + + {cyan temporal server start-dev} + +Use Node version 16+: + + Mac: {cyan brew install node@16} + Other: https://nodejs.org/en/download/ + +Then, in the project directory, using two other shells, run these commands: + + {cyan npm run start.watch} + {cyan npm run workflow} diff --git a/context-propagation/traditionnal/.prettierignore b/context-propagation/traditionnal/.prettierignore new file mode 100644 index 00000000..7951405f --- /dev/null +++ b/context-propagation/traditionnal/.prettierignore @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/context-propagation/traditionnal/.prettierrc b/context-propagation/traditionnal/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/context-propagation/traditionnal/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/context-propagation/traditionnal/README.md b/context-propagation/traditionnal/README.md new file mode 100644 index 00000000..0d63d593 --- /dev/null +++ b/context-propagation/traditionnal/README.md @@ -0,0 +1,175 @@ +# Context Propagation demo + +This project demonstrate an advanced use case where interceptors are used to propagate some contextual data from client to workflow, to child workflow, and to activities. + +In this demo, that contextual data is inject custom log +attributes on log entries produced from workflow and activities. + +In particular, this sample demonstrate: + +- Using + attributes, + +### Running this sample + +1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). +1. `npm install` to install dependencies. +1. `npm run start.watch` to start the Worker. +1. In another shell, `npm run workflow` to run the Workflow. + +
+ +Sample worker output + + +``` +2023-09-05T18:19:39.646Z [worker] info: Creating worker { + options: { + namespace: 'default', + identity: '1234@MyComputer', + useVersioning: false, + buildId: undefined, + shutdownGraceTime: 0, + maxConcurrentLocalActivityExecutions: 100, + enableNonLocalActivities: true, + maxConcurrentWorkflowTaskPolls: 10, + maxConcurrentActivityTaskPolls: 10, + stickyQueueScheduleToStartTimeout: '10s', + maxHeartbeatThrottleInterval: '60s', + defaultHeartbeatThrottleInterval: '30s', + isolateExecutionTimeout: '5s', + workflowThreadPoolSize: 2, + maxCachedWorkflows: 914, + showStackTraceSources: false, + debugMode: false, + workflowsPath: '/Users/user/samples-typescript/custom-logger/src/workflows/index.ts', + activities: { greet: [AsyncFunction: greet] }, + taskQueue: 'custom-logger', + maxConcurrentWorkflowTaskExecutions: 40, + maxConcurrentActivityTaskExecutions: 100, + shutdownGraceTimeMs: 0, + shutdownForceTimeMs: undefined, + stickyQueueScheduleToStartTimeoutMs: 10000, + isolateExecutionTimeoutMs: 5000, + maxHeartbeatThrottleIntervalMs: 60000, + defaultHeartbeatThrottleIntervalMs: 30000, + loadedDataConverter: { + payloadConverter: DefaultPayloadConverter { + converterByEncoding: Map(3) { + 'binary/null' => [UndefinedPayloadConverter], + 'binary/plain' => [BinaryPayloadConverter], + 'json/plain' => [JsonPayloadConverter] + }, + converters: [ + [UndefinedPayloadConverter], + [BinaryPayloadConverter], + [JsonPayloadConverter] + ] + }, + failureConverter: DefaultFailureConverter { + options: { encodeCommonAttributes: false } + }, + payloadCodecs: [] + } + } +} +2023-09-05T18:19:40.084Z [worker] info: asset workflow-bundle-95e3c04c487ab5112957.js 755 KiB [emitted] [immutable] (name: main) +2023-09-05T18:19:40.084Z [worker] info: runtime modules 937 bytes 4 modules +2023-09-05T18:19:40.084Z [worker] info: modules by path ./node_modules/@temporalio/ 182 KiB +2023-09-05T18:19:40.084Z [worker] info: modules by path ./node_modules/@temporalio/common/lib/ 77.9 KiB 21 modules +2023-09-05T18:19:40.084Z [worker] info: modules by path ./node_modules/@temporalio/workflow/ 102 KiB +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/@temporalio/workflow/lib/worker-interface.js 11 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: + 13 modules +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/@temporalio/worker/lib/workflow-log-interceptor.js 2.42 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: modules by path ./src/workflows/ 878 bytes +2023-09-05T18:19:40.084Z [worker] info: ./src/workflows/index-autogenerated-entrypoint.cjs 597 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: ./src/workflows/index.ts 281 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: __temporal_custom_payload_converter (ignored) 15 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: __temporal_custom_failure_converter (ignored) 15 bytes [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/long/umd/index.js 43.1 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: ./node_modules/ms/dist/index.cjs 3.41 KiB [built] [code generated] +2023-09-05T18:19:40.084Z [worker] info: webpack 5.88.2 compiled successfully in 264 ms +2023-09-05T18:19:40.085Z [worker] info: Workflow bundle created { size: '0.74MB' } +2023-09-05T18:19:40.165Z [worker] info: Initializing worker +2023-09-05T18:19:40.166Z [worker] info: Worker state changed { state: 'RUNNING' } +2023-09-05T18:19:51.963Z [worker] debug: New WFT +2023-09-05T18:19:51.963Z [worker] debug: Applying new workflow task from server +2023-09-05T18:19:51.963Z [worker] debug: Driven WF start +2023-09-05T18:19:51.963Z [worker] debug: Sending activation to lang +2023-09-05T18:19:51.987Z [workflow] debug: Workflow started { + namespace: 'default', + taskQueue: 'custom-logger', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + runId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow' +} +2023-09-05T18:19:51.987Z [worker] debug: wf bridge iteration fetch +2023-09-05T18:19:51.987Z [worker] debug: handling command +2023-09-05T18:19:51.987Z [worker] debug: prepared commands +2023-09-05T18:19:51.987Z [worker] debug: Sending responses to server +2023-09-05T18:19:51.989Z [worker] debug: Server returned 1 fewer activities for eager execution than we requested +2023-09-05T18:19:51.989Z [worker] debug: Marking WFT completed +2023-09-05T18:19:51.994Z [workflow] debug: Activity started { + isLocal: false, + attempt: 1, + namespace: 'default', + taskToken: 'CiRjZGNlMGM2Mi1mYzdjLTQyODctOGM2MC1kNWYwOWIzNDhjMWESHndvcmtmbG93LTJZcmlPbG1nSzJ2TVQtcmhSRlFRcBokZmEwYjQxNWItZWY4OS00MzRkLTgzOWMtZWMwMzFkMzE2NjhlIAUoATIBMUIFZ3JlZXRKCAgBEPKUQxgB', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + workflowRunId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + activityId: '1', + activityType: 'greet', + taskQueue: 'custom-logger' +} +2023-09-05T18:19:51.994Z [activity] info: Log from activity { + isLocal: false, + attempt: 1, + namespace: 'default', + taskToken: 'CiRjZGNlMGM2Mi1mYzdjLTQyODctOGM2MC1kNWYwOWIzNDhjMWESHndvcmtmbG93LTJZcmlPbG1nSzJ2TVQtcmhSRlFRcBokZmEwYjQxNWItZWY4OS00MzRkLTgzOWMtZWMwMzFkMzE2NjhlIAUoATIBMUIFZ3JlZXRKCAgBEPKUQxgB', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + workflowRunId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + activityId: '1', + activityType: 'greet', + taskQueue: 'custom-logger', + name: 'Temporal' +} +2023-09-05T18:19:51.994Z [activity] debug: Activity completed { + isLocal: false, + attempt: 1, + namespace: 'default', + taskToken: 'CiRjZGNlMGM2Mi1mYzdjLTQyODctOGM2MC1kNWYwOWIzNDhjMWESHndvcmtmbG93LTJZcmlPbG1nSzJ2TVQtcmhSRlFRcBokZmEwYjQxNWItZWY4OS00MzRkLTgzOWMtZWMwMzFkMzE2NjhlIAUoATIBMUIFZ3JlZXRKCAgBEPKUQxgB', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + workflowRunId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + activityId: '1', + activityType: 'greet', + taskQueue: 'custom-logger', + durationMs: 0 +} +2023-09-05T18:19:51.999Z [workerd] debug: New WFT +2023-09-05T18:19:51.999Z [workerd] debug: Applying new workflow task from server +2023-09-05T18:19:51.999Z [workerd] debug: Sending activation to lang +2023-09-05T18:19:52.001Z [workflow] info: Greeted { + namespace: 'default', + taskQueue: 'custom-logger', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + runId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow', + greeting: 'Hello, Temporal!' +} +2023-09-05T18:19:52.001Z [workflow] debug: Workflow completed { + namespace: 'default', + taskQueue: 'custom-logger', + workflowId: 'workflow-2YriOlmgK2vMT-rhRFQQp', + runId: 'fa0b415b-ef89-434d-839c-ec031d31668e', + workflowType: 'logSampleWorkflow' +} +2023-09-05T18:19:52.001Z [worker] debug: wf bridge iteration fetch +2023-09-05T18:19:52.001Z [worker] debug: handling command +2023-09-05T18:19:52.001Z [worker] debug: prepared commands +2023-09-05T18:19:52.001Z [worker] debug: Sending responses to server +2023-09-05T18:19:52.004Z [worker] debug: Marking WFT completed +``` + +
diff --git a/context-propagation/traditionnal/package.json b/context-propagation/traditionnal/package.json new file mode 100644 index 00000000..78b80497 --- /dev/null +++ b/context-propagation/traditionnal/package.json @@ -0,0 +1,43 @@ +{ + "name": "temporal-context-propagation", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "lint": "eslint .", + "start": "ts-node src/worker.ts", + "start.watch": "nodemon src/worker.ts", + "workflow": "ts-node src/client.ts" + }, + "nodemonConfig": { + "execMap": { + "ts": "ts-node" + }, + "ext": "ts", + "watch": [ + "src" + ] + }, + "dependencies": { + "@temporalio/activity": "^1.12.2", + "@temporalio/client": "^1.12.2", + "@temporalio/worker": "^1.12.2", + "@temporalio/workflow": "^1.12.2", + "nanoid": "3.x" + }, + "devDependencies": { + "@tsconfig/node16": "^1.0.0", + "@types/node": "^16.11.43", + "@types/triple-beam": "^1.3.2", + "@typescript-eslint/eslint-plugin": "^5.0.0", + "@typescript-eslint/parser": "^5.0.0", + "eslint": "^7.32.0", + "eslint-config-prettier": "^8.3.0", + "eslint-plugin-deprecation": "^1.2.1", + "nodemon": "^2.0.12", + "prettier": "^2.8.8", + "ts-node": "^10.2.1", + "typescript": "^4.4.2" + } +} diff --git a/context-propagation/traditionnal/src/activities/index.ts b/context-propagation/traditionnal/src/activities/index.ts new file mode 100644 index 00000000..d5b64c5f --- /dev/null +++ b/context-propagation/traditionnal/src/activities/index.ts @@ -0,0 +1,8 @@ +import { log } from '@temporalio/activity'; +import { getContext } from '../context/context-injection'; + +export async function greet(name: string): Promise { + const propagatedContext = getContext(); + log.info(`Log from activity with customer ${propagatedContext.customer ?? 'unknown'}`); + return `Hello, ${name}!`; +} diff --git a/context-propagation/traditionnal/src/client.ts b/context-propagation/traditionnal/src/client.ts new file mode 100644 index 00000000..c070a592 --- /dev/null +++ b/context-propagation/traditionnal/src/client.ts @@ -0,0 +1,32 @@ +import { nanoid } from 'nanoid'; +import { Connection, Client } from '@temporalio/client'; +import { sampleWorkflow } from './workflows'; +import { ContextClientInterceptor } from './context/client-interceptors'; +import { withContext } from './context/context-injection'; + +async function run() { + const connection = await Connection.connect({ + address: 'localhost:7233', + // If appropriate, configure TLS and other settings. + // This is optional but we leave this here to remind you there is a gRPC connection being established. + }); + + const clientInterceptor = new ContextClientInterceptor(); + const client = new Client({ + connection, + namespace: 'default', + interceptors: { workflow: [clientInterceptor], schedule: [clientInterceptor] }, + }); + + await withContext({ customer: 'Acme Inc.' }, async () => { + await client.workflow.execute(sampleWorkflow, { + taskQueue: 'context-propagation', + workflowId: 'workflow-' + nanoid(), + }); + }); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/context-propagation/traditionnal/src/context/activity-interceptors.ts b/context-propagation/traditionnal/src/context/activity-interceptors.ts new file mode 100644 index 00000000..94a90571 --- /dev/null +++ b/context-propagation/traditionnal/src/context/activity-interceptors.ts @@ -0,0 +1,48 @@ +import { + ActivityExecuteInput, + ActivityInboundCallsInterceptor, + ActivityInterceptors, + ActivityOutboundCallsInterceptor, + GetLogAttributesInput, + GetMetricTagsInput, + Next, +} from '@temporalio/worker'; +import { MetricTags } from '@temporalio/common'; +import { extractContextHeader } from './context-type'; +import { withContext, getContext } from './context-injection'; + +class ContextActivityInterceptor implements ActivityInboundCallsInterceptor, ActivityOutboundCallsInterceptor { + async execute(input: ActivityExecuteInput, next: Next): Promise { + const inboundContext = extractContextHeader(input.headers); + return await withContext(inboundContext ?? {}, () => { + return next(input); + }); + } + + getLogAttributes( + input: GetLogAttributesInput, + next: Next + ): Record { + return next({ + input, + ...getContext(), + }); + } + + getMetricTags(input: GetMetricTagsInput, next: Next): MetricTags { + // FIXME: determine how context needs to affect metric tags + return next(input); + } +} + +/** + * Intercepts activity start, to restore the context received through headers. + * This interceptor also add the content of the context as log metadata. + */ +export function newContextActivityInterceptor(): ActivityInterceptors { + const interceptor = new ContextActivityInterceptor(); + return { + inbound: interceptor, + outbound: interceptor, + }; +} diff --git a/context-propagation/traditionnal/src/context/client-interceptors.ts b/context-propagation/traditionnal/src/context/client-interceptors.ts new file mode 100644 index 00000000..69b5cce9 --- /dev/null +++ b/context-propagation/traditionnal/src/context/client-interceptors.ts @@ -0,0 +1,80 @@ +import { + CreateScheduleInput, + CreateScheduleOutput, + Next, + ScheduleClientInterceptor, + WorkflowClientInterceptor, + WorkflowQueryInput, + WorkflowSignalInput, + WorkflowSignalWithStartInput, + WorkflowStartInput, + WorkflowStartUpdateInput, + WorkflowStartUpdateOutput, +} from '@temporalio/client'; +import { injectContextHeader } from './context-type'; +import { getContext } from './context-injection'; + +/** + * Intercepts calls to start a Workflow or a schedule, passing the current + * context via headers. + */ +export class ContextClientInterceptor implements WorkflowClientInterceptor, ScheduleClientInterceptor { + public async start(input: WorkflowStartInput, next: Next): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + public async signal(input: WorkflowSignalInput, next: Next): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + public async signalWithStart( + input: WorkflowSignalWithStartInput, + next: Next + ): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + public async startUpdate( + input: WorkflowStartUpdateInput, + next: Next + ): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + public async query(input: WorkflowQueryInput, next: Next): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + // Note that you may also intercept calls to 'cancel', 'terminate', and 'describe'. + + public async create( + input: CreateScheduleInput, + next: Next + ): Promise { + return await next({ + ...input, + options: { + ...input.options, + action: { + ...input.options.action, + }, + }, + headers: injectContextHeader(input.headers, getContext()), + }); + } +} diff --git a/context-propagation/traditionnal/src/context/context-injection.ts b/context-propagation/traditionnal/src/context/context-injection.ts new file mode 100644 index 00000000..6b4e5d0d --- /dev/null +++ b/context-propagation/traditionnal/src/context/context-injection.ts @@ -0,0 +1,17 @@ +import { AsyncLocalStorage } from 'async_hooks'; +import { PropagatedContext } from './context-type'; + +const contextStorage: AsyncLocalStorage = new AsyncLocalStorage(); + +export function withContext( + extraContext: PropagatedContext | undefined, + fn: (context: PropagatedContext) => Ret +): Ret { + if (!extraContext) return fn(getContext()); + const newContext = { ...contextStorage.getStore(), ...extraContext }; + return contextStorage.run(newContext, () => fn(newContext)); +} + +export function getContext(): PropagatedContext { + return contextStorage.getStore() ?? {}; +} diff --git a/context-propagation/traditionnal/src/context/context-type.ts b/context-propagation/traditionnal/src/context/context-type.ts new file mode 100644 index 00000000..b6c75e64 --- /dev/null +++ b/context-propagation/traditionnal/src/context/context-type.ts @@ -0,0 +1,27 @@ +import { Headers, defaultPayloadConverter } from '@temporalio/common'; + +// Must be JSON serializable +export interface PropagatedContext { + customer?: string; +} + +const CONTEXT_HEADER_NAME = 'Context'; + +// Headers needs to be converted to payload. By default, we use the default payload converter. +// You may override this if you need to use a custom payload converter. +const payloadConverter = defaultPayloadConverter; + +export function injectContextHeader(headers: Headers, context: PropagatedContext | undefined): Headers { + if (!context) return headers; + + return { + ...headers, + [CONTEXT_HEADER_NAME]: payloadConverter.toPayload(context), + }; +} + +export function extractContextHeader(headers: Headers): PropagatedContext | undefined { + const contextHeader = headers[CONTEXT_HEADER_NAME]; + if (!contextHeader) return undefined; + return payloadConverter.fromPayload(contextHeader) as PropagatedContext; +} diff --git a/context-propagation/traditionnal/src/context/workflow-interceptors.ts b/context-propagation/traditionnal/src/context/workflow-interceptors.ts new file mode 100644 index 00000000..dca9f6d9 --- /dev/null +++ b/context-propagation/traditionnal/src/context/workflow-interceptors.ts @@ -0,0 +1,151 @@ +import { + AsyncLocalStorage, + DisposeInput, + GetMetricTagsInput, + WorkflowInternalsInterceptor, +} from '@temporalio/workflow'; +import { + ActivityInput, + ContinueAsNewInput, + GetLogAttributesInput, + LocalActivityInput, + Next, + QueryInput, + SignalInput, + StartChildWorkflowExecutionInput, + UpdateInput, + WorkflowExecuteInput, + WorkflowInboundCallsInterceptor, + WorkflowInterceptors, + WorkflowOutboundCallsInterceptor, +} from '@temporalio/workflow'; +import { MetricTags } from '@temporalio/common'; +import { extractContextHeader, injectContextHeader, PropagatedContext } from './context-type'; + +const contextStorage = new AsyncLocalStorage(); + +export function withContext( + extraContext: PropagatedContext | undefined, + fn: (context: PropagatedContext) => Ret +): Ret { + if (!extraContext) return fn(getContext()); + const newContext = { ...contextStorage.getStore(), ...extraContext }; + return contextStorage.run(newContext, () => fn(newContext)); +} + +export function getContext(): PropagatedContext { + return contextStorage.getStore() ?? {}; +} + +class ContextWorklfowInterceptor + implements WorkflowInboundCallsInterceptor, WorkflowOutboundCallsInterceptor, WorkflowInternalsInterceptor +{ + private executionContext: PropagatedContext | undefined; + + async execute(input: WorkflowExecuteInput, next: Next): Promise { + this.executionContext = extractContextHeader(input.headers); + return withContext(this.executionContext, () => next(input)); + } + + async handleSignal(input: SignalInput, next: Next): Promise { + const inboundContext = extractContextHeader(input.headers); + return withContext({ ...this.executionContext, ...inboundContext }, () => next(input)); + } + + async handleQuery(input: QueryInput, next: Next): Promise { + const inboundContext = extractContextHeader(input.headers); + return withContext({ ...this.executionContext, ...inboundContext }, () => next(input)); + } + + async handleUpdate( + input: UpdateInput, + next: Next + ): Promise { + const inboundContext = extractContextHeader(input.headers); + return withContext({ ...this.executionContext, ...inboundContext }, () => next(input)); + } + + validateUpdate(input: UpdateInput, next: Next) { + const inboundContext = extractContextHeader(input.headers); + return withContext({ ...this.executionContext, ...inboundContext }, () => next(input)); + } + + async scheduleActivity( + input: ActivityInput, + next: Next + ): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + async scheduleLocalActivity( + input: LocalActivityInput, + next: Next + ): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + async startChildWorkflowExecution( + input: StartChildWorkflowExecutionInput, + next: Next + ): Promise<[Promise, Promise]> { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + async continueAsNew( + input: ContinueAsNewInput, + next: Next + ): Promise { + return await next({ + ...input, + headers: injectContextHeader(input.headers, getContext()), + }); + } + + // Note that we do not propagate context to signals sent from this workflow, as this might create + // confusion in some cases (i.e. in the signal handler of the target workflow, context values + // coming from _this workflow_ would override context values defined when that workflow was sent). + // If that's the intended behavior, simply add intercept the signalWorkflow method. + + getLogAttributes( + input: GetLogAttributesInput, + next: Next + ): Record { + return next({ + input, + ...getContext(), + }); + } + + getMetricTags(input: GetMetricTagsInput, next: Next): MetricTags { + // FIXME: determine how context needs to affect metric tags + return next(input); + } + + dispose(input: DisposeInput, next: Next): void { + // This is very important. Due to how node implements AsyncLocalStorage, the storage is not tied + // to the present execution context, and will survive eviction of this workflow out of the + // worker's workflow cache, causing memory leaks. Always disable AsyncLocalStorage you created + // yourself when the workflow is disposed. + contextStorage.disable(); + + next(input); + } +} + +export const interceptors = (): WorkflowInterceptors => { + const interceptor = new ContextWorklfowInterceptor(); + return { + inbound: [interceptor], + outbound: [interceptor], + internals: [interceptor], + }; +}; diff --git a/context-propagation/traditionnal/src/worker.ts b/context-propagation/traditionnal/src/worker.ts new file mode 100644 index 00000000..f922e004 --- /dev/null +++ b/context-propagation/traditionnal/src/worker.ts @@ -0,0 +1,25 @@ +import { Worker } from '@temporalio/worker'; +import * as activities from './activities'; +import { newContextActivityInterceptor } from './context/activity-interceptors'; + +async function main() { + // Create a worker that uses the Runtime instance installed above + const worker = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue: 'context-propagation', + interceptors: { + activity: [newContextActivityInterceptor], + workflowModules: [require.resolve('./context/workflow-interceptors')], + }, + }); + await worker.run(); +} + +main().then( + () => void process.exit(0), + (err) => { + console.error(err); + process.exit(1); + } +); diff --git a/context-propagation/traditionnal/src/workflows/index.ts b/context-propagation/traditionnal/src/workflows/index.ts new file mode 100644 index 00000000..ce332f68 --- /dev/null +++ b/context-propagation/traditionnal/src/workflows/index.ts @@ -0,0 +1,11 @@ +import { log, proxyActivities } from '@temporalio/workflow'; +import type * as activities from '../activities'; + +const { greet } = proxyActivities({ + startToCloseTimeout: '5 minutes', +}); + +export async function sampleWorkflow(): Promise { + const greeting = await greet('Temporal'); + log.info('Greeted', { greeting }); +} diff --git a/context-propagation/traditionnal/tsconfig.json b/context-propagation/traditionnal/tsconfig.json new file mode 100644 index 00000000..6ff187f6 --- /dev/null +++ b/context-propagation/traditionnal/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@tsconfig/node16/tsconfig.json", + "version": "4.4.2", + "compilerOptions": { + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src/**/*.ts"] +} diff --git a/custom-logger/README.md b/custom-logger/README.md index 9ef26879..c90be879 100644 --- a/custom-logger/README.md +++ b/custom-logger/README.md @@ -1,4 +1,4 @@ -# Customer Logger demo +# Custom Logger demo This project demonstrates how to capture log entries produced by Workers, Workflows and Activities, and process them using a third party logging libraries (in this case, the [Winston](https://github.com/winstonjs/winston) package).