1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-08-09 15:35:22 +02:00
This commit is contained in:
Tyler Myracle 2024-01-20 23:12:25 -06:00
parent 0d5d7d5a7f
commit 75e90014e3
14 changed files with 300 additions and 36 deletions

View file

@ -31,6 +31,8 @@ NX_DATABASE_SECRET=
# We use Polygon.io for market data. You can sign up for a free account # We use Polygon.io for market data. You can sign up for a free account
# and get an API key for individual use at https://polygon.io # and get an API key for individual use at https://polygon.io
NX_POLYGON_API_KEY= NX_POLYGON_API_KEY=
# Basic is free tier, see: https://polygon.io/pricing
NX_POLYGON_TIER=basic
# Automated banking data # Automated banking data
# We use Teller.io for automated banking data. You can sign up for a free # We use Teller.io for automated banking data. You can sign up for a free

View file

@ -44,6 +44,7 @@ const envSchema = z.object({
NX_SENTRY_ENV: z.string().optional(), NX_SENTRY_ENV: z.string().optional(),
NX_POLYGON_API_KEY: z.string().default(''), NX_POLYGON_API_KEY: z.string().default(''),
NX_POLYGON_TIER: z.string().default('basic'),
NX_PORT: z.string().default('3333'), NX_PORT: z.string().default('3333'),
NX_CORS_ORIGINS: z.string().default('https://localhost.maybe.co').transform(toOriginArray), NX_CORS_ORIGINS: z.string().default('https://localhost.maybe.co').transform(toOriginArray),

View file

@ -20,6 +20,7 @@ const envSchema = z.object({
NX_REDIS_URL: z.string().default('redis://localhost:6379'), NX_REDIS_URL: z.string().default('redis://localhost:6379'),
NX_POLYGON_API_KEY: z.string().default(''), NX_POLYGON_API_KEY: z.string().default(''),
NX_POLYGON_TIER: z.string().default('basic'),
NX_POSTMARK_FROM_ADDRESS: z.string().default('account@maybe.co'), NX_POSTMARK_FROM_ADDRESS: z.string().default('account@maybe.co'),
NX_POSTMARK_REPLY_TO_ADDRESS: z.string().default('support@maybe.co'), NX_POSTMARK_REPLY_TO_ADDRESS: z.string().default('support@maybe.co'),

View file

@ -16,7 +16,8 @@ import {
workerErrorHandlerService, workerErrorHandlerService,
} from './app/lib/di' } from './app/lib/di'
import env from './env' import env from './env'
import { cleanUpOutdatedJobs } from './utils' import { cleanUpOutdatedJobs, stopJobsWithName } from './utils'
import { SecurityProvider } from '@prisma/client'
// Defaults from quickstart - https://docs.sentry.io/platforms/node/ // Defaults from quickstart - https://docs.sentry.io/platforms/node/
Sentry.init({ Sentry.init({
@ -39,6 +40,11 @@ const purgeUserQueue = queueService.getQueue('purge-user')
const syncInstitutionQueue = queueService.getQueue('sync-institution') const syncInstitutionQueue = queueService.getQueue('sync-institution')
const sendEmailQueue = queueService.getQueue('send-email') const sendEmailQueue = queueService.getQueue('send-email')
// Replace any jobs that have changed cron schedules and ensures only
// one repeatable jobs for each type is running
stopJobsWithName(syncSecurityQueue, 'sync-all-securities')
stopJobsWithName(syncSecurityQueue, 'sync-us-stock-tickers')
syncUserQueue.process( syncUserQueue.process(
'sync-user', 'sync-user',
async (job) => { async (job) => {
@ -80,6 +86,14 @@ syncSecurityQueue.process(
async () => await securityPricingProcessor.syncAll() async () => await securityPricingProcessor.syncAll()
) )
/**
* sync-us-stock-ticker queue
*/
syncSecurityQueue.process(
'sync-us-stock-tickers',
async () => await securityPricingProcessor.syncUSStockTickers()
)
/** /**
* purge-user queue * purge-user queue
*/ */
@ -94,7 +108,10 @@ purgeUserQueue.process(
/** /**
* sync-all-securities queue * sync-all-securities queue
*/ */
// Start repeated job for syncing securities (Bull won't duplicate it as long as the repeat options are the same) // Start repeated job for syncing securities
// (Bull won't duplicate it as long as the repeat options are the same)
// Do not run if on the free tier (rate limits)
if (env.NX_POLYGON_TIER !== 'basic') {
syncSecurityQueue.add( syncSecurityQueue.add(
'sync-all-securities', 'sync-all-securities',
{}, {},
@ -103,6 +120,38 @@ syncSecurityQueue.add(
jobId: Date.now().toString(), jobId: Date.now().toString(),
} }
) )
}
// If no securities exist, sync them immediately
// Otherwise, schedule the job to run every 24 hours
// Use same jobID to prevent duplicates and rate limiting
async function setupJobs() {
const count = await prisma.security.count({
where: {
providerName: SecurityProvider.polygon,
},
})
if (count === 0) {
await syncSecurityQueue.add(
'sync-us-stock-tickers',
{},
{ jobId: 'sync-us-stock-tickers', removeOnComplete: true }
)
}
// Then schedule it to run every 24 hours
await syncSecurityQueue.add(
'sync-us-stock-tickers',
{},
{
jobId: 'sync-us-stock-tickers',
repeat: {
cron: '0 0 * * *', // At 00:00 (midnight) every day
},
}
)
}
setupJobs()
/** /**
* sync-institution queue * sync-institution queue

View file

@ -43,4 +43,22 @@ function filterOutdatedJobs(jobs: JobInformation[]) {
}) })
} }
export default cleanUpOutdatedJobs export async function stopJobsWithName(queue, jobName) {
// Get all jobs that might be in a state that allows them to be stopped
const jobs = await queue.getJobs(['active', 'waiting', 'delayed', 'paused'])
// Filter jobs by name
const jobsToStop = jobs.filter((job) => job.name === jobName)
// Process each job to stop it
for (const job of jobsToStop) {
if (job.isActive()) {
job.moveToFailed(new Error('Job stopped'), true)
// For active jobs, you might need to implement a soft stop mechanism
// This could involve setting a flag in your job processing logic to stop the job safely
} else {
// For non-active jobs, you can directly remove or fail them
await job.remove() // or job.discard() or job.moveToFailed(new Error('Job stopped'), true)
}
}
}

View file

@ -100,10 +100,7 @@ export class TellerETL implements IETL<Connection, TellerRawData, TellerData> {
const accounts = await this._extractAccounts(accessToken) const accounts = await this._extractAccounts(accessToken)
const transactions = await this._extractTransactions( const transactions = await this._extractTransactions(accessToken, accounts)
accessToken,
accounts.map((a) => a.id)
)
this.logger.info( this.logger.info(
`Extracted Teller data for customer ${user.tellerUserId} accounts=${accounts.length} transactions=${transactions.length}`, `Extracted Teller data for customer ${user.tellerUserId} accounts=${accounts.length} transactions=${transactions.length}`,
@ -144,6 +141,7 @@ export class TellerETL implements IETL<Connection, TellerRawData, TellerData> {
// upsert accounts // upsert accounts
...accounts.map((tellerAccount) => { ...accounts.map((tellerAccount) => {
const type = TellerUtil.getType(tellerAccount.type) const type = TellerUtil.getType(tellerAccount.type)
const categoryProvider = TellerUtil.tellerTypesToCategory(tellerAccount.type)
const classification = AccountUtil.getClassification(type) const classification = AccountUtil.getClassification(type)
return this.prisma.account.upsert({ return this.prisma.account.upsert({
@ -154,9 +152,9 @@ export class TellerETL implements IETL<Connection, TellerRawData, TellerData> {
}, },
}, },
create: { create: {
type: TellerUtil.getType(tellerAccount.type), type,
provider: 'teller', provider: 'teller',
categoryProvider: TellerUtil.tellerTypesToCategory(tellerAccount.type), categoryProvider,
subcategoryProvider: tellerAccount.subtype ?? 'other', subcategoryProvider: tellerAccount.subtype ?? 'other',
accountConnectionId: connection.id, accountConnectionId: connection.id,
userId: connection.userId, userId: connection.userId,
@ -196,26 +194,26 @@ export class TellerETL implements IETL<Connection, TellerRawData, TellerData> {
] ]
} }
private async _extractTransactions(accessToken: string, accountIds: string[]) { private async _extractTransactions(
accessToken: string,
accounts: TellerTypes.AccountWithBalances[]
) {
const accountTransactions = await Promise.all( const accountTransactions = await Promise.all(
accountIds.map(async (accountId) => { accounts.map(async (tellerAccount) => {
const account = await this.prisma.account.findFirst({
where: {
tellerAccountId: accountId,
},
})
const transactions = await SharedUtil.withRetry( const transactions = await SharedUtil.withRetry(
() => () =>
this.teller.getTransactions({ this.teller.getTransactions({
accountId, accountId: tellerAccount.id,
accessToken, accessToken,
}), }),
{ {
maxRetries: 3, maxRetries: 3,
} }
) )
if (account!.classification === AccountClassification.asset) { const type = TellerUtil.getType(tellerAccount.type)
const classification = AccountUtil.getClassification(type)
if (classification === AccountClassification.asset) {
transactions.forEach((t) => { transactions.forEach((t) => {
t.amount = String(Number(t.amount) * -1) t.amount = String(Number(t.amount) * -1)
}) })
@ -266,7 +264,11 @@ export class TellerETL implements IETL<Connection, TellerRawData, TellerData> {
${details.counterparty?.name ?? ''}, ${details.counterparty?.name ?? ''},
${type}, ${type},
${details.category ?? ''}, ${details.category ?? ''},
${maybeCategoryByTellerCategory[details.category ?? ''] ?? 'Other'} ${
details.category
? maybeCategoryByTellerCategory[details.category]
: 'Other'
}
)` )`
}) })
)} )}
@ -277,7 +279,7 @@ export class TellerETL implements IETL<Connection, TellerRawData, TellerData> {
pending = EXCLUDED.pending, pending = EXCLUDED.pending,
merchant_name = EXCLUDED.merchant_name, merchant_name = EXCLUDED.merchant_name,
teller_type = EXCLUDED.teller_type, teller_type = EXCLUDED.teller_type,
teller_category = EXCLUDED.teller_category; teller_category = EXCLUDED.teller_category,
category = EXCLUDED.category; category = EXCLUDED.category;
` `
}) })

View file

@ -4,6 +4,7 @@ import type { ISecurityPricingService } from './security-pricing.service'
export interface ISecurityPricingProcessor { export interface ISecurityPricingProcessor {
syncAll(jobData?: SyncSecurityQueueJobData): Promise<void> syncAll(jobData?: SyncSecurityQueueJobData): Promise<void>
syncUSStockTickers(jobData?: SyncSecurityQueueJobData): Promise<void>
} }
export class SecurityPricingProcessor implements ISecurityPricingProcessor { export class SecurityPricingProcessor implements ISecurityPricingProcessor {
@ -15,4 +16,8 @@ export class SecurityPricingProcessor implements ISecurityPricingProcessor {
async syncAll(_jobData?: SyncSecurityQueueJobData) { async syncAll(_jobData?: SyncSecurityQueueJobData) {
await this.securityPricingService.syncAll() await this.securityPricingService.syncAll()
} }
async syncUSStockTickers(_jobData?: SyncSecurityQueueJobData) {
await this.securityPricingService.syncUSStockTickers()
}
} }

View file

@ -1,13 +1,16 @@
import type { PrismaClient, Security } from '@prisma/client' import type { PrismaClient, Security } from '@prisma/client'
import { SecurityProvider } from '@prisma/client'
import type { IMarketDataService } from '@maybe-finance/server/shared' import type { IMarketDataService } from '@maybe-finance/server/shared'
import type { Logger } from 'winston' import type { Logger } from 'winston'
import { Prisma } from '@prisma/client' import { Prisma } from '@prisma/client'
import { DateTime } from 'luxon' import { DateTime } from 'luxon'
import { SharedUtil } from '@maybe-finance/shared' import { SharedUtil } from '@maybe-finance/shared'
import _ from 'lodash'
export interface ISecurityPricingService { export interface ISecurityPricingService {
sync(security: Pick<Security, 'id' | 'symbol' | 'plaidType'>, syncStart?: string): Promise<void> sync(security: Pick<Security, 'id' | 'symbol' | 'plaidType'>, syncStart?: string): Promise<void>
syncAll(): Promise<void> syncAll(): Promise<void>
syncUSStockTickers(): Promise<void>
} }
export class SecurityPricingService implements ISecurityPricingService { export class SecurityPricingService implements ISecurityPricingService {
@ -154,4 +157,49 @@ export class SecurityPricingService implements ISecurityPricingService {
profiler.done({ message: 'Synced all securities' }) profiler.done({ message: 'Synced all securities' })
} }
async syncUSStockTickers() {
const profiler = this.logger.startTimer()
const usStockTickers = await this.marketDataService.getUSStockTickers()
if (!usStockTickers.length) return
this.logger.debug(`fetched ${usStockTickers.length} stock tickers`)
_.chunk(usStockTickers, 1_000).map((chunk) => {
return this.prisma.$transaction([
this.prisma.$executeRaw`
INSERT INTO security (name, symbol, currency_code, exchange_acronym, exchange_mic, exchange_name, provider_name)
VALUES
${Prisma.join(
chunk.map(
({
name,
ticker,
currency_name,
exchangeAcronym,
exchangeMic,
exchangeName,
}) =>
Prisma.sql`(
${name},
${ticker},
${currency_name?.toUpperCase()},
${exchangeAcronym},
${exchangeMic},
${exchangeName},
${SecurityProvider.polygon},
)`
)
)}
ON CONFLICT (symbol) DO UPDATE
SET
name = EXCLUDED.name,
currency_code = EXCLUDED.currency_code;
`,
])
})
profiler.done({ message: 'Synced US stock tickers' })
}
} }

View file

@ -9,6 +9,7 @@ import type { SharedType } from '@maybe-finance/shared'
import { MarketUtil, SharedUtil } from '@maybe-finance/shared' import { MarketUtil, SharedUtil } from '@maybe-finance/shared'
import type { CacheService } from '.' import type { CacheService } from '.'
import { toDecimal } from '../utils/db-utils' import { toDecimal } from '../utils/db-utils'
import type { ITickersResults } from '@polygon.io/client-js/lib/rest/reference/tickers'
type DailyPricing = { type DailyPricing = {
date: DateTime date: DateTime
@ -62,6 +63,17 @@ export interface IMarketDataService {
getSecurityDetails( getSecurityDetails(
security: Pick<Security, 'symbol' | 'plaidType' | 'currencyCode'> security: Pick<Security, 'symbol' | 'plaidType' | 'currencyCode'>
): Promise<SharedType.SecurityDetails> ): Promise<SharedType.SecurityDetails>
/**
* fetches all US stock tickers
*/
getUSStockTickers(): Promise<
(ITickersResults & {
exchangeAcronym: string
exchangeMic: string
exchangeName: string
})[]
>
} }
export class PolygonMarketDataService implements IMarketDataService { export class PolygonMarketDataService implements IMarketDataService {
@ -284,6 +296,66 @@ export class PolygonMarketDataService implements IMarketDataService {
return {} return {}
} }
async getUSStockTickers(): Promise<
(ITickersResults & {
exchangeAcronym: string
exchangeMic: string
exchangeName: string
})[]
> {
const exchanges = await this.api.reference.exchanges({ locale: 'us' })
const tickers: (ITickersResults & {
exchangeAcronym: string
exchangeMic: string
exchangeName: string
})[] = []
for (const exchange of exchanges.results) {
const exchangeTickers: (ITickersResults & {
exchangeAcronym: string
exchangeMic: string
exchangeName: string
})[] = await SharedUtil.paginateWithNextUrl({
pageSize: 1000,
delay:
process.env.NX_POLYGON_TIER === 'basic'
? {
onDelay: (message: string) => this.logger.debug(message),
milliseconds: 25_000, // Basic accounts rate limited at 5 calls / minute
}
: undefined,
fetchData: async (limit, nextCursor) => {
try {
const { results, next_url } = await SharedUtil.withRetry(
() =>
this.api.reference.tickers({
market: 'stocks',
exchange: exchange.mic,
cursor: nextCursor,
limit: limit,
}),
{ maxRetries: 1, delay: 25_000 }
)
const tickersWithExchange = results.map((ticker) => {
return {
...ticker,
exchangeAcronym: exchange.acronymstring ?? '',
exchangeMic: exchange.mic ?? '',
exchangeName: exchange.name,
}
})
return { data: tickersWithExchange, nextUrl: next_url }
} catch (err) {
this.logger.error('Error while fetching tickers', err)
return { data: [], nextUrl: undefined }
}
},
})
tickers.push(...exchangeTickers)
}
return tickers
}
private async _snapshotStocks(tickers: string[]) { private async _snapshotStocks(tickers: string[]) {
/** /**
* https://polygon.io/docs/stocks/get_v2_snapshot_locale_us_markets_stocks_tickers * https://polygon.io/docs/stocks/get_v2_snapshot_locale_us_markets_stocks_tickers

View file

@ -67,7 +67,10 @@ export type SendEmailQueueJobData =
export type SyncUserQueue = IQueue<SyncUserQueueJobData, 'sync-user'> export type SyncUserQueue = IQueue<SyncUserQueueJobData, 'sync-user'>
export type SyncAccountQueue = IQueue<SyncAccountQueueJobData, 'sync-account'> export type SyncAccountQueue = IQueue<SyncAccountQueueJobData, 'sync-account'>
export type SyncConnectionQueue = IQueue<SyncConnectionQueueJobData, 'sync-connection'> export type SyncConnectionQueue = IQueue<SyncConnectionQueueJobData, 'sync-connection'>
export type SyncSecurityQueue = IQueue<SyncSecurityQueueJobData, 'sync-all-securities'> export type SyncSecurityQueue = IQueue<
SyncSecurityQueueJobData,
'sync-all-securities' | 'sync-us-stock-tickers'
>
export type PurgeUserQueue = IQueue<{ userId: User['id'] }, 'purge-user'> export type PurgeUserQueue = IQueue<{ userId: User['id'] }, 'purge-user'>
export type SyncInstitutionQueue = IQueue< export type SyncInstitutionQueue = IQueue<
{}, {},

View file

@ -51,6 +51,51 @@ export async function paginate<TData>({
return result return result
} }
/**
* Helper function for paginating data with a next data url
*/
export async function paginateWithNextUrl<TData>({
fetchData,
pageSize,
delay,
}: {
fetchData: (
limit: number,
nextCursor: string | undefined
) => Promise<{ data: TData[]; nextUrl: string | undefined }>
pageSize: number
delay?: { onDelay: (message: string) => void; milliseconds: number }
}): Promise<TData[]> {
let hasNextPage = true
let nextCursor: string | undefined = undefined
const result: TData[] = []
while (hasNextPage) {
// Fetch one page of data
const response: { data: TData[]; nextUrl: string | undefined } = await fetchData(
pageSize,
nextCursor
)
const data = response.data
const nextUrl: string | undefined = response.nextUrl ?? undefined
nextCursor = nextUrl ? new URL(nextUrl).searchParams.get('cursor') ?? undefined : undefined
// Add fetched data to the result
result.push(...data)
// Determine if there is a next page
hasNextPage = !!nextCursor
// Delay the next request if needed
if (delay) {
delay.onDelay(`Waiting ${delay.milliseconds / 1000} seconds`)
await new Promise((resolve) => setTimeout(resolve, delay.milliseconds))
}
}
return result
}
/** /**
* Helper function for paginating data using a generator (typically from an API) * Helper function for paginating data using a generator (typically from an API)
*/ */

View file

@ -0,0 +1,5 @@
-- CreateEnum
CREATE TYPE "SecurityProvider" AS ENUM ('polygon', 'other');
-- AlterTable
ALTER TABLE "security" ADD COLUMN "provider_name" "SecurityProvider" DEFAULT 'other';

View file

@ -0,0 +1,4 @@
-- AlterTable
ALTER TABLE "security" ADD COLUMN "exchange_acronym" TEXT,
ADD COLUMN "exchange_mic" TEXT,
ADD COLUMN "exchange_name" TEXT;

View file

@ -243,6 +243,11 @@ model InvestmentTransaction {
@@map("investment_transaction") @@map("investment_transaction")
} }
enum SecurityProvider {
polygon
other
}
model Security { model Security {
id Int @id @default(autoincrement()) id Int @id @default(autoincrement())
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6) createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6)
@ -255,6 +260,10 @@ model Security {
currencyCode String @default("USD") @map("currency_code") currencyCode String @default("USD") @map("currency_code")
pricingLastSyncedAt DateTime? @map("pricing_last_synced_at") @db.Timestamptz(6) pricingLastSyncedAt DateTime? @map("pricing_last_synced_at") @db.Timestamptz(6)
isBrokerageCash Boolean @default(false) @map("is_brokerage_cash") isBrokerageCash Boolean @default(false) @map("is_brokerage_cash")
exchangeAcroynm String? @map("exchange_acronym")
exchangeMic String? @map("exchange_mic")
exchangeName String? @map("exchange_name")
providerName SecurityProvider? @default(other) @map("provider_name")
// plaid data // plaid data
plaidSecurityId String? @unique @map("plaid_security_id") plaidSecurityId String? @unique @map("plaid_security_id")