1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-08-08 23:15:24 +02:00

Basic market data sync cron

This commit is contained in:
Zach Gollwitzer 2025-05-12 10:40:43 -04:00
parent 5c8bca31ec
commit 0fbafceea9
13 changed files with 306 additions and 52 deletions

View file

@ -29,6 +29,7 @@ gem "hotwire_combobox"
# Background Jobs
gem "sidekiq"
gem "sidekiq-cron"
# Monitoring
gem "vernier"

View file

@ -144,6 +144,9 @@ GEM
bigdecimal
rexml
crass (1.0.6)
cronex (0.15.0)
tzinfo
unicode (>= 0.4.4.5)
css_parser (1.21.1)
addressable
csv (3.3.4)
@ -165,6 +168,8 @@ GEM
rubocop (>= 1)
smart_properties
erubi (1.13.1)
et-orbi (1.2.11)
tzinfo
event_stream_parser (1.0.0)
faker (3.5.1)
i18n (>= 1.8.11, < 2)
@ -187,6 +192,9 @@ GEM
ffi (1.17.2-x86_64-linux-gnu)
ffi (1.17.2-x86_64-linux-musl)
foreman (0.88.1)
fugit (1.11.1)
et-orbi (~> 1, >= 1.2.11)
raabro (~> 1.4)
globalid (1.2.1)
activesupport (>= 6.1)
hashdiff (1.1.2)
@ -351,6 +359,7 @@ GEM
public_suffix (6.0.1)
puma (6.6.0)
nio4r (~> 2.0)
raabro (1.4.0)
racc (1.8.1)
rack (3.1.13)
rack-mini-profiler (3.3.1)
@ -491,6 +500,11 @@ GEM
logger (>= 1.6.2)
rack (>= 3.1.0)
redis-client (>= 0.23.2)
sidekiq-cron (2.2.0)
cronex (>= 0.13.0)
fugit (~> 1.8, >= 1.11.1)
globalid (>= 1.0.1)
sidekiq (>= 6.5.0)
simplecov (0.22.0)
docile (~> 1.1)
simplecov-html (~> 0.11)
@ -524,6 +538,7 @@ GEM
railties (>= 7.1.0)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode (0.4.4.5)
unicode-display_width (3.1.4)
unicode-emoji (~> 4.0, >= 4.0.4)
unicode-emoji (4.0.4)
@ -619,6 +634,7 @@ DEPENDENCIES
sentry-ruby
sentry-sidekiq
sidekiq
sidekiq-cron
simplecov
skylight
stimulus-rails

View file

@ -0,0 +1,9 @@
class SyncMarketDataJob < ApplicationJob
queue_as :scheduled
def perform(*args)
syncer = MarketDataSyncer.new
syncer.sync_exchange_rates
syncer.sync_prices
end
end

View file

@ -7,7 +7,17 @@ class Holding::ForwardCalculator
def calculate
Rails.logger.tagged("Holding::ForwardCalculator") do
holdings = calculate_holdings
current_portfolio = generate_starting_portfolio
next_portfolio = {}
holdings = []
account.start_date.upto(Date.current).each do |date|
trades = portfolio_cache.get_trades(date: date)
next_portfolio = transform_portfolio(current_portfolio, trades, direction: :forward)
holdings += build_holdings(next_portfolio, date)
current_portfolio = next_portfolio
end
Holding.gapfill(holdings)
end
end
@ -59,19 +69,4 @@ class Holding::ForwardCalculator
)
end.compact
end
def calculate_holdings
current_portfolio = generate_starting_portfolio
next_portfolio = {}
holdings = []
account.start_date.upto(Date.current).each do |date|
trades = portfolio_cache.get_trades(date: date)
next_portfolio = transform_portfolio(current_portfolio, trades, direction: :forward)
holdings += build_holdings(next_portfolio, date)
current_portfolio = next_portfolio
end
holdings
end
end

View file

@ -83,9 +83,6 @@ class Holding::PortfolioCache
securities.each do |security|
Rails.logger.info "Loading security: ID=#{security.id} Ticker=#{security.ticker}"
# Load prices from provider to DB
security.sync_provider_prices(start_date: account.start_date)
# High priority prices from DB (synced from provider)
db_prices = security.prices.where(date: account.start_date..Date.current).map do |price|
PriceWithPriority.new(

View file

@ -0,0 +1,183 @@
class MarketDataSyncer
DEFAULT_HISTORY_DAYS = 30
RATE_PROVIDER_NAME = :synth
PRICE_PROVIDER_NAME = :synth
MissingExchangeRateError = Class.new(StandardError)
InvalidExchangeRateDataError = Class.new(StandardError)
MissingSecurityPriceError = Class.new(StandardError)
InvalidSecurityPriceDataError = Class.new(StandardError)
class << self
def for(family: nil, account: nil)
new(family: family, account: account)
end
end
# Syncer can optionally be scoped. Otherwise, it syncs all user data
def initialize(family: nil, account: nil)
@family = family
@account = account
end
def sync_exchange_rates(full_history: false)
unless rate_provider
Rails.logger.warn("No rate provider configured for MarketDataSyncer.sync_exchange_rates, skipping sync")
return
end
# Finds distinct currency pairs
entry_pairs = entries_scope.joins(:account)
.where.not("entries.currency = accounts.currency")
.select("entries.currency as source, accounts.currency as target")
.distinct
# All accounts in currency not equal to the family currency require exchange rates to show a normalized historical graph
account_pairs = accounts_scope.joins(:family)
.where.not("families.currency = accounts.currency")
.select("accounts.currency as source, families.currency as target")
.distinct
pairs = (entry_pairs + account_pairs).uniq
pairs.each do |pair|
sync_exchange_rate(from: pair.source, to: pair.target, full_history: full_history)
end
end
def sync_prices(full_history: false)
unless price_provider
Rails.logger.warn("No price provider configured for MarketDataSyncer.sync_prices, skipping sync")
nil
end
securities_scope.each do |security|
sync_security_price(security: security, full_history: full_history)
end
end
private
attr_reader :family, :account
def accounts_scope
return Account.where(id: account.id) if account
return family.accounts if family
Account.all
end
def entries_scope
account&.entries || family&.entries || Entry.all
end
def securities_scope
if account
account.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil })
elsif family
family.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil })
else
Security.where.not(exchange_operating_mic: nil)
end
end
def sync_security_price(security:, full_history:)
start_date = full_history ? find_oldest_required_price(security: security) : default_start_date
Rails.logger.info("Syncing security price for: #{security.ticker}, start_date: #{start_date}, end_date: #{end_date}")
fetched_prices = price_provider.fetch_security_prices(
security.ticker,
start_date: start_date,
end_date: end_date
)
unless fetched_prices.success?
message = "#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}"
Rails.logger.warn(message)
Sentry.capture_exception(MissingSecurityPriceError.new(message))
return
end
prices_for_upsert = fetched_prices.data.map do |price|
if price.security.nil? || price.date.nil? || price.price.nil? || price.currency.nil?
message = "#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}"
Rails.logger.warn(message)
Sentry.capture_exception(InvalidSecurityPriceDataError.new(message))
next
end
{
security_id: price.security.id,
date: price.date,
price: price.price,
currency: price.currency
}
end.compact
Security::Price.upsert_all(
prices_for_upsert,
unique_by: %i[security_id date currency]
)
end
def sync_exchange_rate(from:, to:, full_history:)
start_date = full_history ? find_oldest_required_rate(from_currency: from) : default_start_date
Rails.logger.info("Syncing exchange rate from: #{from}, to: #{to}, start_date: #{start_date}, end_date: #{end_date}")
fetched_rates = rate_provider.fetch_exchange_rates(
from: from,
to: to,
start_date: start_date,
end_date: end_date
)
unless fetched_rates.success?
message = "#{RATE_PROVIDER_NAME} could not fetch exchange rate pair from: #{from} to: #{to} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_rates.error.message}"
Rails.logger.warn(message)
Sentry.capture_exception(MissingExchangeRateError.new(message))
return
end
rates_for_upsert = fetched_rates.data.map do |rate|
if rate.from.nil? || rate.to.nil? || rate.date.nil? || rate.rate.nil?
message = "#{RATE_PROVIDER_NAME} returned invalid rate data for pair from: #{from} to: #{to} on: #{rate.date}. Rate data: #{rate.inspect}"
Rails.logger.warn(message)
Sentry.capture_exception(InvalidExchangeRateDataError.new(message))
next
end
{
from_currency: rate.from,
to_currency: rate.to,
date: rate.date,
rate: rate.rate
}
end.compact
ExchangeRate.upsert_all(
rates_for_upsert,
unique_by: %i[from_currency to_currency date]
)
end
def rate_provider
Provider::Registry.for_concept(:exchange_rates).get_provider(RATE_PROVIDER_NAME)
end
def price_provider
Provider::Registry.for_concept(:securities).get_provider(PRICE_PROVIDER_NAME)
end
def find_oldest_required_rate(from_currency:)
entries_scope.where(currency: from_currency).minimum(:date) || default_start_date
end
def default_start_date
DEFAULT_HISTORY_DAYS.days.ago.to_date
end
# Since we're querying market data from a US-based API, end date should always be today (EST)
def end_date
Date.current.in_time_zone("America/New_York").to_date
end
end

View file

@ -36,8 +36,6 @@ class Provider
default_error_transformer(error)
end
Sentry.capture_exception(transformed_error)
Response.new(
success?: false,
data: nil,

View file

@ -7,3 +7,8 @@ Sidekiq::Web.use(Rack::Auth::Basic) do |username, password|
ActiveSupport::SecurityUtils.secure_compare(::Digest::SHA256.hexdigest(username), configured_username) &&
ActiveSupport::SecurityUtils.secure_compare(::Digest::SHA256.hexdigest(password), configured_password)
end
Sidekiq::Cron.configure do |config|
# 10 min "catch-up" window in case worker process is re-deploying when cron tick occurs
config.reschedule_grace_period = 600
end

View file

@ -1,4 +1,5 @@
require "sidekiq/web"
require "sidekiq/cron/web"
Rails.application.routes.draw do
# MFA routes

5
config/schedule.yml Normal file
View file

@ -0,0 +1,5 @@
sync_market_data:
cron: "0 17 * * 1-5" # 5:00 PM EST (1 hour after market close)
class: "SyncMarketDataJob"
queue: "scheduled"
description: "Syncs market data daily at 5:00 PM EST (1 hour after market close)"

View file

@ -1,6 +1,7 @@
concurrency: <%= ENV.fetch("RAILS_MAX_THREADS") { 3 } %>
queues:
- [high_priority, 6]
- [scheduled, 10] # For cron-like jobs (e.g. "daily market data sync")
- [high_priority, 4]
- [medium_priority, 2]
- [low_priority, 1]
- [default, 1]

View file

@ -28,37 +28,18 @@ class Holding::PortfolioCacheTest < ActiveSupport::TestCase
price: db_price
)
expect_provider_prices([], start_date: @account.start_date)
cache = Holding::PortfolioCache.new(@account)
assert_equal db_price, cache.get_price(@security.id, Date.current).price
end
test "if no price in DB, try fetching from provider" do
Security::Price.delete_all
provider_price = Security::Price.new(
security: @security,
date: Date.current,
price: 220,
currency: "USD"
)
expect_provider_prices([ provider_price ], start_date: @account.start_date)
cache = Holding::PortfolioCache.new(@account)
assert_equal provider_price.price, cache.get_price(@security.id, Date.current).price
end
test "if no price from db or provider, try getting the price from trades" do
test "if no price from db, try getting the price from trades" do
Security::Price.destroy_all
expect_provider_prices([], start_date: @account.start_date)
cache = Holding::PortfolioCache.new(@account)
assert_equal @trade.price, cache.get_price(@security.id, @trade.entry.date).price
end
test "if no price from db, provider, or trades, search holdings" do
test "if no price from db or trades, search holdings" do
Security::Price.delete_all
Entry.delete_all
@ -72,16 +53,7 @@ class Holding::PortfolioCacheTest < ActiveSupport::TestCase
currency: "USD"
)
expect_provider_prices([], start_date: @account.start_date)
cache = Holding::PortfolioCache.new(@account, use_holdings: true)
assert_equal holding.price, cache.get_price(@security.id, holding.date).price
end
private
def expect_provider_prices(prices, start_date:, end_date: Date.current)
@provider.expects(:fetch_security_prices)
.with(@security, start_date: start_date, end_date: end_date)
.returns(provider_success_response(prices))
end
end

View file

@ -0,0 +1,71 @@
require "test_helper"
require "ostruct"
class MarketDataSyncerTest < ActiveSupport::TestCase
include EntriesTestHelper, ProviderTestHelper
test "syncs exchange rates with upsert" do
empty_db
family1 = Family.create!(name: "Family 1", currency: "USD")
account1 = family1.accounts.create!(name: "Account 1", currency: "USD", balance: 100, accountable: Depository.new)
account2 = family1.accounts.create!(name: "Account 2", currency: "CAD", balance: 100, accountable: Depository.new)
family2 = Family.create!(name: "Family 2", currency: "EUR")
account3 = family2.accounts.create!(name: "Account 3", currency: "EUR", balance: 100, accountable: Depository.new)
account4 = family2.accounts.create!(name: "Account 4", currency: "USD", balance: 100, accountable: Depository.new)
mock_provider = mock
Provider::Registry.any_instance.expects(:get_provider).with(:synth).returns(mock_provider).at_least_once
start_date = 1.month.ago.to_date
end_date = Date.current.in_time_zone("America/New_York").to_date
# Put an existing rate in DB to test upsert
ExchangeRate.create!(from_currency: "CAD", to_currency: "USD", date: start_date, rate: 2.0)
mock_provider.expects(:fetch_exchange_rates)
.with(from: "CAD", to: "USD", start_date: start_date, end_date: end_date)
.returns(provider_success_response([ OpenStruct.new(from: "CAD", to: "USD", date: start_date, rate: 1.0) ]))
mock_provider.expects(:fetch_exchange_rates)
.with(from: "USD", to: "EUR", start_date: start_date, end_date: end_date)
.returns(provider_success_response([ OpenStruct.new(from: "USD", to: "EUR", date: start_date, rate: 1.0) ]))
assert_difference "ExchangeRate.count", 1 do
MarketDataSyncer.new.sync_exchange_rates
end
assert_equal 1.0, ExchangeRate.where(from_currency: "CAD", to_currency: "USD", date: start_date).first.rate
end
test "syncs security prices with upsert" do
empty_db
aapl = Security.create!(ticker: "AAPL", exchange_operating_mic: "XNAS")
family = Family.create!(name: "Family 1", currency: "USD")
account = family.accounts.create!(name: "Account 1", currency: "USD", balance: 100, accountable: Investment.new)
mock_provider = mock
Provider::Registry.any_instance.expects(:get_provider).with(:synth).returns(mock_provider).at_least_once
start_date = 1.month.ago.to_date
end_date = Date.current.in_time_zone("America/New_York").to_date
mock_provider.expects(:fetch_security_prices)
.with("AAPL", start_date: start_date, end_date: end_date)
.returns(provider_success_response([ OpenStruct.new(security: aapl, date: start_date, price: 100, currency: "USD") ]))
assert_difference "Security::Price.count", 1 do
MarketDataSyncer.new.sync_prices
end
end
private
def empty_db
Invitation.destroy_all
Family.destroy_all
Security.destroy_all
end
end