Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,6 @@
*/
public abstract class AsyncResultDecorator extends BaseDecorator {

private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultExtension>() {
@Override
protected AsyncResultExtension computeValue(Class<?> type) {
return AsyncResultExtensions.registered().stream()
.filter(extension -> extension.supports(type))
.findFirst()
.orElse(null);
}
};

/**
* Look for asynchronous result and decorate it with span finisher. If the result is not
* asynchronous, it will be return unmodified and span will be finished.
Expand All @@ -33,12 +22,9 @@ protected AsyncResultExtension computeValue(Class<?> type) {
*/
public Object wrapAsyncResultOrFinishSpan(
final Object result, final Class<?> methodReturnType, final AgentSpan span) {
AsyncResultExtension extension;
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(methodReturnType)) != null) {
Object applied = extension.apply(result, span);
if (applied != null) {
return applied;
}
Object applied = AsyncResultExtensions.wrapAsyncResult(result, methodReturnType, span);
if (applied != null) {
return applied;
}
// If no extension was applied, immediately finish the span and return the original result
span.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@ public final class AsyncResultExtensions {
private static final List<AsyncResultExtension> EXTENSIONS =
new CopyOnWriteArrayList<>(singletonList(new CompletableAsyncResultExtension()));

private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultExtension>() {
@Override
protected AsyncResultExtension computeValue(Class<?> type) {
return EXTENSIONS.stream()
.filter(extension -> extension.supports(type))
.findFirst()
.orElse(null);
}
};

/**
* Wraps a supported async result so the span is finished when the async computation completes.
*
* @return the wrapped async result, or {@code null} if the result type is unsupported or no
* wrapping is applied
*/
public static Object wrapAsyncResult(
final Object result, final Class<?> resultType, final AgentSpan span) {
AsyncResultExtension extension;
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(resultType)) != null) {
return extension.apply(result, span);
}
return null;
}

/**
* Registers an extension to add supported async types.
*
Expand All @@ -36,11 +62,6 @@ public static void register(AsyncResultExtension extension) {
}
}

/** Returns the list of currently registered extensions. */
public static List<AsyncResultExtension> registered() {
return EXTENSIONS;
}

static final class CompletableAsyncResultExtension implements AsyncResultExtension {
@Override
public boolean supports(Class<?> result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static void init() {}

@Override
public boolean supports(Class<?> result) {
return result == Flux.class || result == Mono.class;
return Flux.class.isAssignableFrom(result) || Mono.class.isAssignableFrom(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion

plugins {
id 'org.jetbrains.kotlin.jvm'
}

muzzle {
pass {
group = 'org.springframework'
module = 'spring-messaging'
versions = "[4.0.0.RELEASE,)"
assertInverse = true
// KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
// which is not bundled in spring-messaging but is always present when Spring Kafka is.
extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
}
}

apply from: "$rootDir/gradle/java.gradle"
apply from: "$rootDir/gradle/test-with-kotlin.gradle"

testJvmConstraints {
minJavaVersion = JavaVersion.VERSION_17
Expand All @@ -16,13 +27,24 @@ testJvmConstraints {
addTestSuiteForDir('latestDepTest', 'test')

["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
def kotlinTaskName = name.replace("Groovy", "Kotlin")
tasks.named(name, GroovyCompile) {
configureCompiler(it, 17)
classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
}
}

kotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_1_8
apiVersion = KotlinVersion.KOTLIN_1_9
languageVersion = KotlinVersion.KOTLIN_1_9
}
}

dependencies {
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')

// capture SQS send and receive spans, propagate trace details in messages
Expand All @@ -36,6 +58,33 @@ dependencies {
}
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'

// Spring Kafka + embedded Kafka broker for coroutine tests
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}

// KotlinAwareHandlerInstrumentation relies on the reactive-streams and reactor instrumentation
testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')
testImplementation project(':dd-java-agent:instrumentation:reactor-core-3.1')

testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'

testImplementation libs.kotlin
testImplementation "org.jetbrains.kotlin:kotlin-reflect"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
testImplementation "io.projectreactor:reactor-core:3.+"

testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')

latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;

/**
* Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
* Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
* it during subscription.
*
* <p>When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
* KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
* listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
* AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
* SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
* Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
* and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
* retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
* up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
*/
@AutoService(InstrumenterModule.class)
public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public KotlinAwareHandlerInstrumentation() {
super("spring-messaging", "spring-messaging-4");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
}

@Override
public List<Instrumenter> typeInstrumentations() {
return Collections.singletonList(new KotlinAwareHandlerInstrumentation());
}

@Override
public String instrumentedType() {
return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("doInvoke")),
KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
}

public static class DoInvokeAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return Object result) {
if (result instanceof Publisher) {
InstrumentationContext.get(Publisher.class, Context.class)
.put((Publisher<?>) result, Context.current());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
Expand Down Expand Up @@ -95,15 +94,23 @@ public static void onExit(
}
AgentSpan span = scope.span();
scope.close();
if (result instanceof CompletionStage) {
result = ((CompletionStage<?>) result).whenComplete(AsyncResultExtensions.finishSpan(span));
} else {
if (null != error) {
DECORATE.onError(span, error);
if (null != error) {
DECORATE.onError(span, error);
}
if (result != null) {
Object wrappedResult =
AsyncResultExtensions.wrapAsyncResult(result, result.getClass(), span);
if (wrappedResult != null) {
result = wrappedResult;
// span will be finished by the wrapper
return;
}
DECORATE.beforeFinish(span);
span.finish();
}
if (null != error) {
DECORATE.onError(span, error);
}
DECORATE.beforeFinish(span);
span.finish();
}
}
}
Loading
Loading