Skip to content
GitLab
Projects Groups Snippets
  • /
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
  • B bull
  • Project information
    • Project information
    • Activity
    • Labels
    • Members
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 175
    • Issues 175
    • List
    • Boards
    • Service Desk
    • Milestones
  • Merge requests 9
    • Merge requests 9
  • CI/CD
    • CI/CD
    • Pipelines
    • Jobs
    • Schedules
  • Deployments
    • Deployments
    • Environments
    • Releases
  • Packages and registries
    • Packages and registries
    • Package Registry
    • Infrastructure Registry
  • Monitor
    • Monitor
    • Incidents
  • Analytics
    • Analytics
    • Value stream
    • CI/CD
    • Repository
  • Wiki
    • Wiki
  • Snippets
    • Snippets
  • Activity
  • Graph
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
Collapse sidebar
  • OptimalBits
  • bull
  • Issues
  • #2527
Closed
Open
Issue created Jan 25, 2023 by Sydney Cohen@cosydney

[Question] Dynamic queues / Queue per user

Description

Hey there,

First of all thank you for proving such a useful tool. I've been using it in my project and Bull has been super helpful.

I am looking for suggestions/ideas on how to improve our architecture for better efficiency. This is probably trivial but hopefully this will be helpful to other people!

  • We handle 150k webhooks from GitHub and GitLab per day. Each of those webhooks corresponds to an organization in our database.
  • We have around 400 active organizations
  • Today we have a dedicated queue for GitHub and a queue for GitLab. Each webhook event is added to one of the two queue and bull has a job.process function that handle what to do with it.

The problem with this way of doing for me is that I get many errors and some of our event are super slow to get through. Much slower than not going through any queue at all.

So far I've considered creating dynamic queues for each organization, (#issue 867, or #Issue 1567)

  • Do you have any recommendations on handling this use case? Would you go for a Dynamic queue per user?
  • Let me know if you see anything in my code that seem like a no go!

Thank you

Minimal, Working Test code to reproduce the issue.

(An easy to reproduce test case will dramatically decrease the resolution time.)

Here is my code:

// create-queue.js

function createQueue(name) {
  return new Queue(name, {
    redis: {
      opts: {
        createClient(type) {
          switch (type) {
            case 'client':
              return client;
            case 'subscriber':
              return subscriber;
            default:
              return new Redis(REDIS_URL);
          }
        },
      },
    },
  });
}
// github-queue.js

const webHookGitHubQueue = createQueue('webhooksGitHub');
const { handleGitHubIncomingWebHook } = require('./github-connect');

webHookGitHubQueue.process(async (job) => {
  const { webhook, event } = job.data;
  const { attemptsMade } = job;
  try {
    const res = await handleGitHubIncomingWebHook({
      webhook,
      event,
      attempts: attemptsMade,
    });
    return res;
  } catch (e) {
    const errorMessage = `Error handling github webhook, event: ${event}, webhook id: ${webhook.id}, ${e?.message}, ${e}`;
    throw new Error(errorMessage);
  }
});

webHookGitHubQueue.on('completed', (job, result) => {
  if (result.processed) {
    job.remove();
  } else {
    throw new Error("Didn't process GitHub webhook correctly", result);
  }
});

module.exports = {
  webHookGitHubQueue,
};
 // github-connect.js 
 
await webHookGitHubQueue.add(
    {
      webhook,
      event,
    },
    {
      jobId: webhook.id,
      delay: 0,
      attempts: 3,
      backoff: {
        type: 'exponential',
        delay: 1200,
      },
      removeOnComplete: {
        age: 24 * 3600, // keep up to 24 hour
        count: 1000, // keep up to 1000 jobs
      },
      removeOnFail: {
        age: 7 * 24 * 3600, // We keep failed job only for 7 days max
      },
    },
  );

Bull version

4.10.2

Additional information

Assignee
Assign to
Time tracking