Skip to content

Commit f6bdbb0

Browse files
committed
fix: add parallel notification delivery
Implements parallel notification delivery with configurable concurrency Solve performance issues described in #146 and slow or failing destinations Adds maxConcurrentNotifications config option (default: 10) Signed-off-by: gblanc-1a <[email protected]>
1 parent da04400 commit f6bdbb0

File tree

5 files changed

+731
-46
lines changed

5 files changed

+731
-46
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ data:
4242
4343
service.slack: |
4444
token: $slack-token
45+
46+
# Optional: Maximum concurrent notification deliveries (default: 10)
47+
# Controls parallel processing of notifications to multiple destinations.
48+
# Higher values speed up delivery when destinations are slow/timing out but increase resource usage.
49+
# Recommended: 10-25 (small/medium clusters), 50-100 (large clusters)
50+
# Note: Some services have rate limits; start low and increase if needed.
51+
maxConcurrentNotifications: "25"
4552
---
4653
apiVersion: v1
4754
kind: Secret
@@ -76,6 +83,7 @@ notifications.argoproj.io/subscriptions: |
7683
- service: slack
7784
recipients: [my-channel-21, my-channel-22]
7885
```
86+
7987
## Getting Started
8088

8189
Ready to add notifications to your project? Check out sample notifications for [cert-manager](./examples/certmanager/README.md)

pkg/api/config.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ type Config struct {
2929
DefaultTriggers []string
3030
// ServiceDefaultTriggers holds list of default triggers per service
3131
ServiceDefaultTriggers map[string][]string
32-
Namespace string
33-
IsSelfServiceConfig bool
32+
// MaxConcurrentNotifications is the maximum number of notifications to send concurrently (default: 10)
33+
MaxConcurrentNotifications int
34+
Namespace string
35+
IsSelfServiceConfig bool
3436
}
3537

3638
// Returns list of destinations for the specified trigger
@@ -71,14 +73,20 @@ func replaceStringSecret(val string, secretValues map[string][]byte) string {
7173
})
7274
}
7375

76+
const (
77+
// DefaultMaxConcurrentNotifications is the default maximum number of concurrent notification deliveries
78+
DefaultMaxConcurrentNotifications = 10
79+
)
80+
7481
// ParseConfig retrieves Config from given ConfigMap and Secret
7582
func ParseConfig(configMap *corev1.ConfigMap, secret *corev1.Secret) (*Config, error) {
7683
cfg := Config{
77-
Services: map[string]ServiceFactory{},
78-
Triggers: map[string][]triggers.Condition{},
79-
ServiceDefaultTriggers: map[string][]string{},
80-
Templates: map[string]services.Notification{},
81-
Namespace: configMap.Namespace,
84+
Services: map[string]ServiceFactory{},
85+
Triggers: map[string][]triggers.Condition{},
86+
ServiceDefaultTriggers: map[string][]string{},
87+
Templates: map[string]services.Notification{},
88+
Namespace: configMap.Namespace,
89+
MaxConcurrentNotifications: DefaultMaxConcurrentNotifications,
8290
}
8391
if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok {
8492
if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil {
@@ -92,6 +100,23 @@ func ParseConfig(configMap *corev1.ConfigMap, secret *corev1.Secret) (*Config, e
92100
}
93101
}
94102

103+
if maxConcurrentYaml, ok := configMap.Data["maxConcurrentNotifications"]; ok {
104+
var maxConcurrent int
105+
if err := yaml.Unmarshal([]byte(maxConcurrentYaml), &maxConcurrent); err != nil {
106+
log.Warnf("Invalid maxConcurrentNotifications value '%s' (must be a positive integer), using default: %d", maxConcurrentYaml, DefaultMaxConcurrentNotifications)
107+
} else {
108+
switch {
109+
case maxConcurrent <= 0:
110+
log.Warnf("maxConcurrentNotifications must be positive, got %d, using default: %d", maxConcurrent, DefaultMaxConcurrentNotifications)
111+
case maxConcurrent > 1000:
112+
log.Warnf("maxConcurrentNotifications value %d is very high (>1000), consider using a lower value", maxConcurrent)
113+
cfg.MaxConcurrentNotifications = maxConcurrent
114+
default:
115+
cfg.MaxConcurrentNotifications = maxConcurrent
116+
}
117+
}
118+
}
119+
95120
for k, v := range configMap.Data {
96121
parts := strings.Split(k, ".")
97122
switch {

pkg/api/config_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,80 @@ func TestParseConfig_DefaultServiceTriggers(t *testing.T) {
6262
}, cfg.ServiceDefaultTriggers)
6363
}
6464

65+
func TestParseConfig_MaxConcurrentNotifications(t *testing.T) {
66+
tests := []struct {
67+
name string
68+
data map[string]string
69+
expected int
70+
}{
71+
{
72+
name: "valid positive value",
73+
data: map[string]string{
74+
"maxConcurrentNotifications": "25",
75+
},
76+
expected: 25,
77+
},
78+
{
79+
name: "valid value 100",
80+
data: map[string]string{
81+
"maxConcurrentNotifications": "100",
82+
},
83+
expected: 100,
84+
},
85+
{
86+
name: "not set uses default",
87+
data: map[string]string{},
88+
expected: DefaultMaxConcurrentNotifications,
89+
},
90+
{
91+
name: "zero uses default",
92+
data: map[string]string{
93+
"maxConcurrentNotifications": "0",
94+
},
95+
expected: DefaultMaxConcurrentNotifications,
96+
},
97+
{
98+
name: "negative uses default",
99+
data: map[string]string{
100+
"maxConcurrentNotifications": "-10",
101+
},
102+
expected: DefaultMaxConcurrentNotifications,
103+
},
104+
{
105+
name: "invalid non-numeric uses default",
106+
data: map[string]string{
107+
"maxConcurrentNotifications": "invalid",
108+
},
109+
expected: DefaultMaxConcurrentNotifications,
110+
},
111+
{
112+
name: "empty string uses default",
113+
data: map[string]string{
114+
"maxConcurrentNotifications": "",
115+
},
116+
expected: DefaultMaxConcurrentNotifications,
117+
},
118+
{
119+
name: "very high value is accepted with warning",
120+
data: map[string]string{
121+
"maxConcurrentNotifications": "1500",
122+
},
123+
expected: 1500,
124+
},
125+
}
126+
127+
for _, tt := range tests {
128+
t.Run(tt.name, func(t *testing.T) {
129+
cfg, err := ParseConfig(&corev1.ConfigMap{Data: tt.data}, emptySecret)
130+
if !assert.NoError(t, err) {
131+
return
132+
}
133+
assert.Equal(t, tt.expected, cfg.MaxConcurrentNotifications,
134+
"MaxConcurrentNotifications should match expected value")
135+
})
136+
}
137+
}
138+
65139
func TestReplaceStringSecret_KeyPresent(t *testing.T) {
66140
val := replaceStringSecret("hello $secret-value", map[string][]byte{
67141
"secret-value": []byte("world"),

0 commit comments

Comments
 (0)