Skip to content

[FLINK-39388][tests] Fix flaky DataGeneratorSourceITCase#testGatedRateLimiter#27883

Open
featzhang wants to merge 1 commit intoapache:masterfrom
featzhang:fix-datagen-flaky-test
Open

[FLINK-39388][tests] Fix flaky DataGeneratorSourceITCase#testGatedRateLimiter#27883
featzhang wants to merge 1 commit intoapache:masterfrom
featzhang:fix-datagen-flaky-test

Conversation

@featzhang
Copy link
Copy Markdown
Member

What is the purpose of the change?

This PR fixes the flaky test DataGeneratorSourceITCase#testGatedRateLimiter by eliminating a race condition in the FirstCheckpointFilter inner class.

The root cause is that FirstCheckpointFilter previously stopped collecting elements inside snapshotState(). However, the checkpoint barrier can arrive at FirstCheckpointFilter before all upstream elements (emitted in the same checkpoint cycle by GatedRateLimiter) have been processed, causing the filter to prematurely discard elements and making the assertion assertThat(results).hasSize(capacityPerCheckpoint) fail non-deterministically.

Fixes: FLINK-39388

Brief change log

  • DataGeneratorSourceITCase: Refactored FirstCheckpointFilter to implement CheckpointListener in addition to CheckpointedFunction. The element cutoff logic is moved from snapshotState() to notifyCheckpointComplete(), ensuring that collection stops only after the first checkpoint has fully completed and all in-flight elements have been processed downstream.

Verifying this change

This change is a test-only fix. The existing assertion assertThat(results).hasSize(capacityPerCheckpoint) continues to validate the correctness of GatedRateLimiter. The fix eliminates the non-deterministic timing window that caused intermittent failures.

  • This change is a test-only fix.
  • The fix has been verified by code review of the race condition.

Does this pull request potentially affect one of the following parts?

  • Dependencies (does it add or upgrade a dependency): No
  • The public API, i.e., is any changed class annotated with @Public(Evolving): No
  • The serializers: No
  • The runtime per-record code paths (performance sensitive): No
  • Anything that affects the Flink SQL / Table API: No

Documentation

  • Does this pull request introduce a new feature? No
  • If yes, how is the feature documented? N/A

…ckpointListener to stop collecting after first checkpoint completes
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 2, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants