1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-08-09 07:25:19 +02:00

Merge pull request #153 from tmyracle/bring-back-bullmq-dashboard

Bring back BullMQ dashboard and fix stacking jobs
This commit is contained in:
Josh Pigford 2024-01-18 15:35:56 -06:00 committed by GitHub
commit 7d46d22152
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 124 additions and 10 deletions

View file

@ -44,6 +44,7 @@ import {
toolsRouter, toolsRouter,
publicRouter, publicRouter,
e2eRouter, e2eRouter,
adminRouter,
} from './routes' } from './routes'
import env from '../env' import env from '../env'
@ -93,6 +94,7 @@ app.use(cors({ origin, credentials: true }))
app.options('*', cors() as RequestHandler) app.options('*', cors() as RequestHandler)
app.set('view engine', 'ejs').set('views', __dirname + '/app/admin/views') app.set('view engine', 'ejs').set('views', __dirname + '/app/admin/views')
app.use('/admin', adminRouter)
app.use( app.use(
morgan(env.NX_MORGAN_LOG_LEVEL, { morgan(env.NX_MORGAN_LOG_LEVEL, {

View file

@ -0,0 +1,33 @@
import { Router } from 'express'
import { auth, claimCheck } from 'express-openid-connect'
import { createBullBoard } from '@bull-board/api'
import { BullAdapter } from '@bull-board/api/bullAdapter'
import { ExpressAdapter } from '@bull-board/express'
import { AuthUtil, BullQueue } from '@maybe-finance/server/shared'
import { SharedType } from '@maybe-finance/shared'
import { queueService } from '../lib/endpoint'
import env from '../../env'
import { validateAuthJwt } from '../middleware'
const router = Router()
const serverAdapter = new ExpressAdapter().setBasePath('/admin/bullmq')
createBullBoard({
queues: queueService.allQueues
.filter((q): q is BullQueue => q instanceof BullQueue)
.map((q) => new BullAdapter(q.queue)),
serverAdapter,
})
router.get('/', validateAuthJwt, (req, res) => {
res.render('pages/dashboard', {
user: req.user?.name,
role: 'Admin',
})
})
// Visit /admin/bullmq to see BullMQ Dashboard
router.use('/bullmq', validateAuthJwt, serverAdapter.getRouter())
export default router

View file

@ -15,3 +15,4 @@ export { default as plansRouter } from './plans.router'
export { default as toolsRouter } from './tools.router' export { default as toolsRouter } from './tools.router'
export { default as publicRouter } from './public.router' export { default as publicRouter } from './public.router'
export { default as e2eRouter } from './e2e.router' export { default as e2eRouter } from './e2e.router'
export { default as adminRouter } from './admin.router'

View file

@ -16,6 +16,7 @@ import {
workerErrorHandlerService, workerErrorHandlerService,
} from './app/lib/di' } from './app/lib/di'
import env from './env' import env from './env'
import { cleanUpOutdatedJobs } from './utils'
// Defaults from quickstart - https://docs.sentry.io/platforms/node/ // Defaults from quickstart - https://docs.sentry.io/platforms/node/
Sentry.init({ Sentry.init({
@ -99,6 +100,7 @@ syncSecurityQueue.add(
{}, {},
{ {
repeat: { cron: '*/5 * * * *' }, // Run every 5 minutes repeat: { cron: '*/5 * * * *' }, // Run every 5 minutes
jobId: Date.now().toString(),
} }
) )
@ -125,6 +127,7 @@ syncInstitutionQueue.add(
{}, {},
{ {
repeat: { cron: '0 */24 * * *' }, // Run every 24 hours repeat: { cron: '0 */24 * * *' }, // Run every 24 hours
jobId: Date.now().toString(),
} }
) )
@ -133,6 +136,7 @@ syncInstitutionQueue.add(
{}, {},
{ {
repeat: { cron: '0 */24 * * *' }, // Run every 24 hours repeat: { cron: '0 */24 * * *' }, // Run every 24 hours
jobId: Date.now().toString(),
} }
) )
@ -141,6 +145,7 @@ syncInstitutionQueue.add(
{}, {},
{ {
repeat: { cron: '0 */24 * * *' }, // Run every 24 hours repeat: { cron: '0 */24 * * *' }, // Run every 24 hours
jobId: Date.now().toString(),
} }
) )
@ -169,6 +174,11 @@ process.on(
await workerErrorHandlerService.handleWorkersError({ variant: 'unhandled', error }) await workerErrorHandlerService.handleWorkersError({ variant: 'unhandled', error })
) )
// Replace any jobs that have changed cron schedules and ensures only
// one repeatable jobs for each type is running
const queues = [syncSecurityQueue, syncInstitutionQueue]
cleanUpOutdatedJobs(queues)
const app = express() const app = express()
app.use(cors()) app.use(cors())
@ -194,20 +204,24 @@ const server = app.listen(env.NX_PORT, () => {
logger.info(`Worker health server started on port ${env.NX_PORT}`) logger.info(`Worker health server started on port ${env.NX_PORT}`)
}) })
function onShutdown() { async function onShutdown() {
logger.info('[shutdown.start]') logger.info('[shutdown.start]')
server.close() await new Promise((resolve) => server.close(resolve))
// shutdown queues // shutdown queues
Promise.allSettled( try {
queueService.allQueues await Promise.allSettled(
.filter((q): q is BullQueue => q instanceof BullQueue) queueService.allQueues
.map((q) => q.queue.close()) .filter((q): q is BullQueue => q instanceof BullQueue)
).finally(() => { .map((q) => q.queue.close())
)
} catch (error) {
logger.error('[shutdown.error]', error)
} finally {
logger.info('[shutdown.complete]') logger.info('[shutdown.complete]')
process.exit() process.exitCode = 0
}) }
} }
process.on('SIGINT', onShutdown) process.on('SIGINT', onShutdown)

46
apps/workers/src/utils.ts Normal file
View file

@ -0,0 +1,46 @@
import type { IQueue } from '@maybe-finance/server/shared'
import type { JobInformation } from 'bull'
export async function cleanUpOutdatedJobs(queues: IQueue[]) {
for (const queue of queues) {
const repeatedJobs = await queue.getRepeatableJobs()
const outdatedJobs = filterOutdatedJobs(repeatedJobs)
for (const job of outdatedJobs) {
await queue.removeRepeatableByKey(job.key)
}
}
}
function filterOutdatedJobs(jobs: JobInformation[]) {
const jobGroups = new Map()
jobs.forEach((job) => {
if (!jobGroups.has(job.name)) {
jobGroups.set(job.name, [])
}
jobGroups.get(job.name).push(job)
})
const mostRecentJobs = new Map()
jobGroups.forEach((group, name) => {
const mostRecentJob = group.reduce((mostRecent, current) => {
if (current.id === null) return mostRecent
const currentIdTime = current.id
const mostRecentIdTime = mostRecent ? mostRecent.id : 0
return currentIdTime > mostRecentIdTime ? current : mostRecent
}, null)
if (mostRecentJob) {
mostRecentJobs.set(name, mostRecentJob.id)
}
})
return jobs.filter((job: JobInformation) => {
const mostRecentId = mostRecentJobs.get(job.name)
return job.id === null || job.id !== mostRecentId
})
}
export default cleanUpOutdatedJobs

View file

@ -1,6 +1,6 @@
import type { AccountConnection, User, Account } from '@prisma/client' import type { AccountConnection, User, Account } from '@prisma/client'
import type { Logger } from 'winston' import type { Logger } from 'winston'
import type { Job, JobOptions } from 'bull' import type { Job, JobOptions, JobInformation } from 'bull'
import type { SharedType } from '@maybe-finance/shared' import type { SharedType } from '@maybe-finance/shared'
export type IJob<T> = Pick<Job<T>, 'id' | 'name' | 'data' | 'progress'> export type IJob<T> = Pick<Job<T>, 'id' | 'name' | 'data' | 'progress'>
@ -20,6 +20,8 @@ export type IQueue<TData extends Record<string, any> = {}, TJobName extends stri
options?: { concurrency: number } options?: { concurrency: number }
): Promise<void> ): Promise<void>
getActiveJobs(): Promise<IJob<TData>[]> getActiveJobs(): Promise<IJob<TData>[]>
getRepeatableJobs(): Promise<JobInformation[]>
removeRepeatableByKey(key: string): Promise<void>
cancelJobs(): Promise<void> cancelJobs(): Promise<void>
} }

View file

@ -120,6 +120,14 @@ export class BullQueue<TData extends Record<string, any> = any, TJobName extends
return this.queue.getActive() return this.queue.getActive()
} }
async getRepeatableJobs(): Promise<Queue.JobInformation[]> {
return this.queue.getRepeatableJobs()
}
async removeRepeatableByKey(key: string) {
return this.queue.removeRepeatableByKey(key)
}
async cancelJobs() { async cancelJobs() {
await this.queue.pause(true, true) await this.queue.pause(true, true)
await this.queue.removeJobs('*') await this.queue.removeJobs('*')

View file

@ -53,6 +53,14 @@ export class InMemoryQueue<
return [] return []
} }
async getRepeatableJobs() {
return []
}
async removeRepeatableByKey(_key: string) {
// no-op
}
async cancelJobs() { async cancelJobs() {
// no-op // no-op
} }