From a2f3ed0df4c6ae31aab3e6ad0ec2106c06c1bfdf Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 15 Apr 2026 11:49:27 +1000 Subject: [PATCH] fix: Replace scheduler channel with sync.Cond to prevent deadlock Workers and Submit() callers could deadlock when the workAvailable channel buffer (1024) filled up. All workers would block on the deferred channel send in runJob, preventing any reader from draining the channel. sync.Cond.Signal() never blocks, eliminating the issue. Co-Authored-By: Claude Opus 4.6 (1M context) --- Justfile | 2 +- internal/jobscheduler/jobs.go | 55 ++++++++++++++++------- internal/strategy/git/git_test.go | 4 +- internal/strategy/git/integration_test.go | 3 +- 4 files changed, 41 insertions(+), 23 deletions(-) diff --git a/Justfile b/Justfile index 7e33d38..97d95de 100644 --- a/Justfile +++ b/Justfile @@ -18,7 +18,7 @@ _help: # Run tests test: - @gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s + gotestsum --hide-summary skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s # Lint code lint: diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 7da63f2..b0b5739 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -75,8 +75,9 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler { } type RootScheduler struct { - workAvailable chan bool + cond *sync.Cond lock sync.Mutex + done bool queue []queueJob active map[string]string // queue -> job id activeClones int @@ -118,14 +119,22 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { } m := newSchedulerMetrics() q := &RootScheduler{ - workAvailable: make(chan bool, 1024), active: make(map[string]string), maxCloneConcurrency: maxClones, store: store, metrics: m, } + q.cond = sync.NewCond(&q.lock) ctx, cancel := context.WithCancel(ctx) q.cancel = cancel + // Wake all workers on context cancellation so they can observe done and exit. + go func() { + <-ctx.Done() + q.lock.Lock() + q.done = true + q.lock.Unlock() + q.cond.Broadcast() + }() q.wg.Add(config.Concurrency) for id := range config.Concurrency { go q.worker(ctx, id) @@ -152,7 +161,7 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run}) q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue))) q.lock.Unlock() - q.workAvailable <- true + q.cond.Signal() } func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { @@ -205,24 +214,36 @@ func (q *RootScheduler) worker(ctx context.Context, id int) { defer q.wg.Done() logger := logging.FromContext(ctx).With("scheduler-worker", id) for { - select { - case <-ctx.Done(): + job, ok := q.waitForJob() + if !ok { logger.InfoContext(ctx, "Worker terminated") return + } + q.runJob(ctx, logger, job) + } +} - case <-q.workAvailable: - job, ok := q.takeNextJob() - if !ok { - continue - } - q.runJob(ctx, logger, job) +// waitForJob blocks until a job is available or the scheduler is shut down. +// cond.Wait() atomically releases the lock and suspends the goroutine, so the +// lock is only held during the brief check-and-take, never while sleeping. +// On context cancellation, the goroutine in New() sets done and broadcasts, +// waking all sleeping workers so they can exit. +func (q *RootScheduler) waitForJob() (queueJob, bool) { + q.lock.Lock() + defer q.lock.Unlock() + for { + if q.done { + return queueJob{}, false } + if job, ok := q.takeNextJobLocked(); ok { + return job, true + } + q.cond.Wait() } } func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job queueJob) { defer q.markQueueInactive(job.queue) - defer func() { q.workAvailable <- true }() jobAttrs := attribute.String("job.type", jobType(job.id)) start := time.Now() @@ -279,6 +300,7 @@ func (q *RootScheduler) markQueueInactive(queue string) { } delete(q.active, queue) q.recordGaugesLocked() + q.cond.Signal() } // isCloneJob returns true for job IDs that represent long-running clone operations @@ -287,10 +309,9 @@ func isCloneJob(id string) bool { return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore") } -// Take the next job for any queue that is not already running a job. -func (q *RootScheduler) takeNextJob() (queueJob, bool) { - q.lock.Lock() - defer q.lock.Unlock() +// takeNextJobLocked takes the next job for any queue that is not already running a job. +// Must be called with q.lock held. +func (q *RootScheduler) takeNextJobLocked() (queueJob, bool) { for i, job := range q.queue { if _, active := q.active[job.queue]; active { continue @@ -299,12 +320,12 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) { continue } q.queue = append(q.queue[:i], q.queue[i+1:]...) - q.workAvailable <- true q.active[job.queue] = job.id if isCloneJob(job.id) { q.activeClones++ } q.recordGaugesLocked() + q.cond.Signal() return job, true } return queueJob{}, false diff --git a/internal/strategy/git/git_test.go b/internal/strategy/git/git_test.go index b6f4f52..8f4ac49 100644 --- a/internal/strategy/git/git_test.go +++ b/internal/strategy/git/git_test.go @@ -81,7 +81,6 @@ func TestNew(t *testing.T) { return } assert.NoError(t, err) - assert.NotZero(t, s) assert.Equal(t, "git", s.String()) // Verify handlers were registered @@ -181,9 +180,8 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) { MirrorRoot: tmpDir, FetchInterval: 15, }, nil) - s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) - assert.NotZero(t, s) } func TestIntegrationWithMockUpstream(t *testing.T) { diff --git a/internal/strategy/git/integration_test.go b/internal/strategy/git/integration_test.go index 9e0a2bc..cae7271 100644 --- a/internal/strategy/git/integration_test.go +++ b/internal/strategy/git/integration_test.go @@ -117,9 +117,8 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) { mux := http.NewServeMux() memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) assert.NoError(t, err) - strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) - assert.NotZero(t, strategy) // Start a test server with logging middleware server := testServerWithLogging(ctx, mux)