Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ linters-settings:
- '^github\.com/docker/docker/api/types/container\.Config$'
- '^github\.com/docker/docker/api/types\.\w+Options$'
- '^github\.com/opencontainers/runtime-spec/specs-go\.\w+$' # Exempt the entire package. Too many big structs.
- '^github\.com/prometheus/client_golang/prometheus(/.*)?\.\w+Opts$'
- '^github\.com/prometheus/client_golang/prometheus(/.*)?\.\w*Opts$'
- '^github\.com/tychoish/fun/pubsub\.BrokerOptions$'
- '^github\.com/vishvananda/netlink\.\w+$' # Exempt the entire package. Too many big structs.
# vmapi.{VirtualMachine,VirtualMachineSpec,VirtualMachineMigration,VirtualMachineMigrationSpec}
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugin/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewAutoscaleEnforcerPlugin(
promReg := prometheus.NewRegistry()
metrics.RegisterDefaultCollectors(promReg)

pluginMetrics := metrics.BuildPluginMetrics(promReg, config.NodeMetricLabels)
pluginMetrics := metrics.BuildPluginMetrics(promReg, config.NodeMetricLabels, config.ReconcileWorkers)

// pre-define this so that we can reference it in the handlers, knowing that it won't be used
// until we start the workers (which we do *after* we've set this value).
Expand All @@ -81,6 +81,7 @@ func NewAutoscaleEnforcerPlugin(
reconcile.WithBaseContext(ctx),
reconcile.WithMiddleware(initEvents),
reconcile.WithQueueWaitDurationCallback(pluginMetrics.Reconcile.QueueWaitDurationCallback),
reconcile.WithQueueSizeCallback(pluginMetrics.Reconcile.QueueSizeCallback),
reconcile.WithResultCallback(pluginMetrics.Reconcile.ResultCallback),
reconcile.WithErrorStatsCallback(func(params reconcile.ObjectParams, stats reconcile.ErrorStats) {
pluginMetrics.Reconcile.ErrorStatsCallback(params, stats)
Expand Down
6 changes: 4 additions & 2 deletions pkg/plugin/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func TestNormalizeScoreRandomization(t *testing.T) {
registry := prometheus.NewRegistry()

// Create a metrics plugin using the exported BuildPluginMetrics function
metricsPlugin := metrics.BuildPluginMetrics(registry, nil)
reconcileWorkers := 1
metricsPlugin := metrics.BuildPluginMetrics(registry, nil, reconcileWorkers)

// Create a minimal enforcer with just enough dependencies to not crash
//nolint:exhaustruct // Only initializing fields needed for the test
Expand Down Expand Up @@ -110,7 +111,8 @@ func TestMultipleNormalizeScoreRuns(t *testing.T) {
registry := prometheus.NewRegistry()

// Create a metrics plugin using the exported BuildPluginMetrics function
metricsPlugin := metrics.BuildPluginMetrics(registry, nil)
reconcileWorkers := 1
metricsPlugin := metrics.BuildPluginMetrics(registry, nil, reconcileWorkers)

// Create a minimal enforcer with just enough dependencies to not crash
//nolint:exhaustruct // Only initializing fields needed for the test
Expand Down
8 changes: 6 additions & 2 deletions pkg/plugin/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ type Plugin struct {
K8sOps *prometheus.CounterVec
}

func BuildPluginMetrics(reg prometheus.Registerer, nodeMetricLabels map[string]string) *Plugin {
func BuildPluginMetrics(
reg prometheus.Registerer,
nodeMetricLabels map[string]string,
reconcileWorkers int,
) *Plugin {
nodeLabels := buildNodeLabels(nodeMetricLabels)

return &Plugin{
nodeLabels: nodeLabels,
Framework: buildSchedFrameworkMetrics(reg, nodeLabels),
Nodes: buildNodeMetrics(reg, nodeLabels),
Reconcile: buildReconcileMetrics(reg),
Reconcile: buildReconcileMetrics(reg, reconcileWorkers),

ResourceRequests: util.RegisterMetric(reg, prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
24 changes: 23 additions & 1 deletion pkg/plugin/metrics/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ import (

type Reconcile struct {
waitDurations prometheus.Histogram
waitingTimer *WaitingTimer
processDurations *prometheus.HistogramVec
failing *prometheus.GaugeVec
panics *prometheus.CounterVec
}

func buildReconcileMetrics(reg prometheus.Registerer) Reconcile {
func buildReconcileMetrics(reg prometheus.Registerer, numWorkers int) Reconcile {
// No need to return this - it won't change:
workers := util.RegisterMetric(reg, prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "autoscaling_plugin_reconcile_workers",
Help: "Number of worker threads used for reconcile operations",
},
))
workers.Set(float64(numWorkers))

return Reconcile{
waitDurations: util.RegisterMetric(reg, prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand All @@ -32,6 +42,12 @@ func buildReconcileMetrics(reg prometheus.Registerer) Reconcile {
},
},
)),
waitingTimer: util.RegisterMetric(reg, NewWaitingTimer(
prometheus.Opts{
Name: "autoscaling_plugin_reconcile_waiting_seconds_total",
Help: "Total duration where there were some reconcile operations waiting to be picked up",
},
)),
processDurations: util.RegisterMetric(reg, prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "autoscaling_plugin_reconcile_duration_seconds",
Expand Down Expand Up @@ -68,6 +84,12 @@ func (r Reconcile) QueueWaitDurationCallback(duration time.Duration) {
r.waitDurations.Observe(duration.Seconds())
}

func (r Reconcile) QueueSizeCallback(size int) {
// update the timer so we record the amount of time during which at least some items were
// waiting in the queue.
r.waitingTimer.SetWaiting(size != 0)
}

func (r Reconcile) ResultCallback(params reconcile.ObjectParams, duration time.Duration, err error) {
outcome := "success"
if err != nil {
Expand Down
83 changes: 83 additions & 0 deletions pkg/plugin/metrics/waitingtimer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package metrics

// Helper for reconcile metrics

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// WaitingTimer records the amount of time spent waiting, given occasional state transitions between
// "waiting" / "not waiting".
//
// This is useful to measure the saturation of our reconcile workers because it helps isolate the
// impact in aggregate metrics from groups of operations that are all waiting at the same time.
//
// (In essense, it's like the difference between load average and Linux PSI metrics.)
type WaitingTimer struct {
mu sync.Mutex

waiting bool
lastTransition time.Time
totalTime time.Duration

gauge prometheus.GaugeFunc
}

func NewWaitingTimer(opts prometheus.Opts) *WaitingTimer {
t := &WaitingTimer{
mu: sync.Mutex{},
waiting: false,
lastTransition: time.Now(),
totalTime: 0,
gauge: nil, // set below
}

t.gauge = prometheus.NewGaugeFunc(prometheus.GaugeOpts(opts), func() float64 {
return t.getTotal().Seconds()
})

return t
}

func (t *WaitingTimer) SetWaiting(waiting bool) {
t.mu.Lock()
defer t.mu.Unlock()

if t.waiting == waiting {
return
}

now := time.Now()
// If it was previously marked as waiting (but now no longer), then add the time since we
// started waiting to the running total.
if t.waiting {
t.totalTime += now.Sub(t.lastTransition)
}
// Otherwise, we'll mark ourselves as waiting (or not), and set the last change to now so that
// the next switch records the time since now.
t.waiting = waiting
t.lastTransition = now
}

func (t *WaitingTimer) getTotal() time.Duration {
t.mu.Lock()
defer t.mu.Unlock()

total := t.totalTime
if t.waiting {
total += time.Since(t.lastTransition)
}
return total
}

// Describe implements prometheus.Collector
func (t *WaitingTimer) Describe(ch chan<- *prometheus.Desc) {
t.gauge.Describe(ch)
}

func (t *WaitingTimer) Collect(ch chan<- prometheus.Metric) {
t.gauge.Collect(ch)
}
19 changes: 19 additions & 0 deletions pkg/plugin/reconcile/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type queueSettings struct {
baseContext context.Context
middleware []Middleware
waitCallback QueueWaitDurationCallback
sizeCallback QueueSizeCallback
resultCallback ResultCallback
errorCallback ErrorStatsCallback
panicCallback PanicCallback
Expand All @@ -26,6 +27,7 @@ func defaultQueueSettings() *queueSettings {
baseContext: context.Background(),
middleware: []Middleware{},
waitCallback: nil,
sizeCallback: nil,
resultCallback: nil,
errorCallback: nil,
panicCallback: nil,
Expand Down Expand Up @@ -68,6 +70,23 @@ func WithQueueWaitDurationCallback(cb QueueWaitDurationCallback) QueueOption {
}
}

// QueueSizeCallback represents the signature of the callback that may be provided to add
// observability to the ongoing length of the queue.
//
// Whenever the number of queued items changes, this function will be called with the new length of
// the queue (potentially zero, if there are no items).
type QueueSizeCallback = func(size int)

// WithQueueSizeCallback sets the QueueSizeCallback that will be called with the current size of the
// queue whenever the number of waiting items changes.
func WithQueueSizeCallback(cb QueueSizeCallback) QueueOption {
return QueueOption{
apply: func(s *queueSettings) {
s.sizeCallback = cb
},
}
}

// WithResultCallback sets the ResultCallback to provide to the LogMiddleware.
//
// It will be called after every reconcile operation completes with the relevant information about
Expand Down
12 changes: 12 additions & 0 deletions pkg/plugin/reconcile/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Queue struct {

// if not nil, a callback that records how long each item was waiting to be reconciled
queueWaitCallback QueueWaitDurationCallback

// if not nil, a callback that records whether there are currently any items in the queue
queueSizeCallback QueueSizeCallback
}

type kv struct {
Expand Down Expand Up @@ -151,6 +154,7 @@ func NewQueue(handlers map[Object]HandlerFunc, opts ...QueueOption) (*Queue, err
handlers: enrichedHandlers,

queueWaitCallback: settings.waitCallback,
queueSizeCallback: settings.sizeCallback,
}

go q.handleNotifications(ctx, next, enqueuedRcvr)
Expand Down Expand Up @@ -304,6 +308,10 @@ func (q *Queue) Enqueue(eventKind EventKind, obj Object) {
} else {
q.enqueueInactive(k, v)
}

if q.queueSizeCallback != nil {
q.queueSizeCallback(q.queue.Len())
}
}

// Next returns a callback to execute the next waiting reconcile operation in the queue, or false if
Expand All @@ -326,6 +334,10 @@ func (q *Queue) Next() (_ ReconcileCallback, ok bool) {
q.reconcile(logger, kv.k, kv.v)
}

if q.queueSizeCallback != nil {
q.queueSizeCallback(q.queue.Len())
}

return callback, true
}

Expand Down
Loading