1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-08-09 15:35:22 +02:00
This commit is contained in:
Tyler Myracle 2024-01-21 03:13:06 -06:00
parent 4f22541669
commit 3eee3e1494
6 changed files with 70 additions and 89 deletions

View file

@ -16,8 +16,7 @@ import {
workerErrorHandlerService, workerErrorHandlerService,
} from './app/lib/di' } from './app/lib/di'
import env from './env' import env from './env'
import { cleanUpOutdatedJobs, stopJobsWithName } from './utils' import { cleanUpOutdatedJobs } from './utils'
import { SecurityProvider } from '@prisma/client'
// Defaults from quickstart - https://docs.sentry.io/platforms/node/ // Defaults from quickstart - https://docs.sentry.io/platforms/node/
Sentry.init({ Sentry.init({
@ -40,11 +39,6 @@ const purgeUserQueue = queueService.getQueue('purge-user')
const syncInstitutionQueue = queueService.getQueue('sync-institution') const syncInstitutionQueue = queueService.getQueue('sync-institution')
const sendEmailQueue = queueService.getQueue('send-email') const sendEmailQueue = queueService.getQueue('send-email')
// Replace any jobs that have changed cron schedules and ensures only
// one repeatable jobs for each type is running
stopJobsWithName(syncSecurityQueue, 'sync-all-securities')
stopJobsWithName(syncSecurityQueue, 'sync-us-stock-tickers')
syncUserQueue.process( syncUserQueue.process(
'sync-user', 'sync-user',
async (job) => { async (job) => {
@ -125,33 +119,34 @@ if (env.NX_POLYGON_TIER !== 'basic') {
// If no securities exist, sync them immediately // If no securities exist, sync them immediately
// Otherwise, schedule the job to run every 24 hours // Otherwise, schedule the job to run every 24 hours
// Use same jobID to prevent duplicates and rate limiting // Use same jobID to prevent duplicates and rate limiting
async function setupJobs() { syncSecurityQueue.cancelJobs().then(() => {
const count = await prisma.security.count({ prisma.security
.count({
where: { where: {
providerName: SecurityProvider.polygon, providerName: 'polygon',
}, },
}) })
.then((count) => {
if (count === 0) { if (count === 0) {
await syncSecurityQueue.add( syncSecurityQueue.add(
'sync-us-stock-tickers', 'sync-us-stock-tickers',
{}, {},
{ jobId: 'sync-us-stock-tickers', removeOnComplete: true } { jobId: Date.now().toString(), removeOnComplete: false, delay: 5000 }
) )
} } else {
// Then schedule it to run every 24 hours syncSecurityQueue.add(
await syncSecurityQueue.add(
'sync-us-stock-tickers', 'sync-us-stock-tickers',
{}, {},
{ {
jobId: 'sync-us-stock-tickers', repeat: { cron: '0 */24 * * *' }, // Run every 24 hours
repeat: { jobId: Date.now().toString(),
cron: '0 0 * * *', // At 00:00 (midnight) every day
},
} }
) )
} }
})
})
setupJobs() // Then schedule it to run every 24 hours
/** /**
* sync-institution queue * sync-institution queue

View file

@ -42,23 +42,3 @@ function filterOutdatedJobs(jobs: JobInformation[]) {
return job.id === null || job.id !== mostRecentId return job.id === null || job.id !== mostRecentId
}) })
} }
export async function stopJobsWithName(queue, jobName) {
// Get all jobs that might be in a state that allows them to be stopped
const jobs = await queue.getJobs(['active', 'waiting', 'delayed', 'paused'])
// Filter jobs by name
const jobsToStop = jobs.filter((job) => job.name === jobName)
// Process each job to stop it
for (const job of jobsToStop) {
if (job.isActive()) {
job.moveToFailed(new Error('Job stopped'), true)
// For active jobs, you might need to implement a soft stop mechanism
// This could involve setting a flag in your job processing logic to stop the job safely
} else {
// For non-active jobs, you can directly remove or fail them
await job.remove() // or job.discard() or job.moveToFailed(new Error('Job stopped'), true)
}
}
}

View file

@ -188,11 +188,11 @@ export class SecurityPricingService implements ISecurityPricingService {
${exchangeAcronym}, ${exchangeAcronym},
${exchangeMic}, ${exchangeMic},
${exchangeName}, ${exchangeName},
${SecurityProvider.polygon}, ${SecurityProvider.polygon}::"SecurityProvider"
)` )`
) )
)} )}
ON CONFLICT (symbol) DO UPDATE ON CONFLICT (symbol, exchange_mic) DO UPDATE
SET SET
name = EXCLUDED.name, name = EXCLUDED.name,
currency_code = EXCLUDED.currency_code; currency_code = EXCLUDED.currency_code;

View file

@ -303,7 +303,11 @@ export class PolygonMarketDataService implements IMarketDataService {
exchangeName: string exchangeName: string
})[] })[]
> { > {
const exchanges = await this.api.reference.exchanges({ locale: 'us' }) const shouldRateLimit = process.env.NX_POLYGON_TIER === 'basic'
const exchanges = await this.api.reference.exchanges({
locale: 'us',
asset_class: 'stocks',
})
const tickers: (ITickersResults & { const tickers: (ITickersResults & {
exchangeAcronym: string exchangeAcronym: string
@ -317,11 +321,10 @@ export class PolygonMarketDataService implements IMarketDataService {
exchangeName: string exchangeName: string
})[] = await SharedUtil.paginateWithNextUrl({ })[] = await SharedUtil.paginateWithNextUrl({
pageSize: 1000, pageSize: 1000,
delay: delay: shouldRateLimit
process.env.NX_POLYGON_TIER === 'basic'
? { ? {
onDelay: (message: string) => this.logger.debug(message), onDelay: (message: string) => this.logger.debug(message),
milliseconds: 25_000, // Basic accounts rate limited at 5 calls / minute milliseconds: 15_000, // Basic accounts rate limited at 5 calls / minute
} }
: undefined, : undefined,
fetchData: async (limit, nextCursor) => { fetchData: async (limit, nextCursor) => {
@ -334,7 +337,7 @@ export class PolygonMarketDataService implements IMarketDataService {
cursor: nextCursor, cursor: nextCursor,
limit: limit, limit: limit,
}), }),
{ maxRetries: 1, delay: 25_000 } { maxRetries: 1, delay: shouldRateLimit ? 15_000 : 0 }
) )
const tickersWithExchange = results.map((ticker) => { const tickersWithExchange = results.map((ticker) => {
return { return {

View file

@ -0,0 +1,2 @@
ALTER TABLE security
ADD UNIQUE (symbol, exchange_mic);

View file

@ -283,6 +283,7 @@ model Security {
investmentTransactions InvestmentTransaction[] investmentTransactions InvestmentTransaction[]
pricing SecurityPricing[] pricing SecurityPricing[]
@@unique([symbol, exchangeMic])
@@map("security") @@map("security")
} }