diff --git a/Justfile b/Justfile index 7e33d38..97d95de 100644 --- a/Justfile +++ b/Justfile @@ -18,7 +18,7 @@ _help: # Run tests test: - @gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s + gotestsum --hide-summary skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s # Lint code lint: diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 7da63f2..b0b5739 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -75,8 +75,9 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler { } type RootScheduler struct { - workAvailable chan bool + cond *sync.Cond lock sync.Mutex + done bool queue []queueJob active map[string]string // queue -> job id activeClones int @@ -118,14 +119,22 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { } m := newSchedulerMetrics() q := &RootScheduler{ - workAvailable: make(chan bool, 1024), active: make(map[string]string), maxCloneConcurrency: maxClones, store: store, metrics: m, } + q.cond = sync.NewCond(&q.lock) ctx, cancel := context.WithCancel(ctx) q.cancel = cancel + // Wake all workers on context cancellation so they can observe done and exit. + go func() { + <-ctx.Done() + q.lock.Lock() + q.done = true + q.lock.Unlock() + q.cond.Broadcast() + }() q.wg.Add(config.Concurrency) for id := range config.Concurrency { go q.worker(ctx, id) @@ -152,7 +161,7 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run}) q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue))) q.lock.Unlock() - q.workAvailable <- true + q.cond.Signal() } func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { @@ -205,24 +214,36 @@ func (q *RootScheduler) worker(ctx context.Context, id int) { defer q.wg.Done() logger := logging.FromContext(ctx).With("scheduler-worker", id) for { - select { - case <-ctx.Done(): + job, ok := q.waitForJob() + if !ok { logger.InfoContext(ctx, "Worker terminated") return + } + q.runJob(ctx, logger, job) + } +} - case <-q.workAvailable: - job, ok := q.takeNextJob() - if !ok { - continue - } - q.runJob(ctx, logger, job) +// waitForJob blocks until a job is available or the scheduler is shut down. +// cond.Wait() atomically releases the lock and suspends the goroutine, so the +// lock is only held during the brief check-and-take, never while sleeping. +// On context cancellation, the goroutine in New() sets done and broadcasts, +// waking all sleeping workers so they can exit. +func (q *RootScheduler) waitForJob() (queueJob, bool) { + q.lock.Lock() + defer q.lock.Unlock() + for { + if q.done { + return queueJob{}, false } + if job, ok := q.takeNextJobLocked(); ok { + return job, true + } + q.cond.Wait() } } func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job queueJob) { defer q.markQueueInactive(job.queue) - defer func() { q.workAvailable <- true }() jobAttrs := attribute.String("job.type", jobType(job.id)) start := time.Now() @@ -279,6 +300,7 @@ func (q *RootScheduler) markQueueInactive(queue string) { } delete(q.active, queue) q.recordGaugesLocked() + q.cond.Signal() } // isCloneJob returns true for job IDs that represent long-running clone operations @@ -287,10 +309,9 @@ func isCloneJob(id string) bool { return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore") } -// Take the next job for any queue that is not already running a job. -func (q *RootScheduler) takeNextJob() (queueJob, bool) { - q.lock.Lock() - defer q.lock.Unlock() +// takeNextJobLocked takes the next job for any queue that is not already running a job. +// Must be called with q.lock held. +func (q *RootScheduler) takeNextJobLocked() (queueJob, bool) { for i, job := range q.queue { if _, active := q.active[job.queue]; active { continue @@ -299,12 +320,12 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) { continue } q.queue = append(q.queue[:i], q.queue[i+1:]...) - q.workAvailable <- true q.active[job.queue] = job.id if isCloneJob(job.id) { q.activeClones++ } q.recordGaugesLocked() + q.cond.Signal() return job, true } return queueJob{}, false diff --git a/internal/strategy/git/git_test.go b/internal/strategy/git/git_test.go index b6f4f52..8f4ac49 100644 --- a/internal/strategy/git/git_test.go +++ b/internal/strategy/git/git_test.go @@ -81,7 +81,6 @@ func TestNew(t *testing.T) { return } assert.NoError(t, err) - assert.NotZero(t, s) assert.Equal(t, "git", s.String()) // Verify handlers were registered @@ -181,9 +180,8 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) { MirrorRoot: tmpDir, FetchInterval: 15, }, nil) - s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) - assert.NotZero(t, s) } func TestIntegrationWithMockUpstream(t *testing.T) { diff --git a/internal/strategy/git/integration_test.go b/internal/strategy/git/integration_test.go index 9e0a2bc..cae7271 100644 --- a/internal/strategy/git/integration_test.go +++ b/internal/strategy/git/integration_test.go @@ -117,9 +117,8 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) { mux := http.NewServeMux() memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) assert.NoError(t, err) - strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + _, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) - assert.NotZero(t, strategy) // Start a test server with logging middleware server := testServerWithLogging(ctx, mux)