Skip to content

Commit a2f3ed0

Browse files
alecthomasclaude
andcommitted
fix: Replace scheduler channel with sync.Cond to prevent deadlock
Workers and Submit() callers could deadlock when the workAvailable channel buffer (1024) filled up. All workers would block on the deferred channel send in runJob, preventing any reader from draining the channel. sync.Cond.Signal() never blocks, eliminating the issue. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 890a605 commit a2f3ed0

4 files changed

Lines changed: 41 additions & 23 deletions

File tree

Justfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ _help:
1818

1919
# Run tests
2020
test:
21-
@gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s
21+
gotestsum --hide-summary skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s
2222

2323
# Lint code
2424
lint:

internal/jobscheduler/jobs.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
7575
}
7676

7777
type RootScheduler struct {
78-
workAvailable chan bool
78+
cond *sync.Cond
7979
lock sync.Mutex
80+
done bool
8081
queue []queueJob
8182
active map[string]string // queue -> job id
8283
activeClones int
@@ -118,14 +119,22 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
118119
}
119120
m := newSchedulerMetrics()
120121
q := &RootScheduler{
121-
workAvailable: make(chan bool, 1024),
122122
active: make(map[string]string),
123123
maxCloneConcurrency: maxClones,
124124
store: store,
125125
metrics: m,
126126
}
127+
q.cond = sync.NewCond(&q.lock)
127128
ctx, cancel := context.WithCancel(ctx)
128129
q.cancel = cancel
130+
// Wake all workers on context cancellation so they can observe done and exit.
131+
go func() {
132+
<-ctx.Done()
133+
q.lock.Lock()
134+
q.done = true
135+
q.lock.Unlock()
136+
q.cond.Broadcast()
137+
}()
129138
q.wg.Add(config.Concurrency)
130139
for id := range config.Concurrency {
131140
go q.worker(ctx, id)
@@ -152,7 +161,7 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e
152161
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
153162
q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue)))
154163
q.lock.Unlock()
155-
q.workAvailable <- true
164+
q.cond.Signal()
156165
}
157166

158167
func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
@@ -205,24 +214,36 @@ func (q *RootScheduler) worker(ctx context.Context, id int) {
205214
defer q.wg.Done()
206215
logger := logging.FromContext(ctx).With("scheduler-worker", id)
207216
for {
208-
select {
209-
case <-ctx.Done():
217+
job, ok := q.waitForJob()
218+
if !ok {
210219
logger.InfoContext(ctx, "Worker terminated")
211220
return
221+
}
222+
q.runJob(ctx, logger, job)
223+
}
224+
}
212225

213-
case <-q.workAvailable:
214-
job, ok := q.takeNextJob()
215-
if !ok {
216-
continue
217-
}
218-
q.runJob(ctx, logger, job)
226+
// waitForJob blocks until a job is available or the scheduler is shut down.
227+
// cond.Wait() atomically releases the lock and suspends the goroutine, so the
228+
// lock is only held during the brief check-and-take, never while sleeping.
229+
// On context cancellation, the goroutine in New() sets done and broadcasts,
230+
// waking all sleeping workers so they can exit.
231+
func (q *RootScheduler) waitForJob() (queueJob, bool) {
232+
q.lock.Lock()
233+
defer q.lock.Unlock()
234+
for {
235+
if q.done {
236+
return queueJob{}, false
219237
}
238+
if job, ok := q.takeNextJobLocked(); ok {
239+
return job, true
240+
}
241+
q.cond.Wait()
220242
}
221243
}
222244

223245
func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job queueJob) {
224246
defer q.markQueueInactive(job.queue)
225-
defer func() { q.workAvailable <- true }()
226247

227248
jobAttrs := attribute.String("job.type", jobType(job.id))
228249
start := time.Now()
@@ -279,6 +300,7 @@ func (q *RootScheduler) markQueueInactive(queue string) {
279300
}
280301
delete(q.active, queue)
281302
q.recordGaugesLocked()
303+
q.cond.Signal()
282304
}
283305

284306
// isCloneJob returns true for job IDs that represent long-running clone operations
@@ -287,10 +309,9 @@ func isCloneJob(id string) bool {
287309
return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore")
288310
}
289311

290-
// Take the next job for any queue that is not already running a job.
291-
func (q *RootScheduler) takeNextJob() (queueJob, bool) {
292-
q.lock.Lock()
293-
defer q.lock.Unlock()
312+
// takeNextJobLocked takes the next job for any queue that is not already running a job.
313+
// Must be called with q.lock held.
314+
func (q *RootScheduler) takeNextJobLocked() (queueJob, bool) {
294315
for i, job := range q.queue {
295316
if _, active := q.active[job.queue]; active {
296317
continue
@@ -299,12 +320,12 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) {
299320
continue
300321
}
301322
q.queue = append(q.queue[:i], q.queue[i+1:]...)
302-
q.workAvailable <- true
303323
q.active[job.queue] = job.id
304324
if isCloneJob(job.id) {
305325
q.activeClones++
306326
}
307327
q.recordGaugesLocked()
328+
q.cond.Signal()
308329
return job, true
309330
}
310331
return queueJob{}, false

internal/strategy/git/git_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ func TestNew(t *testing.T) {
8181
return
8282
}
8383
assert.NoError(t, err)
84-
assert.NotZero(t, s)
8584
assert.Equal(t, "git", s.String())
8685

8786
// Verify handlers were registered
@@ -181,9 +180,8 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) {
181180
MirrorRoot: tmpDir,
182181
FetchInterval: 15,
183182
}, nil)
184-
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
183+
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
185184
assert.NoError(t, err)
186-
assert.NotZero(t, s)
187185
}
188186

189187
func TestIntegrationWithMockUpstream(t *testing.T) {

internal/strategy/git/integration_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,8 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) {
117117
mux := http.NewServeMux()
118118
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
119119
assert.NoError(t, err)
120-
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
120+
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
121121
assert.NoError(t, err)
122-
assert.NotZero(t, strategy)
123122

124123
// Start a test server with logging middleware
125124
server := testServerWithLogging(ctx, mux)

0 commit comments

Comments
 (0)