diff --git a/.env.example b/.env.example index 31348766..cd000ff0 100644 --- a/.env.example +++ b/.env.example @@ -31,6 +31,8 @@ NX_DATABASE_SECRET= # We use Polygon.io for market data. You can sign up for a free account # and get an API key for individual use at https://polygon.io NX_POLYGON_API_KEY= +# Basic is free tier, see: https://polygon.io/pricing +NX_POLYGON_TIER=basic # Automated banking data # We use Teller.io for automated banking data. You can sign up for a free diff --git a/apps/server/src/env.ts b/apps/server/src/env.ts index 716e6934..7b8986c9 100644 --- a/apps/server/src/env.ts +++ b/apps/server/src/env.ts @@ -44,6 +44,7 @@ const envSchema = z.object({ NX_SENTRY_ENV: z.string().optional(), NX_POLYGON_API_KEY: z.string().default(''), + NX_POLYGON_TIER: z.string().default('basic'), NX_PORT: z.string().default('3333'), NX_CORS_ORIGINS: z.string().default('https://localhost.maybe.co').transform(toOriginArray), diff --git a/apps/workers/src/env.ts b/apps/workers/src/env.ts index 1b3a651c..bc5d1677 100644 --- a/apps/workers/src/env.ts +++ b/apps/workers/src/env.ts @@ -20,6 +20,7 @@ const envSchema = z.object({ NX_REDIS_URL: z.string().default('redis://localhost:6379'), NX_POLYGON_API_KEY: z.string().default(''), + NX_POLYGON_TIER: z.string().default('basic'), NX_POSTMARK_FROM_ADDRESS: z.string().default('account@maybe.co'), NX_POSTMARK_REPLY_TO_ADDRESS: z.string().default('support@maybe.co'), diff --git a/apps/workers/src/main.ts b/apps/workers/src/main.ts index b28299a8..94f6e95d 100644 --- a/apps/workers/src/main.ts +++ b/apps/workers/src/main.ts @@ -16,7 +16,8 @@ import { workerErrorHandlerService, } from './app/lib/di' import env from './env' -import { cleanUpOutdatedJobs } from './utils' +import { cleanUpOutdatedJobs, stopJobsWithName } from './utils' +import { SecurityProvider } from '@prisma/client' // Defaults from quickstart - https://docs.sentry.io/platforms/node/ Sentry.init({ @@ -39,6 +40,11 @@ const purgeUserQueue = queueService.getQueue('purge-user') const syncInstitutionQueue = queueService.getQueue('sync-institution') const sendEmailQueue = queueService.getQueue('send-email') +// Replace any jobs that have changed cron schedules and ensures only +// one repeatable jobs for each type is running +stopJobsWithName(syncSecurityQueue, 'sync-all-securities') +stopJobsWithName(syncSecurityQueue, 'sync-us-stock-tickers') + syncUserQueue.process( 'sync-user', async (job) => { @@ -80,6 +86,14 @@ syncSecurityQueue.process( async () => await securityPricingProcessor.syncAll() ) +/** + * sync-us-stock-ticker queue + */ +syncSecurityQueue.process( + 'sync-us-stock-tickers', + async () => await securityPricingProcessor.syncUSStockTickers() +) + /** * purge-user queue */ @@ -94,15 +108,50 @@ purgeUserQueue.process( /** * sync-all-securities queue */ -// Start repeated job for syncing securities (Bull won't duplicate it as long as the repeat options are the same) -syncSecurityQueue.add( - 'sync-all-securities', - {}, - { - repeat: { cron: '*/5 * * * *' }, // Run every 5 minutes - jobId: Date.now().toString(), +// Start repeated job for syncing securities +// (Bull won't duplicate it as long as the repeat options are the same) +// Do not run if on the free tier (rate limits) +if (env.NX_POLYGON_TIER !== 'basic') { + syncSecurityQueue.add( + 'sync-all-securities', + {}, + { + repeat: { cron: '*/5 * * * *' }, // Run every 5 minutes + jobId: Date.now().toString(), + } + ) +} + +// If no securities exist, sync them immediately +// Otherwise, schedule the job to run every 24 hours +// Use same jobID to prevent duplicates and rate limiting +async function setupJobs() { + const count = await prisma.security.count({ + where: { + providerName: SecurityProvider.polygon, + }, + }) + if (count === 0) { + await syncSecurityQueue.add( + 'sync-us-stock-tickers', + {}, + { jobId: 'sync-us-stock-tickers', removeOnComplete: true } + ) } -) + // Then schedule it to run every 24 hours + await syncSecurityQueue.add( + 'sync-us-stock-tickers', + {}, + { + jobId: 'sync-us-stock-tickers', + repeat: { + cron: '0 0 * * *', // At 00:00 (midnight) every day + }, + } + ) +} + +setupJobs() /** * sync-institution queue diff --git a/apps/workers/src/utils.ts b/apps/workers/src/utils.ts index de615175..877aa615 100644 --- a/apps/workers/src/utils.ts +++ b/apps/workers/src/utils.ts @@ -43,4 +43,22 @@ function filterOutdatedJobs(jobs: JobInformation[]) { }) } -export default cleanUpOutdatedJobs +export async function stopJobsWithName(queue, jobName) { + // Get all jobs that might be in a state that allows them to be stopped + const jobs = await queue.getJobs(['active', 'waiting', 'delayed', 'paused']) + + // Filter jobs by name + const jobsToStop = jobs.filter((job) => job.name === jobName) + + // Process each job to stop it + for (const job of jobsToStop) { + if (job.isActive()) { + job.moveToFailed(new Error('Job stopped'), true) + // For active jobs, you might need to implement a soft stop mechanism + // This could involve setting a flag in your job processing logic to stop the job safely + } else { + // For non-active jobs, you can directly remove or fail them + await job.remove() // or job.discard() or job.moveToFailed(new Error('Job stopped'), true) + } + } +} diff --git a/libs/server/features/src/providers/teller/teller.etl.ts b/libs/server/features/src/providers/teller/teller.etl.ts index 406f70c3..c817f47e 100644 --- a/libs/server/features/src/providers/teller/teller.etl.ts +++ b/libs/server/features/src/providers/teller/teller.etl.ts @@ -100,10 +100,7 @@ export class TellerETL implements IETL { const accounts = await this._extractAccounts(accessToken) - const transactions = await this._extractTransactions( - accessToken, - accounts.map((a) => a.id) - ) + const transactions = await this._extractTransactions(accessToken, accounts) this.logger.info( `Extracted Teller data for customer ${user.tellerUserId} accounts=${accounts.length} transactions=${transactions.length}`, @@ -144,6 +141,7 @@ export class TellerETL implements IETL { // upsert accounts ...accounts.map((tellerAccount) => { const type = TellerUtil.getType(tellerAccount.type) + const categoryProvider = TellerUtil.tellerTypesToCategory(tellerAccount.type) const classification = AccountUtil.getClassification(type) return this.prisma.account.upsert({ @@ -154,9 +152,9 @@ export class TellerETL implements IETL { }, }, create: { - type: TellerUtil.getType(tellerAccount.type), + type, provider: 'teller', - categoryProvider: TellerUtil.tellerTypesToCategory(tellerAccount.type), + categoryProvider, subcategoryProvider: tellerAccount.subtype ?? 'other', accountConnectionId: connection.id, userId: connection.userId, @@ -196,26 +194,26 @@ export class TellerETL implements IETL { ] } - private async _extractTransactions(accessToken: string, accountIds: string[]) { + private async _extractTransactions( + accessToken: string, + accounts: TellerTypes.AccountWithBalances[] + ) { const accountTransactions = await Promise.all( - accountIds.map(async (accountId) => { - const account = await this.prisma.account.findFirst({ - where: { - tellerAccountId: accountId, - }, - }) - + accounts.map(async (tellerAccount) => { const transactions = await SharedUtil.withRetry( () => this.teller.getTransactions({ - accountId, + accountId: tellerAccount.id, accessToken, }), { maxRetries: 3, } ) - if (account!.classification === AccountClassification.asset) { + const type = TellerUtil.getType(tellerAccount.type) + const classification = AccountUtil.getClassification(type) + + if (classification === AccountClassification.asset) { transactions.forEach((t) => { t.amount = String(Number(t.amount) * -1) }) @@ -266,7 +264,11 @@ export class TellerETL implements IETL { ${details.counterparty?.name ?? ''}, ${type}, ${details.category ?? ''}, - ${maybeCategoryByTellerCategory[details.category ?? ''] ?? 'Other'} + ${ + details.category + ? maybeCategoryByTellerCategory[details.category] + : 'Other' + } )` }) )} @@ -277,7 +279,7 @@ export class TellerETL implements IETL { pending = EXCLUDED.pending, merchant_name = EXCLUDED.merchant_name, teller_type = EXCLUDED.teller_type, - teller_category = EXCLUDED.teller_category; + teller_category = EXCLUDED.teller_category, category = EXCLUDED.category; ` }) diff --git a/libs/server/features/src/security-pricing/security-pricing.processor.ts b/libs/server/features/src/security-pricing/security-pricing.processor.ts index 9188265d..3f3ddd52 100644 --- a/libs/server/features/src/security-pricing/security-pricing.processor.ts +++ b/libs/server/features/src/security-pricing/security-pricing.processor.ts @@ -4,6 +4,7 @@ import type { ISecurityPricingService } from './security-pricing.service' export interface ISecurityPricingProcessor { syncAll(jobData?: SyncSecurityQueueJobData): Promise + syncUSStockTickers(jobData?: SyncSecurityQueueJobData): Promise } export class SecurityPricingProcessor implements ISecurityPricingProcessor { @@ -15,4 +16,8 @@ export class SecurityPricingProcessor implements ISecurityPricingProcessor { async syncAll(_jobData?: SyncSecurityQueueJobData) { await this.securityPricingService.syncAll() } + + async syncUSStockTickers(_jobData?: SyncSecurityQueueJobData) { + await this.securityPricingService.syncUSStockTickers() + } } diff --git a/libs/server/features/src/security-pricing/security-pricing.service.ts b/libs/server/features/src/security-pricing/security-pricing.service.ts index 02918f02..37254e62 100644 --- a/libs/server/features/src/security-pricing/security-pricing.service.ts +++ b/libs/server/features/src/security-pricing/security-pricing.service.ts @@ -1,13 +1,16 @@ import type { PrismaClient, Security } from '@prisma/client' +import { SecurityProvider } from '@prisma/client' import type { IMarketDataService } from '@maybe-finance/server/shared' import type { Logger } from 'winston' import { Prisma } from '@prisma/client' import { DateTime } from 'luxon' import { SharedUtil } from '@maybe-finance/shared' +import _ from 'lodash' export interface ISecurityPricingService { sync(security: Pick, syncStart?: string): Promise syncAll(): Promise + syncUSStockTickers(): Promise } export class SecurityPricingService implements ISecurityPricingService { @@ -154,4 +157,49 @@ export class SecurityPricingService implements ISecurityPricingService { profiler.done({ message: 'Synced all securities' }) } + + async syncUSStockTickers() { + const profiler = this.logger.startTimer() + const usStockTickers = await this.marketDataService.getUSStockTickers() + + if (!usStockTickers.length) return + + this.logger.debug(`fetched ${usStockTickers.length} stock tickers`) + + _.chunk(usStockTickers, 1_000).map((chunk) => { + return this.prisma.$transaction([ + this.prisma.$executeRaw` + INSERT INTO security (name, symbol, currency_code, exchange_acronym, exchange_mic, exchange_name, provider_name) + VALUES + ${Prisma.join( + chunk.map( + ({ + name, + ticker, + currency_name, + exchangeAcronym, + exchangeMic, + exchangeName, + }) => + Prisma.sql`( + ${name}, + ${ticker}, + ${currency_name?.toUpperCase()}, + ${exchangeAcronym}, + ${exchangeMic}, + ${exchangeName}, + ${SecurityProvider.polygon}, + )` + ) + )} + ON CONFLICT (symbol) DO UPDATE + SET + name = EXCLUDED.name, + currency_code = EXCLUDED.currency_code; + `, + ]) + }) + + profiler.done({ message: 'Synced US stock tickers' }) + } } diff --git a/libs/server/shared/src/services/market-data.service.ts b/libs/server/shared/src/services/market-data.service.ts index ba443867..b95ba411 100644 --- a/libs/server/shared/src/services/market-data.service.ts +++ b/libs/server/shared/src/services/market-data.service.ts @@ -9,6 +9,7 @@ import type { SharedType } from '@maybe-finance/shared' import { MarketUtil, SharedUtil } from '@maybe-finance/shared' import type { CacheService } from '.' import { toDecimal } from '../utils/db-utils' +import type { ITickersResults } from '@polygon.io/client-js/lib/rest/reference/tickers' type DailyPricing = { date: DateTime @@ -62,6 +63,17 @@ export interface IMarketDataService { getSecurityDetails( security: Pick ): Promise + + /** + * fetches all US stock tickers + */ + getUSStockTickers(): Promise< + (ITickersResults & { + exchangeAcronym: string + exchangeMic: string + exchangeName: string + })[] + > } export class PolygonMarketDataService implements IMarketDataService { @@ -284,6 +296,66 @@ export class PolygonMarketDataService implements IMarketDataService { return {} } + async getUSStockTickers(): Promise< + (ITickersResults & { + exchangeAcronym: string + exchangeMic: string + exchangeName: string + })[] + > { + const exchanges = await this.api.reference.exchanges({ locale: 'us' }) + + const tickers: (ITickersResults & { + exchangeAcronym: string + exchangeMic: string + exchangeName: string + })[] = [] + for (const exchange of exchanges.results) { + const exchangeTickers: (ITickersResults & { + exchangeAcronym: string + exchangeMic: string + exchangeName: string + })[] = await SharedUtil.paginateWithNextUrl({ + pageSize: 1000, + delay: + process.env.NX_POLYGON_TIER === 'basic' + ? { + onDelay: (message: string) => this.logger.debug(message), + milliseconds: 25_000, // Basic accounts rate limited at 5 calls / minute + } + : undefined, + fetchData: async (limit, nextCursor) => { + try { + const { results, next_url } = await SharedUtil.withRetry( + () => + this.api.reference.tickers({ + market: 'stocks', + exchange: exchange.mic, + cursor: nextCursor, + limit: limit, + }), + { maxRetries: 1, delay: 25_000 } + ) + const tickersWithExchange = results.map((ticker) => { + return { + ...ticker, + exchangeAcronym: exchange.acronymstring ?? '', + exchangeMic: exchange.mic ?? '', + exchangeName: exchange.name, + } + }) + return { data: tickersWithExchange, nextUrl: next_url } + } catch (err) { + this.logger.error('Error while fetching tickers', err) + return { data: [], nextUrl: undefined } + } + }, + }) + tickers.push(...exchangeTickers) + } + return tickers + } + private async _snapshotStocks(tickers: string[]) { /** * https://polygon.io/docs/stocks/get_v2_snapshot_locale_us_markets_stocks_tickers diff --git a/libs/server/shared/src/services/queue.service.ts b/libs/server/shared/src/services/queue.service.ts index aef47110..6a88c7b4 100644 --- a/libs/server/shared/src/services/queue.service.ts +++ b/libs/server/shared/src/services/queue.service.ts @@ -67,7 +67,10 @@ export type SendEmailQueueJobData = export type SyncUserQueue = IQueue export type SyncAccountQueue = IQueue export type SyncConnectionQueue = IQueue -export type SyncSecurityQueue = IQueue +export type SyncSecurityQueue = IQueue< + SyncSecurityQueueJobData, + 'sync-all-securities' | 'sync-us-stock-tickers' +> export type PurgeUserQueue = IQueue<{ userId: User['id'] }, 'purge-user'> export type SyncInstitutionQueue = IQueue< {}, diff --git a/libs/shared/src/utils/shared-utils.ts b/libs/shared/src/utils/shared-utils.ts index 85cbfb3a..f13cbe65 100644 --- a/libs/shared/src/utils/shared-utils.ts +++ b/libs/shared/src/utils/shared-utils.ts @@ -51,6 +51,51 @@ export async function paginate({ return result } +/** + * Helper function for paginating data with a next data url + */ +export async function paginateWithNextUrl({ + fetchData, + pageSize, + delay, +}: { + fetchData: ( + limit: number, + nextCursor: string | undefined + ) => Promise<{ data: TData[]; nextUrl: string | undefined }> + pageSize: number + delay?: { onDelay: (message: string) => void; milliseconds: number } +}): Promise { + let hasNextPage = true + let nextCursor: string | undefined = undefined + const result: TData[] = [] + + while (hasNextPage) { + // Fetch one page of data + const response: { data: TData[]; nextUrl: string | undefined } = await fetchData( + pageSize, + nextCursor + ) + const data = response.data + const nextUrl: string | undefined = response.nextUrl ?? undefined + nextCursor = nextUrl ? new URL(nextUrl).searchParams.get('cursor') ?? undefined : undefined + + // Add fetched data to the result + result.push(...data) + + // Determine if there is a next page + hasNextPage = !!nextCursor + + // Delay the next request if needed + if (delay) { + delay.onDelay(`Waiting ${delay.milliseconds / 1000} seconds`) + await new Promise((resolve) => setTimeout(resolve, delay.milliseconds)) + } + } + + return result +} + /** * Helper function for paginating data using a generator (typically from an API) */ diff --git a/prisma/migrations/20240121011219_add_provider_name_to_security/migration.sql b/prisma/migrations/20240121011219_add_provider_name_to_security/migration.sql new file mode 100644 index 00000000..b45d461a --- /dev/null +++ b/prisma/migrations/20240121011219_add_provider_name_to_security/migration.sql @@ -0,0 +1,5 @@ +-- CreateEnum +CREATE TYPE "SecurityProvider" AS ENUM ('polygon', 'other'); + +-- AlterTable +ALTER TABLE "security" ADD COLUMN "provider_name" "SecurityProvider" DEFAULT 'other'; diff --git a/prisma/migrations/20240121013630_add_exchange_info_to_security/migration.sql b/prisma/migrations/20240121013630_add_exchange_info_to_security/migration.sql new file mode 100644 index 00000000..46a5180e --- /dev/null +++ b/prisma/migrations/20240121013630_add_exchange_info_to_security/migration.sql @@ -0,0 +1,4 @@ +-- AlterTable +ALTER TABLE "security" ADD COLUMN "exchange_acronym" TEXT, +ADD COLUMN "exchange_mic" TEXT, +ADD COLUMN "exchange_name" TEXT; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index cccdd299..349852d3 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -243,18 +243,27 @@ model InvestmentTransaction { @@map("investment_transaction") } +enum SecurityProvider { + polygon + other +} + model Security { - id Int @id @default(autoincrement()) - createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6) - updatedAt DateTime @default(now()) @updatedAt @map("updated_at") @db.Timestamptz(6) + id Int @id @default(autoincrement()) + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6) + updatedAt DateTime @default(now()) @updatedAt @map("updated_at") @db.Timestamptz(6) name String? symbol String? cusip String? isin String? - sharesPerContract Decimal? @map("shares_per_contract") @db.Decimal(36, 18) - currencyCode String @default("USD") @map("currency_code") - pricingLastSyncedAt DateTime? @map("pricing_last_synced_at") @db.Timestamptz(6) - isBrokerageCash Boolean @default(false) @map("is_brokerage_cash") + sharesPerContract Decimal? @map("shares_per_contract") @db.Decimal(36, 18) + currencyCode String @default("USD") @map("currency_code") + pricingLastSyncedAt DateTime? @map("pricing_last_synced_at") @db.Timestamptz(6) + isBrokerageCash Boolean @default(false) @map("is_brokerage_cash") + exchangeAcroynm String? @map("exchange_acronym") + exchangeMic String? @map("exchange_mic") + exchangeName String? @map("exchange_name") + providerName SecurityProvider? @default(other) @map("provider_name") // plaid data plaidSecurityId String? @unique @map("plaid_security_id")