mirror of
https://github.com/maybe-finance/maybe.git
synced 2025-08-02 20:15:22 +02:00
Market data sync refinements (#2252)
* Exchange rate syncer implementation * Security price syncer * Fix issues with provider API * Add back prod schedule * Add back price and exchange rate syncs to account syncs * Remove unused stock_exchanges table
This commit is contained in:
parent
6917cecf33
commit
6dc1d22672
38 changed files with 1206 additions and 1615 deletions
|
@ -1,7 +1,20 @@
|
|||
# This job runs daily at market close. See config/schedule.yml for details.
|
||||
#
|
||||
# The primary purpose of this job is to:
|
||||
# 1. Determine what exchange rate pairs, security prices, and other market data all of our users need to view historical account balance data
|
||||
# 2. For each needed rate/price, fetch from our data provider and upsert to our database
|
||||
#
|
||||
# Each individual account sync will still fetch any missing market data that isn't yet synced, but by running
|
||||
# this job daily, we significantly reduce overlapping account syncs that both need the same market data (e.g. common security like `AAPL`)
|
||||
#
|
||||
class SyncMarketDataJob < ApplicationJob
|
||||
queue_as :scheduled
|
||||
|
||||
def perform
|
||||
MarketDataSyncer.new.sync_all
|
||||
def perform(opts)
|
||||
opts = opts.symbolize_keys
|
||||
mode = opts.fetch(:mode, :full)
|
||||
clear_cache = opts.fetch(:clear_cache, false)
|
||||
|
||||
MarketDataSyncer.new(mode: mode, clear_cache: clear_cache).sync
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
class Account < ApplicationRecord
|
||||
include Syncable, Monetizable, Chartable, Linkable, Convertible, Enrichable
|
||||
include Syncable, Monetizable, Chartable, Linkable, Enrichable
|
||||
|
||||
validates :name, :balance, :currency, presence: true
|
||||
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
module Account::Convertible
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
def sync_required_exchange_rates
|
||||
unless requires_exchange_rates?
|
||||
Rails.logger.info("No exchange rate sync needed for account #{id}")
|
||||
return
|
||||
end
|
||||
|
||||
affected_row_count = ExchangeRate.sync_provider_rates(
|
||||
from: currency,
|
||||
to: target_currency,
|
||||
start_date: start_date,
|
||||
)
|
||||
|
||||
Rails.logger.info("Synced #{affected_row_count} exchange rates for account #{id}")
|
||||
end
|
||||
|
||||
private
|
||||
def target_currency
|
||||
family.currency
|
||||
end
|
||||
|
||||
def requires_exchange_rates?
|
||||
currency != target_currency
|
||||
end
|
||||
end
|
82
app/models/account/market_data_syncer.rb
Normal file
82
app/models/account/market_data_syncer.rb
Normal file
|
@ -0,0 +1,82 @@
|
|||
class Account::MarketDataSyncer
|
||||
attr_reader :account
|
||||
|
||||
def initialize(account)
|
||||
@account = account
|
||||
end
|
||||
|
||||
def sync_market_data
|
||||
sync_exchange_rates
|
||||
sync_security_prices
|
||||
end
|
||||
|
||||
private
|
||||
def sync_exchange_rates
|
||||
return unless needs_exchange_rates?
|
||||
return unless ExchangeRate.provider
|
||||
|
||||
pair_dates = {}
|
||||
|
||||
# 1. ENTRY-BASED PAIRS – currencies that differ from the account currency
|
||||
account.entries
|
||||
.where.not(currency: account.currency)
|
||||
.group(:currency)
|
||||
.minimum(:date)
|
||||
.each do |source_currency, date|
|
||||
key = [ source_currency, account.currency ]
|
||||
pair_dates[key] = [ pair_dates[key], date ].compact.min
|
||||
end
|
||||
|
||||
# 2. ACCOUNT-BASED PAIR – convert the account currency to the family currency (if different)
|
||||
if foreign_account?
|
||||
key = [ account.currency, account.family.currency ]
|
||||
pair_dates[key] = [ pair_dates[key], account.start_date ].compact.min
|
||||
end
|
||||
|
||||
pair_dates.each do |(source, target), start_date|
|
||||
ExchangeRate.sync_provider_rates(
|
||||
from: source,
|
||||
to: target,
|
||||
start_date: start_date,
|
||||
end_date: Date.current
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def sync_security_prices
|
||||
return unless Security.provider
|
||||
|
||||
account_securities = account.trades.map(&:security).uniq
|
||||
|
||||
return if account_securities.empty?
|
||||
|
||||
account_securities.each do |security|
|
||||
security.sync_provider_prices(
|
||||
start_date: first_required_price_date(security),
|
||||
end_date: Date.current
|
||||
)
|
||||
|
||||
security.sync_provider_details
|
||||
end
|
||||
end
|
||||
|
||||
# Calculates the first date we require a price for the given security scoped to this account
|
||||
def first_required_price_date(security)
|
||||
account.trades.with_entry
|
||||
.where(security: security)
|
||||
.where(entries: { account_id: account.id })
|
||||
.minimum("entries.date")
|
||||
end
|
||||
|
||||
def needs_exchange_rates?
|
||||
has_multi_currency_entries? || foreign_account?
|
||||
end
|
||||
|
||||
def has_multi_currency_entries?
|
||||
account.entries.where.not(currency: account.currency).exists?
|
||||
end
|
||||
|
||||
def foreign_account?
|
||||
account.currency != account.family.currency
|
||||
end
|
||||
end
|
|
@ -7,6 +7,7 @@ class Account::Syncer
|
|||
|
||||
def perform_sync(sync)
|
||||
Rails.logger.info("Processing balances (#{account.linked? ? 'reverse' : 'forward'})")
|
||||
sync_market_data
|
||||
sync_balances
|
||||
end
|
||||
|
||||
|
@ -19,4 +20,18 @@ class Account::Syncer
|
|||
strategy = account.linked? ? :reverse : :forward
|
||||
Balance::Syncer.new(account, strategy: strategy).sync_balances
|
||||
end
|
||||
|
||||
# Syncs all the exchange rates + security prices this account needs to display historical chart data
|
||||
#
|
||||
# This is a *supplemental* sync. The daily market data sync should have already populated
|
||||
# a majority or all of this data, so this is often a no-op.
|
||||
#
|
||||
# We rescue errors here because if this operation fails, we don't want to fail the entire sync since
|
||||
# we have reasonable fallbacks for missing market data.
|
||||
def sync_market_data
|
||||
Account::MarketDataSyncer.new(account).sync_market_data
|
||||
rescue => e
|
||||
Rails.logger.error("Error syncing market data for account #{account.id}: #{e.message}")
|
||||
Sentry.capture_exception(e)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -19,8 +19,6 @@ class Balance::Syncer
|
|||
if strategy == :forward
|
||||
update_account_info
|
||||
end
|
||||
|
||||
account.sync_required_exchange_rates
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -27,29 +27,21 @@ module ExchangeRate::Provided
|
|||
rate
|
||||
end
|
||||
|
||||
def sync_provider_rates(from:, to:, start_date:, end_date: Date.current)
|
||||
# @return [Integer] The number of exchange rates synced
|
||||
def sync_provider_rates(from:, to:, start_date:, end_date:, clear_cache: false)
|
||||
unless provider.present?
|
||||
Rails.logger.warn("No provider configured for ExchangeRate.sync_provider_rates")
|
||||
return 0
|
||||
end
|
||||
|
||||
fetched_rates = provider.fetch_exchange_rates(from: from, to: to, start_date: start_date, end_date: end_date)
|
||||
|
||||
unless fetched_rates.success?
|
||||
Rails.logger.error("Provider error for ExchangeRate.sync_provider_rates: #{fetched_rates.error}")
|
||||
return 0
|
||||
end
|
||||
|
||||
rates_data = fetched_rates.data.map do |rate|
|
||||
{
|
||||
from_currency: rate.from,
|
||||
to_currency: rate.to,
|
||||
date: rate.date,
|
||||
rate: rate.rate
|
||||
}
|
||||
end
|
||||
|
||||
ExchangeRate.upsert_all(rates_data, unique_by: %i[from_currency to_currency date])
|
||||
ExchangeRate::Syncer.new(
|
||||
exchange_rate_provider: provider,
|
||||
from: from,
|
||||
to: to,
|
||||
start_date: start_date,
|
||||
end_date: end_date,
|
||||
clear_cache: clear_cache
|
||||
).sync_provider_rates
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
156
app/models/exchange_rate/syncer.rb
Normal file
156
app/models/exchange_rate/syncer.rb
Normal file
|
@ -0,0 +1,156 @@
|
|||
class ExchangeRate::Syncer
|
||||
MissingExchangeRateError = Class.new(StandardError)
|
||||
MissingStartRateError = Class.new(StandardError)
|
||||
|
||||
def initialize(exchange_rate_provider:, from:, to:, start_date:, end_date:, clear_cache: false)
|
||||
@exchange_rate_provider = exchange_rate_provider
|
||||
@from = from
|
||||
@to = to
|
||||
@start_date = start_date
|
||||
@end_date = normalize_end_date(end_date)
|
||||
@clear_cache = clear_cache
|
||||
end
|
||||
|
||||
# Constructs a daily series of rates for the given currency pair for date range
|
||||
def sync_provider_rates
|
||||
if !clear_cache && all_rates_exist?
|
||||
Rails.logger.info("No new rates to sync for #{from} to #{to} between #{start_date} and #{end_date}, skipping")
|
||||
return
|
||||
end
|
||||
|
||||
if clear_cache && provider_rates.empty?
|
||||
Rails.logger.warn("Could not clear cache for #{from} to #{to} between #{start_date} and #{end_date} because provider returned no rates")
|
||||
return
|
||||
end
|
||||
|
||||
prev_rate_value = start_rate_value
|
||||
|
||||
unless prev_rate_value.present?
|
||||
error = MissingStartRateError.new("Could not find a start rate for #{from} to #{to} between #{start_date} and #{end_date}")
|
||||
Rails.logger.error(error.message)
|
||||
Sentry.capture_exception(error)
|
||||
return
|
||||
end
|
||||
|
||||
gapfilled_rates = effective_start_date.upto(end_date).map do |date|
|
||||
db_rate_value = db_rates[date]&.rate
|
||||
provider_rate_value = provider_rates[date]&.rate
|
||||
|
||||
chosen_rate = if clear_cache
|
||||
provider_rate_value || db_rate_value # overwrite when possible
|
||||
else
|
||||
db_rate_value || provider_rate_value # fill gaps
|
||||
end
|
||||
|
||||
# Gapfill with LOCF strategy (last observation carried forward)
|
||||
if chosen_rate.nil?
|
||||
chosen_rate = prev_rate_value
|
||||
end
|
||||
|
||||
prev_rate_value = chosen_rate
|
||||
|
||||
{
|
||||
from_currency: from,
|
||||
to_currency: to,
|
||||
date: date,
|
||||
rate: chosen_rate
|
||||
}
|
||||
end
|
||||
|
||||
upsert_rows(gapfilled_rates)
|
||||
end
|
||||
|
||||
private
|
||||
attr_reader :exchange_rate_provider, :from, :to, :start_date, :end_date, :clear_cache
|
||||
|
||||
def upsert_rows(rows)
|
||||
batch_size = 200
|
||||
|
||||
total_upsert_count = 0
|
||||
|
||||
rows.each_slice(batch_size) do |batch|
|
||||
upserted_ids = ExchangeRate.upsert_all(
|
||||
batch,
|
||||
unique_by: %i[from_currency to_currency date],
|
||||
returning: [ "id" ]
|
||||
)
|
||||
|
||||
total_upsert_count += upserted_ids.count
|
||||
end
|
||||
|
||||
total_upsert_count
|
||||
end
|
||||
|
||||
# Since provider may not return values on weekends and holidays, we grab the first rate from the provider that is on or before the start date
|
||||
def start_rate_value
|
||||
provider_rate_value = provider_rates.select { |date, _| date <= start_date }.max_by { |date, _| date }&.last
|
||||
db_rate_value = db_rates[start_date]&.rate
|
||||
provider_rate_value || db_rate_value
|
||||
end
|
||||
|
||||
# No need to fetch/upsert rates for dates that we already have in the DB
|
||||
def effective_start_date
|
||||
return start_date if clear_cache
|
||||
|
||||
first_missing_date = nil
|
||||
|
||||
start_date.upto(end_date) do |date|
|
||||
unless db_rates.key?(date)
|
||||
first_missing_date = date
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
first_missing_date || end_date
|
||||
end
|
||||
|
||||
def provider_rates
|
||||
@provider_rates ||= begin
|
||||
# Always fetch with a 5 day buffer to ensure we have a starting rate (for weekends and holidays)
|
||||
provider_fetch_start_date = effective_start_date - 5.days
|
||||
|
||||
provider_response = exchange_rate_provider.fetch_exchange_rates(
|
||||
from: from,
|
||||
to: to,
|
||||
start_date: provider_fetch_start_date,
|
||||
end_date: end_date
|
||||
)
|
||||
|
||||
if provider_response.success?
|
||||
provider_response.data.index_by(&:date)
|
||||
else
|
||||
message = "#{exchange_rate_provider.class.name} could not fetch exchange rate pair from: #{from} to: #{to} between: #{effective_start_date} and: #{Date.current}. Provider error: #{provider_response.error.message}"
|
||||
Rails.logger.warn(message)
|
||||
Sentry.capture_exception(MissingExchangeRateError.new(message))
|
||||
{}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def all_rates_exist?
|
||||
db_count == expected_count
|
||||
end
|
||||
|
||||
def expected_count
|
||||
(start_date..end_date).count
|
||||
end
|
||||
|
||||
def db_count
|
||||
db_rates.count
|
||||
end
|
||||
|
||||
def db_rates
|
||||
@db_rates ||= ExchangeRate.where(from_currency: from, to_currency: to, date: start_date..end_date)
|
||||
.order(:date)
|
||||
.to_a
|
||||
.index_by(&:date)
|
||||
end
|
||||
|
||||
# Normalizes an end date so that it never exceeds today's date in the
|
||||
# America/New_York timezone. If the caller passes a future date we clamp
|
||||
# it to today so that upstream provider calls remain valid and predictable.
|
||||
def normalize_end_date(requested_end_date)
|
||||
today_est = Date.current.in_time_zone("America/New_York").to_date
|
||||
[ requested_end_date, today_est ].min
|
||||
end
|
||||
end
|
|
@ -1,196 +1,132 @@
|
|||
class MarketDataSyncer
|
||||
DEFAULT_HISTORY_DAYS = 30
|
||||
RATE_PROVIDER_NAME = :synth
|
||||
PRICE_PROVIDER_NAME = :synth
|
||||
# By default, our graphs show 1M as the view, so by fetching 31 days,
|
||||
# we ensure we can always show an accurate default graph
|
||||
SNAPSHOT_DAYS = 31
|
||||
|
||||
MissingExchangeRateError = Class.new(StandardError)
|
||||
InvalidExchangeRateDataError = Class.new(StandardError)
|
||||
MissingSecurityPriceError = Class.new(StandardError)
|
||||
InvalidSecurityPriceDataError = Class.new(StandardError)
|
||||
InvalidModeError = Class.new(StandardError)
|
||||
|
||||
class << self
|
||||
def for(family: nil, account: nil)
|
||||
new(family: family, account: account)
|
||||
end
|
||||
def initialize(mode: :full, clear_cache: false)
|
||||
@mode = set_mode!(mode)
|
||||
@clear_cache = clear_cache
|
||||
end
|
||||
|
||||
# Syncer can optionally be scoped. Otherwise, it syncs all user data
|
||||
def initialize(family: nil, account: nil)
|
||||
@family = family
|
||||
@account = account
|
||||
def sync
|
||||
sync_prices
|
||||
sync_exchange_rates
|
||||
end
|
||||
|
||||
def sync_all(full_history: false)
|
||||
sync_exchange_rates(full_history: full_history)
|
||||
sync_prices(full_history: full_history)
|
||||
end
|
||||
|
||||
def sync_exchange_rates(full_history: false)
|
||||
unless rate_provider
|
||||
Rails.logger.warn("No rate provider configured for MarketDataSyncer.sync_exchange_rates, skipping sync")
|
||||
# Syncs historical security prices (and details)
|
||||
def sync_prices
|
||||
unless Security.provider
|
||||
Rails.logger.warn("No provider configured for MarketDataSyncer.sync_prices, skipping sync")
|
||||
return
|
||||
end
|
||||
|
||||
# Finds distinct currency pairs
|
||||
entry_pairs = entries_scope.joins(:account)
|
||||
.where.not("entries.currency = accounts.currency")
|
||||
.select("entries.currency as source, accounts.currency as target")
|
||||
.distinct
|
||||
Security.where.not(exchange_operating_mic: nil).find_each do |security|
|
||||
security.sync_provider_prices(
|
||||
start_date: get_first_required_price_date(security),
|
||||
end_date: end_date,
|
||||
clear_cache: clear_cache
|
||||
)
|
||||
|
||||
# All accounts in currency not equal to the family currency require exchange rates to show a normalized historical graph
|
||||
account_pairs = accounts_scope.joins(:family)
|
||||
.where.not("families.currency = accounts.currency")
|
||||
.select("accounts.currency as source, families.currency as target")
|
||||
.distinct
|
||||
|
||||
pairs = (entry_pairs + account_pairs).uniq
|
||||
|
||||
pairs.each do |pair|
|
||||
sync_exchange_rate(from: pair.source, to: pair.target, full_history: full_history)
|
||||
security.sync_provider_details(clear_cache: clear_cache)
|
||||
end
|
||||
end
|
||||
|
||||
def sync_prices(full_history: false)
|
||||
unless price_provider
|
||||
Rails.logger.warn("No price provider configured for MarketDataSyncer.sync_prices, skipping sync")
|
||||
nil
|
||||
def sync_exchange_rates
|
||||
unless ExchangeRate.provider
|
||||
Rails.logger.warn("No provider configured for MarketDataSyncer.sync_exchange_rates, skipping sync")
|
||||
return
|
||||
end
|
||||
|
||||
securities_scope.each do |security|
|
||||
sync_security_price(security: security, full_history: full_history)
|
||||
required_exchange_rate_pairs.each do |pair|
|
||||
# pair is a Hash with keys :source, :target, and :start_date
|
||||
start_date = snapshot? ? default_start_date : pair[:start_date]
|
||||
|
||||
ExchangeRate.sync_provider_rates(
|
||||
from: pair[:source],
|
||||
to: pair[:target],
|
||||
start_date: start_date,
|
||||
end_date: end_date,
|
||||
clear_cache: clear_cache
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
attr_reader :family, :account
|
||||
attr_reader :mode, :clear_cache
|
||||
|
||||
def accounts_scope
|
||||
return Account.where(id: account.id) if account
|
||||
return family.accounts if family
|
||||
Account.all
|
||||
def snapshot?
|
||||
mode.to_sym == :snapshot
|
||||
end
|
||||
|
||||
def entries_scope
|
||||
account&.entries || family&.entries || Entry.all
|
||||
end
|
||||
# Builds a unique list of currency pairs with the earliest date we need
|
||||
# exchange rates for.
|
||||
#
|
||||
# Returns: Array of Hashes – [{ source:, target:, start_date: }, ...]
|
||||
def required_exchange_rate_pairs
|
||||
pair_dates = {} # { [source, target] => earliest_date }
|
||||
|
||||
def securities_scope
|
||||
if account
|
||||
account.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil })
|
||||
elsif family
|
||||
family.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil })
|
||||
else
|
||||
Security.where.not(exchange_operating_mic: nil)
|
||||
# 1. ENTRY-BASED PAIRS – we need rates from the first entry date
|
||||
Entry.joins(:account)
|
||||
.where.not("entries.currency = accounts.currency")
|
||||
.group("entries.currency", "accounts.currency")
|
||||
.minimum("entries.date")
|
||||
.each do |(source, target), date|
|
||||
key = [ source, target ]
|
||||
pair_dates[key] = [ pair_dates[key], date ].compact.min
|
||||
end
|
||||
|
||||
# 2. ACCOUNT-BASED PAIRS – use the account's oldest entry date
|
||||
account_first_entry_dates = Entry.group(:account_id).minimum(:date)
|
||||
|
||||
Account.joins(:family)
|
||||
.where.not("families.currency = accounts.currency")
|
||||
.select("accounts.id, accounts.currency AS source, families.currency AS target")
|
||||
.find_each do |account|
|
||||
earliest_entry_date = account_first_entry_dates[account.id]
|
||||
|
||||
chosen_date = [ earliest_entry_date, default_start_date ].compact.min
|
||||
|
||||
key = [ account.source, account.target ]
|
||||
pair_dates[key] = [ pair_dates[key], chosen_date ].compact.min
|
||||
end
|
||||
|
||||
# Convert to array of hashes for ease of use
|
||||
pair_dates.map do |(source, target), date|
|
||||
{ source: source, target: target, start_date: date }
|
||||
end
|
||||
end
|
||||
|
||||
def sync_security_price(security:, full_history:)
|
||||
start_date = full_history ? find_oldest_required_price(security: security) : default_start_date
|
||||
def get_first_required_price_date(security)
|
||||
return default_start_date if snapshot?
|
||||
|
||||
Rails.logger.info("Syncing security price for: #{security.ticker}, start_date: #{start_date}, end_date: #{end_date}")
|
||||
|
||||
fetched_prices = price_provider.fetch_security_prices(
|
||||
security,
|
||||
start_date: start_date,
|
||||
end_date: end_date
|
||||
)
|
||||
|
||||
unless fetched_prices.success?
|
||||
error = MissingSecurityPriceError.new(
|
||||
"#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}"
|
||||
)
|
||||
|
||||
Rails.logger.warn(error.message)
|
||||
Sentry.capture_exception(error, level: :warning)
|
||||
|
||||
return
|
||||
end
|
||||
|
||||
prices_for_upsert = fetched_prices.data.map do |price|
|
||||
if price.security.nil? || price.date.nil? || price.price.nil? || price.currency.nil?
|
||||
error = InvalidSecurityPriceDataError.new(
|
||||
"#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}"
|
||||
)
|
||||
|
||||
Rails.logger.warn(error.message)
|
||||
Sentry.capture_exception(error, level: :warning)
|
||||
|
||||
next
|
||||
end
|
||||
|
||||
{
|
||||
security_id: price.security.id,
|
||||
date: price.date,
|
||||
price: price.price,
|
||||
currency: price.currency
|
||||
}
|
||||
end.compact
|
||||
|
||||
Security::Price.upsert_all(
|
||||
prices_for_upsert,
|
||||
unique_by: %i[security_id date currency]
|
||||
)
|
||||
Trade.with_entry.where(security: security).minimum(:date)
|
||||
end
|
||||
|
||||
def sync_exchange_rate(from:, to:, full_history:)
|
||||
start_date = full_history ? find_oldest_required_rate(from_currency: from) : default_start_date
|
||||
# An approximation that grabs more than we likely need, but simplifies the logic
|
||||
def get_first_required_exchange_rate_date(from_currency:)
|
||||
return default_start_date if snapshot?
|
||||
|
||||
Rails.logger.info("Syncing exchange rate from: #{from}, to: #{to}, start_date: #{start_date}, end_date: #{end_date}")
|
||||
|
||||
fetched_rates = rate_provider.fetch_exchange_rates(
|
||||
from: from,
|
||||
to: to,
|
||||
start_date: start_date,
|
||||
end_date: end_date
|
||||
)
|
||||
|
||||
unless fetched_rates.success?
|
||||
message = "#{RATE_PROVIDER_NAME} could not fetch exchange rate pair from: #{from} to: #{to} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_rates.error.message}"
|
||||
Rails.logger.warn(message)
|
||||
Sentry.capture_exception(MissingExchangeRateError.new(message))
|
||||
return
|
||||
end
|
||||
|
||||
rates_for_upsert = fetched_rates.data.map do |rate|
|
||||
if rate.from.nil? || rate.to.nil? || rate.date.nil? || rate.rate.nil?
|
||||
message = "#{RATE_PROVIDER_NAME} returned invalid rate data for pair from: #{from} to: #{to} on: #{rate.date}. Rate data: #{rate.inspect}"
|
||||
Rails.logger.warn(message)
|
||||
Sentry.capture_exception(InvalidExchangeRateDataError.new(message))
|
||||
next
|
||||
end
|
||||
|
||||
{
|
||||
from_currency: rate.from,
|
||||
to_currency: rate.to,
|
||||
date: rate.date,
|
||||
rate: rate.rate
|
||||
}
|
||||
end.compact
|
||||
|
||||
ExchangeRate.upsert_all(
|
||||
rates_for_upsert,
|
||||
unique_by: %i[from_currency to_currency date]
|
||||
)
|
||||
end
|
||||
|
||||
def rate_provider
|
||||
Provider::Registry.for_concept(:exchange_rates).get_provider(RATE_PROVIDER_NAME)
|
||||
end
|
||||
|
||||
def price_provider
|
||||
Provider::Registry.for_concept(:securities).get_provider(PRICE_PROVIDER_NAME)
|
||||
end
|
||||
|
||||
def find_oldest_required_rate(from_currency:)
|
||||
entries_scope.where(currency: from_currency).minimum(:date) || default_start_date
|
||||
Entry.where(currency: from_currency).minimum(:date)
|
||||
end
|
||||
|
||||
def default_start_date
|
||||
DEFAULT_HISTORY_DAYS.days.ago.to_date
|
||||
SNAPSHOT_DAYS.days.ago.to_date
|
||||
end
|
||||
|
||||
# Since we're querying market data from a US-based API, end date should always be today (EST)
|
||||
def end_date
|
||||
Date.current.in_time_zone("America/New_York").to_date
|
||||
end
|
||||
|
||||
def set_mode!(mode)
|
||||
valid_modes = [ :full, :snapshot ]
|
||||
|
||||
unless valid_modes.include?(mode.to_sym)
|
||||
raise InvalidModeError, "Invalid mode for MarketDataSyncer, can only be :full or :snapshot, but was #{mode}"
|
||||
end
|
||||
|
||||
mode.to_sym
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,22 +2,22 @@ module Provider::SecurityConcept
|
|||
extend ActiveSupport::Concern
|
||||
|
||||
Security = Data.define(:symbol, :name, :logo_url, :exchange_operating_mic)
|
||||
SecurityInfo = Data.define(:symbol, :name, :links, :logo_url, :description, :kind)
|
||||
Price = Data.define(:security, :date, :price, :currency)
|
||||
SecurityInfo = Data.define(:symbol, :name, :links, :logo_url, :description, :kind, :exchange_operating_mic)
|
||||
Price = Data.define(:symbol, :date, :price, :currency, :exchange_operating_mic)
|
||||
|
||||
def search_securities(symbol, country_code: nil, exchange_operating_mic: nil)
|
||||
raise NotImplementedError, "Subclasses must implement #search_securities"
|
||||
end
|
||||
|
||||
def fetch_security_info(security)
|
||||
def fetch_security_info(symbol:, exchange_operating_mic:)
|
||||
raise NotImplementedError, "Subclasses must implement #fetch_security_info"
|
||||
end
|
||||
|
||||
def fetch_security_price(security, date:)
|
||||
def fetch_security_price(symbol:, exchange_operating_mic:, date:)
|
||||
raise NotImplementedError, "Subclasses must implement #fetch_security_price"
|
||||
end
|
||||
|
||||
def fetch_security_prices(security, start_date:, end_date:)
|
||||
def fetch_security_prices(symbol:, exchange_operating_mic:, start_date:, end_date:)
|
||||
raise NotImplementedError, "Subclasses must implement #fetch_security_prices"
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,6 +3,8 @@ class Provider::Synth < Provider
|
|||
|
||||
# Subclass so errors caught in this provider are raised as Provider::Synth::Error
|
||||
Error = Class.new(Provider::Error)
|
||||
InvalidExchangeRateError = Class.new(Error)
|
||||
InvalidSecurityPriceError = Class.new(Error)
|
||||
|
||||
def initialize(api_key)
|
||||
@api_key = api_key
|
||||
|
@ -48,7 +50,7 @@ class Provider::Synth < Provider
|
|||
|
||||
rates = JSON.parse(response.body).dig("data", "rates")
|
||||
|
||||
Rate.new(date:, from:, to:, rate: rates.dig(to))
|
||||
Rate.new(date: date.to_date, from:, to:, rate: rates.dig(to))
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -65,8 +67,18 @@ class Provider::Synth < Provider
|
|||
end
|
||||
|
||||
data.paginated.map do |rate|
|
||||
Rate.new(date: rate.dig("date"), from:, to:, rate: rate.dig("rates", to))
|
||||
end
|
||||
date = rate.dig("date")
|
||||
rate = rate.dig("rates", to)
|
||||
|
||||
if date.nil? || rate.nil?
|
||||
message = "#{self.class.name} returned invalid rate data for pair from: #{from} to: #{to} on: #{date}. Rate data: #{rate.inspect}"
|
||||
Rails.logger.warn(message)
|
||||
Sentry.capture_exception(InvalidExchangeRateError.new(message), level: :warning)
|
||||
next
|
||||
end
|
||||
|
||||
Rate.new(date: date.to_date, from:, to:, rate:)
|
||||
end.compact
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -97,65 +109,73 @@ class Provider::Synth < Provider
|
|||
end
|
||||
end
|
||||
|
||||
def fetch_security_info(security)
|
||||
def fetch_security_info(symbol:, exchange_operating_mic:)
|
||||
with_provider_response do
|
||||
response = client.get("#{base_url}/tickers/#{security.ticker}") do |req|
|
||||
req.params["mic_code"] = security.exchange_mic if security.exchange_mic.present?
|
||||
req.params["operating_mic"] = security.exchange_operating_mic if security.exchange_operating_mic.present?
|
||||
response = client.get("#{base_url}/tickers/#{symbol}") do |req|
|
||||
req.params["operating_mic"] = exchange_operating_mic
|
||||
end
|
||||
|
||||
data = JSON.parse(response.body).dig("data")
|
||||
|
||||
SecurityInfo.new(
|
||||
symbol: data.dig("ticker"),
|
||||
symbol: symbol,
|
||||
name: data.dig("name"),
|
||||
links: data.dig("links"),
|
||||
logo_url: data.dig("logo_url"),
|
||||
description: data.dig("description"),
|
||||
kind: data.dig("kind")
|
||||
kind: data.dig("kind"),
|
||||
exchange_operating_mic: exchange_operating_mic
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_security_price(security, date:)
|
||||
def fetch_security_price(symbol:, exchange_operating_mic:, date:)
|
||||
with_provider_response do
|
||||
historical_data = fetch_security_prices(security, start_date: date, end_date: date)
|
||||
historical_data = fetch_security_prices(symbol:, exchange_operating_mic:, start_date: date, end_date: date)
|
||||
|
||||
raise ProviderError, "No prices found for security #{security.ticker} on date #{date}" if historical_data.data.empty?
|
||||
raise ProviderError, "No prices found for security #{symbol} on date #{date}" if historical_data.data.empty?
|
||||
|
||||
historical_data.data.first
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_security_prices(security, start_date:, end_date:)
|
||||
def fetch_security_prices(symbol:, exchange_operating_mic:, start_date:, end_date:)
|
||||
with_provider_response do
|
||||
params = {
|
||||
start_date: start_date,
|
||||
end_date: end_date
|
||||
end_date: end_date,
|
||||
operating_mic_code: exchange_operating_mic
|
||||
}
|
||||
|
||||
params[:operating_mic_code] = security.exchange_operating_mic if security.exchange_operating_mic.present?
|
||||
|
||||
data = paginate(
|
||||
"#{base_url}/tickers/#{security.ticker}/open-close",
|
||||
"#{base_url}/tickers/#{symbol}/open-close",
|
||||
params
|
||||
) do |body|
|
||||
body.dig("prices")
|
||||
end
|
||||
|
||||
currency = data.first_page.dig("currency")
|
||||
country_code = data.first_page.dig("exchange", "country_code")
|
||||
exchange_mic = data.first_page.dig("exchange", "mic_code")
|
||||
exchange_operating_mic = data.first_page.dig("exchange", "operating_mic_code")
|
||||
|
||||
data.paginated.map do |price|
|
||||
date = price.dig("date")
|
||||
price = price.dig("close") || price.dig("open")
|
||||
|
||||
if date.nil? || price.nil?
|
||||
message = "#{self.class.name} returned invalid price data for security #{symbol} on: #{date}. Price data: #{price.inspect}"
|
||||
Rails.logger.warn(message)
|
||||
Sentry.capture_exception(InvalidSecurityPriceError.new(message), level: :warning)
|
||||
next
|
||||
end
|
||||
|
||||
Price.new(
|
||||
security: security,
|
||||
date: price.dig("date"),
|
||||
price: price.dig("close") || price.dig("open"),
|
||||
currency: currency
|
||||
symbol: symbol,
|
||||
date: date.to_date,
|
||||
price: price,
|
||||
currency: currency,
|
||||
exchange_operating_mic: exchange_operating_mic
|
||||
)
|
||||
end
|
||||
end.compact
|
||||
end
|
||||
end
|
||||
|
||||
|
|
145
app/models/security/price/syncer.rb
Normal file
145
app/models/security/price/syncer.rb
Normal file
|
@ -0,0 +1,145 @@
|
|||
class Security::Price::Syncer
|
||||
MissingSecurityPriceError = Class.new(StandardError)
|
||||
MissingStartPriceError = Class.new(StandardError)
|
||||
|
||||
def initialize(security:, security_provider:, start_date:, end_date:, clear_cache: false)
|
||||
@security = security
|
||||
@security_provider = security_provider
|
||||
@start_date = start_date
|
||||
@end_date = normalize_end_date(end_date)
|
||||
@clear_cache = clear_cache
|
||||
end
|
||||
|
||||
# Constructs a daily series of prices for a single security over the date range.
|
||||
# Returns the number of rows upserted.
|
||||
def sync_provider_prices
|
||||
if !clear_cache && all_prices_exist?
|
||||
Rails.logger.info("No new prices to sync for #{security.ticker} between #{start_date} and #{end_date}, skipping")
|
||||
return 0
|
||||
end
|
||||
|
||||
if clear_cache && provider_prices.empty?
|
||||
Rails.logger.warn("Could not clear cache for #{security.ticker} between #{start_date} and #{end_date} because provider returned no prices")
|
||||
return 0
|
||||
end
|
||||
|
||||
prev_price_value = start_price_value
|
||||
|
||||
unless prev_price_value.present?
|
||||
error = MissingStartPriceError.new("Could not find a start price for #{security.ticker} on or before #{start_date}")
|
||||
Rails.logger.error(error.message)
|
||||
Sentry.capture_exception(error)
|
||||
return 0
|
||||
end
|
||||
|
||||
gapfilled_prices = effective_start_date.upto(end_date).map do |date|
|
||||
db_price_value = db_prices[date]&.price
|
||||
provider_price_value = provider_prices[date]&.price
|
||||
provider_currency = provider_prices[date]&.currency
|
||||
|
||||
chosen_price = if clear_cache
|
||||
provider_price_value || db_price_value # overwrite when possible
|
||||
else
|
||||
db_price_value || provider_price_value # fill gaps
|
||||
end
|
||||
|
||||
# Gap-fill using LOCF (last observation carried forward)
|
||||
chosen_price ||= prev_price_value
|
||||
prev_price_value = chosen_price
|
||||
|
||||
{
|
||||
security_id: security.id,
|
||||
date: date,
|
||||
price: chosen_price,
|
||||
currency: provider_currency || prev_price_currency || db_price_currency || "USD"
|
||||
}
|
||||
end
|
||||
|
||||
upsert_rows(gapfilled_prices)
|
||||
end
|
||||
|
||||
private
|
||||
attr_reader :security, :security_provider, :start_date, :end_date, :clear_cache
|
||||
|
||||
def provider_prices
|
||||
@provider_prices ||= begin
|
||||
provider_fetch_start_date = effective_start_date - 5.days
|
||||
|
||||
response = security_provider.fetch_security_prices(
|
||||
symbol: security.ticker,
|
||||
exchange_operating_mic: security.exchange_operating_mic,
|
||||
start_date: provider_fetch_start_date,
|
||||
end_date: end_date
|
||||
)
|
||||
|
||||
if response.success?
|
||||
response.data.index_by(&:date)
|
||||
else
|
||||
msg = "#{security_provider.class.name} could not fetch prices for #{security.ticker} between #{provider_fetch_start_date} and #{end_date}. Provider error: #{response.error.message}"
|
||||
Rails.logger.warn(msg)
|
||||
Sentry.capture_exception(MissingSecurityPriceError.new(msg))
|
||||
{}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def db_prices
|
||||
@db_prices ||= Security::Price.where(security_id: security.id, date: start_date..end_date)
|
||||
.order(:date)
|
||||
.to_a
|
||||
.index_by(&:date)
|
||||
end
|
||||
|
||||
def all_prices_exist?
|
||||
db_prices.count == expected_count
|
||||
end
|
||||
|
||||
def expected_count
|
||||
(start_date..end_date).count
|
||||
end
|
||||
|
||||
# Skip over ranges that already exist unless clearing cache
|
||||
def effective_start_date
|
||||
return start_date if clear_cache
|
||||
|
||||
(start_date..end_date).detect { |d| !db_prices.key?(d) } || end_date
|
||||
end
|
||||
|
||||
def start_price_value
|
||||
provider_price_value = provider_prices.select { |date, _| date <= start_date }
|
||||
.max_by { |date, _| date }
|
||||
&.last&.price
|
||||
db_price_value = db_prices[start_date]&.price
|
||||
provider_price_value || db_price_value
|
||||
end
|
||||
|
||||
def upsert_rows(rows)
|
||||
batch_size = 200
|
||||
total_upsert_count = 0
|
||||
|
||||
rows.each_slice(batch_size) do |batch|
|
||||
ids = Security::Price.upsert_all(
|
||||
batch,
|
||||
unique_by: %i[security_id date currency],
|
||||
returning: [ "id" ]
|
||||
)
|
||||
total_upsert_count += ids.count
|
||||
end
|
||||
|
||||
total_upsert_count
|
||||
end
|
||||
|
||||
def db_price_currency
|
||||
db_prices.values.first&.currency
|
||||
end
|
||||
|
||||
def prev_price_currency
|
||||
@prev_price_currency ||= provider_prices.values.first&.currency
|
||||
end
|
||||
|
||||
# Clamp to today (EST) so we never call our price API for a future date (our API is in EST/EDT timezone)
|
||||
def normalize_end_date(requested_end_date)
|
||||
today_est = Date.current.in_time_zone("America/New_York").to_date
|
||||
[ requested_end_date, today_est ].min
|
||||
end
|
||||
end
|
|
@ -49,6 +49,48 @@ module Security::Provided
|
|||
price
|
||||
end
|
||||
|
||||
def sync_provider_details(clear_cache: false)
|
||||
unless provider.present?
|
||||
Rails.logger.warn("No provider configured for Security.sync_provider_details")
|
||||
return
|
||||
end
|
||||
|
||||
if self.name.present? && self.logo_url.present? && !clear_cache
|
||||
return
|
||||
end
|
||||
|
||||
response = provider.fetch_security_info(
|
||||
symbol: ticker,
|
||||
exchange_operating_mic: exchange_operating_mic
|
||||
)
|
||||
|
||||
if response.success?
|
||||
update(
|
||||
name: response.data.name,
|
||||
logo_url: response.data.logo_url,
|
||||
)
|
||||
else
|
||||
err = StandardError.new("Failed to fetch security info for #{ticker} from #{provider.class.name}: #{response.error.message}")
|
||||
Rails.logger.warn(err.message)
|
||||
Sentry.capture_exception(err, level: :warning)
|
||||
end
|
||||
end
|
||||
|
||||
def sync_provider_prices(start_date:, end_date:, clear_cache: false)
|
||||
unless provider.present?
|
||||
Rails.logger.warn("No provider configured for Security.sync_provider_prices")
|
||||
return 0
|
||||
end
|
||||
|
||||
Security::Price::Syncer.new(
|
||||
security: self,
|
||||
security_provider: provider,
|
||||
start_date: start_date,
|
||||
end_date: end_date,
|
||||
clear_cache: clear_cache
|
||||
).sync_provider_prices
|
||||
end
|
||||
|
||||
private
|
||||
def provider
|
||||
self.class.provider
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
class StockExchange < ApplicationRecord
|
||||
scope :in_country, ->(country_code) { where(country_code: country_code) }
|
||||
end
|
|
@ -129,13 +129,9 @@ class TradeBuilder
|
|||
def security
|
||||
ticker_symbol, exchange_operating_mic = ticker.present? ? ticker.split("|") : [ manual_ticker, nil ]
|
||||
|
||||
security = Security.find_or_create_by!(
|
||||
Security.find_or_create_by!(
|
||||
ticker: ticker_symbol,
|
||||
exchange_operating_mic: exchange_operating_mic
|
||||
)
|
||||
|
||||
FetchSecurityInfoJob.perform_later(security.id)
|
||||
|
||||
security
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue