From ccf1874bcb6d54ff4f9f70a018fd289fa696a273 Mon Sep 17 00:00:00 2001 From: Tyler Myracle Date: Thu, 18 Jan 2024 15:22:41 -0600 Subject: [PATCH] add back bullmq dashboard and prevent queues from stacking when cron config is changed --- apps/server/src/app/app.ts | 2 + apps/server/src/app/routes/admin.router.ts | 33 +++++++++++++ apps/server/src/app/routes/index.ts | 1 + apps/workers/src/main.ts | 32 +++++++++---- apps/workers/src/utils.ts | 46 +++++++++++++++++++ .../shared/src/services/queue.service.ts | 4 +- .../shared/src/services/queue/bull-queue.ts | 8 ++++ .../src/services/queue/in-memory-queue.ts | 8 ++++ 8 files changed, 124 insertions(+), 10 deletions(-) create mode 100644 apps/server/src/app/routes/admin.router.ts create mode 100644 apps/workers/src/utils.ts diff --git a/apps/server/src/app/app.ts b/apps/server/src/app/app.ts index 06c0870c..4383a53e 100644 --- a/apps/server/src/app/app.ts +++ b/apps/server/src/app/app.ts @@ -44,6 +44,7 @@ import { toolsRouter, publicRouter, e2eRouter, + adminRouter, } from './routes' import env from '../env' @@ -93,6 +94,7 @@ app.use(cors({ origin, credentials: true })) app.options('*', cors() as RequestHandler) app.set('view engine', 'ejs').set('views', __dirname + '/app/admin/views') +app.use('/admin', adminRouter) app.use( morgan(env.NX_MORGAN_LOG_LEVEL, { diff --git a/apps/server/src/app/routes/admin.router.ts b/apps/server/src/app/routes/admin.router.ts new file mode 100644 index 00000000..dd17b6d9 --- /dev/null +++ b/apps/server/src/app/routes/admin.router.ts @@ -0,0 +1,33 @@ +import { Router } from 'express' +import { auth, claimCheck } from 'express-openid-connect' +import { createBullBoard } from '@bull-board/api' +import { BullAdapter } from '@bull-board/api/bullAdapter' +import { ExpressAdapter } from '@bull-board/express' +import { AuthUtil, BullQueue } from '@maybe-finance/server/shared' +import { SharedType } from '@maybe-finance/shared' +import { queueService } from '../lib/endpoint' +import env from '../../env' +import { validateAuthJwt } from '../middleware' + +const router = Router() + +const serverAdapter = new ExpressAdapter().setBasePath('/admin/bullmq') + +createBullBoard({ + queues: queueService.allQueues + .filter((q): q is BullQueue => q instanceof BullQueue) + .map((q) => new BullAdapter(q.queue)), + serverAdapter, +}) + +router.get('/', validateAuthJwt, (req, res) => { + res.render('pages/dashboard', { + user: req.user?.name, + role: 'Admin', + }) +}) + +// Visit /admin/bullmq to see BullMQ Dashboard +router.use('/bullmq', validateAuthJwt, serverAdapter.getRouter()) + +export default router diff --git a/apps/server/src/app/routes/index.ts b/apps/server/src/app/routes/index.ts index 40eb5d16..30601f84 100644 --- a/apps/server/src/app/routes/index.ts +++ b/apps/server/src/app/routes/index.ts @@ -15,3 +15,4 @@ export { default as plansRouter } from './plans.router' export { default as toolsRouter } from './tools.router' export { default as publicRouter } from './public.router' export { default as e2eRouter } from './e2e.router' +export { default as adminRouter } from './admin.router' diff --git a/apps/workers/src/main.ts b/apps/workers/src/main.ts index 890b16d5..f6795dc6 100644 --- a/apps/workers/src/main.ts +++ b/apps/workers/src/main.ts @@ -16,6 +16,7 @@ import { workerErrorHandlerService, } from './app/lib/di' import env from './env' +import { cleanUpOutdatedJobs } from './utils' // Defaults from quickstart - https://docs.sentry.io/platforms/node/ Sentry.init({ @@ -99,6 +100,7 @@ syncSecurityQueue.add( {}, { repeat: { cron: '*/5 * * * *' }, // Run every 5 minutes + jobId: Date.now().toString(), } ) @@ -125,6 +127,7 @@ syncInstitutionQueue.add( {}, { repeat: { cron: '0 */24 * * *' }, // Run every 24 hours + jobId: Date.now().toString(), } ) @@ -133,6 +136,7 @@ syncInstitutionQueue.add( {}, { repeat: { cron: '0 */24 * * *' }, // Run every 24 hours + jobId: Date.now().toString(), } ) @@ -141,6 +145,7 @@ syncInstitutionQueue.add( {}, { repeat: { cron: '0 */24 * * *' }, // Run every 24 hours + jobId: Date.now().toString(), } ) @@ -169,6 +174,11 @@ process.on( await workerErrorHandlerService.handleWorkersError({ variant: 'unhandled', error }) ) +// Replace any jobs that have changed cron schedules and ensures only +// one repeatable jobs for each type is running +const queues = [syncSecurityQueue, syncInstitutionQueue] +cleanUpOutdatedJobs(queues) + const app = express() app.use(cors()) @@ -194,20 +204,24 @@ const server = app.listen(env.NX_PORT, () => { logger.info(`Worker health server started on port ${env.NX_PORT}`) }) -function onShutdown() { +async function onShutdown() { logger.info('[shutdown.start]') - server.close() + await new Promise((resolve) => server.close(resolve)) // shutdown queues - Promise.allSettled( - queueService.allQueues - .filter((q): q is BullQueue => q instanceof BullQueue) - .map((q) => q.queue.close()) - ).finally(() => { + try { + await Promise.allSettled( + queueService.allQueues + .filter((q): q is BullQueue => q instanceof BullQueue) + .map((q) => q.queue.close()) + ) + } catch (error) { + logger.error('[shutdown.error]', error) + } finally { logger.info('[shutdown.complete]') - process.exit() - }) + process.exitCode = 0 + } } process.on('SIGINT', onShutdown) diff --git a/apps/workers/src/utils.ts b/apps/workers/src/utils.ts new file mode 100644 index 00000000..de615175 --- /dev/null +++ b/apps/workers/src/utils.ts @@ -0,0 +1,46 @@ +import type { IQueue } from '@maybe-finance/server/shared' +import type { JobInformation } from 'bull' + +export async function cleanUpOutdatedJobs(queues: IQueue[]) { + for (const queue of queues) { + const repeatedJobs = await queue.getRepeatableJobs() + + const outdatedJobs = filterOutdatedJobs(repeatedJobs) + for (const job of outdatedJobs) { + await queue.removeRepeatableByKey(job.key) + } + } +} + +function filterOutdatedJobs(jobs: JobInformation[]) { + const jobGroups = new Map() + + jobs.forEach((job) => { + if (!jobGroups.has(job.name)) { + jobGroups.set(job.name, []) + } + jobGroups.get(job.name).push(job) + }) + + const mostRecentJobs = new Map() + jobGroups.forEach((group, name) => { + const mostRecentJob = group.reduce((mostRecent, current) => { + if (current.id === null) return mostRecent + const currentIdTime = current.id + const mostRecentIdTime = mostRecent ? mostRecent.id : 0 + + return currentIdTime > mostRecentIdTime ? current : mostRecent + }, null) + + if (mostRecentJob) { + mostRecentJobs.set(name, mostRecentJob.id) + } + }) + + return jobs.filter((job: JobInformation) => { + const mostRecentId = mostRecentJobs.get(job.name) + return job.id === null || job.id !== mostRecentId + }) +} + +export default cleanUpOutdatedJobs diff --git a/libs/server/shared/src/services/queue.service.ts b/libs/server/shared/src/services/queue.service.ts index c0c1d8c7..22355114 100644 --- a/libs/server/shared/src/services/queue.service.ts +++ b/libs/server/shared/src/services/queue.service.ts @@ -1,6 +1,6 @@ import type { AccountConnection, User, Account } from '@prisma/client' import type { Logger } from 'winston' -import type { Job, JobOptions } from 'bull' +import type { Job, JobOptions, JobInformation } from 'bull' import type { SharedType } from '@maybe-finance/shared' export type IJob = Pick, 'id' | 'name' | 'data' | 'progress'> @@ -20,6 +20,8 @@ export type IQueue = {}, TJobName extends stri options?: { concurrency: number } ): Promise getActiveJobs(): Promise[]> + getRepeatableJobs(): Promise + removeRepeatableByKey(key: string): Promise cancelJobs(): Promise } diff --git a/libs/server/shared/src/services/queue/bull-queue.ts b/libs/server/shared/src/services/queue/bull-queue.ts index 3fcd5194..91d2fa8c 100644 --- a/libs/server/shared/src/services/queue/bull-queue.ts +++ b/libs/server/shared/src/services/queue/bull-queue.ts @@ -120,6 +120,14 @@ export class BullQueue = any, TJobName extends return this.queue.getActive() } + async getRepeatableJobs(): Promise { + return this.queue.getRepeatableJobs() + } + + async removeRepeatableByKey(key: string) { + return this.queue.removeRepeatableByKey(key) + } + async cancelJobs() { await this.queue.pause(true, true) await this.queue.removeJobs('*') diff --git a/libs/server/shared/src/services/queue/in-memory-queue.ts b/libs/server/shared/src/services/queue/in-memory-queue.ts index 08bcdc76..f10ff9e0 100644 --- a/libs/server/shared/src/services/queue/in-memory-queue.ts +++ b/libs/server/shared/src/services/queue/in-memory-queue.ts @@ -53,6 +53,14 @@ export class InMemoryQueue< return [] } + async getRepeatableJobs() { + return [] + } + + async removeRepeatableByKey(_key: string) { + // no-op + } + async cancelJobs() { // no-op }