All files / src/queues utils.ts

5.55% Statements 1/18
100% Branches 0/0
0% Functions 0/5
5.55% Lines 1/18

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44            1x                                                                          
import { Job, JobsOptions } from "bullmq"
import { pino } from "pino"
 
import { BaseJob, BudgetJob, EndpointJob, TransactionJob } from "./jobs/BaseJob"
import { getQueue } from "./queue"
 
const logger = pino()
 
function queueConfig(delay: number): JobsOptions {
  return {
    removeOnComplete: false,
    removeOnFail: true,
    delay,
  }
}
 
export async function addTransactionJobToQueue(job: TransactionJob, transactionId: string): Promise<Job> {
  const queue = await getQueue()
  const delay = job.getStartDelay()
  logger.info("Adding job to queue: %s for transactionId: %s with delay: %d seconds", job.id, transactionId, delay / 1000)
  return queue.add(job.id, { job: job.id, transactionId }, queueConfig(delay))
}
 
export async function addEndpointJobToQueue(job: EndpointJob, transactionId: string, data: unknown): Promise<Job> {
  const queue = await getQueue()
  const delay = job.getStartDelay()
  logger.info("Adding endpoint job to queue: %s for transactionId: %s with delay: %d seconds", job.id, transactionId, delay / 1000)
  return queue.add(job.id, { job: job.id, transactionId, data }, queueConfig(delay))
}
 
export async function addBudgetJobToQueue(job: BudgetJob, budgetId: string): Promise<Job> {
  const queue = await getQueue()
  const delay = job.getStartDelay()
  logger.info("Adding job to queue: %s for budgetId: %s with delay: %d seconds", job.id, budgetId, delay / 1000)
  return queue.add(job.id, { job: job.id, budgetId }, queueConfig(delay))
}
 
export async function addJobToQueue(job: BaseJob, asap?: boolean): Promise<Job> {
  const queue = await getQueue()
  const delay = job.getStartDelay(asap)
  logger.info("Adding job to queue: %s with delay: %d seconds", job.id, delay / 1000)
  return queue.add(job.id, { job: job.id }, queueConfig(delay))
}