From 23c38f7e19fe254228037f5212da16b20c5eeee5 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Thu, 12 Mar 2026 13:26:50 -0700 Subject: [PATCH 1/6] Add support for KotlinAwareInvocableHandlerMethod to pass the spring.consume context to a kotlin suspend consume fun --- .../spring/spring-messaging-4.0/build.gradle | 48 ++++++ .../KotlinAwareHandlerInstrumentation.java | 71 +++++++++ .../KafkaBatchListenerCoroutineTest.groovy | 148 ++++++++++++++++++ .../test/kotlin/KafkaBatchCoroutineConfig.kt | 88 +++++++++++ 4 files changed, 355 insertions(+) create mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java create mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy create mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle index 336ea73a0ec..66fc04a0396 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle @@ -1,13 +1,24 @@ +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.dsl.KotlinVersion + +plugins { + id 'org.jetbrains.kotlin.jvm' +} + muzzle { pass { group = 'org.springframework' module = 'spring-messaging' versions = "[4.0.0.RELEASE,)" assertInverse = true + // KotlinAwareHandlerInstrumentation references Publisher from reactive-streams, + // which is not bundled in spring-messaging but is always present when Spring Kafka is. + extraDependency 'org.reactivestreams:reactive-streams:1.0.4' } } apply from: "$rootDir/gradle/java.gradle" +apply from: "$rootDir/gradle/test-with-kotlin.gradle" testJvmConstraints { minJavaVersion = JavaVersion.VERSION_17 @@ -16,13 +27,24 @@ testJvmConstraints { addTestSuiteForDir('latestDepTest', 'test') ["compileTestGroovy", "compileLatestDepTestGroovy"].each { name -> + def kotlinTaskName = name.replace("Groovy", "Kotlin") tasks.named(name, GroovyCompile) { configureCompiler(it, 17) + classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory }) + } +} + +kotlin { + compilerOptions { + jvmTarget = JvmTarget.JVM_1_8 + apiVersion = KotlinVersion.KOTLIN_1_9 + languageVersion = KotlinVersion.KOTLIN_1_9 } } dependencies { compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE' + compileOnly 'org.reactivestreams:reactive-streams:1.0.4' testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common') // capture SQS send and receive spans, propagate trace details in messages @@ -36,6 +58,32 @@ dependencies { } testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3' + // Spring Kafka + embedded Kafka broker for coroutine tests + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', { + exclude group: 'org.apache.kafka' + } + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', { + exclude group: 'org.apache.kafka' + } + + // KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation + testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0') + + testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test' + testImplementation 'org.apache.kafka:kafka-clients:3.8.0' + testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test' + testImplementation 'org.apache.kafka:kafka_2.13:3.8.0' + testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test' + + testImplementation libs.kotlin + testImplementation "org.jetbrains.kotlin:kotlin-reflect" + testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+" + testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+" + testImplementation "io.projectreactor:reactor-core:3.+" + + testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3') + testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8') + latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', { exclude group: 'org.slf4j', module: 'slf4j-api' } diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java new file mode 100644 index 00000000000..72f2c181996 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java @@ -0,0 +1,71 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import org.reactivestreams.Publisher; + +/** + * Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link + * Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates + * it during subscription. + * + *

When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code + * KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the + * listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code + * AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link + * SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link + * Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active — + * and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then + * retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks + * up the correct parent context when the underlying {@code AbstractCoroutine} is constructed. + */ +@AutoService(InstrumenterModule.class) +public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public KotlinAwareHandlerInstrumentation() { + super("spring-messaging", "spring-messaging-4"); + } + + @Override + public Map contextStore() { + return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); + } + + @Override + public List typeInstrumentations() { + return Collections.singletonList(new KotlinAwareHandlerInstrumentation()); + } + + @Override + public String instrumentedType() { + return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("doInvoke")), + KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice"); + } + + public static class DoInvokeAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return Object result) { + if (result instanceof Publisher) { + InstrumentationContext.get(Publisher.class, Context.class) + .put((Publisher) result, Context.current()); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy new file mode 100644 index 00000000000..834b31f2a62 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy @@ -0,0 +1,148 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.agent.test.asserts.TraceAssert +import listener.KafkaBatchCoroutineConfig +import listener.KafkaBatchCoroutineListener +import datadog.trace.api.DDSpanTypes +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.DDSpan +import org.apache.kafka.clients.producer.ProducerRecord +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.kafka.config.KafkaListenerEndpointRegistry +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.utils.ContainerTestUtils + +import java.util.concurrent.TimeUnit + +class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { + + private static final String TOPIC = "batch-coroutine-topic" + private static final String CONSUMER_GROUP = "batch-coroutine-group" + + def "batch @KafkaListener suspend fun - spans must be in the same trace as kafka.consume"() { + setup: + def appContext = new AnnotationConfigApplicationContext(KafkaBatchCoroutineConfig) + def listener = appContext.getBean(KafkaBatchCoroutineListener) + def template = appContext.getBean(KafkaTemplate) + def broker = appContext.getBean(EmbeddedKafkaBroker) + def registry = appContext.getBean(KafkaListenerEndpointRegistry) + + // Wait until listener container has been assigned partitions before sending. + registry.listenerContainers.each { container -> + ContainerTestUtils.waitForAssignment(container, broker.partitionsPerTopic) + } + + TEST_WRITER.clear() + + when: "two messages are sent before the consumer polls so they arrive in one batch" + registry.listenerContainers.each { it.stop() } + template.send(new ProducerRecord(TOPIC, "key", "hello-batch")) + template.send(new ProducerRecord(TOPIC, "key", "hello-batch")) + template.flush() + registry.listenerContainers.each { it.start() } + + then: "the listener processes the batch within 15 s" + listener.latch.await(15, TimeUnit.SECONDS) + listener.receivedValues == ["hello-batch", "hello-batch"] + + and: "child.work is a child of spring.consume" + DDSpan produce1Span, produce2Span, springConsumeParent + assertTraces(10, SORT_TRACES_BY_ID) { + trace(1) { + produceSpan(it) + produce1Span = span(0) + } + trace(1) { + produceSpan(it) + produce2Span = span(0) + } + + trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } + trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } + trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } + trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } + + trace(1) { + // consume messages in one batch + springConsumeSpan(it) + springConsumeParent = span(0) + } + // child work span connected to the spring consume span + trace(1) { childWorkSpan(it, springConsumeParent) } + + trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } + trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } + } + + cleanup: + appContext.close() + } + + private static void produceSpan(TraceAssert trace) { + trace.span { + operationName "kafka.produce" + resourceName "Produce Topic $TOPIC" + spanType "queue" + errored false + measured true + parent() + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String } + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + private static void kafkaConsumeSpan(TraceAssert trace, DDSpan parent, int offset) { + trace.span { + operationName "kafka.consume" + resourceName "Consume Topic $TOPIC" + spanType "queue" + errored false + measured true + childOf parent + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String } + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + "$InstrumentationTags.CONSUMER_GROUP" CONSUMER_GROUP + "$InstrumentationTags.OFFSET" offset + "$InstrumentationTags.PARTITION" { Integer } + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { Long } + "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { Long } + defaultTags(true) + } + } + } + + private static void springConsumeSpan(TraceAssert trace) { + trace.span { + operationName "spring.consume" + resourceName "KafkaBatchCoroutineListener.consume" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + parent() + tags { + "$Tags.COMPONENT" "spring-messaging" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + defaultTags(true) + } + } + } + + private static void childWorkSpan(TraceAssert trace, DDSpan parent) { + trace.span { + operationName "child.work" + childOf parent + tags { defaultTags() } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt new file mode 100644 index 00000000000..3272a3371a6 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt @@ -0,0 +1,88 @@ +package listener + +import datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker +import org.springframework.kafka.test.utils.KafkaTestUtils +import org.springframework.stereotype.Component +import java.util.concurrent.CountDownLatch + +const val BATCH_COROUTINE_TOPIC = "batch-coroutine-topic" + +@Configuration(proxyBeanMethods = false) +@EnableKafka +class KafkaBatchCoroutineConfig { + + @Bean(destroyMethod = "destroy") + fun embeddedKafkaBroker(): EmbeddedKafkaBroker { + val broker = EmbeddedKafkaKraftBroker(1, 2, BATCH_COROUTINE_TOPIC) + broker.afterPropertiesSet() + return broker + } + + @Bean + fun producerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaProducerFactory { + val props = HashMap(KafkaTestUtils.producerProps(broker.brokersAsString)) + props["key.serializer"] = StringSerializer::class.java.name + props["value.serializer"] = StringSerializer::class.java.name + return DefaultKafkaProducerFactory(props) + } + + @Bean + fun kafkaTemplate(pf: DefaultKafkaProducerFactory) = KafkaTemplate(pf) + + @Bean + fun consumerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaConsumerFactory { + val props = HashMap(KafkaTestUtils.consumerProps("batch-coroutine-group", "false", broker)) + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + return DefaultKafkaConsumerFactory(props) + } + + @Bean + fun batchListenerContainerFactory( + cf: DefaultKafkaConsumerFactory + ): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = cf + factory.isBatchListener = true + return factory + } + + @Bean + fun kafkaBatchCoroutineListener() = KafkaBatchCoroutineListener() +} + +@Component +class KafkaBatchCoroutineListener { + + val latch = CountDownLatch(1) + val receivedValues = mutableListOf() + + @KafkaListener( + topics = [BATCH_COROUTINE_TOPIC], + containerFactory = "batchListenerContainerFactory" + ) + suspend fun consume(records: List>) { + Exception("consume records").printStackTrace(System.err) + // Create a child span inside the coroutine body. + // It should be linked to spring.consume, which should be linked to kafka.consume. + val childSpan = startSpan("child.work") + records.forEach { receivedValues.add(it.value()) } + childSpan.finish() + latch.countDown() + } +} From 9bdffba75e3061715810e62073bd5b6a892c791c Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Thu, 2 Apr 2026 14:27:25 -0700 Subject: [PATCH 2/6] Keep spring.consume span open for async result using AsyncResultDecorator --- .../decorator/AsyncResultDecorator.java | 18 ++++++++++++------ .../core/ReactorAsyncResultExtension.java | 2 +- .../spring/spring-messaging-4.0/build.gradle | 3 ++- .../SpringMessageHandlerInstrumentation.java | 18 +++++++++++++++--- .../KafkaBatchListenerCoroutineTest.groovy | 7 +++---- .../test/kotlin/KafkaBatchCoroutineConfig.kt | 1 - 6 files changed, 33 insertions(+), 16 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java index fdcf7c60f74..41351965831 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java @@ -22,6 +22,15 @@ protected AsyncResultExtension computeValue(Class type) { } }; + public static Object wrapAsyncResult( + final Object result, final Class resultType, final AgentSpan span) { + AsyncResultExtension extension; + if (result != null && (extension = EXTENSION_CLASS_VALUE.get(resultType)) != null) { + return extension.apply(result, span); + } + return null; + } + /** * Look for asynchronous result and decorate it with span finisher. If the result is not * asynchronous, it will be return unmodified and span will be finished. @@ -33,12 +42,9 @@ protected AsyncResultExtension computeValue(Class type) { */ public Object wrapAsyncResultOrFinishSpan( final Object result, final Class methodReturnType, final AgentSpan span) { - AsyncResultExtension extension; - if (result != null && (extension = EXTENSION_CLASS_VALUE.get(methodReturnType)) != null) { - Object applied = extension.apply(result, span); - if (applied != null) { - return applied; - } + Object applied = wrapAsyncResult(result, methodReturnType, span); + if (applied != null) { + return applied; } // If no extension was applied, immediately finish the span and return the original result span.finish(); diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorAsyncResultExtension.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorAsyncResultExtension.java index cdda0734ad5..ab16ccac7db 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorAsyncResultExtension.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorAsyncResultExtension.java @@ -23,7 +23,7 @@ public static void init() {} @Override public boolean supports(Class result) { - return result == Flux.class || result == Mono.class; + return Flux.class.isAssignableFrom(result) || Mono.class.isAssignableFrom(result); } @Override diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle index 66fc04a0396..c716d5b3d08 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle @@ -66,8 +66,9 @@ dependencies { exclude group: 'org.apache.kafka' } - // KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation + // KotlinAwareHandlerInstrumentation relies on the reactive-streams and reactor instrumentation testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0') + testImplementation project(':dd-java-agent:instrumentation:reactor-core-3.1') testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test' testImplementation 'org.apache.kafka:kafka-clients:3.8.0' diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java index edc4bcfe589..79320bbc82b 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java @@ -17,6 +17,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator; import net.bytebuddy.asm.Advice; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -73,16 +74,27 @@ public static AgentScope onEnter( } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void onExit(@Advice.Enter AgentScope scope, @Advice.Thrown Throwable error) { + public static void onExit( + @Advice.Enter AgentScope scope, + @Advice.Return(readOnly = false) Object result, + @Advice.Thrown Throwable error) { if (null == scope) { return; } AgentSpan span = scope.span(); + scope.close(); if (null != error) { DECORATE.onError(span, error); } - scope.close(); - DECORATE.beforeFinish(span); + if (result != null) { + Object wrappedResult = + AsyncResultDecorator.wrapAsyncResult(result, result.getClass(), span); + if (wrappedResult != null) { + result = wrappedResult; + // span will be finished by the wrapper + return; + } + } span.finish(); } } diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy index 834b31f2a62..2a24ec23368 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy @@ -48,7 +48,7 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { and: "child.work is a child of spring.consume" DDSpan produce1Span, produce2Span, springConsumeParent - assertTraces(10, SORT_TRACES_BY_ID) { + assertTraces(9, SORT_TRACES_BY_ID) { trace(1) { produceSpan(it) produce1Span = span(0) @@ -63,13 +63,12 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } - trace(1) { + trace(2) { // consume messages in one batch springConsumeSpan(it) springConsumeParent = span(0) + childWorkSpan(it, springConsumeParent) } - // child work span connected to the spring consume span - trace(1) { childWorkSpan(it, springConsumeParent) } trace(1) { kafkaConsumeSpan(it, produce1Span, 0) } trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt index 3272a3371a6..60feddfbac7 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt @@ -77,7 +77,6 @@ class KafkaBatchCoroutineListener { containerFactory = "batchListenerContainerFactory" ) suspend fun consume(records: List>) { - Exception("consume records").printStackTrace(System.err) // Create a child span inside the coroutine body. // It should be linked to spring.consume, which should be linked to kafka.consume. val childSpan = startSpan("child.work") From ace63126bf7ac365d9c942a0f183859ede910aa0 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Thu, 2 Apr 2026 14:43:18 -0700 Subject: [PATCH 3/6] Assert span length in KafkaBatchListenerCoroutineTest --- .../KafkaBatchListenerCoroutineTest.groovy | 33 +++++++++---------- .../test/kotlin/KafkaBatchCoroutineConfig.kt | 2 ++ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy index 2a24ec23368..a2f494fac52 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy @@ -64,8 +64,21 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { trace(1) { kafkaConsumeSpan(it, produce2Span, 1) } trace(2) { - // consume messages in one batch - springConsumeSpan(it) + // consume messages in one batch, and keep spring.consume active across suspend work + span { + operationName "spring.consume" + resourceName "KafkaBatchCoroutineListener.consume" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + parent() + assert span(0).durationNano > TimeUnit.MILLISECONDS.toNanos(500) + tags { + "$Tags.COMPONENT" "spring-messaging" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + defaultTags(true) + } + } springConsumeParent = span(0) childWorkSpan(it, springConsumeParent) } @@ -121,22 +134,6 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { } } - private static void springConsumeSpan(TraceAssert trace) { - trace.span { - operationName "spring.consume" - resourceName "KafkaBatchCoroutineListener.consume" - spanType DDSpanTypes.MESSAGE_CONSUMER - errored false - measured true - parent() - tags { - "$Tags.COMPONENT" "spring-messaging" - "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER - defaultTags(true) - } - } - } - private static void childWorkSpan(TraceAssert trace, DDSpan parent) { trace.span { operationName "child.work" diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt index 60feddfbac7..b29756ff7a4 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt @@ -1,6 +1,7 @@ package listener import datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan +import kotlinx.coroutines.delay import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer @@ -77,6 +78,7 @@ class KafkaBatchCoroutineListener { containerFactory = "batchListenerContainerFactory" ) suspend fun consume(records: List>) { + delay(500) // Create a child span inside the coroutine body. // It should be linked to spring.consume, which should be linked to kafka.consume. val childSpan = startSpan("child.work") From 09975833fb03cbc8093e51fe7f67bc9b1129eb28 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Thu, 2 Apr 2026 14:55:07 -0700 Subject: [PATCH 4/6] Move wrapAsyncResult to AsyncResultExtensions --- .../decorator/AsyncResultDecorator.java | 22 +--------------- .../concurrent/AsyncResultExtensions.java | 26 +++++++++++++++++++ .../SpringMessageHandlerInstrumentation.java | 4 +-- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java index 41351965831..994cf8658a8 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java @@ -11,26 +11,6 @@ */ public abstract class AsyncResultDecorator extends BaseDecorator { - private static final ClassValue EXTENSION_CLASS_VALUE = - new ClassValue() { - @Override - protected AsyncResultExtension computeValue(Class type) { - return AsyncResultExtensions.registered().stream() - .filter(extension -> extension.supports(type)) - .findFirst() - .orElse(null); - } - }; - - public static Object wrapAsyncResult( - final Object result, final Class resultType, final AgentSpan span) { - AsyncResultExtension extension; - if (result != null && (extension = EXTENSION_CLASS_VALUE.get(resultType)) != null) { - return extension.apply(result, span); - } - return null; - } - /** * Look for asynchronous result and decorate it with span finisher. If the result is not * asynchronous, it will be return unmodified and span will be finished. @@ -42,7 +22,7 @@ public static Object wrapAsyncResult( */ public Object wrapAsyncResultOrFinishSpan( final Object result, final Class methodReturnType, final AgentSpan span) { - Object applied = wrapAsyncResult(result, methodReturnType, span); + Object applied = AsyncResultExtensions.wrapAsyncResult(result, methodReturnType, span); if (applied != null) { return applied; } diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java index 1c33522a530..4b6c302c8fb 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java @@ -16,6 +16,32 @@ public final class AsyncResultExtensions { private static final List EXTENSIONS = new CopyOnWriteArrayList<>(singletonList(new CompletableAsyncResultExtension())); + private static final ClassValue EXTENSION_CLASS_VALUE = + new ClassValue() { + @Override + protected AsyncResultExtension computeValue(Class type) { + return AsyncResultExtensions.registered().stream() + .filter(extension -> extension.supports(type)) + .findFirst() + .orElse(null); + } + }; + + /** + * Wraps a supported async result so the span is finished when the async computation completes. + * + * @return the wrapped async result, or {@code null} if the result type is unsupported or no + * wrapping is applied + */ + public static Object wrapAsyncResult( + final Object result, final Class resultType, final AgentSpan span) { + AsyncResultExtension extension; + if (result != null && (extension = EXTENSION_CLASS_VALUE.get(resultType)) != null) { + return extension.apply(result, span); + } + return null; + } + /** * Registers an extension to add supported async types. * diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java index b60d65d7e19..e8b862ad3d0 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java @@ -20,7 +20,7 @@ import datadog.trace.agent.tooling.annotation.AppliesOn; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator; +import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions; import net.bytebuddy.asm.Advice; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -99,7 +99,7 @@ public static void onExit( } if (result != null) { Object wrappedResult = - AsyncResultDecorator.wrapAsyncResult(result, result.getClass(), span); + AsyncResultExtensions.wrapAsyncResult(result, result.getClass(), span); if (wrappedResult != null) { result = wrappedResult; // span will be finished by the wrapper From 053e9a4266522b1e2669d6167abee8271200d93d Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Thu, 2 Apr 2026 14:57:26 -0700 Subject: [PATCH 5/6] Remove unused registered() --- .../java/concurrent/AsyncResultExtensions.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java index 4b6c302c8fb..1127ba4455b 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java @@ -20,7 +20,7 @@ public final class AsyncResultExtensions { new ClassValue() { @Override protected AsyncResultExtension computeValue(Class type) { - return AsyncResultExtensions.registered().stream() + return EXTENSIONS.stream() .filter(extension -> extension.supports(type)) .findFirst() .orElse(null); @@ -62,11 +62,6 @@ public static void register(AsyncResultExtension extension) { } } - /** Returns the list of currently registered extensions. */ - public static List registered() { - return EXTENSIONS; - } - static final class CompletableAsyncResultExtension implements AsyncResultExtension { @Override public boolean supports(Class result) { From 7e0822ffbb8ef820dfcec48756c418865ed18ee0 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Thu, 2 Apr 2026 19:50:13 -0700 Subject: [PATCH 6/6] Strengthen spring messaging async span tests --- .../KafkaBatchListenerCoroutineTest.groovy | 14 ++++- .../test/groovy/SpringListenerSQSTest.groovy | 20 +++++-- .../test/groovy/listener/TestListener.groovy | 8 ++- .../test/kotlin/KafkaBatchCoroutineConfig.kt | 14 +++-- .../listener/AsyncObservationSupport.kt | 52 +++++++++++++++++++ 5 files changed, 97 insertions(+), 11 deletions(-) create mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/listener/AsyncObservationSupport.kt diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy index a2f494fac52..a82ce96c294 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/KafkaBatchListenerCoroutineTest.groovy @@ -20,13 +20,14 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { private static final String TOPIC = "batch-coroutine-topic" private static final String CONSUMER_GROUP = "batch-coroutine-group" - def "batch @KafkaListener suspend fun - spans must be in the same trace as kafka.consume"() { + def "batch @KafkaListener suspend fun - keeps spring.consume span active during async execution"() { setup: def appContext = new AnnotationConfigApplicationContext(KafkaBatchCoroutineConfig) def listener = appContext.getBean(KafkaBatchCoroutineListener) def template = appContext.getBean(KafkaTemplate) def broker = appContext.getBean(EmbeddedKafkaBroker) def registry = appContext.getBean(KafkaListenerEndpointRegistry) + listener.prepareAsyncObservation() // Wait until listener container has been assigned partitions before sending. registry.listenerContainers.each { container -> @@ -41,6 +42,15 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { template.send(new ProducerRecord(TOPIC, "key", "hello-batch")) template.flush() registry.listenerContainers.each { it.start() } + listener.awaitAsyncStarted() + + then: "spring.consume is still open while coroutine work is blocked" + TEST_WRITER.waitForTraces(2) + assert TEST_WRITER.flatten().every { it.operationName != "spring.consume" } + assert listener.activeParentFinished == false + + when: + listener.releaseAsyncObservation() then: "the listener processes the batch within 15 s" listener.latch.await(15, TimeUnit.SECONDS) @@ -72,7 +82,6 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { errored false measured true parent() - assert span(0).durationNano > TimeUnit.MILLISECONDS.toNanos(500) tags { "$Tags.COMPONENT" "spring-messaging" "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER @@ -88,6 +97,7 @@ class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification { } cleanup: + listener?.releaseAsyncObservation() appContext.close() } diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy index cb3a525ab70..45156d53802 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy @@ -10,13 +10,12 @@ import datadog.trace.core.DDSpan import datadog.trace.instrumentation.aws.ExpectedQueryParams import io.awspring.cloud.sqs.operations.SqsTemplate import listener.Config +import listener.TestListener import org.elasticmq.rest.sqs.SQSRestServer import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.messaging.support.GenericMessage import software.amazon.awssdk.services.sqs.SqsAsyncClient -import java.util.concurrent.TimeUnit - class SpringListenerSQSTest extends InstrumentationSpecification { @@ -134,6 +133,8 @@ class SpringListenerSQSTest extends InstrumentationSpecification { def "async handler keeps spring.consume span active during CompletableFuture execution"() { setup: def context = new AnnotationConfigApplicationContext(Config) + def listener = context.getBean(TestListener) + listener.prepareAsyncObservation() def address = context.getBean(SQSRestServer).waitUntilStarted().localAddress() def template = SqsTemplate.newTemplate(context.getBean(SqsAsyncClient)) TEST_WRITER.waitForTraces(2) @@ -143,6 +144,15 @@ class SpringListenerSQSTest extends InstrumentationSpecification { TraceUtils.runUnderTrace("parent") { template.sendAsync("SpringListenerSQSAsync", "an async message").get() } + listener.awaitAsyncStarted() + + then: + TEST_WRITER.waitForTraces(2) + assert TEST_WRITER.size() == 2 + assert listener.activeParentFinished == false + + when: + listener.releaseAsyncObservation() then: def sendingSpan @@ -166,8 +176,6 @@ class SpringListenerSQSTest extends InstrumentationSpecification { errored false measured true childOf(sendingSpan) - // The span duration should be at least 500ms since the async handler sleeps 500ms - assert span(0).durationNano > TimeUnit.MILLISECONDS.toNanos(500) tags { "$Tags.COMPONENT" "spring-messaging" "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER @@ -184,6 +192,10 @@ class SpringListenerSQSTest extends InstrumentationSpecification { deleteMessageBatch(it, address, "SpringListenerSQSAsync") } } + + cleanup: + listener?.releaseAsyncObservation() + context.close() } static sendMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan, String queueName = "SpringListenerSQS") { diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/listener/TestListener.groovy b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/listener/TestListener.groovy index eeaa6588995..77e07a16d2a 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/listener/TestListener.groovy +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/listener/TestListener.groovy @@ -1,15 +1,17 @@ package listener import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan +import datadog.trace.core.DDSpan import io.awspring.cloud.sqs.annotation.SqsListener import org.springframework.stereotype.Component import java.util.concurrent.CompletableFuture @Component -class TestListener { +class TestListener extends AsyncObservationSupport { @SqsListener(queueNames = "SpringListenerSQS") void observe(String message) { println "Received $message" @@ -18,7 +20,9 @@ class TestListener { @SqsListener(queueNames = "SpringListenerSQSAsync") CompletableFuture observeAsync(String message) { return CompletableFuture.runAsync { - Thread.sleep(500) + recordActiveParentFinished(((DDSpan) activeSpan()).isFinished()) + markAsyncStarted() + awaitAsyncRelease() // Asserting spring.consume root span is active during async execution def childSpan = startSpan("async.child") def childScope = activateSpan(childSpan) diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt index b29756ff7a4..71a768778f6 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/KafkaBatchCoroutineConfig.kt @@ -1,7 +1,10 @@ package listener +import datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan import datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan -import kotlinx.coroutines.delay +import datadog.trace.core.DDSpan +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer @@ -68,7 +71,7 @@ class KafkaBatchCoroutineConfig { } @Component -class KafkaBatchCoroutineListener { +class KafkaBatchCoroutineListener : AsyncObservationSupport() { val latch = CountDownLatch(1) val receivedValues = mutableListOf() @@ -78,7 +81,12 @@ class KafkaBatchCoroutineListener { containerFactory = "batchListenerContainerFactory" ) suspend fun consume(records: List>) { - delay(500) + recordActiveParentFinished((activeSpan() as DDSpan).isFinished) + // Keep the test gate from blocking the coroutine dispatcher while the test holds the listener open. + withContext(Dispatchers.IO) { + markAsyncStarted() + awaitAsyncRelease() + } // Create a child span inside the coroutine body. // It should be linked to spring.consume, which should be linked to kafka.consume. val childSpan = startSpan("child.work") diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/listener/AsyncObservationSupport.kt b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/listener/AsyncObservationSupport.kt new file mode 100644 index 00000000000..4fcf976d68e --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/kotlin/listener/AsyncObservationSupport.kt @@ -0,0 +1,52 @@ +package listener + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +open class AsyncObservationSupport { + companion object { + // Match the listener completion waits in these tests so the gate does not fail earlier than + // the enclosing async test would. + private const val TIMEOUT_SECONDS = 15L + } + + @Volatile private var asyncStarted = CountDownLatch(0) + + @Volatile private var allowAsyncCompletion = CountDownLatch(0) + + @Volatile + var activeParentFinished: Boolean? = null + private set + + fun prepareAsyncObservation() { + asyncStarted = CountDownLatch(1) + allowAsyncCompletion = CountDownLatch(1) + activeParentFinished = null + } + + @Throws(InterruptedException::class) + fun awaitAsyncStarted() { + if (!asyncStarted.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + throw AssertionError("timed out waiting for async listener to start") + } + } + + fun releaseAsyncObservation() { + allowAsyncCompletion.countDown() + } + + protected fun markAsyncStarted() { + asyncStarted.countDown() + } + + protected fun recordActiveParentFinished(activeParentFinished: Boolean) { + this.activeParentFinished = activeParentFinished + } + + @Throws(InterruptedException::class) + protected fun awaitAsyncRelease() { + if (!allowAsyncCompletion.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + throw AssertionError("timed out waiting for test to release async listener") + } + } +}