-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathjobqueue.go
309 lines (252 loc) · 7.71 KB
/
jobqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package jobqueue
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"time"
"github.com/dgraph-io/badger/v4"
"github.com/goccy/go-json"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
type JobStatus string
const (
JobStatusPending JobStatus = "pending"
JobStatusCompleted JobStatus = "complete"
)
// TODO: Use complete status for archiving completed jobs?
var errJobChannelFull = errors.New("job channel is closed")
const defaultFetchInterval = 100 * time.Millisecond
const defaultJobBufferSize = 1000
const defaultJobIDSequenceSize = 100
type JobQueue[T any] struct {
db *badger.DB
dbPath string
dbInMemory bool
wg sync.WaitGroup
logger zerolog.Logger
cancel context.CancelFunc
handler func(JobContext, T) error
jobID *badger.Sequence
isJobIDInQueue *xsync.MapOf[uint64, bool]
jobs chan *job[T]
// Options
fetchInterval time.Duration
}
// New creates a new JobQueue with the specified database, name, and number
// of worker goroutines. It initializes the job queue, starts the worker goroutines,
// and returns the JobQueue instance and an error, if any.
func New[T any](
dbPath string, name string, workers int, handler func(JobContext, T) error, opts ...Option[T],
) (*JobQueue[T], error) {
if workers < 0 {
return nil, errors.New("invalid number of workers")
} else if workers == 0 {
log.Warn().Msg("Number of workers is 0, jobs will not be automatically processed")
}
jq := &JobQueue[T]{
db: nil,
dbPath: dbPath,
dbInMemory: false,
wg: sync.WaitGroup{},
logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(),
cancel: nil,
handler: handler,
jobID: nil,
isJobIDInQueue: xsync.NewMapOf[uint64, bool](),
jobs: make(chan *job[T], defaultJobBufferSize),
fetchInterval: defaultFetchInterval,
}
for _, opt := range opts {
opt(jq)
}
db, err := jq.openDB()
if err != nil {
return nil, err
}
jq.db = db
jq.logger.Info().Msg("Starting job queue")
ctx, cancel := context.WithCancel(context.Background())
jq.cancel = cancel
jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize)
if err != nil {
return nil, fmt.Errorf("failed to start job id sequence: %w", err)
}
// Load jobs from BadgerDB
go jq.pollJobs(ctx)
// Start workers
for i := 0; i < workers; i++ {
jq.wg.Add(1)
go jq.worker(i)
}
return jq, nil
}
func (jq *JobQueue[T]) Enqueue(payload T) (uint64, error) {
id, err := jq.jobID.Next()
if err != nil {
return 0, fmt.Errorf("failed to get next job id: %w", err)
}
// Create a new job and store it in BadgerDB
job := newJob(id, payload)
jobBytes, err := json.Marshal(job)
if err != nil {
return 0, fmt.Errorf("failed to marshal job: %w", err)
}
err = jq.db.Update(func(txn *badger.Txn) error {
if err := txn.Set(job.dbKey(), jobBytes); err != nil {
return fmt.Errorf("failed to store job: %w", err)
}
return nil
})
if err != nil {
jq.logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Failed to enqueue job")
return 0, err
}
jq.logger.Info().Uint64("jobID", job.ID).Msg("job enqueued successfully")
return job.ID, nil
}
// worker processes jobs received from the job queue and logs any errors encountered.
func (jq *JobQueue[T]) worker(id int) {
defer jq.wg.Done()
logger := jq.logger.With().Int("worker", id).Logger()
logger.Info().Msg("Worker started")
// Worker stops running when the job channel is closed
for job := range jq.jobs {
err := jq.processJob(job)
if err != nil {
logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Error processing job")
}
}
logger.Info().Msg("Worker stopped")
}
// processJob processes a job and updates its status in the database.
func (jq *JobQueue[T]) processJob(job *job[T]) error {
logger := jq.logger.With().Uint64("jobID", job.ID).Logger()
if logger.GetLevel() == zerolog.DebugLevel {
logger.Debug().Interface("jobPayload", job.Payload).Msg("Processing job")
} else {
logger.Info().Msg("Processing job")
}
if err := job.Process(jq.handler); err != nil {
return fmt.Errorf("failed to process job: %w", err)
}
logger.Info().Msg("Job processed successfully")
// Now that we've successfully processed the job, we can remove it from BadgerDB
jq.logger.Debug().Uint64("jobID", job.ID).Msg("Removing job from BadgerDB")
err := jq.db.Update(func(txn *badger.Txn) error {
if err := txn.Delete(job.dbKey()); err != nil {
return err
}
return nil
})
if err != nil {
logger.Error().Err(err).Msg("Failed to remove completed job from db")
return err
}
// Remove the job from the in-memory index
jq.logger.Debug().Uint64("jobID", job.ID).Msg("Removing job from in-memory index")
jq.isJobIDInQueue.Delete(job.ID)
return nil
}
func (jq *JobQueue[T]) Stop() error {
jq.logger.Info().Msg("Stopping job queue")
// Stop jobs fetch from BadgerDB
jq.logger.Debug().Msg("Stopping jobs fetch from BadgerDB")
jq.cancel()
// Close the channel to signal the workers to stop
jq.logger.Debug().Msg("Closing job channel")
close(jq.jobs)
jq.logger.Debug().Msg("Waiting for workers to finish")
jq.wg.Wait()
// Close Badger DB connection
jq.logger.Debug().Msg("Closing Badger DB connection")
if err := jq.jobID.Release(); err != nil {
jq.logger.Error().Err(err).Msg("Failed to release next job id sequence")
}
if err := jq.db.Close(); err != nil {
jq.logger.Error().Err(err).Msg("Failed to close Badger DB connection")
return err
}
jq.logger.Info().Msg("Job queue stopped successfully")
return nil
}
// pollJobs is a long-running goroutine that fetches jobs from BadgerDB and sends them to the worker channels.
func (jq *JobQueue[T]) pollJobs(ctx context.Context) {
ticker := time.NewTicker(jq.fetchInterval)
for {
select {
case <-ctx.Done():
jq.logger.Debug().Msg("Context cancelled, stopped fetching jobs")
return
case <-ticker.C:
jq.logger.Debug().Msg("Polling for new jobs")
if err := jq.fetchJobs(ctx); err != nil {
jq.logger.Error().Err(err).Msg("Error fetching jobs")
}
}
}
}
func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit
err := jq.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek([]byte(jobDBKeyPrefix)); it.ValidForPrefix([]byte(jobDBKeyPrefix)); it.Next() {
item := it.Item()
err := item.Value(func(v []byte) error {
var job job[T]
if err := json.Unmarshal(v, &job); err != nil {
jq.logger.Error().Err(err).Uint64("jobID",
binary.BigEndian.Uint64(item.Key())).Msg("Failed to unmarshal job")
return err
}
if job.Status == JobStatusPending {
// If the job is already fetched, skip it
_, ok := jq.isJobIDInQueue.Load(job.ID)
if ok {
return nil
}
select {
case <-ctx.Done():
jq.logger.Debug().Msg("Context cancelled, stopping iteration")
break
case jq.jobs <- &job:
jq.isJobIDInQueue.Store(job.ID, true)
jq.logger.Debug().Uint64("jobID", job.ID).Msg("New pending job found and sent to worker")
default:
jq.logger.Warn().Uint64("JobID",
job.ID).Msg("Found pending jobs, but job channel is full")
return errJobChannelFull
}
}
return nil
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to fetch jobs: %w", err)
}
return nil
}
func (jq *JobQueue[T]) openDB() (*badger.DB, error) {
var opts badger.Options
if jq.dbInMemory {
opts = badger.DefaultOptions("").WithInMemory(true)
} else {
opts = badger.DefaultOptions(jq.dbPath)
}
opts.Logger = nil
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to open BadgerDB: %w", err)
}
return db, nil
}