Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to run jobs concurrently ? #542

Open
narbek-abd opened this issue Feb 12, 2025 · 3 comments
Open

How to run jobs concurrently ? #542

narbek-abd opened this issue Feb 12, 2025 · 3 comments

Comments

@narbek-abd
Copy link

narbek-abd commented Feb 12, 2025

Hi.

    const queue = 'readme-queue'

    await boss.createQueue(queue)     
  
    const id = await boss.send(queue, { arg1: 'read me' })

    setTimeout(async () => {
      const id2 = await boss.send(queue, { arg1: 'read me2' })
    }, 2000)
  
    await boss.work(queue, { batchSize: 2}, async ([job]) => {
      const delay = (ms) => new Promise((res) => setTimeout(res, ms));
      await delay(60000);
      console.log(`received job ${job.id} with data ${JSON.stringify(job.data)}`);
    });

Second job becomes active after first job completes. batchSize works only when i send jobs immediately one after another.

If the job completes within 5min and we don't run them concurrently, 10'th job will be completed within 50mins.

In previous version (pg-boss v9) we can do it using {teamSize: 2, teamConcurrency: 2, teamRefill: true} (without teamRefill doesn't work concurrently, do you know why?)

@timgit
Copy link
Owner

timgit commented Feb 14, 2025

You will need to handle concurrency yourself in your handler. A simple example would be something like the following.

async function handler(job) {
 // todo
}

await boss.work(queue, { batchSize: 2 }, (jobs) => Promise.allSettled(jobs.map(handler)))

There are other packages, such as p-map, that add more options, such as concurrency control.

@jlalmes
Copy link

jlalmes commented Feb 22, 2025

Hi @timgit, could you further explain how you would implement concurrency with pg-boss? This approach using batchSize: 2 and Promise.allSettled lets jobs in a batch run concurrently, but it seems to wait for the whole batch to finish before fetching & running new jobs.

@timgit
Copy link
Owner

timgit commented Feb 25, 2025

You're right, each worker will wait until the entire batch is processed before pulling more jobs down. Another option is to create multiple workers, as in the following example where 3 workers are created. Workers will not block each other, so you could use something like this if needed.

await boss.work(queue, { batchSize: 2 }, (jobs) => Promise.allSettled(jobs.map(handler)))
await boss.work(queue, { batchSize: 2 }, (jobs) => Promise.allSettled(jobs.map(handler)))
await boss.work(queue, { batchSize: 2 }, (jobs) => Promise.allSettled(jobs.map(handler)))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants