Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ _help:

# Run tests
test:
@gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s
gotestsum --hide-summary skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s

# Lint code
lint:
Expand Down
55 changes: 38 additions & 17 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
}

type RootScheduler struct {
workAvailable chan bool
cond *sync.Cond
lock sync.Mutex
done bool
queue []queueJob
active map[string]string // queue -> job id
activeClones int
Expand Down Expand Up @@ -118,14 +119,22 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
}
m := newSchedulerMetrics()
q := &RootScheduler{
workAvailable: make(chan bool, 1024),
active: make(map[string]string),
maxCloneConcurrency: maxClones,
store: store,
metrics: m,
}
q.cond = sync.NewCond(&q.lock)
ctx, cancel := context.WithCancel(ctx)
q.cancel = cancel
// Wake all workers on context cancellation so they can observe done and exit.
go func() {
<-ctx.Done()
q.lock.Lock()
q.done = true
q.lock.Unlock()
q.cond.Broadcast()
}()
q.wg.Add(config.Concurrency)
for id := range config.Concurrency {
go q.worker(ctx, id)
Expand All @@ -152,7 +161,7 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue)))
q.lock.Unlock()
q.workAvailable <- true
q.cond.Signal()
}

func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
Expand Down Expand Up @@ -205,24 +214,36 @@ func (q *RootScheduler) worker(ctx context.Context, id int) {
defer q.wg.Done()
logger := logging.FromContext(ctx).With("scheduler-worker", id)
for {
select {
case <-ctx.Done():
job, ok := q.waitForJob()
if !ok {
logger.InfoContext(ctx, "Worker terminated")
return
}
q.runJob(ctx, logger, job)
}
}

case <-q.workAvailable:
job, ok := q.takeNextJob()
if !ok {
continue
}
q.runJob(ctx, logger, job)
// waitForJob blocks until a job is available or the scheduler is shut down.
// cond.Wait() atomically releases the lock and suspends the goroutine, so the
// lock is only held during the brief check-and-take, never while sleeping.
// On context cancellation, the goroutine in New() sets done and broadcasts,
// waking all sleeping workers so they can exit.
func (q *RootScheduler) waitForJob() (queueJob, bool) {
q.lock.Lock()
defer q.lock.Unlock()
for {
if q.done {
return queueJob{}, false
}
if job, ok := q.takeNextJobLocked(); ok {
return job, true
}
q.cond.Wait()
}
}

func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job queueJob) {
defer q.markQueueInactive(job.queue)
defer func() { q.workAvailable <- true }()

jobAttrs := attribute.String("job.type", jobType(job.id))
start := time.Now()
Expand Down Expand Up @@ -279,6 +300,7 @@ func (q *RootScheduler) markQueueInactive(queue string) {
}
delete(q.active, queue)
q.recordGaugesLocked()
q.cond.Signal()
}

// isCloneJob returns true for job IDs that represent long-running clone operations
Expand All @@ -287,10 +309,9 @@ func isCloneJob(id string) bool {
return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore")
}

// Take the next job for any queue that is not already running a job.
func (q *RootScheduler) takeNextJob() (queueJob, bool) {
q.lock.Lock()
defer q.lock.Unlock()
// takeNextJobLocked takes the next job for any queue that is not already running a job.
// Must be called with q.lock held.
func (q *RootScheduler) takeNextJobLocked() (queueJob, bool) {
for i, job := range q.queue {
if _, active := q.active[job.queue]; active {
continue
Expand All @@ -299,12 +320,12 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) {
continue
}
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.workAvailable <- true
q.active[job.queue] = job.id
if isCloneJob(job.id) {
q.activeClones++
}
q.recordGaugesLocked()
q.cond.Signal()
return job, true
}
return queueJob{}, false
Expand Down
4 changes: 1 addition & 3 deletions internal/strategy/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func TestNew(t *testing.T) {
return
}
assert.NoError(t, err)
assert.NotZero(t, s)
assert.Equal(t, "git", s.String())

// Verify handlers were registered
Expand Down Expand Up @@ -181,9 +180,8 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) {
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil)
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)
assert.NotZero(t, s)
}

func TestIntegrationWithMockUpstream(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions internal/strategy/git/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) {
mux := http.NewServeMux()
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)
assert.NotZero(t, strategy)

// Start a test server with logging middleware
server := testServerWithLogging(ctx, mux)
Expand Down