diff --git a/Gemfile b/Gemfile index 2a32b38b..3ad4f0a8 100644 --- a/Gemfile +++ b/Gemfile @@ -29,6 +29,7 @@ gem "hotwire_combobox" # Background Jobs gem "sidekiq" +gem "sidekiq-cron" # Monitoring gem "vernier" diff --git a/Gemfile.lock b/Gemfile.lock index 81c6ccb0..a8c86724 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -144,6 +144,9 @@ GEM bigdecimal rexml crass (1.0.6) + cronex (0.15.0) + tzinfo + unicode (>= 0.4.4.5) css_parser (1.21.1) addressable csv (3.3.4) @@ -165,6 +168,8 @@ GEM rubocop (>= 1) smart_properties erubi (1.13.1) + et-orbi (1.2.11) + tzinfo event_stream_parser (1.0.0) faker (3.5.1) i18n (>= 1.8.11, < 2) @@ -187,6 +192,9 @@ GEM ffi (1.17.2-x86_64-linux-gnu) ffi (1.17.2-x86_64-linux-musl) foreman (0.88.1) + fugit (1.11.1) + et-orbi (~> 1, >= 1.2.11) + raabro (~> 1.4) globalid (1.2.1) activesupport (>= 6.1) hashdiff (1.1.2) @@ -351,6 +359,7 @@ GEM public_suffix (6.0.1) puma (6.6.0) nio4r (~> 2.0) + raabro (1.4.0) racc (1.8.1) rack (3.1.13) rack-mini-profiler (3.3.1) @@ -491,6 +500,11 @@ GEM logger (>= 1.6.2) rack (>= 3.1.0) redis-client (>= 0.23.2) + sidekiq-cron (2.2.0) + cronex (>= 0.13.0) + fugit (~> 1.8, >= 1.11.1) + globalid (>= 1.0.1) + sidekiq (>= 6.5.0) simplecov (0.22.0) docile (~> 1.1) simplecov-html (~> 0.11) @@ -524,6 +538,7 @@ GEM railties (>= 7.1.0) tzinfo (2.0.6) concurrent-ruby (~> 1.0) + unicode (0.4.4.5) unicode-display_width (3.1.4) unicode-emoji (~> 4.0, >= 4.0.4) unicode-emoji (4.0.4) @@ -619,6 +634,7 @@ DEPENDENCIES sentry-ruby sentry-sidekiq sidekiq + sidekiq-cron simplecov skylight stimulus-rails diff --git a/app/jobs/sync_market_data_job.rb b/app/jobs/sync_market_data_job.rb new file mode 100644 index 00000000..4c911593 --- /dev/null +++ b/app/jobs/sync_market_data_job.rb @@ -0,0 +1,9 @@ +class SyncMarketDataJob < ApplicationJob + queue_as :scheduled + + def perform(*args) + syncer = MarketDataSyncer.new + syncer.sync_exchange_rates + syncer.sync_prices + end +end diff --git a/app/models/holding/forward_calculator.rb b/app/models/holding/forward_calculator.rb index 22759416..43f91f7a 100644 --- a/app/models/holding/forward_calculator.rb +++ b/app/models/holding/forward_calculator.rb @@ -7,7 +7,17 @@ class Holding::ForwardCalculator def calculate Rails.logger.tagged("Holding::ForwardCalculator") do - holdings = calculate_holdings + current_portfolio = generate_starting_portfolio + next_portfolio = {} + holdings = [] + + account.start_date.upto(Date.current).each do |date| + trades = portfolio_cache.get_trades(date: date) + next_portfolio = transform_portfolio(current_portfolio, trades, direction: :forward) + holdings += build_holdings(next_portfolio, date) + current_portfolio = next_portfolio + end + Holding.gapfill(holdings) end end @@ -59,19 +69,4 @@ class Holding::ForwardCalculator ) end.compact end - - def calculate_holdings - current_portfolio = generate_starting_portfolio - next_portfolio = {} - holdings = [] - - account.start_date.upto(Date.current).each do |date| - trades = portfolio_cache.get_trades(date: date) - next_portfolio = transform_portfolio(current_portfolio, trades, direction: :forward) - holdings += build_holdings(next_portfolio, date) - current_portfolio = next_portfolio - end - - holdings - end end diff --git a/app/models/holding/portfolio_cache.rb b/app/models/holding/portfolio_cache.rb index 2d67a1d8..9ffed15b 100644 --- a/app/models/holding/portfolio_cache.rb +++ b/app/models/holding/portfolio_cache.rb @@ -83,9 +83,6 @@ class Holding::PortfolioCache securities.each do |security| Rails.logger.info "Loading security: ID=#{security.id} Ticker=#{security.ticker}" - # Load prices from provider to DB - security.sync_provider_prices(start_date: account.start_date) - # High priority prices from DB (synced from provider) db_prices = security.prices.where(date: account.start_date..Date.current).map do |price| PriceWithPriority.new( diff --git a/app/models/market_data_syncer.rb b/app/models/market_data_syncer.rb new file mode 100644 index 00000000..cd9c382f --- /dev/null +++ b/app/models/market_data_syncer.rb @@ -0,0 +1,183 @@ +class MarketDataSyncer + DEFAULT_HISTORY_DAYS = 30 + RATE_PROVIDER_NAME = :synth + PRICE_PROVIDER_NAME = :synth + + MissingExchangeRateError = Class.new(StandardError) + InvalidExchangeRateDataError = Class.new(StandardError) + MissingSecurityPriceError = Class.new(StandardError) + InvalidSecurityPriceDataError = Class.new(StandardError) + + class << self + def for(family: nil, account: nil) + new(family: family, account: account) + end + end + + # Syncer can optionally be scoped. Otherwise, it syncs all user data + def initialize(family: nil, account: nil) + @family = family + @account = account + end + + def sync_exchange_rates(full_history: false) + unless rate_provider + Rails.logger.warn("No rate provider configured for MarketDataSyncer.sync_exchange_rates, skipping sync") + return + end + + # Finds distinct currency pairs + entry_pairs = entries_scope.joins(:account) + .where.not("entries.currency = accounts.currency") + .select("entries.currency as source, accounts.currency as target") + .distinct + + # All accounts in currency not equal to the family currency require exchange rates to show a normalized historical graph + account_pairs = accounts_scope.joins(:family) + .where.not("families.currency = accounts.currency") + .select("accounts.currency as source, families.currency as target") + .distinct + + pairs = (entry_pairs + account_pairs).uniq + + pairs.each do |pair| + sync_exchange_rate(from: pair.source, to: pair.target, full_history: full_history) + end + end + + def sync_prices(full_history: false) + unless price_provider + Rails.logger.warn("No price provider configured for MarketDataSyncer.sync_prices, skipping sync") + nil + end + + securities_scope.each do |security| + sync_security_price(security: security, full_history: full_history) + end + end + + private + attr_reader :family, :account + + def accounts_scope + return Account.where(id: account.id) if account + return family.accounts if family + Account.all + end + + def entries_scope + account&.entries || family&.entries || Entry.all + end + + def securities_scope + if account + account.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil }) + elsif family + family.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil }) + else + Security.where.not(exchange_operating_mic: nil) + end + end + + def sync_security_price(security:, full_history:) + start_date = full_history ? find_oldest_required_price(security: security) : default_start_date + + Rails.logger.info("Syncing security price for: #{security.ticker}, start_date: #{start_date}, end_date: #{end_date}") + + fetched_prices = price_provider.fetch_security_prices( + security.ticker, + start_date: start_date, + end_date: end_date + ) + + unless fetched_prices.success? + message = "#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}" + Rails.logger.warn(message) + Sentry.capture_exception(MissingSecurityPriceError.new(message)) + return + end + + prices_for_upsert = fetched_prices.data.map do |price| + if price.security.nil? || price.date.nil? || price.price.nil? || price.currency.nil? + message = "#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}" + Rails.logger.warn(message) + Sentry.capture_exception(InvalidSecurityPriceDataError.new(message)) + next + end + + { + security_id: price.security.id, + date: price.date, + price: price.price, + currency: price.currency + } + end.compact + + Security::Price.upsert_all( + prices_for_upsert, + unique_by: %i[security_id date currency] + ) + end + + def sync_exchange_rate(from:, to:, full_history:) + start_date = full_history ? find_oldest_required_rate(from_currency: from) : default_start_date + + Rails.logger.info("Syncing exchange rate from: #{from}, to: #{to}, start_date: #{start_date}, end_date: #{end_date}") + + fetched_rates = rate_provider.fetch_exchange_rates( + from: from, + to: to, + start_date: start_date, + end_date: end_date + ) + + unless fetched_rates.success? + message = "#{RATE_PROVIDER_NAME} could not fetch exchange rate pair from: #{from} to: #{to} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_rates.error.message}" + Rails.logger.warn(message) + Sentry.capture_exception(MissingExchangeRateError.new(message)) + return + end + + rates_for_upsert = fetched_rates.data.map do |rate| + if rate.from.nil? || rate.to.nil? || rate.date.nil? || rate.rate.nil? + message = "#{RATE_PROVIDER_NAME} returned invalid rate data for pair from: #{from} to: #{to} on: #{rate.date}. Rate data: #{rate.inspect}" + Rails.logger.warn(message) + Sentry.capture_exception(InvalidExchangeRateDataError.new(message)) + next + end + + { + from_currency: rate.from, + to_currency: rate.to, + date: rate.date, + rate: rate.rate + } + end.compact + + ExchangeRate.upsert_all( + rates_for_upsert, + unique_by: %i[from_currency to_currency date] + ) + end + + def rate_provider + Provider::Registry.for_concept(:exchange_rates).get_provider(RATE_PROVIDER_NAME) + end + + def price_provider + Provider::Registry.for_concept(:securities).get_provider(PRICE_PROVIDER_NAME) + end + + def find_oldest_required_rate(from_currency:) + entries_scope.where(currency: from_currency).minimum(:date) || default_start_date + end + + def default_start_date + DEFAULT_HISTORY_DAYS.days.ago.to_date + end + + # Since we're querying market data from a US-based API, end date should always be today (EST) + def end_date + Date.current.in_time_zone("America/New_York").to_date + end +end diff --git a/app/models/provider.rb b/app/models/provider.rb index c90866e7..e9702349 100644 --- a/app/models/provider.rb +++ b/app/models/provider.rb @@ -36,8 +36,6 @@ class Provider default_error_transformer(error) end - Sentry.capture_exception(transformed_error) - Response.new( success?: false, data: nil, diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 1209a4fa..9040f864 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -7,3 +7,8 @@ Sidekiq::Web.use(Rack::Auth::Basic) do |username, password| ActiveSupport::SecurityUtils.secure_compare(::Digest::SHA256.hexdigest(username), configured_username) && ActiveSupport::SecurityUtils.secure_compare(::Digest::SHA256.hexdigest(password), configured_password) end + +Sidekiq::Cron.configure do |config| + # 10 min "catch-up" window in case worker process is re-deploying when cron tick occurs + config.reschedule_grace_period = 600 +end diff --git a/config/routes.rb b/config/routes.rb index 8384d116..e44265f6 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -1,4 +1,5 @@ require "sidekiq/web" +require "sidekiq/cron/web" Rails.application.routes.draw do # MFA routes diff --git a/config/schedule.yml b/config/schedule.yml new file mode 100644 index 00000000..561e2327 --- /dev/null +++ b/config/schedule.yml @@ -0,0 +1,5 @@ +sync_market_data: + cron: "0 17 * * 1-5" # 5:00 PM EST (1 hour after market close) + class: "SyncMarketDataJob" + queue: "scheduled" + description: "Syncs market data daily at 5:00 PM EST (1 hour after market close)" diff --git a/config/sidekiq.yml b/config/sidekiq.yml index 20343e8c..4fce6f00 100644 --- a/config/sidekiq.yml +++ b/config/sidekiq.yml @@ -1,6 +1,7 @@ concurrency: <%= ENV.fetch("RAILS_MAX_THREADS") { 3 } %> queues: - - [high_priority, 6] + - [scheduled, 10] # For cron-like jobs (e.g. "daily market data sync") + - [high_priority, 4] - [medium_priority, 2] - [low_priority, 1] - [default, 1] diff --git a/test/models/holding/portfolio_cache_test.rb b/test/models/holding/portfolio_cache_test.rb index 2518e2b7..4677fc41 100644 --- a/test/models/holding/portfolio_cache_test.rb +++ b/test/models/holding/portfolio_cache_test.rb @@ -28,37 +28,18 @@ class Holding::PortfolioCacheTest < ActiveSupport::TestCase price: db_price ) - expect_provider_prices([], start_date: @account.start_date) - cache = Holding::PortfolioCache.new(@account) assert_equal db_price, cache.get_price(@security.id, Date.current).price end - test "if no price in DB, try fetching from provider" do - Security::Price.delete_all - - provider_price = Security::Price.new( - security: @security, - date: Date.current, - price: 220, - currency: "USD" - ) - - expect_provider_prices([ provider_price ], start_date: @account.start_date) - - cache = Holding::PortfolioCache.new(@account) - assert_equal provider_price.price, cache.get_price(@security.id, Date.current).price - end - - test "if no price from db or provider, try getting the price from trades" do + test "if no price from db, try getting the price from trades" do Security::Price.destroy_all - expect_provider_prices([], start_date: @account.start_date) cache = Holding::PortfolioCache.new(@account) assert_equal @trade.price, cache.get_price(@security.id, @trade.entry.date).price end - test "if no price from db, provider, or trades, search holdings" do + test "if no price from db or trades, search holdings" do Security::Price.delete_all Entry.delete_all @@ -72,16 +53,7 @@ class Holding::PortfolioCacheTest < ActiveSupport::TestCase currency: "USD" ) - expect_provider_prices([], start_date: @account.start_date) - cache = Holding::PortfolioCache.new(@account, use_holdings: true) assert_equal holding.price, cache.get_price(@security.id, holding.date).price end - - private - def expect_provider_prices(prices, start_date:, end_date: Date.current) - @provider.expects(:fetch_security_prices) - .with(@security, start_date: start_date, end_date: end_date) - .returns(provider_success_response(prices)) - end end diff --git a/test/models/market_data_syncer_test.rb b/test/models/market_data_syncer_test.rb new file mode 100644 index 00000000..8f00e753 --- /dev/null +++ b/test/models/market_data_syncer_test.rb @@ -0,0 +1,71 @@ +require "test_helper" +require "ostruct" + +class MarketDataSyncerTest < ActiveSupport::TestCase + include EntriesTestHelper, ProviderTestHelper + + test "syncs exchange rates with upsert" do + empty_db + + family1 = Family.create!(name: "Family 1", currency: "USD") + account1 = family1.accounts.create!(name: "Account 1", currency: "USD", balance: 100, accountable: Depository.new) + account2 = family1.accounts.create!(name: "Account 2", currency: "CAD", balance: 100, accountable: Depository.new) + + family2 = Family.create!(name: "Family 2", currency: "EUR") + account3 = family2.accounts.create!(name: "Account 3", currency: "EUR", balance: 100, accountable: Depository.new) + account4 = family2.accounts.create!(name: "Account 4", currency: "USD", balance: 100, accountable: Depository.new) + + mock_provider = mock + Provider::Registry.any_instance.expects(:get_provider).with(:synth).returns(mock_provider).at_least_once + + start_date = 1.month.ago.to_date + end_date = Date.current.in_time_zone("America/New_York").to_date + + # Put an existing rate in DB to test upsert + ExchangeRate.create!(from_currency: "CAD", to_currency: "USD", date: start_date, rate: 2.0) + + mock_provider.expects(:fetch_exchange_rates) + .with(from: "CAD", to: "USD", start_date: start_date, end_date: end_date) + .returns(provider_success_response([ OpenStruct.new(from: "CAD", to: "USD", date: start_date, rate: 1.0) ])) + + mock_provider.expects(:fetch_exchange_rates) + .with(from: "USD", to: "EUR", start_date: start_date, end_date: end_date) + .returns(provider_success_response([ OpenStruct.new(from: "USD", to: "EUR", date: start_date, rate: 1.0) ])) + + assert_difference "ExchangeRate.count", 1 do + MarketDataSyncer.new.sync_exchange_rates + end + + assert_equal 1.0, ExchangeRate.where(from_currency: "CAD", to_currency: "USD", date: start_date).first.rate + end + + test "syncs security prices with upsert" do + empty_db + + aapl = Security.create!(ticker: "AAPL", exchange_operating_mic: "XNAS") + + family = Family.create!(name: "Family 1", currency: "USD") + account = family.accounts.create!(name: "Account 1", currency: "USD", balance: 100, accountable: Investment.new) + + mock_provider = mock + Provider::Registry.any_instance.expects(:get_provider).with(:synth).returns(mock_provider).at_least_once + + start_date = 1.month.ago.to_date + end_date = Date.current.in_time_zone("America/New_York").to_date + + mock_provider.expects(:fetch_security_prices) + .with("AAPL", start_date: start_date, end_date: end_date) + .returns(provider_success_response([ OpenStruct.new(security: aapl, date: start_date, price: 100, currency: "USD") ])) + + assert_difference "Security::Price.count", 1 do + MarketDataSyncer.new.sync_prices + end + end + + private + def empty_db + Invitation.destroy_all + Family.destroy_all + Security.destroy_all + end +end