-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathqueue.go
More file actions
222 lines (190 loc) · 4.85 KB
/
queue.go
File metadata and controls
222 lines (190 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// Package bqueue is a "in memory" queue.
package bqueue
import (
"context"
"fmt"
"log"
"runtime"
"sync"
)
const (
// DefaultLimit is the default job queue limit.
DefaultLimit = 128
// UnlimitedWorkers can passed to Workers when run in dynamic
// mode to use an unlimited amount of workers.
UnlimitedWorkers = -1
)
// Queue processes jobs.
type Queue struct {
limiter chan struct{}
workers int
limit int
run func() error
jobs chan Job
log Logger
wg sync.WaitGroup
}
// Option represents a queue option.
type Option func(q *Queue) error
// Workers sets the number of workers which will process jobs from the queue.
// The constant UnlimitedWorkers can be used along side dynamic workers, the default,
// which will enable unlimited workers.
// Default is runtime.NumCPU() workers.
func Workers(count int) Option {
return func(q *Queue) error {
if count < UnlimitedWorkers {
return fmt.Errorf("workers must be at least %d, got %d", UnlimitedWorkers, count)
}
q.workers = count
return nil
}
}
// Limit sets the maximum number of jobs queued before an error is returned.
// Default is DefaultLimit maximum queued jobs..
func Limit(count int) Option {
return func(q *Queue) error {
if count < 0 {
return fmt.Errorf("limit must be at least 0, got %d", count)
}
q.limit = count
return nil
}
}
// Log sets the logger for Queue.
// Default is log.Default().
func Log(l Logger) Option {
return func(q *Queue) error {
q.log = l
return nil
}
}
// Static configures the Queue to use a static number of pre-spawned
// workers instead of dynamic workers, which can be beneficial for
// inexpensive jobs.
// Default behaviour is to use dynamic workers.
func Static() Option {
return func(q *Queue) error {
q.run = q.static
return nil
}
}
// New creates a fully initialised Queue with the given options.
func New(options ...Option) (*Queue, error) {
q := &Queue{
workers: runtime.NumCPU(),
limit: DefaultLimit,
log: log.Default(),
}
q.run = q.dynamic
for _, f := range options {
if err := f(q); err != nil {
return nil, err
}
}
q.jobs = make(chan Job, q.limit)
if err := q.run(); err != nil {
return nil, err
}
return q, nil
}
// static spawns static workers.
func (q *Queue) static() error {
if q.workers < 1 {
return fmt.Errorf("workers must be at least 1, got %d", q.workers)
}
q.log.Printf("Spawning %d static workers...", q.workers)
q.wg.Add(q.workers)
for i := 0; i < q.workers; i++ {
w := newWorker(q.jobs)
go func() {
defer q.wg.Done()
w.run()
}()
}
return nil
}
// dynamic processes jobs in goroutines.
// This optimises for the case where not all goroutines are needed all the time
// at the expense of having to start goroutines for each job.
func (q *Queue) dynamic() error {
switch {
case q.workers == UnlimitedWorkers:
q.dynamicUnlimited()
case q.workers < 1:
return fmt.Errorf("workers must be at least 1, got %d", q.workers)
default:
q.dynamicLimited()
}
return nil
}
// dynamicUnlimited processes jobs with an unlimited number of goroutines.
// This optimises for the case where not all goroutines are needed all the time
// at the expense of having to start goroutines for each job.
func (q *Queue) dynamicUnlimited() {
q.log.Printf("Using unlimited dynamic workers...", q.workers)
q.wg.Add(1)
go func() {
defer q.wg.Done()
for j := range q.jobs {
q.wg.Add(1)
go func(j Job) {
defer q.wg.Done()
j.Process()
}(j)
}
}()
}
// dynamicLimited processes jobs in a limited number of goroutines.
// This optimises for the case where not all goroutines are needed all the time
// at the expense of having to start goroutines for each job.
func (q *Queue) dynamicLimited() {
q.log.Printf("Using up to %d dynamic workers...", q.workers)
q.wg.Add(1)
q.limiter = make(chan struct{}, q.workers)
go func() {
defer q.wg.Done()
for j := range q.jobs {
q.limiter <- struct{}{}
q.wg.Add(1)
go func(j Job) {
defer func() {
<-q.limiter
q.wg.Done()
}()
j.Process()
}(j)
}
}()
}
// Queue queues a job in blocking mode.
func (q *Queue) Queue(job Job) {
q.jobs <- job
}
// QueueNonBlocking queues a job in non blocking mode.
// If the maximum buffer as defined by Limit is already filled an error will be returned.
func (q *Queue) QueueNonBlocking(job Job) error {
select {
case q.jobs <- job:
return nil
default:
return fmt.Errorf("too busy, already have %d queued jobs", cap(q.jobs))
}
}
// Stop stops processing and returns once all jobs have been completed
// or the context indicates done.
// The queue should not be used after calling Stop, calling Queue or QueueNonBlocking
// after Stop will cause a panic.
func (q *Queue) Stop(ctx context.Context) error {
close(q.jobs)
done := make(chan struct{})
go func() {
defer close(done)
q.wg.Wait()
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}