diff --git a/apps/workers/src/main.ts b/apps/workers/src/main.ts index 94f6e95d..ef2809d4 100644 --- a/apps/workers/src/main.ts +++ b/apps/workers/src/main.ts @@ -16,8 +16,7 @@ import { workerErrorHandlerService, } from './app/lib/di' import env from './env' -import { cleanUpOutdatedJobs, stopJobsWithName } from './utils' -import { SecurityProvider } from '@prisma/client' +import { cleanUpOutdatedJobs } from './utils' // Defaults from quickstart - https://docs.sentry.io/platforms/node/ Sentry.init({ @@ -40,11 +39,6 @@ const purgeUserQueue = queueService.getQueue('purge-user') const syncInstitutionQueue = queueService.getQueue('sync-institution') const sendEmailQueue = queueService.getQueue('send-email') -// Replace any jobs that have changed cron schedules and ensures only -// one repeatable jobs for each type is running -stopJobsWithName(syncSecurityQueue, 'sync-all-securities') -stopJobsWithName(syncSecurityQueue, 'sync-us-stock-tickers') - syncUserQueue.process( 'sync-user', async (job) => { @@ -125,33 +119,34 @@ if (env.NX_POLYGON_TIER !== 'basic') { // If no securities exist, sync them immediately // Otherwise, schedule the job to run every 24 hours // Use same jobID to prevent duplicates and rate limiting -async function setupJobs() { - const count = await prisma.security.count({ - where: { - providerName: SecurityProvider.polygon, - }, - }) - if (count === 0) { - await syncSecurityQueue.add( - 'sync-us-stock-tickers', - {}, - { jobId: 'sync-us-stock-tickers', removeOnComplete: true } - ) - } - // Then schedule it to run every 24 hours - await syncSecurityQueue.add( - 'sync-us-stock-tickers', - {}, - { - jobId: 'sync-us-stock-tickers', - repeat: { - cron: '0 0 * * *', // At 00:00 (midnight) every day +syncSecurityQueue.cancelJobs().then(() => { + prisma.security + .count({ + where: { + providerName: 'polygon', }, - } - ) -} + }) + .then((count) => { + if (count === 0) { + syncSecurityQueue.add( + 'sync-us-stock-tickers', + {}, + { jobId: Date.now().toString(), removeOnComplete: false, delay: 5000 } + ) + } else { + syncSecurityQueue.add( + 'sync-us-stock-tickers', + {}, + { + repeat: { cron: '0 */24 * * *' }, // Run every 24 hours + jobId: Date.now().toString(), + } + ) + } + }) +}) -setupJobs() +// Then schedule it to run every 24 hours /** * sync-institution queue diff --git a/apps/workers/src/utils.ts b/apps/workers/src/utils.ts index 877aa615..4f500eab 100644 --- a/apps/workers/src/utils.ts +++ b/apps/workers/src/utils.ts @@ -42,23 +42,3 @@ function filterOutdatedJobs(jobs: JobInformation[]) { return job.id === null || job.id !== mostRecentId }) } - -export async function stopJobsWithName(queue, jobName) { - // Get all jobs that might be in a state that allows them to be stopped - const jobs = await queue.getJobs(['active', 'waiting', 'delayed', 'paused']) - - // Filter jobs by name - const jobsToStop = jobs.filter((job) => job.name === jobName) - - // Process each job to stop it - for (const job of jobsToStop) { - if (job.isActive()) { - job.moveToFailed(new Error('Job stopped'), true) - // For active jobs, you might need to implement a soft stop mechanism - // This could involve setting a flag in your job processing logic to stop the job safely - } else { - // For non-active jobs, you can directly remove or fail them - await job.remove() // or job.discard() or job.moveToFailed(new Error('Job stopped'), true) - } - } -} diff --git a/libs/server/features/src/security-pricing/security-pricing.service.ts b/libs/server/features/src/security-pricing/security-pricing.service.ts index 37254e62..db658b8a 100644 --- a/libs/server/features/src/security-pricing/security-pricing.service.ts +++ b/libs/server/features/src/security-pricing/security-pricing.service.ts @@ -169,34 +169,34 @@ export class SecurityPricingService implements ISecurityPricingService { _.chunk(usStockTickers, 1_000).map((chunk) => { return this.prisma.$transaction([ this.prisma.$executeRaw` - INSERT INTO security (name, symbol, currency_code, exchange_acronym, exchange_mic, exchange_name, provider_name) - VALUES - ${Prisma.join( - chunk.map( - ({ - name, - ticker, - currency_name, - exchangeAcronym, - exchangeMic, - exchangeName, - }) => - Prisma.sql`( - ${name}, - ${ticker}, - ${currency_name?.toUpperCase()}, - ${exchangeAcronym}, - ${exchangeMic}, - ${exchangeName}, - ${SecurityProvider.polygon}, - )` - ) - )} - ON CONFLICT (symbol) DO UPDATE - SET - name = EXCLUDED.name, - currency_code = EXCLUDED.currency_code; - `, + INSERT INTO security (name, symbol, currency_code, exchange_acronym, exchange_mic, exchange_name, provider_name) + VALUES + ${Prisma.join( + chunk.map( + ({ + name, + ticker, + currency_name, + exchangeAcronym, + exchangeMic, + exchangeName, + }) => + Prisma.sql`( + ${name}, + ${ticker}, + ${currency_name?.toUpperCase()}, + ${exchangeAcronym}, + ${exchangeMic}, + ${exchangeName}, + ${SecurityProvider.polygon}::"SecurityProvider" + )` + ) + )} + ON CONFLICT (symbol, exchange_mic) DO UPDATE + SET + name = EXCLUDED.name, + currency_code = EXCLUDED.currency_code; + `, ]) }) diff --git a/libs/server/shared/src/services/market-data.service.ts b/libs/server/shared/src/services/market-data.service.ts index b95ba411..5bece227 100644 --- a/libs/server/shared/src/services/market-data.service.ts +++ b/libs/server/shared/src/services/market-data.service.ts @@ -303,7 +303,11 @@ export class PolygonMarketDataService implements IMarketDataService { exchangeName: string })[] > { - const exchanges = await this.api.reference.exchanges({ locale: 'us' }) + const shouldRateLimit = process.env.NX_POLYGON_TIER === 'basic' + const exchanges = await this.api.reference.exchanges({ + locale: 'us', + asset_class: 'stocks', + }) const tickers: (ITickersResults & { exchangeAcronym: string @@ -317,13 +321,12 @@ export class PolygonMarketDataService implements IMarketDataService { exchangeName: string })[] = await SharedUtil.paginateWithNextUrl({ pageSize: 1000, - delay: - process.env.NX_POLYGON_TIER === 'basic' - ? { - onDelay: (message: string) => this.logger.debug(message), - milliseconds: 25_000, // Basic accounts rate limited at 5 calls / minute - } - : undefined, + delay: shouldRateLimit + ? { + onDelay: (message: string) => this.logger.debug(message), + milliseconds: 15_000, // Basic accounts rate limited at 5 calls / minute + } + : undefined, fetchData: async (limit, nextCursor) => { try { const { results, next_url } = await SharedUtil.withRetry( @@ -334,7 +337,7 @@ export class PolygonMarketDataService implements IMarketDataService { cursor: nextCursor, limit: limit, }), - { maxRetries: 1, delay: 25_000 } + { maxRetries: 1, delay: shouldRateLimit ? 15_000 : 0 } ) const tickersWithExchange = results.map((ticker) => { return { diff --git a/prisma/migrations/20240121084645_create_unique_fields_for_security/migration.sql b/prisma/migrations/20240121084645_create_unique_fields_for_security/migration.sql new file mode 100644 index 00000000..1cbadc3f --- /dev/null +++ b/prisma/migrations/20240121084645_create_unique_fields_for_security/migration.sql @@ -0,0 +1,2 @@ +ALTER TABLE security +ADD UNIQUE (symbol, exchange_mic); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index d6b31524..74641024 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -283,6 +283,7 @@ model Security { investmentTransactions InvestmentTransaction[] pricing SecurityPricing[] + @@unique([symbol, exchangeMic]) @@map("security") }