diff --git a/new_samples/branch/branch.go b/new_samples/branch/branch.go new file mode 100644 index 0000000..f0de59d --- /dev/null +++ b/new_samples/branch/branch.go @@ -0,0 +1,45 @@ +package main + +import ( + "fmt" + "time" + + "go.uber.org/cadence/workflow" +) + +const totalBranches = 3 + +// BranchWorkflow executes multiple activities in parallel and waits for all. +func BranchWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("BranchWorkflow started") + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var futures []workflow.Future + for i := 1; i <= totalBranches; i++ { + input := fmt.Sprintf("branch %d of %d", i, totalBranches) + future := workflow.ExecuteActivity(ctx, BranchActivity, input) + futures = append(futures, future) + } + + for _, f := range futures { + if err := f.Get(ctx, nil); err != nil { + return err + } + } + + logger.Info("BranchWorkflow completed") + return nil +} + +// BranchActivity processes a single branch. +func BranchActivity(input string) (string, error) { + fmt.Printf("BranchActivity: %s\n", input) + return "Result_" + input, nil +} + diff --git a/new_samples/branch/generator/README.md b/new_samples/branch/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/branch/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/branch/generator/README_specific.md b/new_samples/branch/generator/README_specific.md new file mode 100644 index 0000000..1caabfb --- /dev/null +++ b/new_samples/branch/generator/README_specific.md @@ -0,0 +1,29 @@ +## Branch Workflow Sample + +This sample demonstrates **parallel activity execution** using Futures. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.BranchWorkflow +``` + +### Key Concept: Parallel Execution with Futures + +```go +var futures []workflow.Future +for i := 1; i <= 3; i++ { + future := workflow.ExecuteActivity(ctx, BranchActivity, input) + futures = append(futures, future) +} +// Wait for all +for _, f := range futures { + f.Get(ctx, nil) +} +``` + diff --git a/new_samples/branch/generator/generate.go b/new_samples/branch/generator/generate.go new file mode 100644 index 0000000..bdc7512 --- /dev/null +++ b/new_samples/branch/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Branch", + Workflows: []string{"BranchWorkflow"}, + Activities: []string{"BranchActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/branch/main.go b/new_samples/branch/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/branch/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/branch/worker.go b/new_samples/branch/worker.go new file mode 100644 index 0000000..417b3f1 --- /dev/null +++ b/new_samples/branch/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(BranchWorkflow, workflow.RegisterOptions{Name: "cadence_samples.BranchWorkflow"}) + w.RegisterActivityWithOptions(BranchActivity, activity.RegisterOptions{Name: "cadence_samples.BranchActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/cancelactivity/cancelactivity.go b/new_samples/cancelactivity/cancelactivity.go new file mode 100644 index 0000000..7cef31f --- /dev/null +++ b/new_samples/cancelactivity/cancelactivity.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "fmt" + "time" + + "go.uber.org/cadence" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +/** + * This is the cancel activity workflow sample. + */ + +// ApplicationName is the task list for this sample +const ApplicationName = "cancelGroup" + +// CancelWorkflow workflow decider +func CancelWorkflow(ctx workflow.Context) (retError error) { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute * 30, + HeartbeatTimeout: time.Second * 5, + WaitForCancellation: true, + } + ctx = workflow.WithActivityOptions(ctx, ao) + logger := workflow.GetLogger(ctx) + logger.Info("cancel workflow started") + + defer func() { + if cadence.IsCanceledError(retError) { + // When workflow is canceled, it has to get a new disconnected context to execute any activities + newCtx, _ := workflow.NewDisconnectedContext(ctx) + err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(ctx, nil) + if err != nil { + logger.Error("Cleanup activity failed", zap.Error(err)) + retError = err + return + } + retError = nil + logger.Info("Workflow completed.") + } + }() + + var result string + err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, &result) + if err != nil && !cadence.IsCanceledError(err) { + logger.Error("Error from LongRunningActivity", zap.Error(err)) + return err + } + logger.Info(fmt.Sprintf("LongRunningActivity returns %v, %v", result, err)) + + // Execute activity using a canceled ctx, + // activity won't be scheduled and an cancelled error will be returned + err = workflow.ExecuteActivity(ctx, SkippedActivity).Get(ctx, nil) + if err != nil && !cadence.IsCanceledError(err) { + logger.Error("Error from SkippedActivity", zap.Error(err)) + } + + return err +} + +func LongRunningActivity(ctx context.Context) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("activity started, to cancel workflow, use ./cancelactivity -m cancel -w or CLI: 'cadence --do default wf cancel -w ' to cancel") + for { + select { + case <-time.After(1 * time.Second): + logger.Info("heartbeating...") + activity.RecordHeartbeat(ctx, "") + case <-ctx.Done(): + logger.Info("context is cancelled") + // returned canceled error here so that in workflow history we can see ActivityTaskCanceled event + // or if not cancelled, return timeout error + return "I am canceled by Done", ctx.Err() + } + } +} + +func CleanupActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("CleanupActivity started") + return nil +} + +func SkippedActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("this activity will be skipped due to cancellation") + return nil +} diff --git a/new_samples/cancelactivity/generator/README.md b/new_samples/cancelactivity/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/cancelactivity/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/cancelactivity/generator/README_specific.md b/new_samples/cancelactivity/generator/README_specific.md new file mode 100644 index 0000000..c39cdf1 --- /dev/null +++ b/new_samples/cancelactivity/generator/README_specific.md @@ -0,0 +1,53 @@ +## How It Works + +This sample demonstrates graceful workflow cancellation with cleanup: + +``` +┌──────────────────────┐ +│ CancelWorkflow │ +│ │ +│ ┌────────────────┐ │ Cancel Signal +│ │ LongRunning │◀─┼───────────────────── +│ │ Activity │ │ +│ │ (heartbeating) │ │ +│ └───────┬────────┘ │ +│ │ │ +│ On Cancel: │ +│ ▼ │ +│ ┌────────────────┐ │ +│ │ CleanupActivity│ │ ← Runs in disconnected context +│ └────────────────┘ │ +└──────────────────────┘ +``` + +Key concepts: +- **WaitForCancellation**: Activity option that waits for activity to acknowledge cancel +- **NewDisconnectedContext**: Creates a context unaffected by workflow cancellation +- **IsCanceledError**: Check if an error is due to cancellation + +## Running the Sample + +Start the worker: +```bash +go run . +``` + +Trigger a workflow: +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --workflow_type cadence_samples.CancelWorkflow \ + --tl cadence-samples-worker \ + --et 600 +``` + +Cancel the workflow (copy workflow ID from above): +```bash +cadence --env development \ + --domain cadence-samples \ + workflow cancel \ + --wid +``` + +Watch the worker logs to see the cleanup activity run. diff --git a/new_samples/cancelactivity/generator/generate.go b/new_samples/cancelactivity/generator/generate.go new file mode 100644 index 0000000..78ff065 --- /dev/null +++ b/new_samples/cancelactivity/generator/generate.go @@ -0,0 +1,12 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Cancel Activity", + Workflows: []string{"CancelWorkflow"}, + Activities: []string{"LongRunningActivity", "CleanupActivity", "SkippedActivity"}, + } + template.GenerateAll(data) +} diff --git a/new_samples/cancelactivity/main.go b/new_samples/cancelactivity/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/cancelactivity/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/cancelactivity/worker.go b/new_samples/cancelactivity/worker.go new file mode 100644 index 0000000..615b40a --- /dev/null +++ b/new_samples/cancelactivity/worker.go @@ -0,0 +1,103 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(CancelWorkflow, workflow.RegisterOptions{Name: "cadence_samples.CancelWorkflow"}) + w.RegisterActivityWithOptions(LongRunningActivity, activity.RegisterOptions{Name: "cadence_samples.LongRunningActivity"}) + w.RegisterActivityWithOptions(CleanupActivity, activity.RegisterOptions{Name: "cadence_samples.CleanupActivity"}) + w.RegisterActivityWithOptions(SkippedActivity, activity.RegisterOptions{Name: "cadence_samples.SkippedActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/choice/choice.go b/new_samples/choice/choice.go new file mode 100644 index 0000000..ecb6917 --- /dev/null +++ b/new_samples/choice/choice.go @@ -0,0 +1,48 @@ +package main + +import ( + "fmt" + "math/rand" + "time" + + "go.uber.org/cadence/workflow" +) + +var orderChoices = []string{"apple", "banana", "orange"} + +// ChoiceWorkflow demonstrates conditional execution. +func ChoiceWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("ChoiceWorkflow started") + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var order string + if err := workflow.ExecuteActivity(ctx, GetOrderActivity).Get(ctx, &order); err != nil { + return err + } + + switch order { + case "apple": + workflow.ExecuteActivity(ctx, ProcessAppleActivity, order).Get(ctx, nil) + case "banana": + workflow.ExecuteActivity(ctx, ProcessBananaActivity, order).Get(ctx, nil) + case "orange": + workflow.ExecuteActivity(ctx, ProcessOrangeActivity, order).Get(ctx, nil) + } + + logger.Info("ChoiceWorkflow completed") + return nil +} + +func GetOrderActivity() (string, error) { + return orderChoices[rand.Intn(len(orderChoices))], nil +} +func ProcessAppleActivity(order string) error { fmt.Println("Processing apple"); return nil } +func ProcessBananaActivity(order string) error { fmt.Println("Processing banana"); return nil } +func ProcessOrangeActivity(order string) error { fmt.Println("Processing orange"); return nil } + diff --git a/new_samples/choice/generator/README.md b/new_samples/choice/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/choice/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/choice/generator/README_specific.md b/new_samples/choice/generator/README_specific.md new file mode 100644 index 0000000..4e30ea1 --- /dev/null +++ b/new_samples/choice/generator/README_specific.md @@ -0,0 +1,28 @@ +## Choice Workflow Sample + +This sample demonstrates **conditional execution** based on activity results. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.ChoiceWorkflow +``` + +### Key Concept: Conditional Branching + +```go +var order string +workflow.ExecuteActivity(ctx, GetOrderActivity).Get(ctx, &order) +switch order { +case "apple": + workflow.ExecuteActivity(ctx, ProcessAppleActivity) +case "banana": + workflow.ExecuteActivity(ctx, ProcessBananaActivity) +} +``` + diff --git a/new_samples/choice/generator/generate.go b/new_samples/choice/generator/generate.go new file mode 100644 index 0000000..26de64e --- /dev/null +++ b/new_samples/choice/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Choice", + Workflows: []string{"ChoiceWorkflow"}, + Activities: []string{"GetOrderActivity", "ProcessAppleActivity", "ProcessBananaActivity", "ProcessOrangeActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/choice/main.go b/new_samples/choice/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/choice/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/choice/worker.go b/new_samples/choice/worker.go new file mode 100644 index 0000000..c694839 --- /dev/null +++ b/new_samples/choice/worker.go @@ -0,0 +1,104 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(ChoiceWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ChoiceWorkflow"}) + w.RegisterActivityWithOptions(GetOrderActivity, activity.RegisterOptions{Name: "cadence_samples.GetOrderActivity"}) + w.RegisterActivityWithOptions(ProcessAppleActivity, activity.RegisterOptions{Name: "cadence_samples.ProcessAppleActivity"}) + w.RegisterActivityWithOptions(ProcessBananaActivity, activity.RegisterOptions{Name: "cadence_samples.ProcessBananaActivity"}) + w.RegisterActivityWithOptions(ProcessOrangeActivity, activity.RegisterOptions{Name: "cadence_samples.ProcessOrangeActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/delaystart/delaystart.go b/new_samples/delaystart/delaystart.go new file mode 100644 index 0000000..5aeda01 --- /dev/null +++ b/new_samples/delaystart/delaystart.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +/** + * This is the hello world workflow sample. + */ + +// ApplicationName is the task list for this sample +const ApplicationName = "delaystartGroup" + +const DelayStartWorkflowName = "DelayStartWorkflow" + +// helloWorkflow workflow decider +func DelayStartWorkflow(ctx workflow.Context, delayStart time.Duration) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + logger.Info("delaystart workflow started after waiting for " + delayStart.String()) + var helloworldResult string + err := workflow.ExecuteActivity(ctx, DelayStartActivity, delayStart).Get(ctx, &helloworldResult) + if err != nil { + logger.Error("Activity failed after waiting for "+delayStart.String(), zap.Error(err)) + return err + } + + // Adding a new activity to the workflow will result in a non-determinstic change for the workflow + // Please check https://cadenceworkflow.io/docs/go-client/workflow-versioning/ for more information + // + // Un-commenting the following code and the TestReplayWorkflowHistoryFromFile in replay_test.go + // will fail due to the non-determinstic change + // + // If you have a completed workflow execution without the following code and run the + // TestWorkflowShadowing in shadow_test.go or start the worker in shadow mode (using -m shadower) + // those two shadowing check will also fail due to the non-deterministic change + // + // err := workflow.ExecuteActivity(ctx, helloWorldActivity, name).Get(ctx, &helloworldResult) + // if err != nil { + // logger.Error("Activity failed.", zap.Error(err)) + // return err + // } + + logger.Info("Workflow completed.", zap.String("Result", helloworldResult)) + + return nil +} + +func DelayStartActivity(ctx context.Context, delayStart time.Duration) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("DelayStartActivity started after " + delayStart.String()) + return "Activity started after " + delayStart.String(), nil +} diff --git a/new_samples/delaystart/generator/README.md b/new_samples/delaystart/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/delaystart/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/delaystart/generator/README_specific.md b/new_samples/delaystart/generator/README_specific.md new file mode 100644 index 0000000..ba6fe24 --- /dev/null +++ b/new_samples/delaystart/generator/README_specific.md @@ -0,0 +1,44 @@ +## How It Works + +This sample demonstrates deferred workflow execution using `DelayStart` option: + +``` +workflow start --delay_start 30s + │ + ▼ +┌───────────────────┐ +│ Workflow waits │ ← Cadence delays start by 30s +│ in pending state │ +└───────────────────┘ + │ + ▼ (after delay) +┌───────────────────┐ +│DelayStartWorkflow │ +│ │ │ +│ ▼ │ +│DelayStartActivity │ +└───────────────────┘ +``` + +The delay is handled by Cadence, not by the workflow code. + +## Running the Sample + +Start the worker: +```bash +go run . +``` + +Start a workflow with 30-second delay: +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --workflow_type cadence_samples.DelayStartWorkflow \ + --tl cadence-samples-worker \ + --et 600 \ + --delay_start 30s \ + --input '"30s"' +``` + +The workflow will remain in "pending" state for 30 seconds before starting. diff --git a/new_samples/delaystart/generator/generate.go b/new_samples/delaystart/generator/generate.go new file mode 100644 index 0000000..39090cf --- /dev/null +++ b/new_samples/delaystart/generator/generate.go @@ -0,0 +1,12 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Delay Start", + Workflows: []string{"DelayStartWorkflow"}, + Activities: []string{"DelayStartActivity"}, + } + template.GenerateAll(data) +} diff --git a/new_samples/delaystart/main.go b/new_samples/delaystart/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/delaystart/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/delaystart/worker.go b/new_samples/delaystart/worker.go new file mode 100644 index 0000000..dbfd99b --- /dev/null +++ b/new_samples/delaystart/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(DelayStartWorkflow, workflow.RegisterOptions{Name: "cadence_samples.DelayStartWorkflow"}) + w.RegisterActivityWithOptions(DelayStartActivity, activity.RegisterOptions{Name: "cadence_samples.DelayStartActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/dynamic/dynamic.go b/new_samples/dynamic/dynamic.go new file mode 100644 index 0000000..946726e --- /dev/null +++ b/new_samples/dynamic/dynamic.go @@ -0,0 +1,79 @@ +package main + +import ( + "fmt" + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +/** + * The purpose of this sample is to demonstrate invocation of workflows and activities using name rather than strongly + * typed function. + */ + +// ApplicationName is the task list for this sample +const ApplicationName = "dynamicGroup" + +// GreetingsWorkflowName name used when workflow function is registered during init. We use the fully qualified name to function +const GreetingsWorkflowName = "main.DynamicGreetingsWorkflow" + +// Activity names used when activity function is registered during init. We use the fully qualified name to function +const GetNameActivityName = "main.GetNameActivity" +const GetGreetingActivityName = "main.GetGreetingActivity" +const SayGreetingActivityName = "main.SayGreetingActivity" + +// DynamicGreetingsWorkflow Workflow Decider. +func DynamicGreetingsWorkflow(ctx workflow.Context) error { + // Get Greeting. + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + logger := workflow.GetLogger(ctx) + var greetResult string + err := workflow.ExecuteActivity(ctx, GetGreetingActivityName).Get(ctx, &greetResult) + if err != nil { + logger.Error("Get greeting failed.", zap.Error(err)) + return err + } + + // Get Name. + var nameResult string + err = workflow.ExecuteActivity(ctx, GetNameActivityName).Get(ctx, &nameResult) + if err != nil { + logger.Error("Get name failed.", zap.Error(err)) + return err + } + + // Say Greeting. + var sayResult string + err = workflow.ExecuteActivity(ctx, SayGreetingActivityName, greetResult, nameResult).Get(ctx, &sayResult) + if err != nil { + logger.Error("Marshalling failed with error.", zap.Error(err)) + return err + } + + logger.Info("Workflow completed.", zap.String("Result", sayResult)) + return nil +} + +// Get Name Activity. +func GetNameActivity() (string, error) { + return "Cadence", nil +} + +// Get Greeting Activity. +func GetGreetingActivity() (string, error) { + return "Hello", nil +} + +// Say Greeting Activity. +func SayGreetingActivity(greeting string, name string) (string, error) { + result := fmt.Sprintf("Greeting: %s %s!\n", greeting, name) + return result, nil +} diff --git a/new_samples/dynamic/generator/README.md b/new_samples/dynamic/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/dynamic/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/dynamic/generator/README_specific.md b/new_samples/dynamic/generator/README_specific.md new file mode 100644 index 0000000..6b5e613 --- /dev/null +++ b/new_samples/dynamic/generator/README_specific.md @@ -0,0 +1,49 @@ +## How It Works + +This sample demonstrates invoking activities by **string name** rather than function reference: + +```go +// Instead of: +workflow.ExecuteActivity(ctx, GetGreetingActivity) + +// Use string name: +workflow.ExecuteActivity(ctx, "main.getGreetingActivity") +``` + +This enables: +- Plugin architectures where activities are loaded at runtime +- Configuration-driven workflows +- Cross-language activity invocation + +``` +┌─────────────────────────┐ +│ DynamicGreetingsWorkflow│ +│ │ +│ ExecuteActivity(ctx, │ +│ "main.getGreeting") │──▶ GetGreetingActivity +│ │ │ +│ ExecuteActivity(ctx, │ +│ "main.getName") │──▶ GetNameActivity +│ │ │ +│ ExecuteActivity(ctx, │ +│ "main.sayGreeting") │──▶ SayGreetingActivity +└─────────────────────────┘ +``` + +## Running the Sample + +Start the worker: +```bash +go run . +``` + +Trigger the workflow: +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --workflow_type cadence_samples.DynamicGreetingsWorkflow \ + --tl cadence-samples-worker \ + --et 60 +``` + diff --git a/new_samples/dynamic/generator/generate.go b/new_samples/dynamic/generator/generate.go new file mode 100644 index 0000000..9f340c0 --- /dev/null +++ b/new_samples/dynamic/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Dynamic Invocation", + Workflows: []string{"DynamicGreetingsWorkflow"}, + Activities: []string{"GetNameActivity", "GetGreetingActivity", "SayGreetingActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/dynamic/main.go b/new_samples/dynamic/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/dynamic/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/dynamic/worker.go b/new_samples/dynamic/worker.go new file mode 100644 index 0000000..1d3e88e --- /dev/null +++ b/new_samples/dynamic/worker.go @@ -0,0 +1,103 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(DynamicGreetingsWorkflow, workflow.RegisterOptions{Name: "cadence_samples.DynamicGreetingsWorkflow"}) + w.RegisterActivityWithOptions(GetNameActivity, activity.RegisterOptions{Name: "cadence_samples.GetNameActivity"}) + w.RegisterActivityWithOptions(GetGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.GetGreetingActivity"}) + w.RegisterActivityWithOptions(SayGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.SayGreetingActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/greetings/generator/README.md b/new_samples/greetings/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/greetings/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/greetings/generator/README_specific.md b/new_samples/greetings/generator/README_specific.md new file mode 100644 index 0000000..e72eae1 --- /dev/null +++ b/new_samples/greetings/generator/README_specific.md @@ -0,0 +1,24 @@ +## Greetings Workflow Sample + +This sample demonstrates **sequential activity execution**. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.GreetingsWorkflow +``` + +### Key Concept: Sequential Execution + +```go +// Each .Get() blocks until complete +greeting, _ := workflow.ExecuteActivity(ctx, GetGreetingActivity).Get(ctx, &greeting) +name, _ := workflow.ExecuteActivity(ctx, GetNameActivity).Get(ctx, &name) +workflow.ExecuteActivity(ctx, SayGreetingActivity, greeting, name).Get(ctx, &result) +``` + diff --git a/new_samples/greetings/generator/generate.go b/new_samples/greetings/generator/generate.go new file mode 100644 index 0000000..e4846d9 --- /dev/null +++ b/new_samples/greetings/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Greetings", + Workflows: []string{"GreetingsWorkflow"}, + Activities: []string{"GetGreetingActivity", "GetNameActivity", "SayGreetingActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/greetings/greetings.go b/new_samples/greetings/greetings.go new file mode 100644 index 0000000..28b10cb --- /dev/null +++ b/new_samples/greetings/greetings.go @@ -0,0 +1,46 @@ +package main + +import ( + "fmt" + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// GreetingsWorkflow executes activities sequentially. +func GreetingsWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("GreetingsWorkflow started") + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var greeting string + if err := workflow.ExecuteActivity(ctx, GetGreetingActivity).Get(ctx, &greeting); err != nil { + return err + } + + var name string + if err := workflow.ExecuteActivity(ctx, GetNameActivity).Get(ctx, &name); err != nil { + return err + } + + var result string + if err := workflow.ExecuteActivity(ctx, SayGreetingActivity, greeting, name).Get(ctx, &result); err != nil { + return err + } + + logger.Info("GreetingsWorkflow completed", zap.String("result", result)) + return nil +} + +func GetGreetingActivity() (string, error) { return "Hello", nil } +func GetNameActivity() (string, error) { return "Cadence", nil } +func SayGreetingActivity(greeting, name string) (string, error) { + return fmt.Sprintf("%s %s!", greeting, name), nil +} + diff --git a/new_samples/greetings/main.go b/new_samples/greetings/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/greetings/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/greetings/worker.go b/new_samples/greetings/worker.go new file mode 100644 index 0000000..0eb049e --- /dev/null +++ b/new_samples/greetings/worker.go @@ -0,0 +1,103 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(GreetingsWorkflow, workflow.RegisterOptions{Name: "cadence_samples.GreetingsWorkflow"}) + w.RegisterActivityWithOptions(GetGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.GetGreetingActivity"}) + w.RegisterActivityWithOptions(GetNameActivity, activity.RegisterOptions{Name: "cadence_samples.GetNameActivity"}) + w.RegisterActivityWithOptions(SayGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.SayGreetingActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/pickfirst/generator/README.md b/new_samples/pickfirst/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/pickfirst/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/pickfirst/generator/README_specific.md b/new_samples/pickfirst/generator/README_specific.md new file mode 100644 index 0000000..7bb0fdb --- /dev/null +++ b/new_samples/pickfirst/generator/README_specific.md @@ -0,0 +1,25 @@ +## Pick First Workflow Sample + +This sample demonstrates **racing activities** and using the first result. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 60 \ + --workflow_type cadence_samples.PickFirstWorkflow +``` + +### Key Concept: Race and Cancel + +```go +childCtx, cancelHandler := workflow.WithCancel(ctx) +selector := workflow.NewSelector(ctx) +selector.AddFuture(f1, handler).AddFuture(f2, handler) +selector.Select(ctx) // Wait for first +cancelHandler() // Cancel others +``` + diff --git a/new_samples/pickfirst/generator/generate.go b/new_samples/pickfirst/generator/generate.go new file mode 100644 index 0000000..07a697e --- /dev/null +++ b/new_samples/pickfirst/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Pick First", + Workflows: []string{"PickFirstWorkflow"}, + Activities: []string{"RaceActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/pickfirst/main.go b/new_samples/pickfirst/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/pickfirst/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/pickfirst/pickfirst.go b/new_samples/pickfirst/pickfirst.go new file mode 100644 index 0000000..398ff43 --- /dev/null +++ b/new_samples/pickfirst/pickfirst.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "fmt" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" +) + +// PickFirstWorkflow races activities and uses first result. +func PickFirstWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("PickFirstWorkflow started") + + childCtx, cancelHandler := workflow.WithCancel(ctx) + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + WaitForCancellation: true, + } + childCtx = workflow.WithActivityOptions(childCtx, ao) + + selector := workflow.NewSelector(ctx) + var firstResponse string + + f1 := workflow.ExecuteActivity(childCtx, RaceActivity, 0, time.Second*2) + f2 := workflow.ExecuteActivity(childCtx, RaceActivity, 1, time.Second*10) + pendingFutures := []workflow.Future{f1, f2} + + selector.AddFuture(f1, func(f workflow.Future) { f.Get(ctx, &firstResponse) }) + selector.AddFuture(f2, func(f workflow.Future) { f.Get(ctx, &firstResponse) }) + + selector.Select(ctx) + cancelHandler() + + for _, f := range pendingFutures { + f.Get(ctx, nil) + } + + logger.Info("PickFirstWorkflow completed") + return nil +} + +// RaceActivity runs for specified duration with heartbeats. +func RaceActivity(ctx context.Context, branchID int, duration time.Duration) (string, error) { + elapsed := time.Duration(0) + for elapsed < duration { + time.Sleep(time.Second) + elapsed += time.Second + activity.RecordHeartbeat(ctx, "progress") + select { + case <-ctx.Done(): + return fmt.Sprintf("Branch %d cancelled", branchID), ctx.Err() + default: + } + } + return fmt.Sprintf("Branch %d done in %s", branchID, duration), nil +} + diff --git a/new_samples/pickfirst/worker.go b/new_samples/pickfirst/worker.go new file mode 100644 index 0000000..8b69135 --- /dev/null +++ b/new_samples/pickfirst/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(PickFirstWorkflow, workflow.RegisterOptions{Name: "cadence_samples.PickFirstWorkflow"}) + w.RegisterActivityWithOptions(RaceActivity, activity.RegisterOptions{Name: "cadence_samples.RaceActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/signalcounter/generator/README.md b/new_samples/signalcounter/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/signalcounter/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/signalcounter/generator/README_specific.md b/new_samples/signalcounter/generator/README_specific.md new file mode 100644 index 0000000..7482ff0 --- /dev/null +++ b/new_samples/signalcounter/generator/README_specific.md @@ -0,0 +1,65 @@ +## How It Works + +This sample demonstrates handling multiple signals with **ContinueAsNew** to prevent history bloat: + +``` +┌──────────────────────────┐ +│ SignalCounterWorkflow │ +│ │ +│ ┌────────────────────┐ │ +│ │ Listen on channelA │◀─┼── signal channelA --input 5 +│ │ Listen on channelB │◀─┼── signal channelB --input 10 +│ └─────────┬──────────┘ │ +│ │ │ +│ counter += signal value │ +│ │ │ +│ if signals >= 3: │ +│ ContinueAsNew ──────┼──▶ New execution with counter +│ │ +└──────────────────────────┘ +``` + +Key concepts: +- **Multiple signal channels**: Workflow listens on both channelA and channelB +- **ContinueAsNew**: Restarts workflow with current state to prevent history growth +- **MaxSignalsPerExecution**: Limits signals before ContinueAsNew (default: 3 for demo) + +## Running the Sample + +Start the worker: +```bash +go run . +``` + +Start the workflow: +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --workflow_type cadence_samples.SignalCounterWorkflow \ + --tl cadence-samples-worker \ + --et 600 \ + --input '0' +``` + +Send signals (copy workflow ID from above): +```bash +# Signal on channelA +cadence --env development \ + --domain cadence-samples \ + workflow signal \ + --wid \ + --name channelA \ + --input '5' + +# Signal on channelB +cadence --env development \ + --domain cadence-samples \ + workflow signal \ + --wid \ + --name channelB \ + --input '10' +``` + +After 3 signals, the workflow will ContinueAsNew with the accumulated counter. + diff --git a/new_samples/signalcounter/generator/generate.go b/new_samples/signalcounter/generator/generate.go new file mode 100644 index 0000000..ddaf9d6 --- /dev/null +++ b/new_samples/signalcounter/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Signal Counter", + Workflows: []string{"SignalCounterWorkflow"}, + Activities: []string{}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/signalcounter/main.go b/new_samples/signalcounter/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/signalcounter/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/signalcounter/signalcounter.go b/new_samples/signalcounter/signalcounter.go new file mode 100644 index 0000000..675e377 --- /dev/null +++ b/new_samples/signalcounter/signalcounter.go @@ -0,0 +1,65 @@ +package main + +import ( + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +/** + * This sample workflow continuously counting signals and do continue as new + */ + +// ApplicationName is the task list for this sample +const ApplicationName = "signal_counter" + +// A workflow execution cannot receive infinite number of signals due to history limit +// By default 10000 is MaximumSignalsPerExecution which can be configured by DynamicConfig of Cadence cluster. +// But it's recommended to do continueAsNew after receiving certain number of signals(in production, use a number <1000) +const maxSignalsPerExecution = 3 + +// SignalCounterWorkflow Workflow Decider. +func SignalCounterWorkflow(ctx workflow.Context, counter int) error { + + var drainedAllSignals bool + signalsPerExecution := 0 + + logger := workflow.GetLogger(ctx) + logger.Info("Started SignalCounterWorkflow") + + for { + s := workflow.NewSelector(ctx) + s.AddReceive(workflow.GetSignalChannel(ctx, "channelA"), func(c workflow.Channel, ok bool) { + if ok { + var i int + c.Receive(ctx, &i) + counter += i + signalsPerExecution += 1 + logger.Info("Received signal on channelA.", zap.Int("Counter", i)) + } + }) + s.AddReceive(workflow.GetSignalChannel(ctx, "channelB"), func(c workflow.Channel, ok bool) { + if ok { + var i int + c.Receive(ctx, &i) + counter += i + signalsPerExecution += 1 + logger.Info("Received signal on channelB.", zap.Int("Counter", i)) + } + }) + + if signalsPerExecution >= maxSignalsPerExecution { + s.AddDefault(func() { + // this indicate that we have drained all signals within the decision task, and it's safe to do a continueAsNew + drainedAllSignals = true + logger.Info("Reached maxSignalsPerExecution limit") + }) + } + + s.Select(ctx) + + if drainedAllSignals { + logger.Info("Returning ContinueAsNewError") + return workflow.NewContinueAsNewError(ctx, SignalCounterWorkflow, counter) + } + } +} diff --git a/new_samples/signalcounter/worker.go b/new_samples/signalcounter/worker.go new file mode 100644 index 0000000..361e6ac --- /dev/null +++ b/new_samples/signalcounter/worker.go @@ -0,0 +1,99 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(SignalCounterWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SignalCounterWorkflow"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/splitmerge/generator/README.md b/new_samples/splitmerge/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/splitmerge/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/splitmerge/generator/README_specific.md b/new_samples/splitmerge/generator/README_specific.md new file mode 100644 index 0000000..7abe200 --- /dev/null +++ b/new_samples/splitmerge/generator/README_specific.md @@ -0,0 +1,51 @@ +## How It Works + +This sample demonstrates the **split-merge** pattern using Cadence coroutines: + +``` + ┌─────────────────┐ + │SplitMergeWorkflow│ + │ (workerCount=3) │ + └────────┬────────┘ + │ + ┌──────────────┼──────────────┐ + ▼ ▼ ▼ + workflow.Go workflow.Go workflow.Go + │ │ │ + ▼ ▼ ▼ + ┌─────────┐ ┌─────────┐ ┌─────────┐ + │ Chunk 1 │ │ Chunk 2 │ │ Chunk 3 │ + │Activity │ │Activity │ │Activity │ + └────┬────┘ └────┬────┘ └────┬────┘ + │ │ │ + └──────────────┼──────────────┘ + ▼ + ┌─────────────────┐ + │ Merge Results │ + │ (totalSum, etc) │ + └─────────────────┘ +``` + +Key concepts: +- **workflow.Go**: Launch concurrent coroutines (NOT goroutines) +- **workflow.NewChannel**: Create channels for coroutine communication +- Results are collected and merged after all chunks complete + +## Running the Sample + +Start the worker: +```bash +go run . +``` + +Trigger the workflow with 3 parallel workers: +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --workflow_type cadence_samples.SplitMergeWorkflow \ + --tl cadence-samples-worker \ + --et 60 \ + --input '3' +``` + diff --git a/new_samples/splitmerge/generator/generate.go b/new_samples/splitmerge/generator/generate.go new file mode 100644 index 0000000..64008fc --- /dev/null +++ b/new_samples/splitmerge/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Split Merge", + Workflows: []string{"SplitMergeWorkflow"}, + Activities: []string{"ChunkProcessingActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/splitmerge/main.go b/new_samples/splitmerge/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/splitmerge/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/splitmerge/splitmerge.go b/new_samples/splitmerge/splitmerge.go new file mode 100644 index 0000000..2cd9d7a --- /dev/null +++ b/new_samples/splitmerge/splitmerge.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +/** +* This sample workflow demonstrates how to use multiple Cadence corotinues (instead of native goroutine) to process a +* chunk of a large work item in parallel, and then merge the intermediate result to generate the final result. +* In cadence workflow, you should not use go routine. Instead, you use corotinue via workflow.Go method. + */ + +// ApplicationName is the task list for this sample +const ApplicationName = "splitmergeGroup" + +type ( + // ChunkResult contains the result for this sample + ChunkResult struct { + NumberOfItemsInChunk int + SumInChunk int + } +) + +// SplitMergeWorkflow workflow decider +func SplitMergeWorkflow(ctx workflow.Context, workerCount int) (ChunkResult, error) { + chunkResultChannel := workflow.NewChannel(ctx) + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + for i := 1; i <= workerCount; i++ { + chunkID := i + workflow.Go(ctx, func(ctx workflow.Context) { + var result ChunkResult + err := workflow.ExecuteActivity(ctx, ChunkProcessingActivity, chunkID).Get(ctx, &result) + if err == nil { + chunkResultChannel.Send(ctx, result) + } else { + chunkResultChannel.Send(ctx, err) + } + }) + } + + var totalItemCount, totalSum int + for i := 1; i <= workerCount; i++ { + var v interface{} + chunkResultChannel.Receive(ctx, &v) + switch r := v.(type) { + case error: + // failed to process this chunk + // some proper error handling code here + case ChunkResult: + totalItemCount += r.NumberOfItemsInChunk + totalSum += r.SumInChunk + } + } + + workflow.GetLogger(ctx).Info("Workflow completed.") + + return ChunkResult{totalItemCount, totalSum}, nil +} + +func ChunkProcessingActivity(ctx context.Context, chunkID int) (result ChunkResult, err error) { + // some fake processing logic here + numberOfItemsInChunk := chunkID + sumInChunk := chunkID * chunkID + + activity.GetLogger(ctx).Info("Chunck processed", zap.Int("chunkID", chunkID)) + return ChunkResult{numberOfItemsInChunk, sumInChunk}, nil +} diff --git a/new_samples/splitmerge/worker.go b/new_samples/splitmerge/worker.go new file mode 100644 index 0000000..12fb83c --- /dev/null +++ b/new_samples/splitmerge/worker.go @@ -0,0 +1,101 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(SplitMergeWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SplitMergeWorkflow"}) + w.RegisterActivityWithOptions(ChunkProcessingActivity, activity.RegisterOptions{Name: "cadence_samples.ChunkProcessingActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +} diff --git a/new_samples/timer/generator/README.md b/new_samples/timer/generator/README.md new file mode 100644 index 0000000..1da3502 --- /dev/null +++ b/new_samples/timer/generator/README.md @@ -0,0 +1,23 @@ + + + +# Sample Generator + +This folder is NOT part of the actual sample. It exists only for contributors who work on this sample. Please disregard it if you are trying to learn about Cadence. + +To create a better learning experience for Cadence users, each sample folder is designed to be self contained. Users can view every part of writing and running workflows, including: + +* Cadence client initialization +* Worker with workflow and activity registrations +* Workflow starter +* and the workflow code itself + +Some samples may have more or fewer parts depending on what they need to demonstrate. + +In most cases, the workflow code (e.g. `workflow.go`) is the part that users care about. The rest is boilerplate needed to run that workflow. For each sample folder, the workflow code should be written by hand. The boilerplate can be generated. Keeping all parts inside one folder gives early learners more value because they can see everything together rather than jumping across directories. + +## Contributing + +* When creating a new sample, follow the steps mentioned in the README file in the main samples folder. +* To update the sample workflow code, edit the workflow file directly. +* To update the worker, client, or other boilerplate logic, edit the generator file. If your change applies to all samples, update the common generator file inside the `template` folder. Edit the generator file in this folder only when the change should affect this sample alone. diff --git a/new_samples/timer/generator/README_specific.md b/new_samples/timer/generator/README_specific.md new file mode 100644 index 0000000..92dd2bd --- /dev/null +++ b/new_samples/timer/generator/README_specific.md @@ -0,0 +1,32 @@ +## Timer Workflow Sample + +This sample demonstrates **timers** for timeouts and delayed notifications. + +### Start the Workflow + +```bash +cadence --env development \ + --domain cadence-samples \ + workflow start \ + --tl cadence-samples-worker \ + --et 120 \ + --workflow_type cadence_samples.TimerWorkflow \ + --input '5000000000' +``` + +### Key Concept: Timer with Selector + +```go +childCtx, cancelHandler := workflow.WithCancel(ctx) +timerFuture := workflow.NewTimer(childCtx, threshold) + +selector := workflow.NewSelector(ctx) +selector.AddFuture(activityFuture, func(f workflow.Future) { + cancelHandler() // Cancel timer if activity completes first +}) +selector.AddFuture(timerFuture, func(f workflow.Future) { + // Timer fired - send notification +}) +selector.Select(ctx) +``` + diff --git a/new_samples/timer/generator/generate.go b/new_samples/timer/generator/generate.go new file mode 100644 index 0000000..82991a7 --- /dev/null +++ b/new_samples/timer/generator/generate.go @@ -0,0 +1,13 @@ +package main + +import "github.com/uber-common/cadence-samples/new_samples/template" + +func main() { + data := template.TemplateData{ + SampleName: "Timer", + Workflows: []string{"TimerWorkflow"}, + Activities: []string{"OrderProcessingActivity", "SendEmailActivity"}, + } + template.GenerateAll(data) +} + diff --git a/new_samples/timer/main.go b/new_samples/timer/main.go new file mode 100644 index 0000000..5893999 --- /dev/null +++ b/new_samples/timer/main.go @@ -0,0 +1,20 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + StartWorker() + + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT) + fmt.Println("Cadence worker started, press ctrl+c to terminate...") + <-done +} diff --git a/new_samples/timer/timer.go b/new_samples/timer/timer.go new file mode 100644 index 0000000..45be4fd --- /dev/null +++ b/new_samples/timer/timer.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// TimerWorkflow demonstrates using timers for timeout notifications. +func TimerWorkflow(ctx workflow.Context, processingTimeThreshold time.Duration) error { + logger := workflow.GetLogger(ctx) + logger.Info("TimerWorkflow started") + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + childCtx, cancelHandler := workflow.WithCancel(ctx) + selector := workflow.NewSelector(ctx) + + var processingDone bool + f := workflow.ExecuteActivity(ctx, OrderProcessingActivity) + selector.AddFuture(f, func(f workflow.Future) { + processingDone = true + cancelHandler() + }) + + timerFuture := workflow.NewTimer(childCtx, processingTimeThreshold) + selector.AddFuture(timerFuture, func(f workflow.Future) { + if !processingDone { + workflow.ExecuteActivity(ctx, SendEmailActivity).Get(ctx, nil) + } + }) + + selector.Select(ctx) + if !processingDone { + selector.Select(ctx) + } + + logger.Info("TimerWorkflow completed") + return nil +} + +// OrderProcessingActivity simulates order processing (random 0-10s). +func OrderProcessingActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + duration := time.Second * time.Duration(rand.Intn(10)) + logger.Info("OrderProcessingActivity started", zap.Duration("duration", duration)) + time.Sleep(duration) + return nil +} + +// SendEmailActivity sends notification when processing takes too long. +func SendEmailActivity(ctx context.Context) error { + activity.GetLogger(ctx).Info("SendEmailActivity: Sending delay notification") + return nil +} + diff --git a/new_samples/timer/worker.go b/new_samples/timer/worker.go new file mode 100644 index 0000000..aa716cb --- /dev/null +++ b/new_samples/timer/worker.go @@ -0,0 +1,102 @@ +// THIS IS A GENERATED FILE +// PLEASE DO NOT EDIT + +// Package worker implements a Cadence worker with basic configurations. +package main + +import ( + "github.com/uber-go/tally" + apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/compatibility" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/yarpc" + "go.uber.org/yarpc/peer" + yarpchostport "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + HostPort = "127.0.0.1:7833" + Domain = "cadence-samples" + // TaskListName identifies set of client workflows, activities, and workers. + // It could be your group or client or application name. + TaskListName = "cadence-samples-worker" + ClientName = "cadence-samples-worker" + CadenceService = "cadence-frontend" +) + +// StartWorker creates and starts a basic Cadence worker. +func StartWorker() { + logger, cadenceClient := BuildLogger(), BuildCadenceClient() + workerOptions := worker.Options{ + Logger: logger, + MetricsScope: tally.NewTestScope(TaskListName, nil), + } + + w := worker.New( + cadenceClient, + Domain, + TaskListName, + workerOptions) + // HelloWorld workflow registration + w.RegisterWorkflowWithOptions(TimerWorkflow, workflow.RegisterOptions{Name: "cadence_samples.TimerWorkflow"}) + w.RegisterActivityWithOptions(OrderProcessingActivity, activity.RegisterOptions{Name: "cadence_samples.OrderProcessingActivity"}) + w.RegisterActivityWithOptions(SendEmailActivity, activity.RegisterOptions{Name: "cadence_samples.SendEmailActivity"}) + + err := w.Start() + if err != nil { + panic("Failed to start worker: " + err.Error()) + } + logger.Info("Started Worker.", zap.String("worker", TaskListName)) + +} + +func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface { + grpcTransport := grpc.NewTransport() + // Create a single peer chooser that identifies the host/port and configures + // a gRPC dialer with TLS credentials + myChooser := peer.NewSingle( + yarpchostport.Identify(HostPort), + grpcTransport.NewDialer(dialOptions...), + ) + outbound := grpcTransport.NewOutbound(myChooser) + + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: ClientName, + Outbounds: yarpc.Outbounds{ + CadenceService: {Unary: outbound}, + }, + }) + if err := dispatcher.Start(); err != nil { + panic("Failed to start dispatcher: " + err.Error()) + } + + clientConfig := dispatcher.ClientConfig(CadenceService) + + // Create a compatibility adapter that wraps proto-based YARPC clients + // to provide a unified interface for domain, workflow, worker, and visibility APIs + return compatibility.NewThrift2ProtoAdapter( + apiv1.NewDomainAPIYARPCClient(clientConfig), + apiv1.NewWorkflowAPIYARPCClient(clientConfig), + apiv1.NewWorkerAPIYARPCClient(clientConfig), + apiv1.NewVisibilityAPIYARPCClient(clientConfig), + ) +} + +func BuildLogger() *zap.Logger { + config := zap.NewDevelopmentConfig() + config.Level.SetLevel(zapcore.InfoLevel) + + var err error + logger, err := config.Build() + if err != nil { + panic("Failed to setup logger: " + err.Error()) + } + + return logger +}