From ad2c6e5ec4f0b355af4419207dad506962f564d0 Mon Sep 17 00:00:00 2001 From: gblanc-1a Date: Thu, 2 Oct 2025 12:25:29 +0200 Subject: [PATCH] fix: add parallel notification delivery Implements parallel notification delivery with configurable concurrency Solve performance issues described in #146 and slow or failing destinations Adds maxConcurrentNotifications config option (default: 10) Signed-off-by: gblanc-1a --- README.md | 8 + pkg/api/config.go | 39 ++- pkg/api/config_test.go | 74 +++++ pkg/controller/controller.go | 161 +++++++--- pkg/controller/controller_test.go | 494 +++++++++++++++++++++++++++++- 5 files changed, 730 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index fbc2dd30..2f18dad8 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,13 @@ data: service.slack: | token: $slack-token + + # Optional: Maximum concurrent notification deliveries (default: 10) + # Controls parallel processing of notifications to multiple destinations. + # Higher values speed up delivery when destinations are slow/timing out but increase resource usage. + # Recommended: 10-25 (small/medium clusters), 50-100 (large clusters) + # Note: Some services have rate limits; start low and increase if needed. + maxConcurrentNotifications: "25" --- apiVersion: v1 kind: Secret @@ -76,6 +83,7 @@ notifications.argoproj.io/subscriptions: | - service: slack recipients: [my-channel-21, my-channel-22] ``` + ## Getting Started Ready to add notifications to your project? Check out sample notifications for [cert-manager](./examples/certmanager/README.md) diff --git a/pkg/api/config.go b/pkg/api/config.go index b9d21301..cf1f3a19 100644 --- a/pkg/api/config.go +++ b/pkg/api/config.go @@ -29,8 +29,10 @@ type Config struct { DefaultTriggers []string // ServiceDefaultTriggers holds list of default triggers per service ServiceDefaultTriggers map[string][]string - Namespace string - IsSelfServiceConfig bool + // MaxConcurrentNotifications is the maximum number of notifications to send concurrently (default: 10) + MaxConcurrentNotifications int + Namespace string + IsSelfServiceConfig bool } // Returns list of destinations for the specified trigger @@ -71,14 +73,20 @@ func replaceStringSecret(val string, secretValues map[string][]byte) string { }) } +const ( + // DefaultMaxConcurrentNotifications is the default maximum number of concurrent notification deliveries + DefaultMaxConcurrentNotifications = 10 +) + // ParseConfig retrieves Config from given ConfigMap and Secret func ParseConfig(configMap *corev1.ConfigMap, secret *corev1.Secret) (*Config, error) { cfg := Config{ - Services: map[string]ServiceFactory{}, - Triggers: map[string][]triggers.Condition{}, - ServiceDefaultTriggers: map[string][]string{}, - Templates: map[string]services.Notification{}, - Namespace: configMap.Namespace, + Services: map[string]ServiceFactory{}, + Triggers: map[string][]triggers.Condition{}, + ServiceDefaultTriggers: map[string][]string{}, + Templates: map[string]services.Notification{}, + Namespace: configMap.Namespace, + MaxConcurrentNotifications: DefaultMaxConcurrentNotifications, } if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok { if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil { @@ -92,6 +100,23 @@ func ParseConfig(configMap *corev1.ConfigMap, secret *corev1.Secret) (*Config, e } } + if maxConcurrentYaml, ok := configMap.Data["maxConcurrentNotifications"]; ok { + var maxConcurrent int + if err := yaml.Unmarshal([]byte(maxConcurrentYaml), &maxConcurrent); err != nil { + log.Warnf("Invalid maxConcurrentNotifications value '%s' (must be a positive integer), using default: %d", maxConcurrentYaml, DefaultMaxConcurrentNotifications) + } else { + switch { + case maxConcurrent <= 0: + log.Warnf("maxConcurrentNotifications must be positive, got %d, using default: %d", maxConcurrent, DefaultMaxConcurrentNotifications) + case maxConcurrent > 1000: + log.Warnf("maxConcurrentNotifications value %d is very high (>1000), consider using a lower value", maxConcurrent) + cfg.MaxConcurrentNotifications = maxConcurrent + default: + cfg.MaxConcurrentNotifications = maxConcurrent + } + } + } + for k, v := range configMap.Data { parts := strings.Split(k, ".") switch { diff --git a/pkg/api/config_test.go b/pkg/api/config_test.go index fedaf98b..857f254e 100644 --- a/pkg/api/config_test.go +++ b/pkg/api/config_test.go @@ -62,6 +62,80 @@ func TestParseConfig_DefaultServiceTriggers(t *testing.T) { }, cfg.ServiceDefaultTriggers) } +func TestParseConfig_MaxConcurrentNotifications(t *testing.T) { + tests := []struct { + name string + data map[string]string + expected int + }{ + { + name: "valid positive value", + data: map[string]string{ + "maxConcurrentNotifications": "25", + }, + expected: 25, + }, + { + name: "valid value 100", + data: map[string]string{ + "maxConcurrentNotifications": "100", + }, + expected: 100, + }, + { + name: "not set uses default", + data: map[string]string{}, + expected: DefaultMaxConcurrentNotifications, + }, + { + name: "zero uses default", + data: map[string]string{ + "maxConcurrentNotifications": "0", + }, + expected: DefaultMaxConcurrentNotifications, + }, + { + name: "negative uses default", + data: map[string]string{ + "maxConcurrentNotifications": "-10", + }, + expected: DefaultMaxConcurrentNotifications, + }, + { + name: "invalid non-numeric uses default", + data: map[string]string{ + "maxConcurrentNotifications": "invalid", + }, + expected: DefaultMaxConcurrentNotifications, + }, + { + name: "empty string uses default", + data: map[string]string{ + "maxConcurrentNotifications": "", + }, + expected: DefaultMaxConcurrentNotifications, + }, + { + name: "very high value is accepted with warning", + data: map[string]string{ + "maxConcurrentNotifications": "1500", + }, + expected: 1500, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg, err := ParseConfig(&corev1.ConfigMap{Data: tt.data}, emptySecret) + if !assert.NoError(t, err) { + return + } + assert.Equal(t, tt.expected, cfg.MaxConcurrentNotifications, + "MaxConcurrentNotifications should match expected value") + }) + } +} + func TestReplaceStringSecret_KeyPresent(t *testing.T) { val := replaceStringSecret("hello $secret-value", map[string][]byte{ "secret-value": []byte("world"), diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5439cd78..909d3c89 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "runtime/debug" + "sync" "time" log "github.com/sirupsen/logrus" @@ -21,6 +22,7 @@ import ( "github.com/argoproj/notifications-engine/pkg/api" "github.com/argoproj/notifications-engine/pkg/services" "github.com/argoproj/notifications-engine/pkg/subscriptions" + "github.com/argoproj/notifications-engine/pkg/triggers" ) // NotificationDelivery represents a notification that was delivered @@ -98,6 +100,17 @@ func WithEventCallback(f func(eventSequence NotificationEventSequence)) Opts { } } +// WithMaxConcurrentNotifications sets the maximum number of concurrent notification +// deliveries per resource. This helps prevent resource exhaustion when sending +// notifications to many destinations. Default is 10 if not specified. +func WithMaxConcurrentNotifications(maxConcurrent int) Opts { + return func(ctrl *notificationController) { + if maxConcurrent > 0 { + ctrl.maxConcurrentNotifications = maxConcurrent + } + } +} + func NewController( client dynamic.NamespaceableResourceInterface, informer cache.SharedIndexInformer, @@ -123,11 +136,12 @@ func NewController( ) ctrl := ¬ificationController{ - client: client, - informer: informer, - queue: queue, - metricsRegistry: NewMetricsRegistry(""), - apiFactory: apiFactory, + client: client, + informer: informer, + queue: queue, + metricsRegistry: NewMetricsRegistry(""), + apiFactory: apiFactory, + maxConcurrentNotifications: api.DefaultMaxConcurrentNotifications, toUnstructured: func(obj metav1.Object) (*unstructured.Unstructured, error) { res, ok := obj.(*unstructured.Unstructured) if !ok { @@ -155,16 +169,17 @@ func NewControllerWithNamespaceSupport( } type notificationController struct { - client dynamic.NamespaceableResourceInterface - informer cache.SharedIndexInformer - queue workqueue.TypedRateLimitingInterface[string] - apiFactory api.Factory - metricsRegistry *MetricsRegistry - skipProcessing func(obj metav1.Object) (bool, string) - alterDestinations func(obj metav1.Object, destinations services.Destinations, cfg api.Config) services.Destinations - toUnstructured func(obj metav1.Object) (*unstructured.Unstructured, error) - eventCallback func(eventSequence NotificationEventSequence) - namespaceSupport bool + client dynamic.NamespaceableResourceInterface + informer cache.SharedIndexInformer + queue workqueue.TypedRateLimitingInterface[string] + apiFactory api.Factory + metricsRegistry *MetricsRegistry + skipProcessing func(obj metav1.Object) (bool, string) + alterDestinations func(obj metav1.Object, destinations services.Destinations, cfg api.Config) services.Destinations + toUnstructured func(obj metav1.Object) (*unstructured.Unstructured, error) + eventCallback func(eventSequence NotificationEventSequence) + namespaceSupport bool + maxConcurrentNotifications int } func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { @@ -187,22 +202,81 @@ func (c *notificationController) isSelfServiceConfigureApi(api api.API) bool { return c.namespaceSupport && api.GetConfig().IsSelfServiceConfig } -func (c *notificationController) processResourceWithAPI(api api.API, resource metav1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { - apiNamespace := api.GetConfig().Namespace +// notificationResult encapsulates the result of sending a single notification. +// It is used to communicate results from parallel notification goroutines back to the main processing loop. +type notificationResult struct { + success bool // success indicates whether the notification was sent successfully + err error // err contains the error if the send failed + delivery NotificationDelivery // delivery contains the notification delivery information +} + +// sendSingleNotification sends a notification to a single destination and returns the result. +// Thread-safe: designed for concurrent execution. All parameters are either read-only or thread-safe. +// The api.Send call is performed without locks to enable parallel execution. +func (c *notificationController) sendSingleNotification( + api api.API, + un *unstructured.Unstructured, + resource metav1.Object, + trigger string, + cr triggers.ConditionResult, + destination services.Destination, + templates []string, + apiNamespace string, + logEntry *log.Entry, +) notificationResult { + logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", + trigger, cr.Key, destination, apiNamespace) + + err := api.Send(un.Object, templates, destination) + + result := notificationResult{ + success: err == nil, + err: err, + delivery: NotificationDelivery{ + Trigger: trigger, + Destination: destination, + AlreadyNotified: false, + }, + } + + if err != nil { + logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s", + destination, resource.GetNamespace(), resource.GetName(), err, apiNamespace) + result.err = fmt.Errorf("failed to deliver notification %s to %s: %w using the configuration in namespace %s", + trigger, destination, err, apiNamespace) + } else { + logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", + destination.Recipient, apiNamespace) + } + + return result +} + +func (c *notificationController) processResourceWithAPI(apiObj api.API, resource metav1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { + cfg := apiObj.GetConfig() + apiNamespace := cfg.Namespace notificationsState := NewStateFromRes(resource) - destinations := c.getDestinations(resource, api.GetConfig()) + destinations := c.getDestinations(resource, cfg) if len(destinations) == 0 { return resource.GetAnnotations(), nil } + // Determine max concurrent notifications + // Priority: 1) Programmatic config override 2) ConfigMap value 3) api.DefaultMaxConcurrentNotifications + maxConcurrent := c.maxConcurrentNotifications + if maxConcurrent == api.DefaultMaxConcurrentNotifications && cfg.MaxConcurrentNotifications > 0 { + maxConcurrent = cfg.MaxConcurrentNotifications + logEntry.Debugf("Using maxConcurrentNotifications from ConfigMap: %d", maxConcurrent) + } + un, err := c.toUnstructured(resource) if err != nil { return nil, err } for trigger, destinations := range destinations { - res, err := api.RunTrigger(trigger, un.Object) + res, err := apiObj.RunTrigger(trigger, un.Object) if err != nil { logEntry.Errorf("Failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace) eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %w using the configuration in namespace %s", trigger, err, apiNamespace)) @@ -214,13 +288,18 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource me if !cr.Triggered { for _, to := range destinations { - notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, false) + notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(apiObj), apiNamespace, trigger, cr, to, false) } continue } + // send notifications in parallel using goroutines with a worker pool + var wg sync.WaitGroup + var notificationsMutex sync.Mutex + semaphore := make(chan struct{}, maxConcurrent) + for _, to := range destinations { - if changed := notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, true); !changed { + if changed := notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(apiObj), apiNamespace, trigger, cr, to, true); !changed { logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) eventSequence.addDelivered(NotificationDelivery{ Trigger: trigger, @@ -228,24 +307,32 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource me AlreadyNotified: true, }) } else { - logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) - if err := api.Send(un.Object, cr.Templates, to); err != nil { - logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s", - to, resource.GetNamespace(), resource.GetName(), err, apiNamespace) - notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, false) - c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false) - eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %w using the configuration in namespace %s", trigger, to, err, apiNamespace)) - } else { - logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace) - c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true) - eventSequence.addDelivered(NotificationDelivery{ - Trigger: trigger, - Destination: to, - AlreadyNotified: false, - }) - } + wg.Add(1) + go func(destination services.Destination, templates []string) { + semaphore <- struct{}{} + defer func() { + <-semaphore + wg.Done() + }() + + result := c.sendSingleNotification(apiObj, un, resource, trigger, cr, destination, templates, apiNamespace, logEntry) + + notificationsMutex.Lock() + defer notificationsMutex.Unlock() + + if !result.success { + notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(apiObj), apiNamespace, trigger, cr, destination, false) + c.metricsRegistry.IncDeliveriesCounter(trigger, destination.Service, false) + eventSequence.addError(result.err) + } else { + c.metricsRegistry.IncDeliveriesCounter(trigger, destination.Service, true) + eventSequence.addDelivered(result.delivery) + } + }(to, cr.Templates) } } + + wg.Wait() } } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 8237b8a3..4adcce0a 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "fmt" + "sync" + "sync/atomic" "testing" "time" @@ -453,13 +455,13 @@ func TestProcessItemsWithSelfService(t *testing.T) { ctrl.namespaceSupport = true // SelfService API: config has IsSelfServiceConfig set to true - apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: true, Namespace: "selfservice_namespace"}).Times(3) + apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: true, Namespace: "selfservice_namespace"}).AnyTimes() apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().RunTrigger(triggerName, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool { return true }), []string{"test"}, destination).Return(nil).AnyTimes() - apiMap["default"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: false, Namespace: "default"}).Times(3) + apiMap["default"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: false, Namespace: "default"}).AnyTimes() apiMap["default"].(*mocks.MockAPI).EXPECT().RunTrigger(triggerName, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) apiMap["default"].(*mocks.MockAPI).EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool { return true @@ -486,3 +488,491 @@ func TestProcessItemsWithSelfService(t *testing.T) { assert.Equal(t, expectedDeliveries[i].Destination, event.Destination) } } + +func TestNotificationsShouldNotBeBlockedBySlowDestinations(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Setup: 3 destinations - 1 slow (500ms) and 2 fast (50ms) + // We verify that fast notifications complete without waiting for the slow one + app := newResource("test", withAnnotations(map[string]string{ + subscriptions.SubscribeAnnotationKey("my-trigger", "webhook-slow"): "slow-recipient", + subscriptions.SubscribeAnnotationKey("my-trigger", "webhook-fast1"): "fast-recipient-1", + subscriptions.SubscribeAnnotationKey("my-trigger", "webhook-fast2"): "fast-recipient-2", + })) + + ctrl, api, err := newController(t, ctx, newFakeClient(app)) + assert.NoError(t, err) + + api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes() + api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) + + // Track when each notification starts to verify parallel execution + sendTimes := make([]time.Time, 0) + var timesLock sync.Mutex + + const slowWebhookDelay = 500 * time.Millisecond + const fastWebhookDelay = 50 * time.Millisecond + const parallelStartThreshold = 50 * time.Millisecond + + // Mock slow webhook that times out after 500ms + api.EXPECT().Send(gomock.Any(), []string{"test"}, services.Destination{Service: "webhook-slow", Recipient: "slow-recipient"}). + DoAndReturn(func(_ map[string]interface{}, _ []string, _ services.Destination) error { + timesLock.Lock() + sendTimes = append(sendTimes, time.Now()) + timesLock.Unlock() + time.Sleep(slowWebhookDelay) + return fmt.Errorf("webhook timeout") + }) + + // Mock fast webhooks that complete quickly + api.EXPECT().Send(gomock.Any(), []string{"test"}, services.Destination{Service: "webhook-fast1", Recipient: "fast-recipient-1"}). + DoAndReturn(func(_ map[string]interface{}, _ []string, _ services.Destination) error { + timesLock.Lock() + sendTimes = append(sendTimes, time.Now()) + timesLock.Unlock() + time.Sleep(fastWebhookDelay) + return nil + }) + + api.EXPECT().Send(gomock.Any(), []string{"test"}, services.Destination{Service: "webhook-fast2", Recipient: "fast-recipient-2"}). + DoAndReturn(func(_ map[string]interface{}, _ []string, _ services.Destination) error { + timesLock.Lock() + sendTimes = append(sendTimes, time.Now()) + timesLock.Unlock() + time.Sleep(fastWebhookDelay) + return nil + }) + + // Execute and measure total time + eventSequence := &NotificationEventSequence{} + start := time.Now() + _, err = ctrl.processResourceWithAPI(api, app, logEntry, eventSequence) + elapsed := time.Since(start) + + assert.NoError(t, err) + assert.Equal(t, 3, len(sendTimes), "All 3 notifications should have been sent") + + // Verify all notifications started in parallel (within threshold) + if len(sendTimes) >= 2 { + timeBetweenFirstAndSecond := sendTimes[1].Sub(sendTimes[0]) + assert.Less(t, timeBetweenFirstAndSecond.Milliseconds(), parallelStartThreshold.Milliseconds(), + "Fast notifications should start in parallel, not wait for slow ones") + } + + if len(sendTimes) >= 3 { + timeBetweenFirstAndThird := sendTimes[2].Sub(sendTimes[0]) + assert.Less(t, timeBetweenFirstAndThird.Milliseconds(), parallelStartThreshold.Milliseconds(), + "All notifications should start in parallel") + } + + // Total time should be ~500ms (longest webhook), not 600ms (sum of all) + assert.Less(t, elapsed.Seconds(), 0.7, + "Total time should be ~0.5s (parallel), not sum of all notifications") + + // Verify error was recorded for slow webhook + assert.Greater(t, len(eventSequence.Errors), 0, "Slow webhook error should be recorded") + + // Verify 2 fast webhooks succeeded + successfulDeliveries := 0 + for _, delivery := range eventSequence.Delivered { + if !delivery.AlreadyNotified { + successfulDeliveries++ + } + } + assert.Equal(t, 2, successfulDeliveries, "Two fast webhooks should have succeeded") +} + +func TestConcurrentNotificationsLimited(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Setup: 10 destinations with a concurrency limit of 3 + // Verify that at most 3 notifications are sent concurrently + const numNotifications = 10 + const maxConcurrency = 3 + + annotations := make(map[string]string) + for i := 1; i <= numNotifications; i++ { + annotations[subscriptions.SubscribeAnnotationKey("my-trigger", fmt.Sprintf("webhook-%d", i))] = fmt.Sprintf("recipient-%d", i) + } + app := newResource("test", withAnnotations(annotations)) + + ctrl, api, err := newController(t, ctx, newFakeClient(app), WithMaxConcurrentNotifications(maxConcurrency)) + assert.NoError(t, err) + + api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes() + api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) + + // Track concurrent execution to verify worker pool limits + var concurrentCount int32 + var maxConcurrent int32 + var countLock sync.Mutex + + for i := 1; i <= numNotifications; i++ { + api.EXPECT().Send(gomock.Any(), []string{"test"}, services.Destination{ + Service: fmt.Sprintf("webhook-%d", i), + Recipient: fmt.Sprintf("recipient-%d", i), + }).DoAndReturn(func(_ map[string]interface{}, _ []string, _ services.Destination) error { + // Increment concurrent count and track maximum + countLock.Lock() + concurrentCount++ + if concurrentCount > maxConcurrent { + maxConcurrent = concurrentCount + } + currentCount := concurrentCount + countLock.Unlock() + + // Verify we never exceed the limit + assert.LessOrEqual(t, currentCount, int32(maxConcurrency), + "Concurrent notifications should not exceed configured limit") + + // Simulate work + time.Sleep(50 * time.Millisecond) + + // Decrement concurrent count + countLock.Lock() + concurrentCount-- + countLock.Unlock() + + return nil + }) + } + + eventSequence := &NotificationEventSequence{} + _, err = ctrl.processResourceWithAPI(api, app, logEntry, eventSequence) + assert.NoError(t, err) + + // Verify the worker pool reached the configured limit + assert.Equal(t, int32(maxConcurrency), maxConcurrent, + "Worker pool should reach the maximum concurrency limit") + assert.Equal(t, numNotifications, len(eventSequence.Delivered), + "All notifications should be delivered") +} + +func TestSendNotificationsInParallel(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Setup: 3 destinations to verify they execute in parallel + // We use concurrent call counting instead of timing to avoid flaky tests + const numNotifications = 3 + + app := newResource("test", withAnnotations(map[string]string{ + subscriptions.SubscribeAnnotationKey("my-trigger", "webhook-1"): "recipient-1", + subscriptions.SubscribeAnnotationKey("my-trigger", "webhook-2"): "recipient-2", + subscriptions.SubscribeAnnotationKey("my-trigger", "webhook-3"): "recipient-3", + })) + + ctrl, api, err := newController(t, ctx, newFakeClient(app)) + assert.NoError(t, err) + + api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes() + api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) + + // Track active concurrent calls and maximum reached + var activeCalls int32 + var maxConcurrent int32 + var countLock sync.Mutex + + // Channel signals when all notifications have started + allStarted := make(chan struct{}) + + for i := 1; i <= numNotifications; i++ { + api.EXPECT().Send(gomock.Any(), []string{"test"}, services.Destination{ + Service: fmt.Sprintf("webhook-%d", i), + Recipient: fmt.Sprintf("recipient-%d", i), + }).DoAndReturn(func(_ map[string]interface{}, _ []string, _ services.Destination) error { + // Track concurrent execution + countLock.Lock() + activeCalls++ + if activeCalls > maxConcurrent { + maxConcurrent = activeCalls + } + currentActive := activeCalls + + // Signal when all notifications are active concurrently + if currentActive == numNotifications { + close(allStarted) + } + countLock.Unlock() + + // Simulate work + time.Sleep(100 * time.Millisecond) + + countLock.Lock() + activeCalls-- + countLock.Unlock() + + return nil + }) + } + + // Execute in background to verify parallel start + eventSequence := &NotificationEventSequence{} + done := make(chan struct{}) + go func() { + _, err = ctrl.processResourceWithAPI(api, app, logEntry, eventSequence) + close(done) + }() + + // Wait for all notifications to start concurrently + select { + case <-allStarted: + // Success - all notifications started in parallel + case <-time.After(time.Second): + t.Fatal("notifications did not start in parallel within timeout") + } + + // Wait for completion + <-done + assert.NoError(t, err) + + // Verify all notifications ran concurrently + assert.Equal(t, int32(numNotifications), maxConcurrent, + "All notifications should have been active concurrently") + assert.Equal(t, numNotifications, len(eventSequence.Delivered), + "All notifications should be delivered") +} + +func TestSendSingleNotification(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Setup common test data + app := newResource("test", withAnnotations(map[string]string{ + subscriptions.SubscribeAnnotationKey("my-trigger", "webhook"): "recipient", + })) + + ctrl, api, err := newController(t, ctx, newFakeClient(app)) + assert.NoError(t, err) + + un, err := ctrl.toUnstructured(app) + assert.NoError(t, err) + + destination := services.Destination{Service: "webhook", Recipient: "recipient"} + templates := []string{"template1"} + trigger := "my-trigger" + cr := triggers.ConditionResult{Key: "test-condition"} + apiNamespace := "default" + + t.Run("success case", func(t *testing.T) { + // Mock successful send + api.EXPECT().Send(un.Object, templates, destination).Return(nil) + + // Execute + result := ctrl.sendSingleNotification(api, un, app, trigger, cr, destination, templates, apiNamespace, logEntry) + + // Verify success result + assert.True(t, result.success, "Notification should succeed") + assert.Nil(t, result.err, "No error should be returned") + assert.Equal(t, trigger, result.delivery.Trigger, "Trigger name should match") + assert.Equal(t, destination, result.delivery.Destination, "Destination should match") + assert.False(t, result.delivery.AlreadyNotified, "Should not be marked as already notified") + }) + + t.Run("error case", func(t *testing.T) { + // Mock failed send + sendErr := fmt.Errorf("network timeout") + api.EXPECT().Send(un.Object, templates, destination).Return(sendErr) + + // Execute + result := ctrl.sendSingleNotification(api, un, app, trigger, cr, destination, templates, apiNamespace, logEntry) + + // Verify error result + assert.False(t, result.success, "Notification should fail") + assert.NotNil(t, result.err, "Error should be returned") + assert.Contains(t, result.err.Error(), "network timeout", "Error should contain original error message") + assert.Contains(t, result.err.Error(), "failed to deliver notification", "Error should be wrapped with context") + }) +} + +func TestDefaultConcurrencyLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Verify default concurrency limit when not explicitly configured + ctrl, _, err := newController(t, ctx, newFakeClient()) + assert.NoError(t, err) + + assert.Equal(t, notificationApi.DefaultMaxConcurrentNotifications, ctrl.maxConcurrentNotifications, + "Default concurrency limit should match constant") +} + +func TestCustomConcurrencyLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Verify custom concurrency limit is applied + ctrl, _, err := newController(t, ctx, newFakeClient(), WithMaxConcurrentNotifications(25)) + assert.NoError(t, err) + + assert.Equal(t, 25, ctrl.maxConcurrentNotifications, + "Custom concurrency limit should be applied") +} + +func TestInvalidConcurrencyLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Verify negative values fall back to default + ctrl, _, err := newController(t, ctx, newFakeClient(), WithMaxConcurrentNotifications(-5)) + assert.NoError(t, err) + + assert.Equal(t, notificationApi.DefaultMaxConcurrentNotifications, ctrl.maxConcurrentNotifications, + "Negative concurrency limit should fall back to default") + + // Verify zero values fall back to default + ctrl2, _, err := newController(t, ctx, newFakeClient(), WithMaxConcurrentNotifications(0)) + assert.NoError(t, err) + + assert.Equal(t, notificationApi.DefaultMaxConcurrentNotifications, ctrl2.maxConcurrentNotifications, + "Zero concurrency limit should fall back to default") +} + +func TestConcurrencyLimitFromConfig(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Setup: app with 5 destinations + // Controller has default limit of 50, but Config sets it to 2 + const numNotifications = 5 + const configMaxConcurrency = 2 + + annotations := make(map[string]string) + for i := 1; i <= numNotifications; i++ { + annotations[subscriptions.SubscribeAnnotationKey("my-trigger", fmt.Sprintf("webhook-%d", i))] = fmt.Sprintf("recipient-%d", i) + } + app := newResource("test", withAnnotations(annotations)) + + ctrl, api, err := newController(t, ctx, newFakeClient(app)) + assert.NoError(t, err) + + // Config specifies MaxConcurrentNotifications + api.EXPECT().GetConfig().Return(notificationApi.Config{ + MaxConcurrentNotifications: configMaxConcurrency, + }).AnyTimes() + api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) + + // Track concurrent execution to verify config value is used + var concurrentCount int32 + var maxConcurrent int32 + var countLock sync.Mutex + + for i := 1; i <= numNotifications; i++ { + api.EXPECT().Send(gomock.Any(), []string{"test"}, services.Destination{ + Service: fmt.Sprintf("webhook-%d", i), + Recipient: fmt.Sprintf("recipient-%d", i), + }).DoAndReturn(func(_ map[string]interface{}, _ []string, _ services.Destination) error { + // Increment and track concurrent count + countLock.Lock() + concurrentCount++ + if concurrentCount > maxConcurrent { + maxConcurrent = concurrentCount + } + currentCount := concurrentCount + countLock.Unlock() + + // Verify we never exceed the config limit + assert.LessOrEqual(t, currentCount, int32(configMaxConcurrency), + "Concurrent notifications should not exceed config limit") + + time.Sleep(50 * time.Millisecond) + + countLock.Lock() + concurrentCount-- + countLock.Unlock() + + return nil + }) + } + + eventSequence := &NotificationEventSequence{} + _, err = ctrl.processResourceWithAPI(api, app, logEntry, eventSequence) + assert.NoError(t, err) + + // Verify the config value was used (not the controller default of 50) + assert.Equal(t, int32(configMaxConcurrency), maxConcurrent, + "Should use config's MaxConcurrentNotifications value") + assert.Equal(t, numNotifications, len(eventSequence.Delivered), + "All notifications should be delivered") +} + +func TestSequentialNotificationsWhenConcurrencyIsOne(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // --- Arrange ------------------------------------------------------------------ + const ( + triggerName = "my-trigger" + numNotifications = 5 + servicePrefix = "webhook" + recipientPrefix = "recipient" + ) + + annotations := make(map[string]string, numNotifications) + for i := 1; i <= numNotifications; i++ { + annotations[subscriptions.SubscribeAnnotationKey(triggerName, fmt.Sprintf("%s-%d", servicePrefix, i))] = fmt.Sprintf("%s-%d", recipientPrefix, i) + } + app := newResource("test", withAnnotations(annotations)) + + ctrl, api, err := newController(t, ctx, newFakeClient(app), WithMaxConcurrentNotifications(1)) + assert.NoError(t, err) + + api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes() + api.EXPECT().RunTrigger(triggerName, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) + + // Concurrency tracking state + var active int32 // current in-flight sends + var peakActive int32 // maximum observed simultaneous sends + var executionOrder []int // order in which destinations began sending (for permutation validation) + var orderLock sync.Mutex + + for i := 1; i <= numNotifications; i++ { + idx := i + api.EXPECT().Send(gomock.Any(), []string{"test"}, services.Destination{ + Service: fmt.Sprintf("%s-%d", servicePrefix, i), + Recipient: fmt.Sprintf("%s-%d", recipientPrefix, i), + }).DoAndReturn(func(_ map[string]interface{}, _ []string, _ services.Destination) error { + // Track entry + current := atomic.AddInt32(&active, 1) + if current > peakActive { + peakActive = current + } + orderLock.Lock() + executionOrder = append(executionOrder, idx) + orderLock.Unlock() + + // Simulate delivery time (small to keep test fast) + time.Sleep(20 * time.Millisecond) + + // Track exit + atomic.AddInt32(&active, -1) + return nil + }) + } + + // --- Act ---------------------------------------------------------------------- + eventSeq := &NotificationEventSequence{} + _, err = ctrl.processResourceWithAPI(api, app, logEntry, eventSeq) + assert.NoError(t, err) + + // --- Assert ------------------------------------------------------------------- + assert.Equal(t, int32(1), peakActive, "Concurrency=1 must enforce strictly sequential sends") + assert.Equal(t, numNotifications, len(eventSeq.Delivered), "All notifications should be delivered") + assert.Empty(t, eventSeq.Errors, "No delivery errors expected") + + // Validate executionOrder is a permutation of 1..numNotifications + if len(executionOrder) != numNotifications { + t.Fatalf("expected %d executions recorded, got %d", numNotifications, len(executionOrder)) + } + seen := make(map[int]bool, numNotifications) + for _, v := range executionOrder { + seen[v] = true + } + for i := 1; i <= numNotifications; i++ { + if !seen[i] { + t.Fatalf("missing expected index %d in execution order (concurrency=1)", i) + } + } +}