Skip to content

Commit deb0a43

Browse files
committed
plugin: Move reconcile queue callbacks to metrics
This removes a layer of indirection via the plugin methods, which means we no longer have to require that the plugin is initialized before any of the methods are called.
1 parent c0fe02b commit deb0a43

File tree

5 files changed

+52
-56
lines changed

5 files changed

+52
-56
lines changed

pkg/plugin/entrypoint.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ func NewAutoscaleEnforcerPlugin(
5454
promReg := prometheus.NewRegistry()
5555
metrics.RegisterDefaultCollectors(promReg)
5656

57+
pluginMetrics := metrics.BuildPluginMetrics(promReg, config.NodeMetricLabels)
58+
5759
// pre-define this so that we can reference it in the handlers, knowing that it won't be used
5860
// until we start the workers (which we do *after* we've set this value).
5961
var pluginState *PluginState
@@ -78,21 +80,13 @@ func NewAutoscaleEnforcerPlugin(
7880
},
7981
reconcile.WithBaseContext(ctx),
8082
reconcile.WithMiddleware(initEvents),
81-
// Note: we need one layer of indirection for callbacks referencing pluginState, because
82-
// it's initialized later, so directly referencing the methods at this point will use the
83-
// nil pluginState and panic on use.
84-
reconcile.WithQueueWaitDurationCallback(func(duration time.Duration) {
85-
pluginState.reconcileQueueWaitCallback(duration)
86-
}),
87-
reconcile.WithResultCallback(func(params reconcile.ObjectParams, duration time.Duration, err error) {
88-
pluginState.reconcileResultCallback(params, duration, err)
89-
}),
83+
reconcile.WithQueueWaitDurationCallback(pluginMetrics.Reconcile.QueueWaitDurationCallback),
84+
reconcile.WithResultCallback(pluginMetrics.Reconcile.ResultCallback),
9085
reconcile.WithErrorStatsCallback(func(params reconcile.ObjectParams, stats reconcile.ErrorStats) {
91-
pluginState.reconcileErrorStatsCallback(logger, params, stats)
92-
}),
93-
reconcile.WithPanicCallback(func(params reconcile.ObjectParams) {
94-
pluginState.reconcilePanicCallback(params)
86+
pluginMetrics.Reconcile.ErrorStatsCallback(params, stats)
87+
logSuccessiveReconcileFailure(logger, params, stats, config.LogSuccessiveFailuresThreshold)
9588
}),
89+
reconcile.WithPanicCallback(pluginMetrics.Reconcile.PanicCallback),
9690
)
9791
if err != nil {
9892
return nil, fmt.Errorf("could not setup reconcile queue: %w", err)
@@ -123,7 +117,7 @@ func NewAutoscaleEnforcerPlugin(
123117
return nil, fmt.Errorf("could not start watch on VirtualMachineMigration events: %w", err)
124118
}
125119

126-
pluginState = NewPluginState(*config, vmClient, promReg, podStore, nodeStore)
120+
pluginState = NewPluginState(*config, vmClient, pluginMetrics, podStore, nodeStore)
127121

128122
// Start the workers for the queue. We can't do these earlier because our handlers depend on the
129123
// PluginState that only exists now.

pkg/plugin/globalstate.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"sync"
1111
"time"
1212

13-
"github.com/prometheus/client_golang/prometheus"
1413
"go.uber.org/zap"
1514

1615
corev1 "k8s.io/api/core/v1"
@@ -53,7 +52,7 @@ type PluginState struct {
5352
// We use this when scoring pod placements.
5453
maxNodeMem api.Bytes
5554

56-
metrics metrics.Plugin
55+
metrics *metrics.Plugin
5756

5857
requeuePod func(uid types.UID) error
5958
requeueNode func(nodeName string) error
@@ -81,16 +80,14 @@ type nodeState struct {
8180
func NewPluginState(
8281
config Config,
8382
vmClient vmclient.Interface,
84-
reg prometheus.Registerer,
83+
metrics *metrics.Plugin,
8584
podWatchStore *watch.Store[corev1.Pod],
8685
nodeWatchStore *watch.Store[corev1.Node],
8786
) *PluginState {
8887
crudTimeout := time.Second * time.Duration(config.K8sCRUDTimeoutSeconds)
8988

9089
indexedNodeStore := watch.NewIndexedStore(nodeWatchStore, watch.NewFlatNameIndex[corev1.Node]())
9190

92-
metrics := metrics.BuildPluginMetrics(reg, config.NodeMetricLabels)
93-
9491
return &PluginState{
9592
mu: sync.Mutex{},
9693

pkg/plugin/metrics/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ type Plugin struct {
2929
K8sOps *prometheus.CounterVec
3030
}
3131

32-
func BuildPluginMetrics(reg prometheus.Registerer, nodeMetricLabels map[string]string) Plugin {
32+
func BuildPluginMetrics(reg prometheus.Registerer, nodeMetricLabels map[string]string) *Plugin {
3333
nodeLabels := buildNodeLabels(nodeMetricLabels)
3434

35-
return Plugin{
35+
return &Plugin{
3636
nodeLabels: nodeLabels,
3737
Framework: buildSchedFrameworkMetrics(reg, nodeLabels),
3838
Nodes: buildNodeMetrics(reg, nodeLabels),

pkg/plugin/metrics/reconcile.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
package metrics
22

33
import (
4+
"time"
5+
46
"github.com/prometheus/client_golang/prometheus"
57

8+
"github.com/neondatabase/autoscaling/pkg/plugin/reconcile"
69
"github.com/neondatabase/autoscaling/pkg/util"
710
)
811

912
type Reconcile struct {
10-
WaitDurations prometheus.Histogram
11-
ProcessDurations *prometheus.HistogramVec
12-
Failing *prometheus.GaugeVec
13-
Panics *prometheus.CounterVec
13+
waitDurations prometheus.Histogram
14+
processDurations *prometheus.HistogramVec
15+
failing *prometheus.GaugeVec
16+
panics *prometheus.CounterVec
1417
}
1518

1619
func buildReconcileMetrics(reg prometheus.Registerer) Reconcile {
1720
return Reconcile{
18-
WaitDurations: util.RegisterMetric(reg, prometheus.NewHistogram(
21+
waitDurations: util.RegisterMetric(reg, prometheus.NewHistogram(
1922
prometheus.HistogramOpts{
2023
Name: "autoscaling_plugin_reconcile_queue_wait_durations",
2124
Help: "Duration that items in the reconcile queue are waiting to be picked up",
@@ -29,7 +32,7 @@ func buildReconcileMetrics(reg prometheus.Registerer) Reconcile {
2932
},
3033
},
3134
)),
32-
ProcessDurations: util.RegisterMetric(reg, prometheus.NewHistogramVec(
35+
processDurations: util.RegisterMetric(reg, prometheus.NewHistogramVec(
3336
prometheus.HistogramOpts{
3437
Name: "autoscaling_plugin_reconcile_duration_seconds",
3538
Help: "Duration that items take to be reconciled",
@@ -44,14 +47,14 @@ func buildReconcileMetrics(reg prometheus.Registerer) Reconcile {
4447
},
4548
[]string{"kind", "outcome"},
4649
)),
47-
Failing: util.RegisterMetric(reg, prometheus.NewGaugeVec(
50+
failing: util.RegisterMetric(reg, prometheus.NewGaugeVec(
4851
prometheus.GaugeOpts{
4952
Name: "autoscaling_plugin_reconcile_failing_objects",
5053
Help: "Number of objects currently failing to be reconciled",
5154
},
5255
[]string{"kind"},
5356
)),
54-
Panics: util.RegisterMetric(reg, prometheus.NewCounterVec(
57+
panics: util.RegisterMetric(reg, prometheus.NewCounterVec(
5558
prometheus.CounterOpts{
5659
Name: "autoscaling_plugin_reconcile_panics_count",
5760
Help: "Number of times reconcile operations have panicked",
@@ -60,3 +63,24 @@ func buildReconcileMetrics(reg prometheus.Registerer) Reconcile {
6063
)),
6164
}
6265
}
66+
67+
func (r Reconcile) QueueWaitDurationCallback(duration time.Duration) {
68+
r.waitDurations.Observe(duration.Seconds())
69+
}
70+
71+
func (r Reconcile) ResultCallback(params reconcile.ObjectParams, duration time.Duration, err error) {
72+
outcome := "success"
73+
if err != nil {
74+
outcome = "failure"
75+
}
76+
r.processDurations.WithLabelValues(params.GVK.Kind, outcome).Observe(duration.Seconds())
77+
}
78+
79+
func (r Reconcile) ErrorStatsCallback(params reconcile.ObjectParams, stats reconcile.ErrorStats) {
80+
// update count of current failing objects
81+
r.failing.WithLabelValues(params.GVK.Kind).Set(float64(stats.TypedCount))
82+
}
83+
84+
func (r Reconcile) PanicCallback(params reconcile.ObjectParams) {
85+
r.panics.WithLabelValues(params.GVK.Kind).Inc()
86+
}

pkg/plugin/reconcile.go

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,48 +3,29 @@ package plugin
33
import (
44
"context"
55
"fmt"
6-
"time"
76

87
"go.uber.org/zap"
98

109
"github.com/neondatabase/autoscaling/pkg/plugin/reconcile"
1110
)
1211

13-
func (s *PluginState) reconcileQueueWaitCallback(duration time.Duration) {
14-
s.metrics.Reconcile.WaitDurations.Observe(duration.Seconds())
15-
}
16-
17-
func (s *PluginState) reconcileResultCallback(params reconcile.ObjectParams, duration time.Duration, err error) {
18-
outcome := "success"
19-
if err != nil {
20-
outcome = "failure"
21-
}
22-
s.metrics.Reconcile.ProcessDurations.
23-
WithLabelValues(params.GVK.Kind, outcome).
24-
Observe(duration.Seconds())
25-
}
26-
27-
func (s *PluginState) reconcileErrorStatsCallback(logger *zap.Logger, params reconcile.ObjectParams, stats reconcile.ErrorStats) {
28-
// update count of current failing objects
29-
s.metrics.Reconcile.Failing.
30-
WithLabelValues(params.GVK.Kind).
31-
Set(float64(stats.TypedCount))
32-
12+
func logSuccessiveReconcileFailure(
13+
logger *zap.Logger,
14+
params reconcile.ObjectParams,
15+
stats reconcile.ErrorStats,
16+
threshold int,
17+
) {
3318
// Make sure that repeatedly failing objects are sufficiently noisy
34-
if stats.SuccessiveFailures >= s.config.LogSuccessiveFailuresThreshold {
19+
if stats.SuccessiveFailures >= threshold {
3520
logger.Warn(
36-
fmt.Sprintf("%s has failed to reconcile >%d times in a row", params.GVK.Kind, s.config.LogSuccessiveFailuresThreshold),
21+
fmt.Sprintf("%s has failed to reconcile >%d times in a row", params.GVK.Kind, threshold),
3722
zap.Int("SuccessiveFailures", stats.SuccessiveFailures),
3823
zap.String("EventKind", string(params.EventKind)),
3924
reconcile.ObjectMetaLogField(params.GVK.Kind, params.Obj),
4025
)
4126
}
4227
}
4328

44-
func (s *PluginState) reconcilePanicCallback(params reconcile.ObjectParams) {
45-
s.metrics.Reconcile.Panics.WithLabelValues(params.GVK.Kind).Inc()
46-
}
47-
4829
func reconcileWorker(ctx context.Context, logger *zap.Logger, queue *reconcile.Queue) {
4930
wait := queue.WaitChan()
5031
for {

0 commit comments

Comments
 (0)