Enqueue job when id is same and active is running, nothing in the waiting.
Description
I have a scenario to explain the need for it.
Let's say I have a task which takes about 5 minutes to finish (ex: updating the db records one by one). This task may be triggered multiple times, but at a time only one should be running and subsequent triggers must be ignored (as I don't want to duplicate the work). One easy solution I found is that I ensure the jobId
is same and all subsequent requests are automatically ignored.
Now the tricky part here is that, in the middle of the task execution, the db state might have changed and ignoring all the subsequent pushes will miss this state change to be handled. So I am expecting one extra job to be enqueued and all remaining ones should be ignored.
That is: when active count > 0 and waiting count = 0, add
should be allowed with same jobId (as if when job is executing, it's not in the queue, so add one).
Looking at the references, it doesn't seems to be this way. it considers even the active ones while ensuring the jobId uniqueness.
Minimal, Working Test code to reproduce the issue.
const Queue = require('bull')
const queue = new Queue('eventsua', 'redis://127.0.0.1:6379')
queue.process(async (job) => {
console.log('===>>>> Job started execution', job.jobId)
await new Promise((resolve) => setTimeout(resolve, 60000))
console.log('Job finished execution', job.jobId)
})
queue.on('active', job => {
console.info(`Job(${job.id}) started`, job.data)
})
queue.on('completed', (job, results) => {
const duration = job.finishedOn - job.processedOn
console.info(`Job(${job.id}) finished in ${duration}ms`, results)
})
queue.on('error', err => {
console.error(err)
})
queue.on('failed', (job, err) => {
console.info(`Job(${job.id}) failed`, err)
})
queue.on('stalled', (job) => {
console.info(`Job(${job.id}) stalled`)
})
const test = async () => {
for (let index = 0; index < 10; index++) {
console.log('Pushing', index)
const job = await queue.add({ event: 'refresh' }, { jobId: 'refresh', removeOnComplete: true })
console.log('Pushed', index, job.id)
await new Promise((resolve) => setTimeout(resolve, 2000))
console.log('Waiting count: ', await queue.count())
console.log('Job Counts', await queue.getJobCounts())
}
}
test().catch(console.error)
I am expecting the task to run two times as I am pushing some tasks during the middle execution of the first execution and still they are being ignored for jobId being same.
I can do the custom implementations like with Redis GETSET and INCR. and I understand this is as per BullJS design but just trying to see if there is any workaround or design pattern to achieve my need within BullJS.