From ad35bd61c016d5155acde7d3fd87ce2f619c2884 Mon Sep 17 00:00:00 2001 From: Brandur Date: Tue, 24 Mar 2026 20:36:54 -0700 Subject: [PATCH] Try to start the queue maintainer multiple times with backoff This one's aimed at addressing #1161. `HookPeriodicJobsStart.Start` may return an error that causes the queue maintainer not to start, and there are a few other intermittent errors that may cause it not to start (say in the case of a transient DB problem). If this were to occur, the course of action currently is for the client to to just spit an error to logs and not try any additional remediation, which could have the effect of leaving the queue maintainer offline for extended periods. Here, try to address this broadly by allowing the queue maintainer a few attempts at starting, and with our standard exponential backoff (1s, 2s, 4s, 8s, etc.). In case a queue maintainer fails to start completely, the client requests resignation and hands leadership off to another client to see if it can start successfully. I think this is an okay compromise because in case of a non-transient fundamental error (say `HookPeriodicJobsStart.Start` always returns an error), we don't go into a hot loop that starts hammering things. Instead, we'll get a reasonably responsible slow back off that gives things a chance to recover, and which should be very visible in logs. Fixes #1161. --- CHANGELOG.md | 4 + client.go | 163 +++++++++++++++--- client_test.go | 25 +++ internal/maintenance/periodic_job_enqueuer.go | 53 +++--- internal/maintenance/queue_maintainer.go | 2 + 5 files changed, 199 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf5e4e26..ed2d4a97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184). + ## [0.33.0] - 2026-04-03 ### Changed diff --git a/client.go b/client.go index c07c70d4..13fb1653 100644 --- a/client.go +++ b/client.go @@ -33,6 +33,7 @@ import ( "github.com/riverqueue/river/rivershared/testsignal" "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/maputil" + "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivershared/util/testutil" "github.com/riverqueue/river/rivershared/util/valutil" @@ -606,11 +607,20 @@ type Client[TTx any] struct { pilot riverpilot.Pilot producersByQueueName map[string]*producer queueMaintainer *maintenance.QueueMaintainer - queues *QueueBundle - services []startstop.Service - stopped <-chan struct{} - subscriptionManager *subscriptionManager - testSignals clientTestSignals + + // queueMaintainerEpoch is incremented each time leadership is gained, + // giving each tryStartQueueMaintainer goroutine a term number. + // queueMaintainerMu serializes epoch checks with Stop calls so that a + // stale goroutine from an older term cannot tear down a maintainer + // started by a newer term. + queueMaintainerEpoch int64 + queueMaintainerMu sync.Mutex + + queues *QueueBundle + services []startstop.Service + stopped <-chan struct{} + subscriptionManager *subscriptionManager + testSignals clientTestSignals // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. @@ -619,7 +629,9 @@ type Client[TTx any] struct { // Test-only signals. type clientTestSignals struct { - electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader + electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader + queueMaintainerStartError testsignal.TestSignal[error] // notifies on each failed queue maintainer start attempt + queueMaintainerStartRetriesExhausted testsignal.TestSignal[struct{}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted jobCleaner *maintenance.JobCleanerTestSignals jobRescuer *maintenance.JobRescuerTestSignals @@ -631,6 +643,8 @@ type clientTestSignals struct { func (ts *clientTestSignals) Init(tb testutil.TestingTB) { ts.electedLeader.Init(tb) + ts.queueMaintainerStartError.Init(tb) + ts.queueMaintainerStartRetriesExhausted.Init(tb) if ts.jobCleaner != nil { ts.jobCleaner.Init(tb) @@ -1279,26 +1293,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte } func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error { - handleLeadershipChange := func(ctx context.Context, notification *leadership.Notification) { - c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received", - slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader)) - - switch { - case notification.IsLeader: - // Starting the queue maintainer can take a little time so send to - // this test signal _first_ so tests waiting on it can finish, - // cancel the queue maintainer start, and overall run much faster. - c.testSignals.electedLeader.Signal(struct{}{}) - - if err := c.queueMaintainer.Start(ctx); err != nil { - c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error())) - } - - default: - c.queueMaintainer.Stop() - } - } - if !shouldStart { return nil } @@ -1310,13 +1304,56 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar sub := c.elector.Listen() defer sub.Unlisten() + // Cancel function for an in-progress tryStartQueueMaintainer. If + // leadership is lost while the start process is still retrying, used to + // abort it promptly instead of waiting for retries to finish. + var cancelQueueMaintainerStart context.CancelCauseFunc = func(_ error) {} + for { select { case <-ctx.Done(): + cancelQueueMaintainerStart(context.Cause(ctx)) return case notification := <-sub.C(): - handleLeadershipChange(ctx, notification) + c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received", + slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader)) + + switch { + case notification.IsLeader: + // Starting the queue maintainer takes time, so send the + // test signal first. Tests waiting on it can receive it, + // cancel the queue maintainer start, and finish faster. + c.testSignals.electedLeader.Signal(struct{}{}) + + // Start the queue maintainer with retries and exponential + // backoff in a separate goroutine so the leadership change + // loop remains responsive to new notifications. startCtx is + // used for cancellation in case leadership is lost while + // retries are in progress. + // + // Epoch is incremented so stale tryStartQueueMaintainer + // goroutines from a previous term cannot call Stop after a + // new term has begun. + var startCtx context.Context + startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx) + + c.queueMaintainerMu.Lock() + c.queueMaintainerEpoch++ + epoch := c.queueMaintainerEpoch + c.queueMaintainerMu.Unlock() + + go c.tryStartQueueMaintainer(startCtx, epoch) + + default: + // Cancel any in-progress start attempts before stopping. + // Send a startstop.ErrStop to make sure services like + // Reindexer run any specific cleanup code for stops. + cancelQueueMaintainerStart(startstop.ErrStop) + cancelQueueMaintainerStart = func(_ error) {} + + c.queueMaintainer.Stop() + } } } }() @@ -1324,6 +1361,78 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar return nil } +// Tries to start the queue maintainer after gaining leadership. We allow some +// retries with exponential backoff in case of failure, and in case the queue +// maintainer can't be started, we request resignation to allow another client +// to try and take over. +func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context, epoch int64) { + const maxStartAttempts = 3 + + ctxCancelled := func() bool { + if ctx.Err() != nil { + c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Queue maintainer start cancelled") + return true + } + return false + } + + // stopIfCurrentEpoch atomically checks whether this goroutine's epoch is + // still the active one and calls Stop only if it is. Combined with the + // epoch increment in handleLeadershipChangeLoop, prevents stale goroutine + // from stopping a maintainer started by a newer leadership term. + stopIfCurrentEpoch := func() bool { + c.queueMaintainerMu.Lock() + defer c.queueMaintainerMu.Unlock() + + if c.queueMaintainerEpoch != epoch { + return false + } + + c.queueMaintainer.Stop() + return true + } + + var lastErr error + for attempt := 1; attempt <= maxStartAttempts; attempt++ { + if ctxCancelled() { + return + } + + if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil { + return + } + + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer", + slog.String("err", lastErr.Error()), slog.Int("attempt", attempt)) + + c.testSignals.queueMaintainerStartError.Signal(lastErr) + + // Stop the queue maintainer to fully reset its state (and any + // sub-services) before retrying. The epoch check ensures a stale + // goroutine cannot stop a maintainer from a newer leadership term. + if !stopIfCurrentEpoch() { + return + } + + if attempt < maxStartAttempts { + serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault)) + } + } + + if ctxCancelled() { + return + } + + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation", + slog.String("err", lastErr.Error())) + + c.testSignals.queueMaintainerStartRetriesExhausted.Signal(struct{}{}) + + if err := c.clientNotifyBundle.RequestResign(ctx); err != nil { + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error())) + } +} + // Driver exposes the underlying driver used by the client. // // API is not stable. DO NOT USE. diff --git a/client_test.go b/client_test.go index cb5eed0f..5a446068 100644 --- a/client_test.go +++ b/client_test.go @@ -5145,6 +5145,31 @@ func Test_Client_Maintenance(t *testing.T) { require.True(t, svc.RemoveByID("new_periodic_job")) }) + t.Run("QueueMaintainerStartRetriesAndResigns", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + config.Hooks = []rivertype.Hook{ + HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + return errors.New("hook start error") + }), + } + + client, _ := setup(t, config) + + startClient(ctx, t, client) + client.testSignals.electedLeader.WaitOrTimeout() + + // Wait for all 3 retry attempts to fail. + for range 3 { + err := client.testSignals.queueMaintainerStartError.WaitOrTimeout() + require.EqualError(t, err, "hook start error") + } + + // After all retries exhausted, the client should request resignation. + client.testSignals.queueMaintainerStartRetriesExhausted.WaitOrTimeout() + }) + t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 8ea0c042..54e8fe53 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -318,31 +318,42 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { s.StaggerStart(ctx) - initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ - Schema: s.Config.Schema, - }) - if err != nil { - return err - } + var ( + initialPeriodicJobs []*riverpilot.PeriodicJob + subServices []startstop.Service + ) + if err := func() error { + var err error + initialPeriodicJobs, err = s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ + Schema: s.Config.Schema, + }) + if err != nil { + return err + } - for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) { - if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert - DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob { - return (*rivertype.DurablePeriodicJob)(job) - }), - }); err != nil { + for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) { + if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert + DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob { + return (*rivertype.DurablePeriodicJob)(job) + }), + }); err != nil { + return err + } + } + + subServices = []startstop.Service{ + startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), + } + stopServicesOnError := func() { + startstop.StopAllParallel(subServices...) + } + if err := startstop.StartAll(ctx, subServices...); err != nil { + stopServicesOnError() return err } - } - subServices := []startstop.Service{ - startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), - } - stopServicesOnError := func() { - startstop.StopAllParallel(subServices...) - } - if err := startstop.StartAll(ctx, subServices...); err != nil { - stopServicesOnError() + return nil + }(); err != nil { stopped() return err } diff --git a/internal/maintenance/queue_maintainer.go b/internal/maintenance/queue_maintainer.go index 5c780ce0..142bf283 100644 --- a/internal/maintenance/queue_maintainer.go +++ b/internal/maintenance/queue_maintainer.go @@ -50,6 +50,8 @@ func (m *QueueMaintainer) Start(ctx context.Context) error { for _, service := range m.servicesByName { if err := service.Start(ctx); err != nil { + startstop.StopAllParallel(maputil.Values(m.servicesByName)...) + stopped() return err } }