1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-08-08 15:05:22 +02:00

rollbacks for merge conflicts

This commit is contained in:
Lorenzo Palaia 2024-01-19 00:39:13 +01:00
parent dc50dcc808
commit 0c9bea6787
6 changed files with 88 additions and 10 deletions

View file

@ -93,6 +93,7 @@ app.use(cors({ origin, credentials: true }))
app.options('*', cors() as RequestHandler)
app.set('view engine', 'ejs').set('views', __dirname + '/app/admin/views')
app.use('/admin', adminRouter)
app.use(
morgan(env.NX_MORGAN_LOG_LEVEL, {

View file

@ -100,6 +100,7 @@ syncSecurityQueue.add(
{},
{
repeat: { cron: '*/5 * * * *' }, // Run every 5 minutes
jobId: Date.now().toString(),
}
)
@ -121,6 +122,7 @@ syncInstitutionQueue.add(
{},
{
repeat: { cron: '0 */24 * * *' }, // Run every 24 hours
jobId: Date.now().toString(),
}
)
@ -129,6 +131,7 @@ syncInstitutionQueue.add(
{},
{
repeat: { cron: '0 */24 * * *' }, // Run every 24 hours
jobId: Date.now().toString(),
}
)
@ -157,6 +160,11 @@ process.on(
await workerErrorHandlerService.handleWorkersError({ variant: 'unhandled', error })
)
// Replace any jobs that have changed cron schedules and ensures only
// one repeatable jobs for each type is running
const queues = [syncSecurityQueue, syncInstitutionQueue]
cleanUpOutdatedJobs(queues)
const app = express()
app.use(cors())
@ -182,20 +190,24 @@ const server = app.listen(env.NX_PORT, () => {
logger.info(`Worker health server started on port ${env.NX_PORT}`)
})
function onShutdown() {
async function onShutdown() {
logger.info('[shutdown.start]')
server.close()
await new Promise((resolve) => server.close(resolve))
// shutdown queues
Promise.allSettled(
queueService.allQueues
.filter((q): q is BullQueue => q instanceof BullQueue)
.map((q) => q.queue.close())
).finally(() => {
try {
await Promise.allSettled(
queueService.allQueues
.filter((q): q is BullQueue => q instanceof BullQueue)
.map((q) => q.queue.close())
)
} catch (error) {
logger.error('[shutdown.error]', error)
} finally {
logger.info('[shutdown.complete]')
process.exit()
})
process.exitCode = 0
}
}
process.on('SIGINT', onShutdown)

46
apps/workers/src/utils.ts Normal file
View file

@ -0,0 +1,46 @@
import type { IQueue } from '@maybe-finance/server/shared'
import type { JobInformation } from 'bull'
export async function cleanUpOutdatedJobs(queues: IQueue[]) {
for (const queue of queues) {
const repeatedJobs = await queue.getRepeatableJobs()
const outdatedJobs = filterOutdatedJobs(repeatedJobs)
for (const job of outdatedJobs) {
await queue.removeRepeatableByKey(job.key)
}
}
}
function filterOutdatedJobs(jobs: JobInformation[]) {
const jobGroups = new Map()
jobs.forEach((job) => {
if (!jobGroups.has(job.name)) {
jobGroups.set(job.name, [])
}
jobGroups.get(job.name).push(job)
})
const mostRecentJobs = new Map()
jobGroups.forEach((group, name) => {
const mostRecentJob = group.reduce((mostRecent, current) => {
if (current.id === null) return mostRecent
const currentIdTime = current.id
const mostRecentIdTime = mostRecent ? mostRecent.id : 0
return currentIdTime > mostRecentIdTime ? current : mostRecent
}, null)
if (mostRecentJob) {
mostRecentJobs.set(name, mostRecentJob.id)
}
})
return jobs.filter((job: JobInformation) => {
const mostRecentId = mostRecentJobs.get(job.name)
return job.id === null || job.id !== mostRecentId
})
}
export default cleanUpOutdatedJobs

View file

@ -1,6 +1,6 @@
import type { AccountConnection, User, Account } from '@prisma/client'
import type { Logger } from 'winston'
import type { Job, JobOptions } from 'bull'
import type { Job, JobOptions, JobInformation } from 'bull'
import type { SharedType } from '@maybe-finance/shared'
export type IJob<T> = Pick<Job<T>, 'id' | 'name' | 'data' | 'progress'>
@ -20,6 +20,8 @@ export type IQueue<TData extends Record<string, any> = {}, TJobName extends stri
options?: { concurrency: number }
): Promise<void>
getActiveJobs(): Promise<IJob<TData>[]>
getRepeatableJobs(): Promise<JobInformation[]>
removeRepeatableByKey(key: string): Promise<void>
cancelJobs(): Promise<void>
}

View file

@ -120,6 +120,14 @@ export class BullQueue<TData extends Record<string, any> = any, TJobName extends
return this.queue.getActive()
}
async getRepeatableJobs(): Promise<Queue.JobInformation[]> {
return this.queue.getRepeatableJobs()
}
async removeRepeatableByKey(key: string) {
return this.queue.removeRepeatableByKey(key)
}
async cancelJobs() {
await this.queue.pause(true, true)
await this.queue.removeJobs('*')

View file

@ -53,6 +53,14 @@ export class InMemoryQueue<
return []
}
async getRepeatableJobs() {
return []
}
async removeRepeatableByKey(_key: string) {
// no-op
}
async cancelJobs() {
// no-op
}
@ -63,6 +71,7 @@ export class InMemoryQueueFactory implements IQueueFactory {
private readonly ignoreJobNames: string[] = [
'sync-all-securities',
'sync-plaid-institutions',
'sync-finicity-institutions',
'trial-reminders',
'send-email',
]