Skip to content

Commit b716917

Browse files
Use pointer when passing TransformContext around or calling into (#44753)
Continuation of #44541 for Log/Metric/DataPoint/SpanEvent context. --------- Signed-off-by: Bogdan Drutu <[email protected]> Co-authored-by: Evan Bradley <[email protected]>
1 parent 64babf8 commit b716917

File tree

92 files changed

+1809
-1332
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+1809
-1332
lines changed

.chloggen/use-pointer-for-context.yaml

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,31 @@ subtext: |
1919
Change Expr/Parser/Getter/Setter and all ottl related funcs to accept pointers to avoid unnecessary copy of a large
2020
TransformContext(96B). Avoid allocating a new pcommon.Map every time a new context is created by using a Borrow/Return
2121
pattern and reuse objects between calls. Deprecated funcs are:
22-
- `ottlspan.NewTransformContext` in favor of `ottlspan.BorrowContext`;
23-
- `filtermprocessor.DefaultSpanFunctions` in favor of `filtermprocessor.DefaultSpanFunctionsNew`
24-
- `filtermprocessor.WithSpanFunctions` in favor of `filtermprocessor.WithSpanFunctionsNew`
22+
- `ottldatapoint.NewTransformContext` in favor of `ottldatapoint.NewTransformContextPtr`;
23+
- `ottllog.NewTransformContext` in favor of `ottllog.NewTransformContextPtr`;
24+
- `ottlmetric.NewTransformContext` in favor of `ottlmetric.NewTransformContextPtr`;
25+
- `ottlspan.NewTransformContext` in favor of `ottlspan.NewTransformContextPtr`;
26+
- `ottlspanevent.NewTransformContext` in favor of `ottlspanevent.NewTransformContextPtr`;
27+
- `filterprocessor.DefaultDataPointFunctions` in favor of `filtermprocessor.DefaultDataPointFunctionsNew`
28+
- `filterprocessor.WithDataPointFunctions` in favor of `filterprocessor.WithDataPointFunctionsNew`
29+
- `filterprocessor.DefaultLogFunctions` in favor of `filterprocessor.DefaultLogFunctionsNew`
30+
- `filterprocessor.WithLogFunctions` in favor of `filterprocessor.WithLogFunctionsNew`
31+
- `filterprocessor.DefaultMetricFunctions` in favor of `filterprocessor.DefaultMetricFunctionsNew`
32+
- `filterprocessor.WithMetricFunctions` in favor of `filterprocessor.WithMetricFunctionsNew`
33+
- `filterprocessor.DefaultSpanFunctions` in favor of `filterprocessor.DefaultSpanFunctionsNew`
34+
- `filterprocessor.WithSpanFunctions` in favor of `filterprocessor.WithSpanFunctionsNew`
35+
- `filtermprocessor.DefaultSpanEventFunctions` in favor of `filtermprocessor.DefaultSpanEventFunctionsNew`
36+
- `filtermprocessor.WithSpanEventFunctions` in favor of `filtermprocessor.WithSpanEventFunctionsNew`
37+
- `transformprocessor.DefaultDataPointFunctions` in favor of `transformprocessor.DefaultDataPointFunctionsNew`
38+
- `transformprocessor.WithDataPointFunctions` in favor of `transformprocessor.WithDataPointFunctionsNew`
39+
- `transformprocessor.DefaultLogFunctions` in favor of `transformprocessor.DefaultLogFunctionsNew`
40+
- `transformprocessor.WithLogFunctions` in favor of `transformprocessor.WithLogFunctionsNew`
41+
- `transformprocessor.DefaultMetricFunctions` in favor of `transformprocessor.DefaultMetricFunctionsNew`
42+
- `transformprocessor.WithMetricFunctions` in favor of `transformprocessor.WithMetricFunctionsNew`
2543
- `transformprocessor.DefaultSpanFunctions` in favor of `transformprocessor.DefaultSpanFunctionsNew`
2644
- `transformprocessor.WithSpanFunctions` in favor of `transformprocessor.WithSpanFunctionsNew`
45+
- `transformprocessor.DefaultSpanEventFunctions` in favor of `transformprocessor.DefaultSpanEventFunctionsNew`
46+
- `transformprocessor.WithSpanEventFunctions` in favor of `transformprocessor.WithSpanEventFunctionsNew`
2747
2848
# If your change doesn't affect end users or the exported elements of any package,
2949
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

connector/countconnector/connector.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ type count struct {
3333
component.ShutdownFunc
3434

3535
spansMetricDefs map[string]metricDef[*ottlspan.TransformContext]
36-
spanEventsMetricDefs map[string]metricDef[ottlspanevent.TransformContext]
37-
metricsMetricDefs map[string]metricDef[ottlmetric.TransformContext]
38-
dataPointsMetricDefs map[string]metricDef[ottldatapoint.TransformContext]
39-
logsMetricDefs map[string]metricDef[ottllog.TransformContext]
36+
spanEventsMetricDefs map[string]metricDef[*ottlspanevent.TransformContext]
37+
metricsMetricDefs map[string]metricDef[*ottlmetric.TransformContext]
38+
dataPointsMetricDefs map[string]metricDef[*ottldatapoint.TransformContext]
39+
logsMetricDefs map[string]metricDef[*ottllog.TransformContext]
4040
profilesMetricDefs map[string]metricDef[ottlprofile.TransformContext]
4141
}
4242

@@ -52,7 +52,7 @@ func (c *count) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
5252
resourceSpan := td.ResourceSpans().At(i)
5353
resourceAttrs := resourceSpan.Resource().Attributes()
5454
spansCounter := newCounter[*ottlspan.TransformContext](c.spansMetricDefs)
55-
spanEventsCounter := newCounter[ottlspanevent.TransformContext](c.spanEventsMetricDefs)
55+
spanEventsCounter := newCounter[*ottlspanevent.TransformContext](c.spanEventsMetricDefs)
5656

5757
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
5858
scopeSpan := resourceSpan.ScopeSpans().At(j)
@@ -69,8 +69,9 @@ func (c *count) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
6969
for l := 0; l < span.Events().Len(); l++ {
7070
event := span.Events().At(l)
7171
spanEventsCounter.updateTimestamp(event.Timestamp())
72-
eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
72+
eCtx := ottlspanevent.NewTransformContextPtr(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
7373
multiError = errors.Join(multiError, spanEventsCounter.update(ctx, event.Attributes(), scopeAttrs, resourceAttrs, eCtx))
74+
eCtx.Close()
7475
}
7576
}
7677
}
@@ -102,17 +103,18 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
102103
for i := 0; i < md.ResourceMetrics().Len(); i++ {
103104
resourceMetric := md.ResourceMetrics().At(i)
104105
resourceAttrs := resourceMetric.Resource().Attributes()
105-
metricsCounter := newCounter[ottlmetric.TransformContext](c.metricsMetricDefs)
106-
dataPointsCounter := newCounter[ottldatapoint.TransformContext](c.dataPointsMetricDefs)
106+
metricsCounter := newCounter[*ottlmetric.TransformContext](c.metricsMetricDefs)
107+
dataPointsCounter := newCounter[*ottldatapoint.TransformContext](c.dataPointsMetricDefs)
107108

108109
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
109110
scopeMetrics := resourceMetric.ScopeMetrics().At(j)
110111
scopeAttrs := scopeMetrics.Scope().Attributes()
111112

112113
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
113114
metric := scopeMetrics.Metrics().At(k)
114-
mCtx := ottlmetric.NewTransformContext(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
115+
mCtx := ottlmetric.NewTransformContextPtr(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
115116
multiError = errors.Join(multiError, metricsCounter.update(ctx, pcommon.NewMap(), scopeAttrs, resourceAttrs, mCtx))
117+
mCtx.Close()
116118

117119
//exhaustive:enforce
118120
switch metric.Type() {
@@ -121,40 +123,45 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
121123
for i := 0; i < dps.Len(); i++ {
122124
dp := dps.At(i)
123125
dataPointsCounter.updateTimestamp(dp.Timestamp())
124-
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
126+
dCtx := ottldatapoint.NewTransformContextPtr(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
125127
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx))
128+
dCtx.Close()
126129
}
127130
case pmetric.MetricTypeSum:
128131
dps := metric.Sum().DataPoints()
129132
for i := 0; i < dps.Len(); i++ {
130133
dp := dps.At(i)
131134
dataPointsCounter.updateTimestamp(dp.Timestamp())
132-
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
135+
dCtx := ottldatapoint.NewTransformContextPtr(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
133136
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx))
137+
dCtx.Close()
134138
}
135139
case pmetric.MetricTypeSummary:
136140
dps := metric.Summary().DataPoints()
137141
for i := 0; i < dps.Len(); i++ {
138142
dp := dps.At(i)
139143
dataPointsCounter.updateTimestamp(dp.Timestamp())
140-
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
144+
dCtx := ottldatapoint.NewTransformContextPtr(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
141145
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx))
146+
dCtx.Close()
142147
}
143148
case pmetric.MetricTypeHistogram:
144149
dps := metric.Histogram().DataPoints()
145150
for i := 0; i < dps.Len(); i++ {
146151
dp := dps.At(i)
147152
dataPointsCounter.updateTimestamp(dp.Timestamp())
148-
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
153+
dCtx := ottldatapoint.NewTransformContextPtr(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
149154
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx))
155+
dCtx.Close()
150156
}
151157
case pmetric.MetricTypeExponentialHistogram:
152158
dps := metric.ExponentialHistogram().DataPoints()
153159
for i := 0; i < dps.Len(); i++ {
154160
dp := dps.At(i)
155161
dataPointsCounter.updateTimestamp(dp.Timestamp())
156-
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
162+
dCtx := ottldatapoint.NewTransformContextPtr(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
157163
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx))
164+
dCtx.Close()
158165
}
159166
case pmetric.MetricTypeEmpty:
160167
multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type()))
@@ -189,7 +196,7 @@ func (c *count) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
189196
for i := 0; i < ld.ResourceLogs().Len(); i++ {
190197
resourceLog := ld.ResourceLogs().At(i)
191198
resourceAttrs := resourceLog.Resource().Attributes()
192-
counter := newCounter[ottllog.TransformContext](c.logsMetricDefs)
199+
counter := newCounter[*ottllog.TransformContext](c.logsMetricDefs)
193200

194201
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
195202
scopeLogs := resourceLog.ScopeLogs().At(j)
@@ -198,8 +205,9 @@ func (c *count) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
198205
for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
199206
logRecord := scopeLogs.LogRecords().At(k)
200207
counter.updateTimestamp(logRecord.Timestamp())
201-
lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog)
208+
lCtx := ottllog.NewTransformContextPtr(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog)
202209
multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), scopeAttrs, resourceAttrs, lCtx))
210+
lCtx.Close()
203211
}
204212
}
205213

connector/countconnector/factory.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ func createTracesToMetrics(
6464
spanMetricDefs[name] = md
6565
}
6666

67-
spanEventMetricDefs := make(map[string]metricDef[ottlspanevent.TransformContext], len(c.SpanEvents))
67+
spanEventMetricDefs := make(map[string]metricDef[*ottlspanevent.TransformContext], len(c.SpanEvents))
6868
for name, info := range c.SpanEvents {
69-
md := metricDef[ottlspanevent.TransformContext]{
69+
md := metricDef[*ottlspanevent.TransformContext]{
7070
desc: info.Description,
7171
attrs: info.Attributes,
7272
}
@@ -94,9 +94,9 @@ func createMetricsToMetrics(
9494
) (connector.Metrics, error) {
9595
c := cfg.(*Config)
9696

97-
metricMetricDefs := make(map[string]metricDef[ottlmetric.TransformContext], len(c.Metrics))
97+
metricMetricDefs := make(map[string]metricDef[*ottlmetric.TransformContext], len(c.Metrics))
9898
for name, info := range c.Metrics {
99-
md := metricDef[ottlmetric.TransformContext]{
99+
md := metricDef[*ottlmetric.TransformContext]{
100100
desc: info.Description,
101101
}
102102
if len(info.Conditions) > 0 {
@@ -107,9 +107,9 @@ func createMetricsToMetrics(
107107
metricMetricDefs[name] = md
108108
}
109109

110-
dataPointMetricDefs := make(map[string]metricDef[ottldatapoint.TransformContext], len(c.DataPoints))
110+
dataPointMetricDefs := make(map[string]metricDef[*ottldatapoint.TransformContext], len(c.DataPoints))
111111
for name, info := range c.DataPoints {
112-
md := metricDef[ottldatapoint.TransformContext]{
112+
md := metricDef[*ottldatapoint.TransformContext]{
113113
desc: info.Description,
114114
attrs: info.Attributes,
115115
}
@@ -137,9 +137,9 @@ func createLogsToMetrics(
137137
) (connector.Logs, error) {
138138
c := cfg.(*Config)
139139

140-
metricDefs := make(map[string]metricDef[ottllog.TransformContext], len(c.Logs))
140+
metricDefs := make(map[string]metricDef[*ottllog.TransformContext], len(c.Logs))
141141
for name, info := range c.Logs {
142-
md := metricDef[ottllog.TransformContext]{
142+
md := metricDef[*ottllog.TransformContext]{
143143
desc: info.Description,
144144
attrs: info.Attributes,
145145
}

connector/routingconnector/logs.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
8787
case "log":
8888
plogutil.MoveRecordsWithContextIf(ld, matched,
8989
func(rl plog.ResourceLogs, sl plog.ScopeLogs, lr plog.LogRecord) bool {
90-
ltx := ottllog.NewTransformContext(lr, sl.Scope(), rl.Resource(), sl, rl)
90+
ltx := ottllog.NewTransformContextPtr(lr, sl.Scope(), rl.Resource(), sl, rl)
9191
_, isMatch, err := route.logStatement.Execute(ctx, ltx)
92+
ltx.Close()
9293
// If error during statement evaluation consider it as not a match.
9394
if err != nil {
9495
errs = errors.Join(errs, err)

connector/routingconnector/logs_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,9 @@ func TestLogsConnectorDetailed(t *testing.T) {
437437

438438
// IsMap and IsString are just candidate for Standard Converter Function to prevent any unknown regressions for this component
439439
isBodyString := `IsString(body) == true`
440-
require.Contains(t, standardFunctions[ottllog.TransformContext](), "IsString")
440+
require.Contains(t, standardFunctions[*ottllog.TransformContext](), "IsString")
441441
isBodyMap := `IsMap(body) == true`
442-
require.Contains(t, standardFunctions[ottllog.TransformContext](), "IsMap")
442+
require.Contains(t, standardFunctions[*ottllog.TransformContext](), "IsMap")
443443

444444
isScopeCFromLowerContext := `instrumentation_scope.name == "scopeC"`
445445
isScopeDFromLowerContext := `instrumentation_scope.name == "scopeD"`

connector/routingconnector/metrics.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,9 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
8888
case "metric":
8989
pmetricutil.MoveMetricsWithContextIf(md, matched,
9090
func(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) bool {
91-
mtx := ottlmetric.NewTransformContext(m, sm.Metrics(), sm.Scope(), rm.Resource(), sm, rm)
91+
mtx := ottlmetric.NewTransformContextPtr(m, sm.Metrics(), sm.Scope(), rm.Resource(), sm, rm)
9292
_, isMatch, err := route.metricStatement.Execute(ctx, mtx)
93+
mtx.Close()
9394
// If error during statement evaluation consider it as not a match.
9495
if err != nil {
9596
errs = errors.Join(errs, err)
@@ -101,8 +102,9 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
101102
case "datapoint":
102103
pmetricutil.MoveDataPointsWithContextIf(md, matched,
103104
func(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric, dp any) bool {
104-
dptx := ottldatapoint.NewTransformContext(dp, m, sm.Metrics(), sm.Scope(), rm.Resource(), sm, rm)
105+
dptx := ottldatapoint.NewTransformContextPtr(dp, m, sm.Metrics(), sm.Scope(), rm.Resource(), sm, rm)
105106
_, isMatch, err := route.dataPointStatement.Execute(ctx, dptx)
107+
dptx.Close()
106108
// If error during statement evaluation consider it as not a match.
107109
if err != nil {
108110
errs = errors.Join(errs, err)

connector/routingconnector/router.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ type consumerProvider[C any] func(...pipeline.ID) (C, error)
3333
type router[C any] struct {
3434
resourceParser ottl.Parser[ottlresource.TransformContext]
3535
spanParser ottl.Parser[*ottlspan.TransformContext]
36-
metricParser ottl.Parser[ottlmetric.TransformContext]
37-
dataPointParser ottl.Parser[ottldatapoint.TransformContext]
38-
logParser ottl.Parser[ottllog.TransformContext]
36+
metricParser ottl.Parser[*ottlmetric.TransformContext]
37+
dataPointParser ottl.Parser[*ottldatapoint.TransformContext]
38+
logParser ottl.Parser[*ottllog.TransformContext]
3939
defaultConsumer C
4040
logger *zap.Logger
4141
routes map[string]routingItem[C]
@@ -75,9 +75,9 @@ type routingItem[C any] struct {
7575
requestCondition *requestCondition
7676
resourceStatement *ottl.Statement[ottlresource.TransformContext]
7777
spanStatement *ottl.Statement[*ottlspan.TransformContext]
78-
metricStatement *ottl.Statement[ottlmetric.TransformContext]
79-
dataPointStatement *ottl.Statement[ottldatapoint.TransformContext]
80-
logStatement *ottl.Statement[ottllog.TransformContext]
78+
metricStatement *ottl.Statement[*ottlmetric.TransformContext]
79+
dataPointStatement *ottl.Statement[*ottldatapoint.TransformContext]
80+
logStatement *ottl.Statement[*ottllog.TransformContext]
8181
statementContext string
8282
}
8383

@@ -123,7 +123,7 @@ func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.Te
123123
}
124124
if buildMetric {
125125
parser, err := ottlmetric.NewParser(
126-
standardFunctions[ottlmetric.TransformContext](),
126+
standardFunctions[*ottlmetric.TransformContext](),
127127
settings,
128128
)
129129
if err == nil {
@@ -134,7 +134,7 @@ func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.Te
134134
}
135135
if buildDataPoint {
136136
parser, err := ottldatapoint.NewParser(
137-
standardFunctions[ottldatapoint.TransformContext](),
137+
standardFunctions[*ottldatapoint.TransformContext](),
138138
settings,
139139
)
140140
if err == nil {
@@ -145,7 +145,7 @@ func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.Te
145145
}
146146
if buildLog {
147147
parser, err := ottllog.NewParser(
148-
standardFunctions[ottllog.TransformContext](),
148+
standardFunctions[*ottllog.TransformContext](),
149149
settings,
150150
)
151151
if err == nil {

0 commit comments

Comments
 (0)