From 32e24bf207b5ff5eb4cc2c5bf30e5767a1e0d5d8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 3 Apr 2026 18:34:17 +0200 Subject: [PATCH 1/2] 4.x: Detect un-awaited CompletionStageDisposables during tests --- .../core/CompletionStageDisposable.java | 109 +++++++++++++++++- .../streamable/StreamableBaseTest.java | 74 ++++++++++++ .../operators/streamable/StreamableTest.java | 37 +++--- 3 files changed, 200 insertions(+), 20 deletions(-) create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java diff --git a/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java b/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java index 616d021e5a..feb7fc2087 100644 --- a/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java +++ b/src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java @@ -13,24 +13,77 @@ package io.reactivex.rxjava4.core; +import java.lang.ref.Cleaner; +import java.util.Objects; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import io.reactivex.rxjava4.annotations.NonNull; import io.reactivex.rxjava4.disposables.*; +import io.reactivex.rxjava4.plugins.RxJavaPlugins; /** * Consist of a terminal stage and a disposable to be able to cancel a sequence. * @param the return and element type of the various stages - * @param stage the embedded stage to work with - * @param disposable the way to cancel the stage concurrently * @since 4.0.0 */ -public record CompletionStageDisposable(@NonNull CompletionStage stage, @NonNull Disposable disposable) { +public final class CompletionStageDisposable implements AutoCloseable { + // record classes can't have extra fields, why? + // also I have to write out the constructor instead of declaring it in the record definition, FFS + + static final Cleaner cleaner = Cleaner.create(); + + static volatile Consumer trackAllocations; + + static final class State extends AtomicBoolean implements Runnable { + + /** */ + private static final long serialVersionUID = 262854674341831347L; + + Throwable allocationTrace; + + @Override + public void run() { + if (!get()) { + RxJavaPlugins.onError( + new IllegalStateException("CompletionStageDisposable was not awaited or ignored explicitly", + allocationTrace)); + } + } + + } + + final CompletionStage stage; + final Disposable disposable; + final State state; + final Cleaner.Cleanable cleanable; + + /** + * Construct an instance with parameters + * @param stage the stage to be awaited + * @param disposable the disposable to cancel asynchronously + */ + public CompletionStageDisposable(@NonNull CompletionStage stage, @NonNull Disposable disposable) { + Objects.requireNonNull(stage, "stage is null"); + Objects.requireNonNull(disposable, "disposable is null"); + this.stage = stage; + this.disposable = disposable; + this.state = new State(); + this.cleanable = cleaner.register(this, state); + if (trackAllocations != null) { + state.allocationTrace = new StackOverflowError("CompletionStageDisposable::AllocationTrace"); + trackAllocations.accept(this.cleanable); + } else { + state.allocationTrace = null; + } + } /** * Await the completion of the current stage. */ public void await() { + state.lazySet(true);; Streamer.await(stage); } @@ -39,6 +92,56 @@ public void await() { * @param canceller the canceller link */ public void await(DisposableContainer canceller) { + state.lazySet(true);; Streamer.await(stage, canceller); } + + /** + * Indicate this instance is deliberately not awaiting its stage. + */ + public void ignore() { + state.lazySet(true);; + } + + @Override + public void close() { + try { + state.lazySet(true); + disposable.dispose(); + } finally { + cleanable.clean(); + } + } + + /** + * Set an allocator tracer callback to track where CompletionStageDisposables are leaking. + * @param callback the callback to call when a new trace is being established + */ + public static void setAllocationTrace(Consumer callback) { + trackAllocations = callback; + } + + /** + * Returns the current allocation stacktrace capturing consumer. + * @return the current allocation stacktrace capturing consumer. + */ + public static Consumer getAllocationTrace() { + return trackAllocations; + } + + /*** + * Returns the associated completion stage value. + * @return the associated completion stage value. + */ + public CompletionStage stage() { + return stage; + } + + /** + * Returns the associated disposable value. + * @return the associated disposable value. + */ + public Disposable disposable() { + return disposable; + } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java new file mode 100644 index 0000000000..042f18d396 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableBaseTest.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.lang.ref.Cleaner; +import java.util.*; + +import org.junit.jupiter.api.*; + +import io.reactivex.rxjava4.core.CompletionStageDisposable; +import io.reactivex.rxjava4.exceptions.CompositeException; +import io.reactivex.rxjava4.functions.Consumer; +import io.reactivex.rxjava4.plugins.RxJavaPlugins; + +public abstract class StreamableBaseTest { + + protected java.util.function.Consumer stageTrackingState; + + protected Consumer oldHandler; + + protected List errors; + + protected List cleaners; + + protected volatile boolean undeliverablesExpected; + + @BeforeEach + protected final void beforeTest() { + errors = Collections.synchronizedList(new ArrayList<>()); + cleaners = Collections.synchronizedList(new ArrayList<>()); + undeliverablesExpected = false; + + stageTrackingState = CompletionStageDisposable.getAllocationTrace(); + CompletionStageDisposable.setAllocationTrace(cleaners::add); + + oldHandler = RxJavaPlugins.getErrorHandler(); + RxJavaPlugins.setErrorHandler(e -> { + if (!undeliverablesExpected) { + errors.add(e); + } + if (oldHandler != null) { + oldHandler.accept(e); + } + }); + } + + @AfterEach + protected final void afterTest(TestInfo testInfo) { + CompletionStageDisposable.setAllocationTrace(stageTrackingState); + for (var c : cleaners) { + c.clean(); + } + if (errors.size() != 0) { + throw new AssertionError("Undeliverable exceptions during test detected: " + testInfo.getDisplayName(), + new CompositeException(errors)); + } + } + + protected final void setUndeliverablesExpected(boolean isExpected) { + undeliverablesExpected = isExpected; + } + +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java index f06f62189a..126ee906f7 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java @@ -13,8 +13,7 @@ package io.reactivex.rxjava4.internal.operators.streamable; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -29,7 +28,7 @@ import io.reactivex.rxjava4.testsupport.TestHelper; @Isolated -public class StreamableTest { +public class StreamableTest extends StreamableBaseTest { @Test public void empty() throws Throwable { @@ -38,16 +37,18 @@ public void empty() throws Throwable { TestSubscriber ts = new TestSubscriber(); ts.onSubscribe(EmptySubscription.INSTANCE); - var comp = Streamable.empty().forEach(e -> { ts.onError(new TestException("Element produced? " + e)); }, exec); + try (var comp = Streamable.empty().forEach(e -> { ts.onError(new TestException("Element produced? " + e)); }, exec)) { - comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()).exceptionally(e -> { ts.onError(e); return null; }).join(); + comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()) + .exceptionally(e -> { ts.onError(e); return null; }); - ts - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(); + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); - assertFalse(exec.isShutdown(), "Exec::IsShutdown"); - assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + } }); } @@ -58,16 +59,18 @@ public void just() throws Throwable { TestSubscriber ts = new TestSubscriber(); ts.onSubscribe(EmptySubscription.INSTANCE); - var comp = Streamable.just(1).forEach(e -> { ts.onNext(e); }, exec); + try (var comp = Streamable.just(1).forEach(e -> { ts.onNext(e); }, exec)) { - comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()).exceptionally(e -> { ts.onError(e); return null; }).join(); + comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()) + .exceptionally(e -> { ts.onError(e); return null; }).join(); - ts - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); - assertFalse(exec.isShutdown(), "Exec::IsShutdown"); - assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + assertFalse(exec.isShutdown(), "Exec::IsShutdown"); + assertFalse(exec.isTerminated(), "Exec::IsTerminated"); + } }); } From aedc4b3ffd762b325a90d54c4920fbcbc1ca0cf0 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 3 Apr 2026 18:40:20 +0200 Subject: [PATCH 2/2] Promote the Cleaner api use in readme. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 33c42ab63a..7f441940d1 100644 --- a/README.md +++ b/README.md @@ -19,12 +19,12 @@ It extends the [observer pattern](http://en.wikipedia.org/wiki/Observer_pattern) - :+1: `java.util.concurrent.Flow`-based implementation. - :+1: Virtual Thread support; `virtualCreate()`, `virtualTransform()`, :eye: `Schedulers.virtual()`. - :+1: New `Streamable` built around Virtual Threads & virtual blocking. Think `IAsyncEnumerable` for Java. :satellite: in progress. +- :+1: Using Java Cleaner API to detect resource leaks and using it for adaptive cleanups. - :information_source: Reactive Streams Test Compatibility Kit usage; [Reactive-Streams](https://github.com/reactive-streams/reactive-streams-jvm). - :satellite: Rewamp of the javadoc bloat in the base types via `sealed` interfaces. - :satellite: Reduce overload bloat by using `record`-based configurations. - :satellite: Internal optimizations now that I have the master :key:. - :eye: Possible usages for Scoped variables for context and per-item resource management. -- :eye: Possible use for the Java Cleaner API. - :eye: Possible inclusion of 2nd and 3rd party operators. - :eye: Possible inclusion of the Iterable Extensions (Ix) 2nd party library. ju.Stream is sh|t wrt interfacing and composability. - :question: Android compatibility depends on your API level and what desugaring is available.