diff --git a/app/jobs/sync_market_data_job.rb b/app/jobs/sync_market_data_job.rb index 074cda9f..d7bdc15d 100644 --- a/app/jobs/sync_market_data_job.rb +++ b/app/jobs/sync_market_data_job.rb @@ -1,7 +1,16 @@ +# This job runs daily at market close. See config/schedule.yml for details. +# +# The primary purpose of this job is to: +# 1. Determine what exchange rate pairs, security prices, and other market data all of our users need to view historical account balance data +# 2. For each needed rate/price, fetch from our data provider and upsert to our database +# +# Each individual account sync will still fetch any missing market data that isn't yet synced, but by running +# this job daily, we significantly reduce overlapping account syncs that both need the same market data (e.g. common security like `AAPL`) +# class SyncMarketDataJob < ApplicationJob queue_as :scheduled - def perform - MarketDataSyncer.new.sync_all + def perform(mode: :full, clear_cache: false) + MarketDataSyncer.new.sync_all(mode: mode, clear_cache: clear_cache) end end diff --git a/app/models/exchange_rate/provided.rb b/app/models/exchange_rate/provided.rb index dbe87133..5237ea39 100644 --- a/app/models/exchange_rate/provided.rb +++ b/app/models/exchange_rate/provided.rb @@ -27,29 +27,20 @@ module ExchangeRate::Provided rate end - def sync_provider_rates(from:, to:, start_date:, end_date: Date.current) + def sync_provider_rates(from:, to:, start_date:, end_date:, clear_cache: false) unless provider.present? Rails.logger.warn("No provider configured for ExchangeRate.sync_provider_rates") return 0 end - fetched_rates = provider.fetch_exchange_rates(from: from, to: to, start_date: start_date, end_date: end_date) - - unless fetched_rates.success? - Rails.logger.error("Provider error for ExchangeRate.sync_provider_rates: #{fetched_rates.error}") - return 0 - end - - rates_data = fetched_rates.data.map do |rate| - { - from_currency: rate.from, - to_currency: rate.to, - date: rate.date, - rate: rate.rate - } - end - - ExchangeRate.upsert_all(rates_data, unique_by: %i[from_currency to_currency date]) + ExchangeRate::Syncer.new( + exchange_rate_provider: provider, + from: from, + to: to, + start_date: start_date, + end_date: end_date, + clear_cache: clear_cache + ).sync_provider_rates end end end diff --git a/app/models/exchange_rate/syncer.rb b/app/models/exchange_rate/syncer.rb new file mode 100644 index 00000000..69962a22 --- /dev/null +++ b/app/models/exchange_rate/syncer.rb @@ -0,0 +1,150 @@ +class ExchangeRate::Syncer + MissingExchangeRateError = Class.new(StandardError) + MissingStartRateError = Class.new(StandardError) + + def initialize(exchange_rate_provider:, from:, to:, start_date:, end_date:, clear_cache: false) + @exchange_rate_provider = exchange_rate_provider + @from = from + @to = to + @start_date = start_date + @end_date = normalize_end_date(end_date) + @clear_cache = clear_cache + end + + # Constructs a daily series of rates for the given currency pair for date range + def sync_provider_rates + if !clear_cache && all_rates_exist? + Rails.logger.info("No new rates to sync for #{from} to #{to} between #{start_date} and #{end_date}, skipping") + return + end + + if clear_cache && provider_rates.empty? + Rails.logger.warn("Could not clear cache for #{from} to #{to} between #{start_date} and #{end_date} because provider returned no rates") + return + end + + prev_rate_value = start_rate_value + + unless prev_rate_value.present? + error = MissingStartRateError.new("Could not find a start rate for #{from} to #{to} between #{start_date} and #{end_date}") + Rails.logger.error(error.message) + Sentry.capture_exception(error) + return + end + + gapfilled_rates = effective_start_date.upto(end_date).map do |date| + db_rate_value = db_rates[date]&.rate + provider_rate_value = provider_rates[date]&.rate + + chosen_rate = if clear_cache + provider_rate_value || db_rate_value # overwrite when possible + else + db_rate_value || provider_rate_value # fill gaps + end + + # Gapfill with LOCF strategy (last observation carried forward) + if chosen_rate.nil? + chosen_rate = prev_rate_value + end + + prev_rate_value = chosen_rate + + { + from_currency: from, + to_currency: to, + date: date, + rate: chosen_rate + } + end + + upsert_rows(gapfilled_rates) + end + + private + attr_reader :exchange_rate_provider, :from, :to, :start_date, :end_date, :clear_cache + + def upsert_rows(rows) + batch_size = 200 + + rows.each_slice(batch_size) do |batch| + ExchangeRate.upsert_all( + batch, + unique_by: %i[from_currency to_currency date], + returning: false + ) + end + end + + # Since provider may not return values on weekends and holidays, we grab the first rate from the provider that is on or before the start date + def start_rate_value + provider_rate_value = provider_rates.select { |date, _| date <= start_date }.max_by { |date, _| date }&.last + db_rate_value = db_rates[start_date]&.rate + provider_rate_value || db_rate_value + end + + # No need to fetch/upsert rates for dates that we already have in the DB + def effective_start_date + return start_date if clear_cache + + first_missing_date = nil + + start_date.upto(end_date) do |date| + unless db_rates.key?(date) + first_missing_date = date + break + end + end + + first_missing_date || end_date + end + + def provider_rates + @provider_rates ||= begin + # Always fetch with a 5 day buffer to ensure we have a starting rate (for weekends and holidays) + provider_fetch_start_date = effective_start_date - 5.days + + provider_response = exchange_rate_provider.fetch_exchange_rates( + from: from, + to: to, + start_date: provider_fetch_start_date, + end_date: end_date + ) + + if provider_response.success? + provider_response.data.index_by(&:date) + else + message = "#{exchange_rate_provider.class.name} could not fetch exchange rate pair from: #{from} to: #{to} between: #{effective_start_date} and: #{Date.current}. Provider error: #{provider_response.error.message}" + Rails.logger.warn(message) + Sentry.capture_exception(MissingExchangeRateError.new(message)) + {} + end + end + end + + def all_rates_exist? + db_count == expected_count + end + + def expected_count + (start_date..end_date).count + end + + def db_count + db_rates.count + end + + def db_rates + @db_rates ||= ExchangeRate.where(from_currency: from, to_currency: to, date: start_date..end_date) + .order(:date) + .to_a + .index_by(&:date) + end + + # Normalizes an end date so that it never exceeds today's date in the + # America/New_York timezone. If the caller passes a future date we clamp + # it to today so that upstream provider calls remain valid and predictable. + def normalize_end_date(requested_end_date) + today_est = Date.current.in_time_zone("America/New_York").to_date + [ requested_end_date, today_est ].min + end +end diff --git a/app/models/market_data_syncer.rb b/app/models/market_data_syncer.rb index d634cd45..fe0db445 100644 --- a/app/models/market_data_syncer.rb +++ b/app/models/market_data_syncer.rb @@ -8,24 +8,18 @@ class MarketDataSyncer MissingSecurityPriceError = Class.new(StandardError) InvalidSecurityPriceDataError = Class.new(StandardError) - class << self - def for(family: nil, account: nil) - new(family: family, account: account) - end - end - # Syncer can optionally be scoped. Otherwise, it syncs all user data def initialize(family: nil, account: nil) @family = family @account = account end - def sync_all(full_history: false) - sync_exchange_rates(full_history: full_history) - sync_prices(full_history: full_history) + def sync_all(full_history: false, clear_cache: false) + sync_exchange_rates(full_history: full_history, clear_cache: clear_cache) + sync_prices(full_history: full_history, clear_cache: clear_cache) end - def sync_exchange_rates(full_history: false) + def sync_exchange_rates(full_history: false, clear_cache: false) unless rate_provider Rails.logger.warn("No rate provider configured for MarketDataSyncer.sync_exchange_rates, skipping sync") return @@ -50,7 +44,7 @@ class MarketDataSyncer end end - def sync_prices(full_history: false) + def sync_prices(full_history: false, clear_cache: false) unless price_provider Rails.logger.warn("No price provider configured for MarketDataSyncer.sync_prices, skipping sync") nil @@ -61,6 +55,95 @@ class MarketDataSyncer end end + def sync_security_price(security:, full_history:, clear_cache:) + start_date = full_history ? find_oldest_required_price(security: security) : default_start_date + + Rails.logger.info("Syncing security price for: #{security.ticker}, start_date: #{start_date}, end_date: #{end_date}") + + fetched_prices = price_provider.fetch_security_prices( + security, + start_date: start_date, + end_date: end_date + ) + + unless fetched_prices.success? + error = MissingSecurityPriceError.new( + "#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}" + ) + + Rails.logger.warn(error.message) + Sentry.capture_exception(error, level: :warning) + + return + end + + prices_for_upsert = fetched_prices.data.map do |price| + if price.security.nil? || price.date.nil? || price.price.nil? || price.currency.nil? + error = InvalidSecurityPriceDataError.new( + "#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}" + ) + + Rails.logger.warn(error.message) + Sentry.capture_exception(error, level: :warning) + + next + end + + { + security_id: price.security.id, + date: price.date, + price: price.price, + currency: price.currency + } + end.compact + + Security::Price.upsert_all( + prices_for_upsert, + unique_by: %i[security_id date currency] + ) + end + + def sync_exchange_rate(from:, to:, full_history:, clear_cache:) + start_date = full_history ? find_oldest_required_rate(from_currency: from) : default_start_date + + Rails.logger.info("Syncing exchange rate from: #{from}, to: #{to}, start_date: #{start_date}, end_date: #{end_date}") + + fetched_rates = rate_provider.fetch_exchange_rates( + from: from, + to: to, + start_date: start_date, + end_date: end_date + ) + + unless fetched_rates.success? + message = "#{RATE_PROVIDER_NAME} could not fetch exchange rate pair from: #{from} to: #{to} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_rates.error.message}" + Rails.logger.warn(message) + Sentry.capture_exception(MissingExchangeRateError.new(message)) + return + end + + rates_for_upsert = fetched_rates.data.map do |rate| + if rate.from.nil? || rate.to.nil? || rate.date.nil? || rate.rate.nil? + message = "#{RATE_PROVIDER_NAME} returned invalid rate data for pair from: #{from} to: #{to} on: #{rate.date}. Rate data: #{rate.inspect}" + Rails.logger.warn(message) + Sentry.capture_exception(InvalidExchangeRateDataError.new(message)) + next + end + + { + from_currency: rate.from, + to_currency: rate.to, + date: rate.date, + rate: rate.rate + } + end.compact + + ExchangeRate.upsert_all( + rates_for_upsert, + unique_by: %i[from_currency to_currency date] + ) + end + private attr_reader :family, :account @@ -84,95 +167,6 @@ class MarketDataSyncer end end - def sync_security_price(security:, full_history:) - start_date = full_history ? find_oldest_required_price(security: security) : default_start_date - - Rails.logger.info("Syncing security price for: #{security.ticker}, start_date: #{start_date}, end_date: #{end_date}") - - fetched_prices = price_provider.fetch_security_prices( - security, - start_date: start_date, - end_date: end_date - ) - - unless fetched_prices.success? - error = MissingSecurityPriceError.new( - "#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}" - ) - - Rails.logger.warn(error.message) - Sentry.capture_exception(error, level: :warning) - - return - end - - prices_for_upsert = fetched_prices.data.map do |price| - if price.security.nil? || price.date.nil? || price.price.nil? || price.currency.nil? - error = InvalidSecurityPriceDataError.new( - "#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}" - ) - - Rails.logger.warn(error.message) - Sentry.capture_exception(error, level: :warning) - - next - end - - { - security_id: price.security.id, - date: price.date, - price: price.price, - currency: price.currency - } - end.compact - - Security::Price.upsert_all( - prices_for_upsert, - unique_by: %i[security_id date currency] - ) - end - - def sync_exchange_rate(from:, to:, full_history:) - start_date = full_history ? find_oldest_required_rate(from_currency: from) : default_start_date - - Rails.logger.info("Syncing exchange rate from: #{from}, to: #{to}, start_date: #{start_date}, end_date: #{end_date}") - - fetched_rates = rate_provider.fetch_exchange_rates( - from: from, - to: to, - start_date: start_date, - end_date: end_date - ) - - unless fetched_rates.success? - message = "#{RATE_PROVIDER_NAME} could not fetch exchange rate pair from: #{from} to: #{to} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_rates.error.message}" - Rails.logger.warn(message) - Sentry.capture_exception(MissingExchangeRateError.new(message)) - return - end - - rates_for_upsert = fetched_rates.data.map do |rate| - if rate.from.nil? || rate.to.nil? || rate.date.nil? || rate.rate.nil? - message = "#{RATE_PROVIDER_NAME} returned invalid rate data for pair from: #{from} to: #{to} on: #{rate.date}. Rate data: #{rate.inspect}" - Rails.logger.warn(message) - Sentry.capture_exception(InvalidExchangeRateDataError.new(message)) - next - end - - { - from_currency: rate.from, - to_currency: rate.to, - date: rate.date, - rate: rate.rate - } - end.compact - - ExchangeRate.upsert_all( - rates_for_upsert, - unique_by: %i[from_currency to_currency date] - ) - end - def rate_provider Provider::Registry.for_concept(:exchange_rates).get_provider(RATE_PROVIDER_NAME) end diff --git a/app/models/provider/synth.rb b/app/models/provider/synth.rb index ff75ff49..95ddf908 100644 --- a/app/models/provider/synth.rb +++ b/app/models/provider/synth.rb @@ -3,6 +3,8 @@ class Provider::Synth < Provider # Subclass so errors caught in this provider are raised as Provider::Synth::Error Error = Class.new(Provider::Error) + InvalidExchangeRateError = Class.new(Error) + InvalidSecurityPriceError = Class.new(Error) def initialize(api_key) @api_key = api_key @@ -65,8 +67,18 @@ class Provider::Synth < Provider end data.paginated.map do |rate| - Rate.new(date: rate.dig("date"), from:, to:, rate: rate.dig("rates", to)) - end + date = rate.dig("date") + rate = rate.dig("rates", to) + + if date.nil? || rate.nil? + message = "#{self.class.name} returned invalid rate data for pair from: #{from} to: #{to} on: #{date}. Rate data: #{rate.inspect}" + Rails.logger.warn(message) + Sentry.capture_exception(InvalidExchangeRateError.new(message), level: :warning) + next + end + + Rate.new(date:, from:, to:, rate:) + end.compact end end @@ -149,13 +161,23 @@ class Provider::Synth < Provider exchange_operating_mic = data.first_page.dig("exchange", "operating_mic_code") data.paginated.map do |price| + date = price.dig("date") + price = price.dig("close") || price.dig("open") + + if date.nil? || price.nil? + message = "#{self.class.name} returned invalid price data for security #{security.ticker} on: #{date}. Price data: #{price.inspect}" + Rails.logger.warn(message) + Sentry.capture_exception(InvalidSecurityPriceError.new(message), level: :warning) + next + end + Price.new( security: security, - date: price.dig("date"), - price: price.dig("close") || price.dig("open"), + date: date, + price: price, currency: currency ) - end + end.compact end end diff --git a/config/schedule.yml b/config/schedule.yml index 561e2327..6e0c4f52 100644 --- a/config/schedule.yml +++ b/config/schedule.yml @@ -1,5 +1,5 @@ sync_market_data: - cron: "0 17 * * 1-5" # 5:00 PM EST (1 hour after market close) + cron: "0 22 * * 1-5" # 5:00 PM EST / 6:00 PM EDT (NY time) class: "SyncMarketDataJob" queue: "scheduled" description: "Syncs market data daily at 5:00 PM EST (1 hour after market close)" diff --git a/test/models/exchange_rate/syncer_test.rb b/test/models/exchange_rate/syncer_test.rb new file mode 100644 index 00000000..58818834 --- /dev/null +++ b/test/models/exchange_rate/syncer_test.rb @@ -0,0 +1,148 @@ +require "test_helper" +require "ostruct" + +class ExchangeRate::SyncerTest < ActiveSupport::TestCase + include ProviderTestHelper + + setup do + @provider = mock + end + + test "syncs missing rates from provider" do + ExchangeRate.delete_all + + provider_response = provider_success_response([ + OpenStruct.new(from: "USD", to: "EUR", date: 2.days.ago.to_date, rate: 1.3), + OpenStruct.new(from: "USD", to: "EUR", date: 1.day.ago.to_date, rate: 1.4), + OpenStruct.new(from: "USD", to: "EUR", date: Date.current, rate: 1.5) + ]) + + @provider.expects(:fetch_exchange_rates) + .with(from: "USD", to: "EUR", start_date: get_provider_fetch_start_date(2.days.ago.to_date), end_date: Date.current) + .returns(provider_response) + + ExchangeRate::Syncer.new( + exchange_rate_provider: @provider, + from: "USD", + to: "EUR", + start_date: 2.days.ago.to_date, + end_date: Date.current + ).sync_provider_rates + + db_rates = ExchangeRate.where(from_currency: "USD", to_currency: "EUR", date: 2.days.ago.to_date..Date.current) + .order(:date) + + assert_equal 3, db_rates.count + assert_equal 1.3, db_rates[0].rate + assert_equal 1.4, db_rates[1].rate + assert_equal 1.5, db_rates[2].rate + end + + test "syncs diff when some rates already exist" do + ExchangeRate.delete_all + + # Pre-populate DB with the first two days + ExchangeRate.create!(from_currency: "USD", to_currency: "EUR", date: 3.days.ago.to_date, rate: 1.2) + ExchangeRate.create!(from_currency: "USD", to_currency: "EUR", date: 2.days.ago.to_date, rate: 1.25) + + provider_response = provider_success_response([ + OpenStruct.new(from: "USD", to: "EUR", date: 1.day.ago.to_date, rate: 1.3) + ]) + + @provider.expects(:fetch_exchange_rates) + .with(from: "USD", to: "EUR", start_date: get_provider_fetch_start_date(1.day.ago.to_date), end_date: Date.current) + .returns(provider_response) + + ExchangeRate::Syncer.new( + exchange_rate_provider: @provider, + from: "USD", + to: "EUR", + start_date: 3.days.ago.to_date, + end_date: Date.current + ).sync_provider_rates + + db_rates = ExchangeRate.order(:date) + assert_equal 4, db_rates.count + assert_equal [ 1.2, 1.25, 1.3, 1.3 ], db_rates.map(&:rate) + end + + test "no provider calls when all rates exist" do + ExchangeRate.delete_all + + (3.days.ago.to_date..Date.current).each_with_index do |date, idx| + ExchangeRate.create!(from_currency: "USD", to_currency: "EUR", date:, rate: 1.2 + idx * 0.01) + end + + @provider.expects(:fetch_exchange_rates).never + + ExchangeRate::Syncer.new( + exchange_rate_provider: @provider, + from: "USD", + to: "EUR", + start_date: 3.days.ago.to_date, + end_date: Date.current + ).sync_provider_rates + end + + # A helpful "reset" option for when we need to refresh provider data + test "full upsert if clear_cache is true" do + ExchangeRate.delete_all + + # Seed DB with stale data + (2.days.ago.to_date..Date.current).each do |date| + ExchangeRate.create!(from_currency: "USD", to_currency: "EUR", date:, rate: 1.0) + end + + provider_response = provider_success_response([ + OpenStruct.new(from: "USD", to: "EUR", date: 2.days.ago.to_date, rate: 1.3), + OpenStruct.new(from: "USD", to: "EUR", date: 1.day.ago.to_date, rate: 1.4), + OpenStruct.new(from: "USD", to: "EUR", date: Date.current, rate: 1.5) + ]) + + @provider.expects(:fetch_exchange_rates) + .with(from: "USD", to: "EUR", start_date: get_provider_fetch_start_date(2.days.ago.to_date), end_date: Date.current) + .returns(provider_response) + + ExchangeRate::Syncer.new( + exchange_rate_provider: @provider, + from: "USD", + to: "EUR", + start_date: 2.days.ago.to_date, + end_date: Date.current, + clear_cache: true + ).sync_provider_rates + + db_rates = ExchangeRate.where(from_currency: "USD", to_currency: "EUR").order(:date) + assert_equal [ 1.3, 1.4, 1.5 ], db_rates.map(&:rate) + end + + test "clamps end_date to today when future date is provided" do + ExchangeRate.delete_all + + future_date = Date.current + 3.days + + provider_response = provider_success_response([ + OpenStruct.new(from: "USD", to: "EUR", date: Date.current, rate: 1.6) + ]) + + @provider.expects(:fetch_exchange_rates) + .with(from: "USD", to: "EUR", start_date: get_provider_fetch_start_date(Date.current), end_date: Date.current) + .returns(provider_response) + + ExchangeRate::Syncer.new( + exchange_rate_provider: @provider, + from: "USD", + to: "EUR", + start_date: Date.current, + end_date: future_date + ).sync_provider_rates + + assert_equal 1, ExchangeRate.count + end + + private + def get_provider_fetch_start_date(start_date) + # We fetch with a 5 day buffer to account for weekends and holidays + start_date - 5.days + end +end diff --git a/test/models/exchange_rate_test.rb b/test/models/exchange_rate_test.rb index 64fc328b..021b4edf 100644 --- a/test/models/exchange_rate_test.rb +++ b/test/models/exchange_rate_test.rb @@ -67,26 +67,4 @@ class ExchangeRateTest < ActiveSupport::TestCase assert_nil ExchangeRate.find_or_fetch_rate(from: "USD", to: "EUR", date: Date.current, cache: true) end - - test "upserts rates for currency pair and date range" do - ExchangeRate.delete_all - - ExchangeRate.create!(date: 1.day.ago.to_date, from_currency: "USD", to_currency: "EUR", rate: 0.9) - - provider_response = provider_success_response([ - OpenStruct.new(from: "USD", to: "EUR", date: Date.current, rate: 1.3), - OpenStruct.new(from: "USD", to: "EUR", date: 1.day.ago.to_date, rate: 1.4), - OpenStruct.new(from: "USD", to: "EUR", date: 2.days.ago.to_date, rate: 1.5) - ]) - - @provider.expects(:fetch_exchange_rates) - .with(from: "USD", to: "EUR", start_date: 2.days.ago.to_date, end_date: Date.current) - .returns(provider_response) - - ExchangeRate.sync_provider_rates(from: "USD", to: "EUR", start_date: 2.days.ago.to_date) - - assert_equal 1.3, ExchangeRate.find_by(from_currency: "USD", to_currency: "EUR", date: Date.current).rate - assert_equal 1.4, ExchangeRate.find_by(from_currency: "USD", to_currency: "EUR", date: 1.day.ago.to_date).rate - assert_equal 1.5, ExchangeRate.find_by(from_currency: "USD", to_currency: "EUR", date: 2.days.ago.to_date).rate - end end diff --git a/test/models/security/syncer_test.rb b/test/models/security/syncer_test.rb new file mode 100644 index 00000000..c2040690 --- /dev/null +++ b/test/models/security/syncer_test.rb @@ -0,0 +1,25 @@ +require "test_helper" + +class Security::SyncerTest < ActiveSupport::TestCase + include ProviderTestHelper + + setup do + @provider = mock + end + + test "syncs missing securities from provider" do + # TODO + end + + test "syncs diff when some securities already exist" do + # TODO + end + + test "no provider calls when all securities exist" do + # TODO + end + + test "full upsert if clear_cache is true" do + # TODO + end +end