From af3a22c1d7e6a50884bb44bda29fea4d0b797e03 Mon Sep 17 00:00:00 2001 From: Matthew McKeen Date: Mon, 27 Oct 2025 17:41:54 -0700 Subject: [PATCH] feat: expire stale advanced metrics after not being updated for some time Signed-off-by: Matthew McKeen --- .../v1alpha1/metricsconfiguration_types.go | 4 + .../validate_metricconfiguration.go | 15 +- .../validate_metricconfiguration_test.go | 159 ++++++++++++++++++ crd/api/v1alpha1/zz_generated.deepcopy.go | 5 + .../helm/retina/crds/retina.sh_captures.yaml | 33 ++-- .../crds/retina.sh_metricsconfigurations.yaml | 12 +- .../crds/retina.sh_retinaendpoints.yaml | 2 +- .../crds/retina.sh_tracesconfigurations.yaml | 2 +- docs/03-Metrics/configuration.md | 3 +- docs/05-Concepts/CRDs/MetricsConfiguration.md | 2 + pkg/metrics/interfaces.go | 6 + pkg/metrics/metrics.go | 7 + pkg/metrics/mock_types.go | 77 +++++++++ pkg/metrics/types.go | 3 + pkg/module/metrics/basemetricsobject.go | 96 ++++++++++- pkg/module/metrics/dns.go | 29 +++- pkg/module/metrics/drops.go | 27 ++- pkg/module/metrics/drops_test.go | 3 +- pkg/module/metrics/forward.go | 26 ++- pkg/module/metrics/forward_test.go | 3 +- pkg/module/metrics/metrics_module.go | 20 ++- pkg/module/metrics/tcpflags.go | 31 +++- pkg/module/metrics/tcpflags_test.go | 3 +- pkg/module/metrics/tcpretrans.go | 31 +++- pkg/utils/attr_utils.go | 1 + 25 files changed, 541 insertions(+), 59 deletions(-) diff --git a/crd/api/v1alpha1/metricsconfiguration_types.go b/crd/api/v1alpha1/metricsconfiguration_types.go index 18902c6fa6..0044493a4d 100644 --- a/crd/api/v1alpha1/metricsconfiguration_types.go +++ b/crd/api/v1alpha1/metricsconfiguration_types.go @@ -41,6 +41,10 @@ type MetricsContextOptions struct { // +optional // +listType=set AdditionalLabels []string `json:"additionalLabels,omitempty"` + // TTL represents the time-to-live of the metrics collected + // Metrics which have not been updated within the TTL will be removed from export + // +optional + TTL string `json:"ttl,omitempty"` } // MetricsNamespaces indicates the namespaces to include or exclude in metric collection diff --git a/crd/api/v1alpha1/validations/validate_metricconfiguration.go b/crd/api/v1alpha1/validations/validate_metricconfiguration.go index 0a7a456d40..740691e0d6 100644 --- a/crd/api/v1alpha1/validations/validate_metricconfiguration.go +++ b/crd/api/v1alpha1/validations/validate_metricconfiguration.go @@ -7,6 +7,7 @@ package validations import ( "fmt" + "time" "github.com/microsoft/retina/crd/api/v1alpha1" "github.com/microsoft/retina/pkg/utils" @@ -40,6 +41,15 @@ func MetricsSpec(metricsSpec v1alpha1.MetricsSpec) error { if !utils.IsAdvancedMetric(contextOption.MetricName) { return fmt.Errorf("%s is not a valid metric", contextOption.MetricName) } + if contextOption.TTL != "" { + ttl, err := time.ParseDuration(contextOption.TTL) + if err != nil { + return fmt.Errorf("invalid TTL format for metric %s: %v", contextOption.MetricName, err) + } + if ttl < 0 { + return fmt.Errorf("TTL cannot be negative for metric %s", contextOption.MetricName) + } + } } err := MetricsNamespaces(metricsSpec.Namespaces) @@ -152,10 +162,13 @@ func MetricsContextOptionsCompare(old, new []v1alpha1.MetricsContextOptions) boo return false } - if !utils.CompareStringSlice(oldContextOption.AdditionalLabels, newContextOption.AdditionalLabels) { + if oldContextOption.TTL != newContextOption.TTL { return false } + if !utils.CompareStringSlice(oldContextOption.AdditionalLabels, newContextOption.AdditionalLabels) { + return false + } } return true diff --git a/crd/api/v1alpha1/validations/validate_metricconfiguration_test.go b/crd/api/v1alpha1/validations/validate_metricconfiguration_test.go index d5197facb0..f1109ee195 100644 --- a/crd/api/v1alpha1/validations/validate_metricconfiguration_test.go +++ b/crd/api/v1alpha1/validations/validate_metricconfiguration_test.go @@ -98,6 +98,46 @@ func TestMetricsConfiguration(t *testing.T) { }, wantErr: false, }, + { + name: "valid metrics crd with TTL", + obj: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + TTL: "24h", + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Exclude: []string{"kube-system"}, + }, + }, + }, + wantErr: false, + }, + { + name: "invalid metrics crd with TTL", + obj: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + TTL: "24", + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Exclude: []string{"kube-system"}, + }, + }, + }, + wantErr: true, + }, { name: "invalid metrics crd with random metric name", obj: &v1alpha1.MetricsConfiguration{ @@ -348,6 +388,125 @@ func TestCompare(t *testing.T) { }, equal: true, }, + { + name: "valid test 6", + old: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + SourceLabels: []string{"ns", "ip", "port"}, + TTL: "24h", + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Include: []string{"default", "test"}, + Exclude: []string{"kube-system"}, + }, + }, + }, + new: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + SourceLabels: []string{"ip", "port", "ns"}, + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Include: []string{"default", "test"}, + Exclude: []string{"kube-system"}, + }, + }, + }, + equal: false, + }, + { + name: "valid test 7", + old: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + SourceLabels: []string{"ns", "ip", "port"}, + TTL: "24h", + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Include: []string{"default", "test"}, + Exclude: []string{"kube-system"}, + }, + }, + }, + new: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + SourceLabels: []string{"ip", "port", "ns"}, + TTL: "24h", + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Include: []string{"default", "test"}, + Exclude: []string{"kube-system"}, + }, + }, + }, + equal: true, + }, + { + name: "valid test 8", + old: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + SourceLabels: []string{"ns", "ip", "port"}, + TTL: "24h", + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Include: []string{"default", "test"}, + Exclude: []string{"kube-system"}, + }, + }, + }, + new: &v1alpha1.MetricsConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metricsconfig", + }, + Spec: v1alpha1.MetricsSpec{ + ContextOptions: []v1alpha1.MetricsContextOptions{ + { + MetricName: "drop_count", + SourceLabels: []string{"ip", "port", "ns"}, + TTL: "12h", + }, + }, + Namespaces: v1alpha1.MetricsNamespaces{ + Include: []string{"default", "test"}, + Exclude: []string{"kube-system"}, + }, + }, + }, + equal: false, + }, } for _, tt := range tests { diff --git a/crd/api/v1alpha1/zz_generated.deepcopy.go b/crd/api/v1alpha1/zz_generated.deepcopy.go index 48b9c3bdfb..08e56f9d0c 100644 --- a/crd/api/v1alpha1/zz_generated.deepcopy.go +++ b/crd/api/v1alpha1/zz_generated.deepcopy.go @@ -138,6 +138,11 @@ func (in *CaptureOption) DeepCopyInto(out *CaptureOption) { *out = new(int) **out = **in } + if in.Interfaces != nil { + in, out := &in.Interfaces, &out.Interfaces + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CaptureOption. diff --git a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_captures.yaml b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_captures.yaml index 749d2f3cfb..6429f13009 100644 --- a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_captures.yaml +++ b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_captures.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: captures.retina.sh spec: group: retina.sh @@ -53,6 +53,14 @@ spec: should continue for. pattern: ^([0-9]+(\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$ type: string + interfaces: + description: |- + Interfaces specifies the network interfaces on which to capture packets. + If specified, captures only on the listed interfaces. + If empty, captures on all interfaces by default. + items: + type: string + type: array maxCaptureSize: default: 100 description: MaxCaptureSize limits the capture file to MB @@ -290,10 +298,14 @@ spec: description: SecretName is the name of secret which stores S3 compliant storage access key and secret key. type: string + required: + - bucket + - secretName type: object type: object required: - captureConfiguration + - outputConfiguration type: object status: description: CaptureStatus describes the status of the capture. @@ -310,16 +322,8 @@ spec: type: string conditions: items: - description: "Condition contains details for one aspect of the current - state of this API Resource.\n---\nThis struct is intended for - direct use as an array at the field path .status.conditions. For - example,\n\n\n\ttype FooStatus struct{\n\t // Represents the - observations of a foo's current state.\n\t // Known .status.conditions.type - are: \"Available\", \"Progressing\", and \"Degraded\"\n\t // - +patchMergeKey=type\n\t // +patchStrategy=merge\n\t // +listType=map\n\t - \ // +listMapKey=type\n\t Conditions []metav1.Condition `json:\"conditions,omitempty\" - patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"`\n\n\n\t - \ // other fields\n\t}" + description: Condition contains details for one aspect of the current + state of this API Resource. properties: lastTransitionTime: description: |- @@ -360,12 +364,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_metricsconfigurations.yaml b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_metricsconfigurations.yaml index b0453ebf81..123fe4c108 100644 --- a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_metricsconfigurations.yaml +++ b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_metricsconfigurations.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: metricsconfigurations.retina.sh spec: group: retina.sh @@ -75,6 +75,11 @@ spec: type: string type: array x-kubernetes-list-type: set + ttl: + description: |- + TTL represents the time-to-live of the metrics collected + Metrics which have not been updated within the TTL will be removed from export + type: string required: - metricName type: object @@ -136,6 +141,11 @@ spec: type: string type: array x-kubernetes-list-type: set + ttl: + description: |- + TTL represents the time-to-live of the metrics collected + Metrics which have not been updated within the TTL will be removed from export + type: string required: - metricName type: object diff --git a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_retinaendpoints.yaml b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_retinaendpoints.yaml index a9b94e5999..f09c04ad0f 100644 --- a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_retinaendpoints.yaml +++ b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_retinaendpoints.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: retinaendpoints.retina.sh spec: group: retina.sh diff --git a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_tracesconfigurations.yaml b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_tracesconfigurations.yaml index ee00af15ae..d1850bfe44 100644 --- a/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_tracesconfigurations.yaml +++ b/deploy/standard/manifests/controller/helm/retina/crds/retina.sh_tracesconfigurations.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: tracesconfigurations.retina.sh spec: group: retina.sh diff --git a/docs/03-Metrics/configuration.md b/docs/03-Metrics/configuration.md index 87cd3e80ba..3c7ee9421e 100644 --- a/docs/03-Metrics/configuration.md +++ b/docs/03-Metrics/configuration.md @@ -5,6 +5,7 @@ You can enable/disable metrics by including/omitting their Plugin from `enabledP Via [MetricsConfiguration CRD](../05-Concepts/CRDs/MetricsConfiguration.md), you can further customize the following for your enabled plugins: - Which metrics to include -- Which metadata to include for a metric. +- Which metadata to include for a metric +- Time-to-live for a metric **Note**: If you enable [Annotations](./annotations.md), you cannot use the `MetricsConfiguration` CRD to specify which Pods to observe. diff --git a/docs/05-Concepts/CRDs/MetricsConfiguration.md b/docs/05-Concepts/CRDs/MetricsConfiguration.md index 2b59205c59..91a341fafa 100644 --- a/docs/05-Concepts/CRDs/MetricsConfiguration.md +++ b/docs/05-Concepts/CRDs/MetricsConfiguration.md @@ -24,6 +24,7 @@ The `MetricsConfiguration` CRD is defined with the following specifications: - `destinationLabels`: Represents the destination context labels, such as IP, Pod, port, workload (deployment/replicaset/statefulset/daemonset). - `metricName`: Indicates the name of the metric. - `sourceLabels`: Represents the source context labels, such as IP, Pod, port. + - `ttl`: Represents the time-to-live for the metric. If there are no metric updates for a particular set of context labels for this duration the metric will be removed from export. The value of `ttl` must be a valid Golang `time.Duration` string and non-negative. A zero `ttl` (the default) means that metrics are never removed from export. - **spec.namespaces:** Specifies the namespaces to include or exclude in metric collection. It includes the following properties: - `exclude`: Specifies namespaces to be excluded from metric collection. @@ -51,6 +52,7 @@ spec: - port additionalLabels: - direction + ttl: 24h - metricName: forward_count sourceLabels: - ip diff --git a/pkg/metrics/interfaces.go b/pkg/metrics/interfaces.go index ddb9a07a5b..cb556e82ac 100644 --- a/pkg/metrics/interfaces.go +++ b/pkg/metrics/interfaces.go @@ -9,12 +9,18 @@ import ( //go:generate go run go.uber.org/mock/mockgen@v0.4.0 -source=interfaces.go -destination=mock_types.go -package=metrics +type MetricVec interface { + DeleteLabelValues(lvs ...string) bool +} + type CounterVec interface { + MetricVec WithLabelValues(lvs ...string) prometheus.Counter GetMetricWithLabelValues(lvs ...string) (prometheus.Counter, error) } type GaugeVec interface { + MetricVec WithLabelValues(lvs ...string) prometheus.Gauge GetMetricWithLabelValues(lvs ...string) (prometheus.Gauge, error) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 9169a08d8d..c40c2c39da 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -194,6 +194,13 @@ func InitializeMetrics() { parsedPacketsCounterDescription, ) + MetricsExpiredCounter = exporter.CreatePrometheusCounterVecForControlPlaneMetric( + exporter.DefaultRegistry, + expiredMetricsCounterName, + expiredMetricsCounterDescription, + utils.Metric, + ) + isInitialized = true metricsLogger.Info("Metrics initialized") } diff --git a/pkg/metrics/mock_types.go b/pkg/metrics/mock_types.go index 5d80bb22da..4dbc97c1ab 100644 --- a/pkg/metrics/mock_types.go +++ b/pkg/metrics/mock_types.go @@ -17,6 +17,47 @@ import ( gomock "go.uber.org/mock/gomock" ) +// MockMetricVec is a mock of MetricVec interface. +type MockMetricVec struct { + ctrl *gomock.Controller + recorder *MockMetricVecMockRecorder +} + +// MockMetricVecMockRecorder is the mock recorder for MockMetricVec. +type MockMetricVecMockRecorder struct { + mock *MockMetricVec +} + +// NewMockMetricVec creates a new mock instance. +func NewMockMetricVec(ctrl *gomock.Controller) *MockMetricVec { + mock := &MockMetricVec{ctrl: ctrl} + mock.recorder = &MockMetricVecMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMetricVec) EXPECT() *MockMetricVecMockRecorder { + return m.recorder +} + +// DeleteLabelValues mocks base method. +func (m *MockMetricVec) DeleteLabelValues(lvs ...string) bool { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range lvs { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteLabelValues", varargs...) + ret0, _ := ret[0].(bool) + return ret0 +} + +// DeleteLabelValues indicates an expected call of DeleteLabelValues. +func (mr *MockMetricVecMockRecorder) DeleteLabelValues(lvs ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLabelValues", reflect.TypeOf((*MockMetricVec)(nil).DeleteLabelValues), lvs...) +} + // MockCounterVec is a mock of CounterVec interface. type MockCounterVec struct { ctrl *gomock.Controller @@ -40,6 +81,24 @@ func (m *MockCounterVec) EXPECT() *MockCounterVecMockRecorder { return m.recorder } +// DeleteLabelValues mocks base method. +func (m *MockCounterVec) DeleteLabelValues(lvs ...string) bool { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range lvs { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteLabelValues", varargs...) + ret0, _ := ret[0].(bool) + return ret0 +} + +// DeleteLabelValues indicates an expected call of DeleteLabelValues. +func (mr *MockCounterVecMockRecorder) DeleteLabelValues(lvs ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLabelValues", reflect.TypeOf((*MockCounterVec)(nil).DeleteLabelValues), lvs...) +} + // GetMetricWithLabelValues mocks base method. func (m *MockCounterVec) GetMetricWithLabelValues(lvs ...string) (prometheus.Counter, error) { m.ctrl.T.Helper() @@ -100,6 +159,24 @@ func (m *MockGaugeVec) EXPECT() *MockGaugeVecMockRecorder { return m.recorder } +// DeleteLabelValues mocks base method. +func (m *MockGaugeVec) DeleteLabelValues(lvs ...string) bool { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range lvs { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteLabelValues", varargs...) + ret0, _ := ret[0].(bool) + return ret0 +} + +// DeleteLabelValues indicates an expected call of DeleteLabelValues. +func (mr *MockGaugeVecMockRecorder) DeleteLabelValues(lvs ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLabelValues", reflect.TypeOf((*MockGaugeVec)(nil).DeleteLabelValues), lvs...) +} + // GetMetricWithLabelValues mocks base method. func (m *MockGaugeVec) GetMetricWithLabelValues(lvs ...string) (prometheus.Gauge, error) { m.ctrl.T.Helper() diff --git a/pkg/metrics/types.go b/pkg/metrics/types.go index 0803f62a71..cbc76f46ad 100644 --- a/pkg/metrics/types.go +++ b/pkg/metrics/types.go @@ -13,6 +13,7 @@ const ( pluginManagerFailedToReconcileCounterName = "plugin_manager_failed_to_reconcile" lostEventsCounterName = "lost_events_counter" parsedPacketsCounterName = "parsed_packets_counter" + expiredMetricsCounterName = "expired_metrics_counter" // Windows hnsStats = "windows_hns_stats" @@ -45,6 +46,7 @@ const ( pluginManagerFailedToReconcileCounterDescription = "Number of times the plugin manager failed to reconcile the plugins" lostEventsCounterDescription = "Number of events lost in control plane" parsedPacketsCounterDescription = "Number of packets parsed by the packetparser plugin" + expiredMetricsCounterDescription = "Number of metrics expired due to lack of updates and no longer exported" // Conntrack metrics ConntrackPacketTxDescription = "Number of tx packets" @@ -93,6 +95,7 @@ var ( PluginManagerFailedToReconcileCounter CounterVec LostEventsCounter CounterVec ParsedPacketsCounter CounterVec + MetricsExpiredCounter CounterVec // DNS Metrics. DNSRequestCounter CounterVec diff --git a/pkg/module/metrics/basemetricsobject.go b/pkg/module/metrics/basemetricsobject.go index 19f0b9e675..57864318b2 100644 --- a/pkg/module/metrics/basemetricsobject.go +++ b/pkg/module/metrics/basemetricsobject.go @@ -3,25 +3,119 @@ package metrics import ( + "bytes" + "crypto/sha256" + "fmt" + "sync" + "time" + api "github.com/microsoft/retina/crd/api/v1alpha1" "github.com/microsoft/retina/pkg/log" + "go.uber.org/zap" ) +type expireFn func(lbs []string) bool + +type updated struct { + t time.Time + lbs []string +} + type baseMetricObject struct { + *sync.Mutex advEnable bool contextMode enrichmentContext ctxOptions *api.MetricsContextOptions srcCtx ContextOptionsInterface dstCtx ContextOptionsInterface l *log.ZapLogger + lastUpdated map[string]updated + expireFn expireFn } -func newBaseMetricsObject(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext) baseMetricObject { +func (b *baseMetricObject) expire(ttl time.Duration) int { + if b.expireFn == nil { + return 0 + } + + b.Lock() + defer b.Unlock() + + var expired int + n := make(map[string]updated) + + for h, u := range b.lastUpdated { + if time.Since(u.t) >= ttl { + d := b.expireFn(u.lbs) + if d { + expired++ + } + } else { + n[h] = u + } + } + + b.lastUpdated = n + + return expired +} + +func (b *baseMetricObject) updated(lbs []string) { + // no expiration function is defined, so we don't need to track updates + if b.expireFn == nil { + return + } + + var bf bytes.Buffer + + for _, l := range lbs { + bf.WriteString(l) + } + + h := sha256.New() + h.Write(bf.Bytes()) + + s := string(h.Sum(nil)) + + b.Lock() + defer b.Unlock() + + b.lastUpdated[s] = updated{ + t: time.Now(), + lbs: lbs, + } +} + +func newBaseMetricsObject(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext, expire expireFn, ttl time.Duration) baseMetricObject { + expireOrInfiniteTTL := expire + if ttl <= 0 { + // infinite TTL, so make sure the expiration function is unset + expireOrInfiniteTTL = nil + } + b := baseMetricObject{ + Mutex: &sync.Mutex{}, advEnable: ctxOptions.IsAdvanced(), ctxOptions: ctxOptions, l: fl, contextMode: isLocalContext, + lastUpdated: make(map[string]updated), + expireFn: expireOrInfiniteTTL, + } + + if expireOrInfiniteTTL != nil { + go func() { + for { + b.l.Debug(fmt.Sprintf("Expiring metrics: %s", ctxOptions.MetricName)) + n := b.expire(ttl) + b.l.Debug( + fmt.Sprintf("Metric expiration finished: %s", ctxOptions.MetricName), + zap.Time("next_expiration", time.Now().Add(ttl)), + zap.Int("expired", n), + ) + time.Sleep(ttl) + } + }() } b.populateCtxOptions(ctxOptions) diff --git a/pkg/module/metrics/dns.go b/pkg/module/metrics/dns.go index 003d14cb3d..efce458bee 100644 --- a/pkg/module/metrics/dns.go +++ b/pkg/module/metrics/dns.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "strings" + "time" v1 "github.com/cilium/cilium/api/v1/flow" api "github.com/microsoft/retina/crd/api/v1alpha1" @@ -35,16 +36,16 @@ type DNSMetrics struct { metricName string } -func NewDNSMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext) *DNSMetrics { +func NewDNSMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext, ttl time.Duration) *DNSMetrics { if ctxOptions == nil || !strings.Contains(strings.ToLower(ctxOptions.MetricName), "dns") { return nil } fl = fl.Named("dns-metricsmodule") fl.Info("Creating DNS count metrics", zap.Any("options", ctxOptions)) - return &DNSMetrics{ - baseMetricObject: newBaseMetricsObject(ctxOptions, fl, isLocalContext), - } + d := &DNSMetrics{} + d.baseMetricObject = newBaseMetricsObject(ctxOptions, fl, isLocalContext, d.expire, ttl) + return d } func (d *DNSMetrics) Init(metricName string) { @@ -197,7 +198,7 @@ func (d *DNSMetrics) ProcessFlow(flow *v1.Flow) { } } - d.dnsMetrics.WithLabelValues(labels...).Inc() + d.update(labels) d.l.Debug("Update dns metric in remote ctx", zap.Any("metric", d.dnsMetrics), zap.Any("labels", labels)) } @@ -233,10 +234,26 @@ func (d *DNSMetrics) processLocalCtxFlow(flow *v1.Flow) { } else { return } - d.dnsMetrics.WithLabelValues(labels...).Inc() + d.update(labels) d.l.Debug("Update dns metric in local ctx", zap.Any("metric", d.dnsMetrics), zap.Any("labels", labels)) } +func (d *DNSMetrics) expire(labels []string) bool { + var del bool + if d.dnsMetrics != nil { + del = d.dnsMetrics.DeleteLabelValues(labels...) + if del { + metricsinit.MetricsExpiredCounter.WithLabelValues(d.metricName).Inc() + } + } + return del +} + +func (d *DNSMetrics) update(labels []string) { + d.dnsMetrics.WithLabelValues(labels...).Inc() + d.updated(labels) +} + func (d *DNSMetrics) Clean() { exporter.UnregisterMetric(exporter.AdvancedRegistry, metricsinit.ToPrometheusType(d.dnsMetrics)) } diff --git a/pkg/module/metrics/drops.go b/pkg/module/metrics/drops.go index cf5db7d7a1..e6d7814c20 100644 --- a/pkg/module/metrics/drops.go +++ b/pkg/module/metrics/drops.go @@ -5,12 +5,14 @@ package metrics import ( "strings" + "time" v1 "github.com/cilium/cilium/api/v1/flow" api "github.com/microsoft/retina/crd/api/v1alpha1" "github.com/microsoft/retina/pkg/exporter" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" + metricsinit "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/utils" "go.uber.org/zap" ) @@ -29,16 +31,16 @@ type DropCountMetrics struct { metricName string } -func NewDropCountMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext) *DropCountMetrics { +func NewDropCountMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext, ttl time.Duration) *DropCountMetrics { if ctxOptions == nil || !strings.Contains(strings.ToLower(ctxOptions.MetricName), "drop") { return nil } fl = fl.Named("dropreason-metricsmodule") fl.Info("Creating drop count metrics", zap.Any("options", ctxOptions)) - return &DropCountMetrics{ - baseMetricObject: newBaseMetricsObject(ctxOptions, fl, isLocalContext), - } + d := &DropCountMetrics{} + d.baseMetricObject = newBaseMetricsObject(ctxOptions, fl, isLocalContext, d.expire, ttl) + return d } func (d *DropCountMetrics) Init(metricName string) { @@ -161,11 +163,28 @@ func (d *DropCountMetrics) processLocalCtxFlow(flow *v1.Flow) { } } +func (d *DropCountMetrics) expire(labels []string) bool { + var del bool + if d.dropMetric != nil { + del = d.dropMetric.DeleteLabelValues(labels...) + if del { + metricsinit.MetricsExpiredCounter.WithLabelValues(d.metricName).Inc() + } + } + return del +} + func (d *DropCountMetrics) update(fl *v1.Flow, labels []string) { + var updated bool switch d.metricName { case utils.DroppedPacketsGaugeName: + updated = true d.dropMetric.WithLabelValues(labels...).Inc() case utils.DropBytesGaugeName: + updated = true d.dropMetric.WithLabelValues(labels...).Add(float64(utils.PacketSize(fl))) } + if updated { + d.updated(labels) + } } diff --git a/pkg/module/metrics/drops_test.go b/pkg/module/metrics/drops_test.go index bc2e9f707f..978d6bb803 100644 --- a/pkg/module/metrics/drops_test.go +++ b/pkg/module/metrics/drops_test.go @@ -6,6 +6,7 @@ package metrics import ( "testing" + "time" "github.com/cilium/cilium/api/v1/flow" "github.com/microsoft/retina/crd/api/v1alpha1" @@ -270,7 +271,7 @@ func TestNewDrop(t *testing.T) { for _, metricName := range []string{"drop_count", "drop_bytes"} { log.Logger().Info("Running test name", zap.String("name", tc.name), zap.String("metricName", metricName)) ctrl := gomock.NewController(t) - f := NewDropCountMetrics(tc.opts, log.Logger(), tc.localContext) + f := NewDropCountMetrics(tc.opts, log.Logger(), tc.localContext, time.Duration(0)) if tc.nilObj { assert.Nil(t, f, "drop metrics should be nil Test Name: %s", tc.name) continue diff --git a/pkg/module/metrics/forward.go b/pkg/module/metrics/forward.go index 9ed694b2f3..f4f781f574 100644 --- a/pkg/module/metrics/forward.go +++ b/pkg/module/metrics/forward.go @@ -7,6 +7,7 @@ import ( "slices" "strconv" "strings" + "time" v1 "github.com/cilium/cilium/api/v1/flow" api "github.com/microsoft/retina/crd/api/v1alpha1" @@ -34,16 +35,16 @@ type ForwardMetrics struct { metricName string } -func NewForwardCountMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext) *ForwardMetrics { +func NewForwardCountMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext, ttl time.Duration) *ForwardMetrics { if ctxOptions == nil || !strings.Contains(strings.ToLower(ctxOptions.MetricName), "forward") { return nil } l := fl.Named("forward-metricsmodule") l.Info("Creating forward count metrics", zap.Any("options", ctxOptions)) - return &ForwardMetrics{ - baseMetricObject: newBaseMetricsObject(ctxOptions, fl, isLocalContext), - } + fm := ForwardMetrics{} + fm.baseMetricObject = newBaseMetricsObject(ctxOptions, fl, isLocalContext, fm.expire, ttl) + return &fm } func (f *ForwardMetrics) Init(metricName string) { @@ -167,11 +168,28 @@ func (f *ForwardMetrics) processLocalCtxFlow(flow *v1.Flow) { } } +func (f *ForwardMetrics) expire(labels []string) bool { + var d bool + if f.forwardMetric != nil { + d = f.forwardMetric.DeleteLabelValues(labels...) + if d { + metricsinit.MetricsExpiredCounter.WithLabelValues(f.metricName).Inc() + } + } + return d +} + func (f *ForwardMetrics) update(fl *v1.Flow, labels []string) { + var updated bool switch f.metricName { case utils.ForwardPacketsGaugeName: + updated = true f.forwardMetric.WithLabelValues(labels...).Add(float64(utils.PreviouslyObservedPackets(fl) + 1)) case utils.ForwardBytesGaugeName: + updated = true f.forwardMetric.WithLabelValues(labels...).Add(float64(utils.PacketSize(fl) + utils.PreviouslyObservedBytes(fl))) } + if updated { + f.updated(labels) + } } diff --git a/pkg/module/metrics/forward_test.go b/pkg/module/metrics/forward_test.go index 1af7558483..8965654459 100644 --- a/pkg/module/metrics/forward_test.go +++ b/pkg/module/metrics/forward_test.go @@ -6,6 +6,7 @@ package metrics import ( "testing" + "time" "github.com/cilium/cilium/api/v1/flow" "github.com/microsoft/retina/crd/api/v1alpha1" @@ -309,7 +310,7 @@ func TestNewForward(t *testing.T) { l.Info("Running test", zap.String("name", tc.name), zap.String("metricName", metricName)) ctrl := gomock.NewController(t) - f := NewForwardCountMetrics(tc.opts, log.Logger(), tc.localContext) + f := NewForwardCountMetrics(tc.opts, log.Logger(), tc.localContext, time.Duration(0)) if tc.nilObj { assert.Nil(t, f, "forward metrics should be nil Test Name: %s", tc.name) continue diff --git a/pkg/module/metrics/metrics_module.go b/pkg/module/metrics/metrics_module.go index 0d38c06522..617ce74475 100644 --- a/pkg/module/metrics/metrics_module.go +++ b/pkg/module/metrics/metrics_module.go @@ -221,23 +221,33 @@ func (m *Module) updateMetricsContexts(spec *api.MetricsSpec) { } for _, ctxOption := range spec.ContextOptions { + var ttl time.Duration + var err error + if ctxOption.TTL != "" { + ttl, err = time.ParseDuration(ctxOption.TTL) + // this shouldn't happen since we've already validated the CRD, but put some safety here just in case + if err != nil { + m.l.Error("Invalid TTL format", zap.String("metricName", ctxOption.MetricName), zap.Error(err)) + continue + } + } switch { case strings.Contains(ctxOption.MetricName, forward): - fm := NewForwardCountMetrics(&ctxOption, m.l, ctxType) + fm := NewForwardCountMetrics(&ctxOption, m.l, ctxType, ttl) if fm != nil { m.registry[ctxOption.MetricName] = fm } case strings.Contains(ctxOption.MetricName, drop): - dm := NewDropCountMetrics(&ctxOption, m.l, ctxType) + dm := NewDropCountMetrics(&ctxOption, m.l, ctxType, ttl) if dm != nil { m.registry[ctxOption.MetricName] = dm } case strings.Contains(ctxOption.MetricName, tcp): - tm := NewTCPMetrics(&ctxOption, m.l, ctxType) + tm := NewTCPMetrics(&ctxOption, m.l, ctxType, ttl) if tm != nil { m.registry[ctxOption.MetricName] = tm } - tr := NewTCPRetransMetrics(&ctxOption, m.l, ctxType) + tr := NewTCPRetransMetrics(&ctxOption, m.l, ctxType, ttl) if tr != nil { m.registry[ctxOption.MetricName] = tr } @@ -249,7 +259,7 @@ func (m *Module) updateMetricsContexts(spec *api.MetricsSpec) { m.registry[nodeApiserver] = lm } case strings.Contains(ctxOption.MetricName, dns) || strings.Contains(ctxOption.MetricName, pktmon): - dm := NewDNSMetrics(&ctxOption, m.l, ctxType) + dm := NewDNSMetrics(&ctxOption, m.l, ctxType, ttl) if dm != nil { m.registry[ctxOption.MetricName] = dm } diff --git a/pkg/module/metrics/tcpflags.go b/pkg/module/metrics/tcpflags.go index c6a24a8635..ff2136869b 100644 --- a/pkg/module/metrics/tcpflags.go +++ b/pkg/module/metrics/tcpflags.go @@ -5,6 +5,7 @@ package metrics import ( "strings" + "time" v1 "github.com/cilium/cilium/api/v1/flow" api "github.com/microsoft/retina/crd/api/v1alpha1" @@ -28,16 +29,16 @@ type TCPMetrics struct { tcpFlagsMetrics metricsinit.GaugeVec } -func NewTCPMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext) *TCPMetrics { +func NewTCPMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext, ttl time.Duration) *TCPMetrics { if ctxOptions == nil || !strings.Contains(strings.ToLower(ctxOptions.MetricName), "flag") { return nil } fl = fl.Named("tcpflags-metricsmodule") fl.Info("Creating TCP Flags count metrics", zap.Any("options", ctxOptions)) - return &TCPMetrics{ - baseMetricObject: newBaseMetricsObject(ctxOptions, fl, isLocalContext), - } + t := &TCPMetrics{} + t.baseMetricObject = newBaseMetricsObject(ctxOptions, fl, isLocalContext, t.expire, ttl) + return t } func (t *TCPMetrics) Init(metricName string) { @@ -124,7 +125,7 @@ func (t *TCPMetrics) ProcessFlow(flow *v1.Flow) { for flag, count := range combineFlagsWithPrevious(flags, flow) { labels := append([]string{flag}, srcLabels...) labels = append(labels, dstLabels...) - t.tcpFlagsMetrics.WithLabelValues(labels...).Add(float64(count)) + t.update(labels, count) t.l.Debug("TCP flag metric", zap.String("flag", flag), zap.Strings("labels", labels), zap.Uint32("count", count)) } } @@ -141,7 +142,7 @@ func (t *TCPMetrics) processLocalCtxFlow(flow *v1.Flow, flags []string) { if l := len(labelValuesMap[ingress]); l > 0 { for flag, count := range combinedFlags { labels := append([]string{flag}, labelValuesMap[ingress]...) - t.tcpFlagsMetrics.WithLabelValues(labels...).Add(float64(count)) + t.update(labels, count) t.l.Debug("TCP flag metric", zap.String("flag", flag), zap.Strings("labels", labels), zap.Uint32("count", count)) } } @@ -149,12 +150,28 @@ func (t *TCPMetrics) processLocalCtxFlow(flow *v1.Flow, flags []string) { if l := len(labelValuesMap[egress]); l > 0 { for flag, count := range combinedFlags { labels := append([]string{flag}, labelValuesMap[egress]...) - t.tcpFlagsMetrics.WithLabelValues(labels...).Add(float64(count)) + t.update(labels, count) t.l.Debug("TCP flag metric", zap.String("flag", flag), zap.Strings("labels", labels), zap.Uint32("count", count)) } } } +func (t *TCPMetrics) expire(labels []string) bool { + var d bool + if t.tcpFlagsMetrics != nil { + d = t.tcpFlagsMetrics.DeleteLabelValues(labels...) + if d { + metricsinit.MetricsExpiredCounter.WithLabelValues(TCPFlagsCountName).Inc() + } + } + return d +} + +func (t *TCPMetrics) update(labels []string, count uint32) { + t.tcpFlagsMetrics.WithLabelValues(labels...).Add(float64(count)) + t.updated(labels) +} + func (t *TCPMetrics) getFlagValues(flags *v1.TCPFlags) []string { f := make([]string, 0) if flags == nil { diff --git a/pkg/module/metrics/tcpflags_test.go b/pkg/module/metrics/tcpflags_test.go index 8b845df6bb..8c7175b47f 100644 --- a/pkg/module/metrics/tcpflags_test.go +++ b/pkg/module/metrics/tcpflags_test.go @@ -6,6 +6,7 @@ package metrics import ( "testing" + "time" "github.com/cilium/cilium/api/v1/flow" "github.com/microsoft/retina/crd/api/v1alpha1" @@ -471,7 +472,7 @@ func TestNewTCPMetrics(t *testing.T) { log.Logger().Info("Running test name", zap.String("name", tc.name)) ctrl := gomock.NewController(t) - tcp := NewTCPMetrics(tc.opts, log.Logger(), tc.localContext) + tcp := NewTCPMetrics(tc.opts, log.Logger(), tc.localContext, time.Duration(0)) if tc.nilObj { assert.Nil(t, tcp, "forward metrics should be nil Test Name: %s", tc.name) continue diff --git a/pkg/module/metrics/tcpretrans.go b/pkg/module/metrics/tcpretrans.go index 3e307eabf4..2dfbc56888 100644 --- a/pkg/module/metrics/tcpretrans.go +++ b/pkg/module/metrics/tcpretrans.go @@ -5,6 +5,7 @@ package metrics import ( "strings" + "time" v1 "github.com/cilium/cilium/api/v1/flow" api "github.com/microsoft/retina/crd/api/v1alpha1" @@ -28,16 +29,16 @@ type TCPRetransMetrics struct { tcpRetransMetrics metricsinit.GaugeVec } -func NewTCPRetransMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext) *TCPRetransMetrics { +func NewTCPRetransMetrics(ctxOptions *api.MetricsContextOptions, fl *log.ZapLogger, isLocalContext enrichmentContext, ttl time.Duration) *TCPRetransMetrics { if ctxOptions == nil || !strings.Contains(strings.ToLower(ctxOptions.MetricName), "retrans") { return nil } fl = fl.Named("tcpretrans-metricsmodule") fl.Info("Creating TCP retransmit count metrics", zap.Any("options", ctxOptions)) - return &TCPRetransMetrics{ - baseMetricObject: newBaseMetricsObject(ctxOptions, fl, isLocalContext), - } + t := &TCPRetransMetrics{} + t.baseMetricObject = newBaseMetricsObject(ctxOptions, fl, isLocalContext, t.expire, ttl) + return t } func (t *TCPRetransMetrics) Init(metricName string) { @@ -96,7 +97,7 @@ func (t *TCPRetransMetrics) ProcessFlow(flow *v1.Flow) { } } - t.tcpRetransMetrics.WithLabelValues(labels...).Inc() + t.update(labels) } func (t *TCPRetransMetrics) processLocalCtxFlow(flow *v1.Flow) { @@ -107,17 +108,33 @@ func (t *TCPRetransMetrics) processLocalCtxFlow(flow *v1.Flow) { if len(labelValuesMap[ingress]) > 0 { labels := append([]string{ingress}, labelValuesMap[ingress]...) - t.tcpRetransMetrics.WithLabelValues(labels...).Inc() + t.update(labels) t.l.Debug("tcp retransmission count metric in INGRESS in local ctx", zap.Any("labels", labels)) } if len(labelValuesMap[egress]) > 0 { labels := append([]string{egress}, labelValuesMap[egress]...) - t.tcpRetransMetrics.WithLabelValues(labels...).Inc() + t.update(labels) t.l.Debug("tcp retransmission count metric in EGRESS in local ctx", zap.Any("labels", labels)) } } +func (t *TCPRetransMetrics) expire(labels []string) bool { + var d bool + if t.tcpRetransMetrics != nil { + d = t.tcpRetransMetrics.DeleteLabelValues(labels...) + if d { + metricsinit.MetricsExpiredCounter.WithLabelValues(TCPRetransCountName).Inc() + } + } + return d +} + +func (t *TCPRetransMetrics) update(labels []string) { + t.tcpRetransMetrics.WithLabelValues(labels...).Inc() + t.updated(labels) +} + func (t *TCPRetransMetrics) Clean() { exporter.UnregisterMetric(exporter.AdvancedRegistry, metricsinit.ToPrometheusType(t.tcpRetransMetrics)) } diff --git a/pkg/utils/attr_utils.go b/pkg/utils/attr_utils.go index eac816e88f..5223b34772 100644 --- a/pkg/utils/attr_utils.go +++ b/pkg/utils/attr_utils.go @@ -47,6 +47,7 @@ var ( AclRule = "aclrule" Active = "ACTIVE" Device = "device" + Metric = "metric" // TCP Connection Statistic Names ResetCount = "ResetCount"