Skip to content

Commit ba683ce

Browse files
committed
fix: add parallel notification delivery
Implements parallel notification delivery with configurable concurrency Solve performance issues described in #146 and slow or failing destinations Adds maxConcurrentNotifications config option (default: 10) Signed-off-by: gblanc-1a <[email protected]>
1 parent da04400 commit ba683ce

File tree

5 files changed

+651
-42
lines changed

5 files changed

+651
-42
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ data:
4242
4343
service.slack: |
4444
token: $slack-token
45+
46+
# Optional: Maximum concurrent notification deliveries (default: 10)
47+
# Controls parallel processing of notifications to multiple destinations.
48+
# Higher values speed up delivery when destinations are slow/timing out but increase resource usage.
49+
# Recommended: 10-25 (small/medium clusters), 50-100 (large clusters)
50+
# Note: Some services have rate limits; start low and increase if needed.
51+
maxConcurrentNotifications: "25"
4552
---
4653
apiVersion: v1
4754
kind: Secret
@@ -76,6 +83,7 @@ notifications.argoproj.io/subscriptions: |
7683
- service: slack
7784
recipients: [my-channel-21, my-channel-22]
7885
```
86+
7987
## Getting Started
8088

8189
Ready to add notifications to your project? Check out sample notifications for [cert-manager](./examples/certmanager/README.md)

pkg/api/config.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ type Config struct {
2929
DefaultTriggers []string
3030
// ServiceDefaultTriggers holds list of default triggers per service
3131
ServiceDefaultTriggers map[string][]string
32-
Namespace string
33-
IsSelfServiceConfig bool
32+
// MaxConcurrentNotifications is the maximum number of notifications to send concurrently (default: 10)
33+
MaxConcurrentNotifications int
34+
Namespace string
35+
IsSelfServiceConfig bool
3436
}
3537

3638
// Returns list of destinations for the specified trigger
@@ -71,14 +73,20 @@ func replaceStringSecret(val string, secretValues map[string][]byte) string {
7173
})
7274
}
7375

76+
const (
77+
// DefaultMaxConcurrentNotifications is the default maximum number of concurrent notification deliveries
78+
DefaultMaxConcurrentNotifications = 10
79+
)
80+
7481
// ParseConfig retrieves Config from given ConfigMap and Secret
7582
func ParseConfig(configMap *corev1.ConfigMap, secret *corev1.Secret) (*Config, error) {
7683
cfg := Config{
77-
Services: map[string]ServiceFactory{},
78-
Triggers: map[string][]triggers.Condition{},
79-
ServiceDefaultTriggers: map[string][]string{},
80-
Templates: map[string]services.Notification{},
81-
Namespace: configMap.Namespace,
84+
Services: map[string]ServiceFactory{},
85+
Triggers: map[string][]triggers.Condition{},
86+
ServiceDefaultTriggers: map[string][]string{},
87+
Templates: map[string]services.Notification{},
88+
Namespace: configMap.Namespace,
89+
MaxConcurrentNotifications: DefaultMaxConcurrentNotifications,
8290
}
8391
if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok {
8492
if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil {
@@ -92,6 +100,23 @@ func ParseConfig(configMap *corev1.ConfigMap, secret *corev1.Secret) (*Config, e
92100
}
93101
}
94102

103+
if maxConcurrentYaml, ok := configMap.Data["maxConcurrentNotifications"]; ok {
104+
var maxConcurrent int
105+
if err := yaml.Unmarshal([]byte(maxConcurrentYaml), &maxConcurrent); err != nil {
106+
log.Warnf("Invalid maxConcurrentNotifications value '%s' (must be a positive integer), using default: %d", maxConcurrentYaml, DefaultMaxConcurrentNotifications)
107+
} else {
108+
switch {
109+
case maxConcurrent <= 0:
110+
log.Warnf("maxConcurrentNotifications must be positive, got %d, using default: %d", maxConcurrent, DefaultMaxConcurrentNotifications)
111+
case maxConcurrent > 1000:
112+
log.Warnf("maxConcurrentNotifications value %d is very high (>1000), consider using a lower value", maxConcurrent)
113+
cfg.MaxConcurrentNotifications = maxConcurrent
114+
default:
115+
cfg.MaxConcurrentNotifications = maxConcurrent
116+
}
117+
}
118+
}
119+
95120
for k, v := range configMap.Data {
96121
parts := strings.Split(k, ".")
97122
switch {

pkg/api/config_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,80 @@ func TestParseConfig_DefaultServiceTriggers(t *testing.T) {
6262
}, cfg.ServiceDefaultTriggers)
6363
}
6464

65+
func TestParseConfig_MaxConcurrentNotifications(t *testing.T) {
66+
tests := []struct {
67+
name string
68+
data map[string]string
69+
expected int
70+
}{
71+
{
72+
name: "valid positive value",
73+
data: map[string]string{
74+
"maxConcurrentNotifications": "25",
75+
},
76+
expected: 25,
77+
},
78+
{
79+
name: "valid value 100",
80+
data: map[string]string{
81+
"maxConcurrentNotifications": "100",
82+
},
83+
expected: 100,
84+
},
85+
{
86+
name: "not set uses default",
87+
data: map[string]string{},
88+
expected: DefaultMaxConcurrentNotifications,
89+
},
90+
{
91+
name: "zero uses default",
92+
data: map[string]string{
93+
"maxConcurrentNotifications": "0",
94+
},
95+
expected: DefaultMaxConcurrentNotifications,
96+
},
97+
{
98+
name: "negative uses default",
99+
data: map[string]string{
100+
"maxConcurrentNotifications": "-10",
101+
},
102+
expected: DefaultMaxConcurrentNotifications,
103+
},
104+
{
105+
name: "invalid non-numeric uses default",
106+
data: map[string]string{
107+
"maxConcurrentNotifications": "invalid",
108+
},
109+
expected: DefaultMaxConcurrentNotifications,
110+
},
111+
{
112+
name: "empty string uses default",
113+
data: map[string]string{
114+
"maxConcurrentNotifications": "",
115+
},
116+
expected: DefaultMaxConcurrentNotifications,
117+
},
118+
{
119+
name: "very high value is accepted with warning",
120+
data: map[string]string{
121+
"maxConcurrentNotifications": "1500",
122+
},
123+
expected: 1500,
124+
},
125+
}
126+
127+
for _, tt := range tests {
128+
t.Run(tt.name, func(t *testing.T) {
129+
cfg, err := ParseConfig(&corev1.ConfigMap{Data: tt.data}, emptySecret)
130+
if !assert.NoError(t, err) {
131+
return
132+
}
133+
assert.Equal(t, tt.expected, cfg.MaxConcurrentNotifications,
134+
"MaxConcurrentNotifications should match expected value")
135+
})
136+
}
137+
}
138+
65139
func TestReplaceStringSecret_KeyPresent(t *testing.T) {
66140
val := replaceStringSecret("hello $secret-value", map[string][]byte{
67141
"secret-value": []byte("world"),

pkg/controller/controller.go

Lines changed: 125 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"reflect"
88
"runtime/debug"
9+
"sync"
910
"time"
1011

1112
log "github.com/sirupsen/logrus"
@@ -21,6 +22,12 @@ import (
2122
"github.com/argoproj/notifications-engine/pkg/api"
2223
"github.com/argoproj/notifications-engine/pkg/services"
2324
"github.com/argoproj/notifications-engine/pkg/subscriptions"
25+
"github.com/argoproj/notifications-engine/pkg/triggers"
26+
)
27+
28+
const (
29+
// DefaultMaxConcurrentNotifications is the default maximum number of concurrent notification deliveries
30+
DefaultMaxConcurrentNotifications = 10
2431
)
2532

2633
// NotificationDelivery represents a notification that was delivered
@@ -98,6 +105,17 @@ func WithEventCallback(f func(eventSequence NotificationEventSequence)) Opts {
98105
}
99106
}
100107

108+
// WithMaxConcurrentNotifications sets the maximum number of concurrent notification
109+
// deliveries per resource. This helps prevent resource exhaustion when sending
110+
// notifications to many destinations. Default is 50 if not specified.
111+
func WithMaxConcurrentNotifications(maxConcurrent int) Opts {
112+
return func(ctrl *notificationController) {
113+
if maxConcurrent > 0 {
114+
ctrl.maxConcurrentNotifications = maxConcurrent
115+
}
116+
}
117+
}
118+
101119
func NewController(
102120
client dynamic.NamespaceableResourceInterface,
103121
informer cache.SharedIndexInformer,
@@ -123,11 +141,12 @@ func NewController(
123141
)
124142

125143
ctrl := &notificationController{
126-
client: client,
127-
informer: informer,
128-
queue: queue,
129-
metricsRegistry: NewMetricsRegistry(""),
130-
apiFactory: apiFactory,
144+
client: client,
145+
informer: informer,
146+
queue: queue,
147+
metricsRegistry: NewMetricsRegistry(""),
148+
apiFactory: apiFactory,
149+
maxConcurrentNotifications: DefaultMaxConcurrentNotifications,
131150
toUnstructured: func(obj metav1.Object) (*unstructured.Unstructured, error) {
132151
res, ok := obj.(*unstructured.Unstructured)
133152
if !ok {
@@ -155,16 +174,17 @@ func NewControllerWithNamespaceSupport(
155174
}
156175

157176
type notificationController struct {
158-
client dynamic.NamespaceableResourceInterface
159-
informer cache.SharedIndexInformer
160-
queue workqueue.TypedRateLimitingInterface[string]
161-
apiFactory api.Factory
162-
metricsRegistry *MetricsRegistry
163-
skipProcessing func(obj metav1.Object) (bool, string)
164-
alterDestinations func(obj metav1.Object, destinations services.Destinations, cfg api.Config) services.Destinations
165-
toUnstructured func(obj metav1.Object) (*unstructured.Unstructured, error)
166-
eventCallback func(eventSequence NotificationEventSequence)
167-
namespaceSupport bool
177+
client dynamic.NamespaceableResourceInterface
178+
informer cache.SharedIndexInformer
179+
queue workqueue.TypedRateLimitingInterface[string]
180+
apiFactory api.Factory
181+
metricsRegistry *MetricsRegistry
182+
skipProcessing func(obj metav1.Object) (bool, string)
183+
alterDestinations func(obj metav1.Object, destinations services.Destinations, cfg api.Config) services.Destinations
184+
toUnstructured func(obj metav1.Object) (*unstructured.Unstructured, error)
185+
eventCallback func(eventSequence NotificationEventSequence)
186+
namespaceSupport bool
187+
maxConcurrentNotifications int
168188
}
169189

170190
func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) {
@@ -187,15 +207,74 @@ func (c *notificationController) isSelfServiceConfigureApi(api api.API) bool {
187207
return c.namespaceSupport && api.GetConfig().IsSelfServiceConfig
188208
}
189209

210+
// notificationResult encapsulates the result of sending a single notification.
211+
// It is used to communicate results from parallel notification goroutines back to the main processing loop.
212+
type notificationResult struct {
213+
success bool // success indicates whether the notification was sent successfully
214+
err error // err contains the error if the send failed
215+
delivery NotificationDelivery // delivery contains the notification delivery information
216+
}
217+
218+
// sendSingleNotification sends a notification to a single destination and returns the result.
219+
// Thread-safe: designed for concurrent execution. All parameters are either read-only or thread-safe.
220+
// The api.Send call is performed without locks to enable parallel execution.
221+
func (c *notificationController) sendSingleNotification(
222+
api api.API,
223+
un *unstructured.Unstructured,
224+
resource metav1.Object,
225+
trigger string,
226+
cr triggers.ConditionResult,
227+
destination services.Destination,
228+
templates []string,
229+
apiNamespace string,
230+
logEntry *log.Entry,
231+
) notificationResult {
232+
logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s",
233+
trigger, cr.Key, destination, apiNamespace)
234+
235+
err := api.Send(un.Object, templates, destination)
236+
237+
result := notificationResult{
238+
success: err == nil,
239+
err: err,
240+
delivery: NotificationDelivery{
241+
Trigger: trigger,
242+
Destination: destination,
243+
AlreadyNotified: false,
244+
},
245+
}
246+
247+
if err != nil {
248+
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s",
249+
destination, resource.GetNamespace(), resource.GetName(), err, apiNamespace)
250+
result.err = fmt.Errorf("failed to deliver notification %s to %s: %w using the configuration in namespace %s",
251+
trigger, destination, err, apiNamespace)
252+
} else {
253+
logEntry.Debugf("Notification %s was sent using the configuration in namespace %s",
254+
destination.Recipient, apiNamespace)
255+
}
256+
257+
return result
258+
}
259+
190260
func (c *notificationController) processResourceWithAPI(api api.API, resource metav1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) {
191-
apiNamespace := api.GetConfig().Namespace
261+
cfg := api.GetConfig()
262+
apiNamespace := cfg.Namespace
192263
notificationsState := NewStateFromRes(resource)
193264

194-
destinations := c.getDestinations(resource, api.GetConfig())
265+
destinations := c.getDestinations(resource, cfg)
195266
if len(destinations) == 0 {
196267
return resource.GetAnnotations(), nil
197268
}
198269

270+
// Determine max concurrent notifications
271+
// Priority: 1) Programmatic config 2) ConfigMap config 3) Default
272+
maxConcurrent := c.maxConcurrentNotifications
273+
if maxConcurrent == DefaultMaxConcurrentNotifications && cfg.MaxConcurrentNotifications != DefaultMaxConcurrentNotifications {
274+
maxConcurrent = cfg.MaxConcurrentNotifications
275+
logEntry.Debugf("Using maxConcurrentNotifications from ConfigMap: %d", maxConcurrent)
276+
}
277+
199278
un, err := c.toUnstructured(resource)
200279
if err != nil {
201280
return nil, err
@@ -219,6 +298,11 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource me
219298
continue
220299
}
221300

301+
// send notifications in parallel using goroutines with a worker pool
302+
var wg sync.WaitGroup
303+
var notificationsMutex sync.Mutex
304+
semaphore := make(chan struct{}, maxConcurrent)
305+
222306
for _, to := range destinations {
223307
if changed := notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, true); !changed {
224308
logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
@@ -228,24 +312,32 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource me
228312
AlreadyNotified: true,
229313
})
230314
} else {
231-
logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
232-
if err := api.Send(un.Object, cr.Templates, to); err != nil {
233-
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s",
234-
to, resource.GetNamespace(), resource.GetName(), err, apiNamespace)
235-
notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, false)
236-
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false)
237-
eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %w using the configuration in namespace %s", trigger, to, err, apiNamespace))
238-
} else {
239-
logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace)
240-
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true)
241-
eventSequence.addDelivered(NotificationDelivery{
242-
Trigger: trigger,
243-
Destination: to,
244-
AlreadyNotified: false,
245-
})
246-
}
315+
wg.Add(1)
316+
semaphore <- struct{}{}
317+
go func(destination services.Destination, templates []string) {
318+
defer func() {
319+
<-semaphore
320+
wg.Done()
321+
}()
322+
323+
result := c.sendSingleNotification(api, un, resource, trigger, cr, destination, templates, apiNamespace, logEntry)
324+
325+
notificationsMutex.Lock()
326+
defer notificationsMutex.Unlock()
327+
328+
if !result.success {
329+
notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, destination, false)
330+
c.metricsRegistry.IncDeliveriesCounter(trigger, destination.Service, false)
331+
eventSequence.addError(result.err)
332+
} else {
333+
c.metricsRegistry.IncDeliveriesCounter(trigger, destination.Service, true)
334+
eventSequence.addDelivered(result.delivery)
335+
}
336+
}(to, cr.Templates)
247337
}
248338
}
339+
340+
wg.Wait()
249341
}
250342
}
251343

0 commit comments

Comments
 (0)