[Question] Dynamic queues / Queue per user
Description
Hey there,
First of all thank you for proving such a useful tool. I've been using it in my project and Bull has been super helpful.
I am looking for suggestions/ideas on how to improve our architecture for better efficiency. This is probably trivial but hopefully this will be helpful to other people!
- We handle 150k webhooks from GitHub and GitLab per day. Each of those webhooks corresponds to an organization in our database.
- We have around 400 active organizations
- Today we have a dedicated queue for GitHub and a queue for GitLab. Each webhook event is added to one of the two queue and bull has a job.process function that handle what to do with it.
The problem with this way of doing for me is that I get many errors and some of our event are super slow to get through. Much slower than not going through any queue at all.
So far I've considered creating dynamic queues for each organization, (#issue 867, or #Issue 1567)
- Do you have any recommendations on handling this use case? Would you go for a Dynamic queue per user?
- Let me know if you see anything in my code that seem like a no go!
Thank you
Minimal, Working Test code to reproduce the issue.
(An easy to reproduce test case will dramatically decrease the resolution time.)
Here is my code:
// create-queue.js
function createQueue(name) {
return new Queue(name, {
redis: {
opts: {
createClient(type) {
switch (type) {
case 'client':
return client;
case 'subscriber':
return subscriber;
default:
return new Redis(REDIS_URL);
}
},
},
},
});
}
// github-queue.js
const webHookGitHubQueue = createQueue('webhooksGitHub');
const { handleGitHubIncomingWebHook } = require('./github-connect');
webHookGitHubQueue.process(async (job) => {
const { webhook, event } = job.data;
const { attemptsMade } = job;
try {
const res = await handleGitHubIncomingWebHook({
webhook,
event,
attempts: attemptsMade,
});
return res;
} catch (e) {
const errorMessage = `Error handling github webhook, event: ${event}, webhook id: ${webhook.id}, ${e?.message}, ${e}`;
throw new Error(errorMessage);
}
});
webHookGitHubQueue.on('completed', (job, result) => {
if (result.processed) {
job.remove();
} else {
throw new Error("Didn't process GitHub webhook correctly", result);
}
});
module.exports = {
webHookGitHubQueue,
};
// github-connect.js
await webHookGitHubQueue.add(
{
webhook,
event,
},
{
jobId: webhook.id,
delay: 0,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1200,
},
removeOnComplete: {
age: 24 * 3600, // keep up to 24 hour
count: 1000, // keep up to 1000 jobs
},
removeOnFail: {
age: 7 * 24 * 3600, // We keep failed job only for 7 days max
},
},
);
Bull version
4.10.2