Skip to content

Commit 74f16c0

Browse files
github-actions[bot]Mateusz Rzeszutek
andauthored
Spring-kafka single record instrumentation (#5904) (#5907)
Co-authored-by: Mateusz Rzeszutek <[email protected]>
1 parent d018791 commit 74f16c0

File tree

20 files changed

+504
-226
lines changed

20 files changed

+504
-226
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessTracing.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@ public final class KafkaClientsConsumerProcessTracing {
1414

1515
private KafkaClientsConsumerProcessTracing() {}
1616

17-
public static void enableWrapping() {
18-
wrappingEnabled.set(true);
19-
}
20-
21-
public static void disableWrapping() {
22-
wrappingEnabled.set(false);
17+
public static boolean setEnabled(boolean enabled) {
18+
boolean previous = wrappingEnabled.get();
19+
wrappingEnabled.set(enabled);
20+
return previous;
2321
}
2422

2523
public static boolean wrappingEnabled() {
26-
return wrappingEnabled.get() == true;
24+
return wrappingEnabled.get();
2725
}
2826
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/kafka/KafkaClientsConsumerProcessWrapper.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
import io.opentelemetry.instrumentation.api.field.VirtualField;
1818
import io.opentelemetry.instrumentation.kafka.internal.ReceivedRecords;
1919
import io.opentelemetry.instrumentation.kafka.internal.Timer;
20+
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
2021
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2122
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2223
import java.time.Duration;
2324
import net.bytebuddy.asm.Advice;
2425
import net.bytebuddy.description.type.TypeDescription;
2526
import net.bytebuddy.matcher.ElementMatcher;
27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2628
import org.apache.kafka.clients.consumer.ConsumerRecords;
2729

2830
public class KafkaConsumerInstrumentation implements TypeInstrumentation {
@@ -74,6 +76,18 @@ public static void onExit(
7476
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
7577
VirtualField.find(ConsumerRecords.class, Context.class);
7678
consumerRecordsContext.set(records, context);
79+
80+
// disable process tracing and store the receive span for each individual record too
81+
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
82+
try {
83+
VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
84+
VirtualField.find(ConsumerRecord.class, Context.class);
85+
for (ConsumerRecord<?, ?> record : records) {
86+
consumerRecordContext.set(record, context);
87+
}
88+
} finally {
89+
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
90+
}
7791
}
7892
}
7993
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,11 @@
77

88
import io.opentelemetry.context.Context;
99
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
10-
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
1110
import java.util.Iterator;
1211
import javax.annotation.Nullable;
1312
import org.apache.kafka.clients.consumer.ConsumerRecord;
1413

15-
public class TracingIterable<K, V>
16-
implements Iterable<ConsumerRecord<K, V>>,
17-
KafkaClientsConsumerProcessWrapper<Iterable<ConsumerRecord<K, V>>> {
14+
public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
1815
private final Iterable<ConsumerRecord<K, V>> delegate;
1916
@Nullable private final Context receiveContext;
2017
private boolean firstIterator = true;
@@ -48,9 +45,4 @@ public Iterator<ConsumerRecord<K, V>> iterator() {
4845

4946
return it;
5047
}
51-
52-
@Override
53-
public Iterable<ConsumerRecord<K, V>> unwrap() {
54-
return delegate;
55-
}
5648
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,11 @@
1010
import io.opentelemetry.context.Context;
1111
import io.opentelemetry.context.Scope;
1212
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
13-
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
1413
import java.util.Iterator;
1514
import javax.annotation.Nullable;
1615
import org.apache.kafka.clients.consumer.ConsumerRecord;
1716

18-
public class TracingIterator<K, V>
19-
implements Iterator<ConsumerRecord<K, V>>,
20-
KafkaClientsConsumerProcessWrapper<Iterator<ConsumerRecord<K, V>>> {
17+
public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
2118
private final Iterator<ConsumerRecord<K, V>> delegateIterator;
2219
private final Context parentContext;
2320

@@ -79,9 +76,4 @@ private void closeScopeAndEndSpan() {
7976
public void remove() {
8077
delegateIterator.remove();
8178
}
82-
83-
@Override
84-
public Iterator<ConsumerRecord<K, V>> unwrap() {
85-
return delegateIterator;
86-
}
8779
}

instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.opentelemetry.api.OpenTelemetry;
1010
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
1111
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
12+
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
1213
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1314
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
1415
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
@@ -81,14 +82,35 @@ public static Instrumenter<ReceivedRecords, Void> createConsumerReceiveInstrumen
8182
instrumentationName,
8283
GlobalOpenTelemetry.get(),
8384
MessageOperation.PROCESS,
84-
Collections.emptyList());
85+
Collections.emptyList(),
86+
ErrorCauseExtractor.jdk());
87+
}
88+
89+
public static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerProcessInstrumenter(
90+
String instrumentationName, ErrorCauseExtractor errorCauseExtractor) {
91+
return createConsumerOperationInstrumenter(
92+
instrumentationName,
93+
GlobalOpenTelemetry.get(),
94+
MessageOperation.PROCESS,
95+
Collections.emptyList(),
96+
errorCauseExtractor);
8597
}
8698

8799
public static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerOperationInstrumenter(
88100
String instrumentationName,
89101
OpenTelemetry openTelemetry,
90102
MessageOperation operation,
91103
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors) {
104+
return createConsumerOperationInstrumenter(
105+
instrumentationName, openTelemetry, operation, extractors, ErrorCauseExtractor.jdk());
106+
}
107+
108+
private static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerOperationInstrumenter(
109+
String instrumentationName,
110+
OpenTelemetry openTelemetry,
111+
MessageOperation operation,
112+
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors,
113+
ErrorCauseExtractor errorCauseExtractor) {
92114

93115
KafkaConsumerAttributesGetter getter = KafkaConsumerAttributesGetter.INSTANCE;
94116

@@ -99,7 +121,8 @@ public static Instrumenter<ReceivedRecords, Void> createConsumerReceiveInstrumen
99121
MessagingSpanNameExtractor.create(getter, operation))
100122
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
101123
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor())
102-
.addAttributesExtractors(extractors);
124+
.addAttributesExtractors(extractors)
125+
.setErrorCauseExtractor(errorCauseExtractor);
103126
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
104127
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
105128
}

instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,15 @@
55

66
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
77

8-
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
98
import static net.bytebuddy.matcher.ElementMatchers.named;
10-
import static net.bytebuddy.matcher.ElementMatchers.returns;
119

12-
import io.opentelemetry.context.Context;
13-
import io.opentelemetry.instrumentation.api.field.VirtualField;
1410
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
1511
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1612
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1713
import net.bytebuddy.asm.Advice;
1814
import net.bytebuddy.description.type.TypeDescription;
1915
import net.bytebuddy.matcher.ElementMatcher;
20-
import org.apache.kafka.clients.consumer.ConsumerRecord;
21-
import org.apache.kafka.clients.consumer.ConsumerRecords;
2216

23-
// This instrumentation copies the receive CONSUMER span context from the ConsumerRecords aggregate
24-
// object to each individual record
2517
public class StreamThreadInstrumentation implements TypeInstrumentation {
2618

2719
@Override
@@ -31,47 +23,20 @@ public ElementMatcher<TypeDescription> typeMatcher() {
3123

3224
@Override
3325
public void transform(TypeTransformer transformer) {
34-
transformer.applyAdviceToMethod(
35-
named("pollRequests")
36-
.and(isPrivate())
37-
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
38-
this.getClass().getName() + "$PollRecordsAdvice");
3926
transformer.applyAdviceToMethod(named("runLoop"), this.getClass().getName() + "$RunLoopAdvice");
4027
}
4128

42-
@SuppressWarnings("unused")
43-
public static class PollRecordsAdvice {
44-
@Advice.OnMethodExit(suppress = Throwable.class)
45-
public static void onExit(@Advice.Return ConsumerRecords<?, ?> records) {
46-
if (records.isEmpty()) {
47-
return;
48-
}
49-
50-
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
51-
if (receiveContext == null) {
52-
return;
53-
}
54-
55-
VirtualField<ConsumerRecord<?, ?>, Context> singleRecordReceiveContext =
56-
VirtualField.find(ConsumerRecord.class, Context.class);
57-
58-
for (ConsumerRecord<?, ?> record : records) {
59-
singleRecordReceiveContext.set(record, receiveContext);
60-
}
61-
}
62-
}
63-
6429
// this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation
6530
@SuppressWarnings("unused")
6631
public static class RunLoopAdvice {
6732
@Advice.OnMethodEnter(suppress = Throwable.class)
68-
public static void onEnter() {
69-
KafkaClientsConsumerProcessTracing.disableWrapping();
33+
public static boolean onEnter() {
34+
return KafkaClientsConsumerProcessTracing.setEnabled(false);
7035
}
7136

7237
@Advice.OnMethodExit(suppress = Throwable.class)
73-
public static void onExit() {
74-
KafkaClientsConsumerProcessTracing.enableWrapping();
38+
public static void onExit(@Advice.Enter boolean previousValue) {
39+
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
7540
}
7641
}
7742
}

instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
import net.bytebuddy.asm.Advice;
1818
import net.bytebuddy.description.type.TypeDescription;
1919
import net.bytebuddy.matcher.ElementMatcher;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2021
import org.apache.kafka.clients.consumer.ConsumerRecords;
2122
import org.springframework.kafka.listener.BatchInterceptor;
23+
import org.springframework.kafka.listener.RecordInterceptor;
2224

2325
public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation {
26+
2427
@Override
2528
public ElementMatcher<TypeDescription> typeMatcher() {
2629
return named("org.springframework.kafka.listener.AbstractMessageListenerContainer");
@@ -36,20 +39,48 @@ public void transform(TypeTransformer transformer) {
3639
.and(takesArguments(0))
3740
.and(returns(named("org.springframework.kafka.listener.BatchInterceptor"))),
3841
this.getClass().getName() + "$GetBatchInterceptorAdvice");
42+
// getRecordInterceptor() is called internally by AbstractMessageListenerContainer
43+
// implementations
44+
transformer.applyAdviceToMethod(
45+
named("getRecordInterceptor")
46+
.and(isProtected())
47+
.and(takesArguments(0))
48+
.and(returns(named("org.springframework.kafka.listener.RecordInterceptor"))),
49+
this.getClass().getName() + "$GetRecordInterceptorAdvice");
3950
}
4051

4152
@SuppressWarnings("unused")
4253
public static class GetBatchInterceptorAdvice {
54+
4355
@Advice.OnMethodExit(suppress = Throwable.class)
4456
public static <K, V> void onExit(
4557
@Advice.Return(readOnly = false) BatchInterceptor<K, V> interceptor) {
58+
4659
if (!(interceptor instanceof InstrumentedBatchInterceptor)) {
47-
VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField =
60+
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField =
4861
VirtualField.find(ConsumerRecords.class, Context.class);
49-
VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore =
62+
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField =
5063
VirtualField.find(ConsumerRecords.class, State.class);
5164
interceptor =
52-
new InstrumentedBatchInterceptor<>(receiveContextVirtualField, stateStore, interceptor);
65+
new InstrumentedBatchInterceptor<>(receiveContextField, stateField, interceptor);
66+
}
67+
}
68+
}
69+
70+
@SuppressWarnings("unused")
71+
public static class GetRecordInterceptorAdvice {
72+
73+
@Advice.OnMethodExit(suppress = Throwable.class)
74+
public static <K, V> void onExit(
75+
@Advice.Return(readOnly = false) RecordInterceptor<K, V> interceptor) {
76+
77+
if (!(interceptor instanceof InstrumentedRecordInterceptor)) {
78+
VirtualField<ConsumerRecord<K, V>, Context> receiveContextField =
79+
VirtualField.find(ConsumerRecord.class, Context.class);
80+
VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField =
81+
VirtualField.find(ConsumerRecord.class, State.class);
82+
interceptor =
83+
new InstrumentedRecordInterceptor<>(receiveContextField, stateField, interceptor);
5384
}
5485
}
5586
}

instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package io.opentelemetry.javaagent.instrumentation.spring.kafka;
77

8-
import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter;
8+
import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.batchProcessInstrumenter;
99

1010
import io.opentelemetry.context.Context;
1111
import io.opentelemetry.context.Scope;
@@ -16,34 +16,35 @@
1616
import org.springframework.kafka.listener.BatchInterceptor;
1717

1818
public final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
19-
private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField;
20-
private final VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore;
19+
20+
private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextField;
21+
private final VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField;
2122
@Nullable private final BatchInterceptor<K, V> decorated;
2223

2324
public InstrumentedBatchInterceptor(
24-
VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField,
25-
VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore,
25+
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField,
26+
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField,
2627
@Nullable BatchInterceptor<K, V> decorated) {
27-
this.receiveContextVirtualField = receiveContextVirtualField;
28-
this.stateStore = stateStore;
28+
this.receiveContextField = receiveContextField;
29+
this.stateField = stateField;
2930
this.decorated = decorated;
3031
}
3132

3233
@Override
3334
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
3435
Context parentContext = getParentContext(records);
3536

36-
if (processInstrumenter().shouldStart(parentContext, records)) {
37-
Context context = processInstrumenter().start(parentContext, records);
37+
if (batchProcessInstrumenter().shouldStart(parentContext, records)) {
38+
Context context = batchProcessInstrumenter().start(parentContext, records);
3839
Scope scope = context.makeCurrent();
39-
stateStore.set(records, State.create(records, context, scope));
40+
stateField.set(records, State.create(records, context, scope));
4041
}
4142

4243
return decorated == null ? records : decorated.intercept(records, consumer);
4344
}
4445

4546
private Context getParentContext(ConsumerRecords<K, V> records) {
46-
Context receiveContext = receiveContextVirtualField.get(records);
47+
Context receiveContext = receiveContextField.get(records);
4748

4849
// use the receive CONSUMER span as parent if it's available
4950
return receiveContext != null ? receiveContext : Context.current();
@@ -66,11 +67,11 @@ public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer
6667
}
6768

6869
private void end(ConsumerRecords<K, V> records, @Nullable Throwable error) {
69-
State<K, V> state = stateStore.get(records);
70-
stateStore.set(records, null);
70+
State<ConsumerRecords<K, V>> state = stateField.get(records);
71+
stateField.set(records, null);
7172
if (state != null) {
7273
state.scope().close();
73-
processInstrumenter().end(state.context(), state.request(), null, error);
74+
batchProcessInstrumenter().end(state.context(), state.request(), null, error);
7475
}
7576
}
7677
}

0 commit comments

Comments
 (0)