Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion

plugins {
id 'org.jetbrains.kotlin.jvm'
}

muzzle {
pass {
group = 'org.springframework'
module = 'spring-messaging'
versions = "[4.0.0.RELEASE,)"
assertInverse = true
// KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
// which is not bundled in spring-messaging but is always present when Spring Kafka is.
extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
}
}

apply from: "$rootDir/gradle/java.gradle"
apply from: "$rootDir/gradle/test-with-kotlin.gradle"

testJvmConstraints {
minJavaVersion = JavaVersion.VERSION_17
Expand All @@ -16,13 +27,24 @@ testJvmConstraints {
addTestSuiteForDir('latestDepTest', 'test')

["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
def kotlinTaskName = name.replace("Groovy", "Kotlin")
tasks.named(name, GroovyCompile) {
configureCompiler(it, 17)
classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
}
}

kotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_1_8
apiVersion = KotlinVersion.KOTLIN_1_9
languageVersion = KotlinVersion.KOTLIN_1_9
}
}

dependencies {
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')

// capture SQS send and receive spans, propagate trace details in messages
Expand All @@ -36,6 +58,32 @@ dependencies {
}
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'

// Spring Kafka + embedded Kafka broker for coroutine tests
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}

// KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation
testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')

testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'

testImplementation libs.kotlin
testImplementation "org.jetbrains.kotlin:kotlin-reflect"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
testImplementation "io.projectreactor:reactor-core:3.+"

testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')

latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;

/**
* Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
* Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
* it during subscription.
*
* <p>When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
* KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
* listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
* AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
* SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
* Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
* and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
* retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
* up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
*/
@AutoService(InstrumenterModule.class)
public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public KotlinAwareHandlerInstrumentation() {
super("spring-messaging", "spring-messaging-4");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
}

@Override
public List<Instrumenter> typeInstrumentations() {
return Collections.singletonList(new KotlinAwareHandlerInstrumentation());
}

@Override
public String instrumentedType() {
return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("doInvoke")),
KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
}

public static class DoInvokeAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return Object result) {
if (result instanceof Publisher) {
InstrumentationContext.get(Publisher.class, Context.class)
.put((Publisher<?>) result, Context.current());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.lang.reflect.Method;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public final class SpringMessageAsyncHelper {
private SpringMessageAsyncHelper() {}

private static final ClassValue<ReactorCallbackMethods> REACTOR_CALLBACK_METHODS =
new ReactorCallbacksClassValue();

public static Object wrapAsyncResult(Object result, AgentSpan span) {
if (result == null) {
return null;
}
SpanFinisher finisher = new SpanFinisher(span);
if (result instanceof CompletionStage<?>) {
return ((CompletionStage<?>) result)
.whenComplete(new CompletionStageFinishCallback(finisher));
}
ReactorCallbackMethods callbackMethods = REACTOR_CALLBACK_METHODS.get(result.getClass());
if (!callbackMethods.supported()) {
return null;
}
try {
Object wrapped = callbackMethods.doOnError.invoke(result, new ErrorCallback(finisher));
wrapped = callbackMethods.doOnTerminate.invoke(wrapped, new FinishCallback(finisher));
return callbackMethods.doOnCancel.invoke(wrapped, new FinishCallback(finisher));
} catch (Throwable ignored) {
return null;
}
}

static final class ReactorCallbacksClassValue extends ClassValue<ReactorCallbackMethods> {
@Override
protected ReactorCallbackMethods computeValue(Class<?> type) {
try {
Method doOnError = type.getMethod("doOnError", Consumer.class);
Method doOnTerminate = type.getMethod("doOnTerminate", Runnable.class);
Method doOnCancel = type.getMethod("doOnCancel", Runnable.class);
return new ReactorCallbackMethods(doOnError, doOnTerminate, doOnCancel);
} catch (Throwable ignored) {
return ReactorCallbackMethods.UNSUPPORTED;
}
}
}

static final class ReactorCallbackMethods {
static final ReactorCallbackMethods UNSUPPORTED = new ReactorCallbackMethods(null, null, null);

final Method doOnError;
final Method doOnTerminate;
final Method doOnCancel;

ReactorCallbackMethods(Method doOnError, Method doOnTerminate, Method doOnCancel) {
this.doOnError = doOnError;
this.doOnTerminate = doOnTerminate;
this.doOnCancel = doOnCancel;
}

boolean supported() {
return doOnError != null && doOnTerminate != null && doOnCancel != null;
}
}

static final class SpanFinisher {
private final AgentSpan span;
private final AtomicBoolean finished = new AtomicBoolean(false);

SpanFinisher(AgentSpan span) {
this.span = span;
}

void onError(Throwable throwable) {
DECORATE.onError(span, throwable);
}

void finish() {
if (finished.compareAndSet(false, true)) {
DECORATE.beforeFinish(span);
span.finish();
}
}
}

static final class CompletionStageFinishCallback implements BiConsumer<Object, Throwable> {
private final SpanFinisher finisher;

CompletionStageFinishCallback(SpanFinisher finisher) {
this.finisher = finisher;
}

@Override
public void accept(Object ignored, Throwable throwable) {
if (throwable != null) {
finisher.onError(unwrap(throwable));
}
finisher.finish();
}
}

static final class ErrorCallback implements Consumer<Throwable> {
private final SpanFinisher finisher;

ErrorCallback(SpanFinisher finisher) {
this.finisher = finisher;
}

@Override
public void accept(Throwable throwable) {
finisher.onError(throwable);
}
}

static final class FinishCallback implements Runnable {
private final SpanFinisher finisher;

FinishCallback(SpanFinisher finisher) {
this.finisher = finisher;
}

@Override
public void run() {
finisher.finish();
}
}

private static Throwable unwrap(Throwable throwable) {
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
Throwable cause = throwable.getCause();
if (cause != null) {
return cause;
}
}
return throwable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.agent.tooling.annotation.AppliesOn;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
import java.util.concurrent.CompletionStage;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

Expand Down Expand Up @@ -50,12 +53,24 @@ public void methodAdvice(MethodTransformer transformer) {
SpringMessageHandlerInstrumentation.class.getName() + "$HandleMessageAdvice");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SpringMessageDecorator",
packageName + ".SpringMessageExtractAdapter",
packageName + ".SpringMessageExtractAdapter$1"
packageName + ".SpringMessageExtractAdapter$1",
packageName + ".SpringMessageAsyncHelper",
packageName + ".SpringMessageAsyncHelper$ReactorCallbacksClassValue",
packageName + ".SpringMessageAsyncHelper$ReactorCallbackMethods",
packageName + ".SpringMessageAsyncHelper$SpanFinisher",
packageName + ".SpringMessageAsyncHelper$CompletionStageFinishCallback",
packageName + ".SpringMessageAsyncHelper$ErrorCallback",
packageName + ".SpringMessageAsyncHelper$FinishCallback",
};
}

Expand Down Expand Up @@ -95,8 +110,20 @@ public static void onExit(
}
AgentSpan span = scope.span();
scope.close();
if (result instanceof CompletionStage) {
result = ((CompletionStage<?>) result).whenComplete(AsyncResultExtensions.finishSpan(span));
Object asyncResult = SpringMessageAsyncHelper.wrapAsyncResult(result, span);
if (asyncResult != null) {
if (result != asyncResult
&& result instanceof Publisher<?>
&& asyncResult instanceof Publisher<?>) {
Context publisherContext =
InstrumentationContext.get(Publisher.class, Context.class)
.remove((Publisher<?>) result);
if (publisherContext != null) {
InstrumentationContext.get(Publisher.class, Context.class)
.put((Publisher<?>) asyncResult, publisherContext);
}
}
result = asyncResult;
} else {
if (null != error) {
DECORATE.onError(span, error);
Expand Down
Loading
Loading