1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-08-04 21:15:19 +02:00

Improve account sync performance, handle concurrent market data syncing (#2236)

* PlaidConnectable concern

* Remove bad abstraction

* Put sync implementations in own concerns

* Sync strategies

* Move sync orchestration to Sync class

* Clean up sync class, add state machine

* Basic market data sync cron

* Fix price sync

* Improve sync window column names, add timestamps

* 30 day syncs by default

* Clean up market data methods

* Report high duplicate sync counts to Sentry

* Add sync states throughout app

* account tab session

* Persistent account tab selections

* Remove manual sleep

* Add migration to clear stale syncs on self hosted apps

* Tweak sync states

* Sync completion event broadcasts

* Fix timezones in tests

* Cleanup

* More cleanup

* Plaid item UI broadcasts for sync

* Fix account ID namespace conflict

* Sync broadcasters

* Smoother account sync refreshes

* Remove test sync delay
This commit is contained in:
Zach Gollwitzer 2025-05-15 10:19:56 -04:00 committed by GitHub
parent 9793cc74f9
commit 10dd9e061a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
97 changed files with 1837 additions and 949 deletions

View file

@ -61,6 +61,18 @@ class Account < ApplicationRecord
end
end
def syncing?
self_syncing = syncs.incomplete.any?
# Since Plaid Items sync as a "group", if the item is syncing, even if the account
# sync hasn't yet started (i.e. we're still fetching the Plaid data), show it as syncing in UI.
if linked?
plaid_account&.plaid_item&.syncing? || self_syncing
else
self_syncing
end
end
def institution_domain
url_string = plaid_account&.plaid_item&.institution_url
return nil unless url_string.present?
@ -81,21 +93,6 @@ class Account < ApplicationRecord
DestroyJob.perform_later(self)
end
def sync_data(sync, start_date: nil)
Rails.logger.info("Processing balances (#{linked? ? 'reverse' : 'forward'})")
sync_balances
end
def post_sync(sync)
family.remove_syncing_notice!
accountable.post_sync(sync)
unless sync.child?
family.auto_match_transfers!
end
end
def current_holdings
holdings.where(currency: currency, date: holdings.maximum(:date)).order(amount: :desc)
end
@ -172,10 +169,4 @@ class Account < ApplicationRecord
def long_subtype_label
accountable_class.long_subtype_label_for(subtype) || accountable_class.display_name
end
private
def sync_balances
strategy = linked? ? :reverse : :forward
Balance::Syncer.new(self, strategy: strategy).sync_balances
end
end

View file

@ -0,0 +1,54 @@
class Account::SyncCompleteEvent
attr_reader :account
def initialize(account)
@account = account
end
def broadcast
# Replace account row in accounts list
account.broadcast_replace_to(
account.family,
target: "account_#{account.id}",
partial: "accounts/account",
locals: { account: account }
)
# Replace the groups this account belongs to in the sidebar
account_group_ids.each do |id|
account.broadcast_replace_to(
account.family,
target: id,
partial: "accounts/accountable_group",
locals: { account_group: account_group, open: true }
)
end
# If this is a manual, unlinked account (i.e. not part of a Plaid Item),
# trigger the family sync complete broadcast so net worth graph is updated
unless account.linked?
account.family.broadcast_sync_complete
end
# Refresh entire account page (only applies if currently viewing this account)
account.broadcast_refresh
end
private
# The sidebar will show the account in both its classification tab and the "all" tab,
# so we need to broadcast to both.
def account_group_ids
id = account_group.id
[ id, "#{account_group.classification}_#{id}" ]
end
def account_group
family_balance_sheet.account_groups.find do |group|
group.accounts.any? { |a| a.id == account.id }
end
end
def family_balance_sheet
account.family.balance_sheet
end
end

View file

@ -0,0 +1,22 @@
class Account::Syncer
attr_reader :account
def initialize(account)
@account = account
end
def perform_sync(sync)
Rails.logger.info("Processing balances (#{account.linked? ? 'reverse' : 'forward'})")
sync_balances
end
def perform_post_sync
account.family.auto_match_transfers!
end
private
def sync_balances
strategy = account.linked? ? :reverse : :forward
Balance::Syncer.new(account, strategy: strategy).sync_balances
end
end

View file

@ -1,35 +0,0 @@
class Balance::BaseCalculator
attr_reader :account
def initialize(account)
@account = account
end
def calculate
Rails.logger.tagged(self.class.name) do
calculate_balances
end
end
private
def sync_cache
@sync_cache ||= Balance::SyncCache.new(account)
end
def build_balance(date, cash_balance, holdings_value)
Balance.new(
account_id: account.id,
date: date,
balance: holdings_value + cash_balance,
cash_balance: cash_balance,
currency: account.currency
)
end
def calculate_next_balance(prior_balance, transactions, direction: :forward)
flows = transactions.sum(&:amount)
negated = direction == :forward ? account.asset? : account.liability?
flows *= -1 if negated
prior_balance + flows
end
end

View file

@ -1,4 +1,16 @@
class Balance::ForwardCalculator < Balance::BaseCalculator
class Balance::ForwardCalculator
attr_reader :account
def initialize(account)
@account = account
end
def calculate
Rails.logger.tagged("Balance::ForwardCalculator") do
calculate_balances
end
end
private
def calculate_balances
current_cash_balance = 0
@ -25,4 +37,25 @@ class Balance::ForwardCalculator < Balance::BaseCalculator
@balances
end
def sync_cache
@sync_cache ||= Balance::SyncCache.new(account)
end
def build_balance(date, cash_balance, holdings_value)
Balance.new(
account_id: account.id,
date: date,
balance: holdings_value + cash_balance,
cash_balance: cash_balance,
currency: account.currency
)
end
def calculate_next_balance(prior_balance, transactions, direction: :forward)
flows = transactions.sum(&:amount)
negated = direction == :forward ? account.asset? : account.liability?
flows *= -1 if negated
prior_balance + flows
end
end

View file

@ -1,4 +1,16 @@
class Balance::ReverseCalculator < Balance::BaseCalculator
class Balance::ReverseCalculator
attr_reader :account
def initialize(account)
@account = account
end
def calculate
Rails.logger.tagged("Balance::ReverseCalculator") do
calculate_balances
end
end
private
def calculate_balances
current_cash_balance = account.cash_balance
@ -35,4 +47,25 @@ class Balance::ReverseCalculator < Balance::BaseCalculator
@balances
end
def sync_cache
@sync_cache ||= Balance::SyncCache.new(account)
end
def build_balance(date, cash_balance, holdings_value)
Balance.new(
account_id: account.id,
date: date,
balance: holdings_value + cash_balance,
cash_balance: cash_balance,
currency: account.currency
)
end
def calculate_next_balance(prior_balance, transactions, direction: :forward)
flows = transactions.sum(&:amount)
negated = direction == :forward ? account.asset? : account.liability?
flows *= -1 if negated
prior_balance + flows
end
end

View file

@ -22,20 +22,25 @@ class BalanceSheet
end
def classification_groups
asset_groups = account_groups("asset")
liability_groups = account_groups("liability")
[
ClassificationGroup.new(
key: "asset",
display_name: "Assets",
icon: "plus",
total_money: total_assets_money,
account_groups: account_groups("asset")
account_groups: asset_groups,
syncing?: asset_groups.any?(&:syncing?)
),
ClassificationGroup.new(
key: "liability",
display_name: "Debts",
icon: "minus",
total_money: total_liabilities_money,
account_groups: account_groups("liability")
account_groups: liability_groups,
syncing?: liability_groups.any?(&:syncing?)
)
]
end
@ -43,13 +48,17 @@ class BalanceSheet
def account_groups(classification = nil)
classification_accounts = classification ? totals_query.filter { |t| t.classification == classification } : totals_query
classification_total = classification_accounts.sum(&:converted_balance)
account_groups = classification_accounts.group_by(&:accountable_type).transform_keys { |k| Accountable.from_type(k) }
account_groups = classification_accounts.group_by(&:accountable_type)
.transform_keys { |k| Accountable.from_type(k) }
account_groups.map do |accountable, accounts|
groups = account_groups.map do |accountable, accounts|
group_total = accounts.sum(&:converted_balance)
key = accountable.model_name.param_key
AccountGroup.new(
key: accountable.model_name.param_key,
id: classification ? "#{classification}_#{key}_group" : "#{key}_group",
key: key,
name: accountable.display_name,
classification: accountable.classification,
total: group_total,
@ -57,6 +66,7 @@ class BalanceSheet
weight: classification_total.zero? ? 0 : group_total / classification_total.to_d * 100,
missing_rates?: accounts.any? { |a| a.missing_rates? },
color: accountable.color,
syncing?: accounts.any?(&:is_syncing),
accounts: accounts.map do |account|
account.define_singleton_method(:weight) do
classification_total.zero? ? 0 : account.converted_balance / classification_total.to_d * 100
@ -65,7 +75,13 @@ class BalanceSheet
account
end.sort_by(&:weight).reverse
)
end.sort_by(&:weight).reverse
end
groups.sort_by do |group|
manual_order = Accountable::TYPES
type_name = group.key.camelize
manual_order.index(type_name) || Float::INFINITY
end
end
def net_worth_series(period: Period.last_30_days)
@ -76,9 +92,13 @@ class BalanceSheet
family.currency
end
def syncing?
classification_groups.any? { |group| group.syncing? }
end
private
ClassificationGroup = Struct.new(:key, :display_name, :icon, :total_money, :account_groups, keyword_init: true)
AccountGroup = Struct.new(:key, :name, :accountable_type, :classification, :total, :total_money, :weight, :accounts, :color, :missing_rates?, keyword_init: true)
ClassificationGroup = Struct.new(:key, :display_name, :icon, :total_money, :account_groups, :syncing?, keyword_init: true)
AccountGroup = Struct.new(:id, :key, :name, :accountable_type, :classification, :total, :total_money, :weight, :accounts, :color, :missing_rates?, :syncing?, keyword_init: true)
def active_accounts
family.accounts.active.with_attached_logo
@ -87,9 +107,11 @@ class BalanceSheet
def totals_query
@totals_query ||= active_accounts
.joins(ActiveRecord::Base.sanitize_sql_array([ "LEFT JOIN exchange_rates ON exchange_rates.date = CURRENT_DATE AND accounts.currency = exchange_rates.from_currency AND exchange_rates.to_currency = ?", currency ]))
.joins("LEFT JOIN syncs ON syncs.syncable_id = accounts.id AND syncs.syncable_type = 'Account' AND (syncs.status = 'pending' OR syncs.status = 'syncing')")
.select(
"accounts.*",
"SUM(accounts.balance * COALESCE(exchange_rates.rate, 1)) as converted_balance",
"COUNT(syncs.id) > 0 as is_syncing",
ActiveRecord::Base.sanitize_sql_array([ "COUNT(CASE WHEN accounts.currency <> ? AND exchange_rates.rate IS NULL THEN 1 END) as missing_rates", currency ])
)
.group(:classification, :accountable_type, :id)

View file

@ -68,15 +68,6 @@ module Accountable
end
end
def post_sync(sync)
broadcast_replace_to(
account,
target: "chart_account_#{account.id}",
partial: "accounts/show/chart",
locals: { account: account }
)
end
def display_name
self.class.display_name
end

View file

@ -6,24 +6,24 @@ module Syncable
end
def syncing?
syncs.where(status: [ :syncing, :pending ]).any?
raise NotImplementedError, "Subclasses must implement the syncing? method"
end
def sync_later(start_date: nil, parent_sync: nil)
new_sync = syncs.create!(start_date: start_date, parent: parent_sync)
def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil)
new_sync = syncs.create!(parent: parent_sync, window_start_date: window_start_date, window_end_date: window_end_date)
SyncJob.perform_later(new_sync)
end
def sync(start_date: nil)
syncs.create!(start_date: start_date).perform
def perform_sync(sync)
syncer.perform_sync(sync)
end
def sync_data(sync, start_date: nil)
raise NotImplementedError, "Subclasses must implement the `sync_data` method"
def perform_post_sync
syncer.perform_post_sync
end
def post_sync(sync)
# no-op, syncable can optionally provide implementation
def broadcast_sync_complete
sync_broadcaster.broadcast
end
def sync_error
@ -31,7 +31,7 @@ module Syncable
end
def last_synced_at
latest_sync&.last_ran_at
latest_sync&.completed_at
end
def last_sync_created_at
@ -40,6 +40,14 @@ module Syncable
private
def latest_sync
syncs.order(created_at: :desc).first
syncs.ordered.first
end
def syncer
self.class::Syncer.new(self)
end
def sync_broadcaster
self.class::SyncCompleteEvent.new(self)
end
end

View file

@ -45,7 +45,7 @@ class Entry < ApplicationRecord
def sync_account_later
sync_start_date = [ date_previously_was, date ].compact.min unless destroyed?
account.sync_later(start_date: sync_start_date)
account.sync_later(window_start_date: sync_start_date)
end
def entryable_name_short

View file

@ -1,5 +1,5 @@
class Family < ApplicationRecord
include Syncable, AutoTransferMatchable, Subscribeable
include PlaidConnectable, Syncable, AutoTransferMatchable, Subscribeable
DATE_FORMATS = [
[ "MM-DD-YYYY", "%m-%d-%Y" ],
@ -15,7 +15,6 @@ class Family < ApplicationRecord
has_many :users, dependent: :destroy
has_many :accounts, dependent: :destroy
has_many :plaid_items, dependent: :destroy
has_many :invitations, dependent: :destroy
has_many :imports, dependent: :destroy
@ -36,6 +35,15 @@ class Family < ApplicationRecord
validates :locale, inclusion: { in: I18n.available_locales.map(&:to_s) }
validates :date_format, inclusion: { in: DATE_FORMATS.map(&:last) }
# If any accounts or plaid items are syncing, the family is also syncing, even if a formal "Family Sync" is not running.
def syncing?
Sync.joins("LEFT JOIN plaid_items ON plaid_items.id = syncs.syncable_id AND syncs.syncable_type = 'PlaidItem'")
.joins("LEFT JOIN accounts ON accounts.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
.where("syncs.syncable_id = ? OR accounts.family_id = ? OR plaid_items.family_id = ?", id, id, id)
.incomplete
.exists?
end
def assigned_merchants
merchant_ids = transactions.where.not(merchant_id: nil).pluck(:merchant_id).uniq
Merchant.where(id: merchant_ids)
@ -65,64 +73,10 @@ class Family < ApplicationRecord
@income_statement ||= IncomeStatement.new(self)
end
def sync_data(sync, start_date: nil)
# We don't rely on this value to guard the app, but keep it eventually consistent
sync_trial_status!
Rails.logger.info("Syncing accounts for family #{id}")
accounts.manual.each do |account|
account.sync_later(start_date: start_date, parent_sync: sync)
end
Rails.logger.info("Applying rules for family #{id}")
rules.each do |rule|
rule.apply_later
end
end
def remove_syncing_notice!
broadcast_remove target: "syncing-notice"
end
def post_sync(sync)
auto_match_transfers!
broadcast_refresh
end
# If family has any syncs pending/syncing within the last 10 minutes, we show a persistent "syncing" notice.
# Ignore syncs older than 10 minutes as they are considered "stale"
def syncing?
Sync.where(
"(syncable_type = 'Family' AND syncable_id = ?) OR
(syncable_type = 'Account' AND syncable_id IN (SELECT id FROM accounts WHERE family_id = ? AND plaid_account_id IS NULL)) OR
(syncable_type = 'PlaidItem' AND syncable_id IN (SELECT id FROM plaid_items WHERE family_id = ?))",
id, id, id
).where(status: [ "pending", "syncing" ], created_at: 10.minutes.ago..).exists?
end
def eu?
country != "US" && country != "CA"
end
def get_link_token(webhooks_url:, redirect_url:, accountable_type: nil, region: :us, access_token: nil)
provider = if region.to_sym == :eu
Provider::Registry.get_provider(:plaid_eu)
else
Provider::Registry.get_provider(:plaid_us)
end
# early return when no provider
return nil unless provider
provider.get_link_token(
user_id: id,
webhooks_url: webhooks_url,
redirect_url: redirect_url,
accountable_type: accountable_type,
access_token: access_token
).link_token
end
def requires_data_provider?
# If family has any trades, they need a provider for historical prices
return true if trades.any?

View file

@ -0,0 +1,51 @@
module Family::PlaidConnectable
extend ActiveSupport::Concern
included do
has_many :plaid_items, dependent: :destroy
end
def create_plaid_item!(public_token:, item_name:, region:)
provider = plaid_provider_for_region(region)
public_token_response = provider.exchange_public_token(public_token)
plaid_item = plaid_items.create!(
name: item_name,
plaid_id: public_token_response.item_id,
access_token: public_token_response.access_token,
plaid_region: region
)
plaid_item.sync_later
plaid_item
end
def get_link_token(webhooks_url:, redirect_url:, accountable_type: nil, region: :us, access_token: nil)
return nil unless plaid_us || plaid_eu
provider = plaid_provider_for_region(region)
provider.get_link_token(
user_id: self.id,
webhooks_url: webhooks_url,
redirect_url: redirect_url,
accountable_type: accountable_type,
access_token: access_token
).link_token
end
private
def plaid_us
@plaid ||= Provider::Registry.get_provider(:plaid_us)
end
def plaid_eu
@plaid_eu ||= Provider::Registry.get_provider(:plaid_eu)
end
def plaid_provider_for_region(region)
region.to_sym == :eu ? plaid_eu : plaid_us
end
end

View file

@ -72,10 +72,9 @@ module Family::Subscribeable
(1 - days_left_in_trial.to_f / Subscription::TRIAL_DAYS) * 100
end
private
def sync_trial_status!
if subscription&.status == "trialing" && days_left_in_trial < 0
subscription.update!(status: "paused")
end
def sync_trial_status!
if subscription&.status == "trialing" && days_left_in_trial < 0
subscription.update!(status: "paused")
end
end
end

View file

@ -0,0 +1,21 @@
class Family::SyncCompleteEvent
attr_reader :family
def initialize(family)
@family = family
end
def broadcast
family.broadcast_replace(
target: "balance-sheet",
partial: "pages/dashboard/balance_sheet",
locals: { balance_sheet: family.balance_sheet }
)
family.broadcast_replace(
target: "net-worth-chart",
partial: "pages/dashboard/net_worth_chart",
locals: { balance_sheet: family.balance_sheet, period: Period.last_30_days }
)
end
end

View file

@ -0,0 +1,31 @@
class Family::Syncer
attr_reader :family
def initialize(family)
@family = family
end
def perform_sync(sync)
# We don't rely on this value to guard the app, but keep it eventually consistent
family.sync_trial_status!
Rails.logger.info("Applying rules for family #{family.id}")
family.rules.each do |rule|
rule.apply_later
end
# Schedule child syncs
child_syncables.each do |syncable|
syncable.sync_later(parent_sync: sync, window_start_date: sync.window_start_date, window_end_date: sync.window_end_date)
end
end
def perform_post_sync
family.auto_match_transfers!
end
private
def child_syncables
family.plaid_items + family.accounts.manual
end
end

View file

@ -1,62 +0,0 @@
class Holding::BaseCalculator
attr_reader :account
def initialize(account)
@account = account
end
def calculate
Rails.logger.tagged(self.class.name) do
holdings = calculate_holdings
Holding.gapfill(holdings)
end
end
private
def portfolio_cache
@portfolio_cache ||= Holding::PortfolioCache.new(account)
end
def empty_portfolio
securities = portfolio_cache.get_securities
securities.each_with_object({}) { |security, hash| hash[security.id] = 0 }
end
def generate_starting_portfolio
empty_portfolio
end
def transform_portfolio(previous_portfolio, trade_entries, direction: :forward)
new_quantities = previous_portfolio.dup
trade_entries.each do |trade_entry|
trade = trade_entry.entryable
security_id = trade.security_id
qty_change = trade.qty
qty_change = qty_change * -1 if direction == :reverse
new_quantities[security_id] = (new_quantities[security_id] || 0) + qty_change
end
new_quantities
end
def build_holdings(portfolio, date, price_source: nil)
portfolio.map do |security_id, qty|
price = portfolio_cache.get_price(security_id, date, source: price_source)
if price.nil?
next
end
Holding.new(
account_id: account.id,
security_id: security_id,
date: date,
qty: qty,
price: price.price,
currency: price.currency,
amount: qty * price.price
)
end.compact
end
end

View file

@ -1,10 +1,12 @@
class Holding::ForwardCalculator < Holding::BaseCalculator
private
def portfolio_cache
@portfolio_cache ||= Holding::PortfolioCache.new(account)
end
class Holding::ForwardCalculator
attr_reader :account
def calculate_holdings
def initialize(account)
@account = account
end
def calculate
Rails.logger.tagged("Holding::ForwardCalculator") do
current_portfolio = generate_starting_portfolio
next_portfolio = {}
holdings = []
@ -16,6 +18,55 @@ class Holding::ForwardCalculator < Holding::BaseCalculator
current_portfolio = next_portfolio
end
holdings
Holding.gapfill(holdings)
end
end
private
def portfolio_cache
@portfolio_cache ||= Holding::PortfolioCache.new(account)
end
def empty_portfolio
securities = portfolio_cache.get_securities
securities.each_with_object({}) { |security, hash| hash[security.id] = 0 }
end
def generate_starting_portfolio
empty_portfolio
end
def transform_portfolio(previous_portfolio, trade_entries, direction: :forward)
new_quantities = previous_portfolio.dup
trade_entries.each do |trade_entry|
trade = trade_entry.entryable
security_id = trade.security_id
qty_change = trade.qty
qty_change = qty_change * -1 if direction == :reverse
new_quantities[security_id] = (new_quantities[security_id] || 0) + qty_change
end
new_quantities
end
def build_holdings(portfolio, date, price_source: nil)
portfolio.map do |security_id, qty|
price = portfolio_cache.get_price(security_id, date, source: price_source)
if price.nil?
next
end
Holding.new(
account_id: account.id,
security_id: security_id,
date: date,
qty: qty,
price: price.price,
currency: price.currency,
amount: qty * price.price
)
end.compact
end
end

View file

@ -83,9 +83,6 @@ class Holding::PortfolioCache
securities.each do |security|
Rails.logger.info "Loading security: ID=#{security.id} Ticker=#{security.ticker}"
# Load prices from provider to DB
security.sync_provider_prices(start_date: account.start_date)
# High priority prices from DB (synced from provider)
db_prices = security.prices.where(date: account.start_date..Date.current).map do |price|
PriceWithPriority.new(

View file

@ -1,4 +1,17 @@
class Holding::ReverseCalculator < Holding::BaseCalculator
class Holding::ReverseCalculator
attr_reader :account
def initialize(account)
@account = account
end
def calculate
Rails.logger.tagged("Holding::ReverseCalculator") do
holdings = calculate_holdings
Holding.gapfill(holdings)
end
end
private
# Reverse calculators will use the existing holdings as a source of security ids and prices
# since it is common for a provider to supply "current day" holdings but not all the historical
@ -25,6 +38,11 @@ class Holding::ReverseCalculator < Holding::BaseCalculator
holdings
end
def empty_portfolio
securities = portfolio_cache.get_securities
securities.each_with_object({}) { |security, hash| hash[security.id] = 0 }
end
# Since this is a reverse sync, we start with today's holdings
def generate_starting_portfolio
holding_quantities = empty_portfolio
@ -37,4 +55,38 @@ class Holding::ReverseCalculator < Holding::BaseCalculator
holding_quantities
end
def transform_portfolio(previous_portfolio, trade_entries, direction: :forward)
new_quantities = previous_portfolio.dup
trade_entries.each do |trade_entry|
trade = trade_entry.entryable
security_id = trade.security_id
qty_change = trade.qty
qty_change = qty_change * -1 if direction == :reverse
new_quantities[security_id] = (new_quantities[security_id] || 0) + qty_change
end
new_quantities
end
def build_holdings(portfolio, date, price_source: nil)
portfolio.map do |security_id, qty|
price = portfolio_cache.get_price(security_id, date, source: price_source)
if price.nil?
next
end
Holding.new(
account_id: account.id,
security_id: security_id,
date: date,
qty: qty,
price: price.price,
currency: price.currency,
amount: qty * price.price
)
end.compact
end
end

View file

@ -62,7 +62,7 @@ class Import < ApplicationRecord
def publish
import!
family.sync
family.sync_later
update! status: :complete
rescue => error

View file

@ -0,0 +1,196 @@
class MarketDataSyncer
DEFAULT_HISTORY_DAYS = 30
RATE_PROVIDER_NAME = :synth
PRICE_PROVIDER_NAME = :synth
MissingExchangeRateError = Class.new(StandardError)
InvalidExchangeRateDataError = Class.new(StandardError)
MissingSecurityPriceError = Class.new(StandardError)
InvalidSecurityPriceDataError = Class.new(StandardError)
class << self
def for(family: nil, account: nil)
new(family: family, account: account)
end
end
# Syncer can optionally be scoped. Otherwise, it syncs all user data
def initialize(family: nil, account: nil)
@family = family
@account = account
end
def sync_all(full_history: false)
sync_exchange_rates(full_history: full_history)
sync_prices(full_history: full_history)
end
def sync_exchange_rates(full_history: false)
unless rate_provider
Rails.logger.warn("No rate provider configured for MarketDataSyncer.sync_exchange_rates, skipping sync")
return
end
# Finds distinct currency pairs
entry_pairs = entries_scope.joins(:account)
.where.not("entries.currency = accounts.currency")
.select("entries.currency as source, accounts.currency as target")
.distinct
# All accounts in currency not equal to the family currency require exchange rates to show a normalized historical graph
account_pairs = accounts_scope.joins(:family)
.where.not("families.currency = accounts.currency")
.select("accounts.currency as source, families.currency as target")
.distinct
pairs = (entry_pairs + account_pairs).uniq
pairs.each do |pair|
sync_exchange_rate(from: pair.source, to: pair.target, full_history: full_history)
end
end
def sync_prices(full_history: false)
unless price_provider
Rails.logger.warn("No price provider configured for MarketDataSyncer.sync_prices, skipping sync")
nil
end
securities_scope.each do |security|
sync_security_price(security: security, full_history: full_history)
end
end
private
attr_reader :family, :account
def accounts_scope
return Account.where(id: account.id) if account
return family.accounts if family
Account.all
end
def entries_scope
account&.entries || family&.entries || Entry.all
end
def securities_scope
if account
account.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil })
elsif family
family.trades.joins(:security).where.not(securities: { exchange_operating_mic: nil })
else
Security.where.not(exchange_operating_mic: nil)
end
end
def sync_security_price(security:, full_history:)
start_date = full_history ? find_oldest_required_price(security: security) : default_start_date
Rails.logger.info("Syncing security price for: #{security.ticker}, start_date: #{start_date}, end_date: #{end_date}")
fetched_prices = price_provider.fetch_security_prices(
security,
start_date: start_date,
end_date: end_date
)
unless fetched_prices.success?
error = MissingSecurityPriceError.new(
"#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}"
)
Rails.logger.warn(error.message)
Sentry.capture_exception(error, level: :warning)
return
end
prices_for_upsert = fetched_prices.data.map do |price|
if price.security.nil? || price.date.nil? || price.price.nil? || price.currency.nil?
error = InvalidSecurityPriceDataError.new(
"#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}"
)
Rails.logger.warn(error.message)
Sentry.capture_exception(error, level: :warning)
next
end
{
security_id: price.security.id,
date: price.date,
price: price.price,
currency: price.currency
}
end.compact
Security::Price.upsert_all(
prices_for_upsert,
unique_by: %i[security_id date currency]
)
end
def sync_exchange_rate(from:, to:, full_history:)
start_date = full_history ? find_oldest_required_rate(from_currency: from) : default_start_date
Rails.logger.info("Syncing exchange rate from: #{from}, to: #{to}, start_date: #{start_date}, end_date: #{end_date}")
fetched_rates = rate_provider.fetch_exchange_rates(
from: from,
to: to,
start_date: start_date,
end_date: end_date
)
unless fetched_rates.success?
message = "#{RATE_PROVIDER_NAME} could not fetch exchange rate pair from: #{from} to: #{to} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_rates.error.message}"
Rails.logger.warn(message)
Sentry.capture_exception(MissingExchangeRateError.new(message))
return
end
rates_for_upsert = fetched_rates.data.map do |rate|
if rate.from.nil? || rate.to.nil? || rate.date.nil? || rate.rate.nil?
message = "#{RATE_PROVIDER_NAME} returned invalid rate data for pair from: #{from} to: #{to} on: #{rate.date}. Rate data: #{rate.inspect}"
Rails.logger.warn(message)
Sentry.capture_exception(InvalidExchangeRateDataError.new(message))
next
end
{
from_currency: rate.from,
to_currency: rate.to,
date: rate.date,
rate: rate.rate
}
end.compact
ExchangeRate.upsert_all(
rates_for_upsert,
unique_by: %i[from_currency to_currency date]
)
end
def rate_provider
Provider::Registry.for_concept(:exchange_rates).get_provider(RATE_PROVIDER_NAME)
end
def price_provider
Provider::Registry.for_concept(:securities).get_provider(PRICE_PROVIDER_NAME)
end
def find_oldest_required_rate(from_currency:)
entries_scope.where(currency: from_currency).minimum(:date) || default_start_date
end
def default_start_date
DEFAULT_HISTORY_DAYS.days.ago.to_date
end
# Since we're querying market data from a US-based API, end date should always be today (EST)
def end_date
Date.current.in_time_zone("America/New_York").to_date
end
end

View file

@ -1,5 +1,5 @@
class PlaidItem < ApplicationRecord
include Provided, Syncable
include Syncable
enum :plaid_region, { us: "us", eu: "eu" }
enum :status, { good: "good", requires_update: "requires_update" }, default: :good
@ -22,39 +22,6 @@ class PlaidItem < ApplicationRecord
scope :ordered, -> { order(created_at: :desc) }
scope :needs_update, -> { where(status: :requires_update) }
class << self
def create_from_public_token(token, item_name:, region:)
response = plaid_provider_for_region(region).exchange_public_token(token)
new_plaid_item = create!(
name: item_name,
plaid_id: response.item_id,
access_token: response.access_token,
plaid_region: region
)
new_plaid_item.sync_later
end
end
def sync_data(sync, start_date: nil)
begin
Rails.logger.info("Fetching and loading Plaid data")
fetch_and_load_plaid_data(sync)
update!(status: :good) if requires_update?
# Schedule account syncs
accounts.each do |account|
account.sync_later(start_date: start_date, parent_sync: sync)
end
Rails.logger.info("Plaid data fetched and loaded")
rescue Plaid::ApiError => e
handle_plaid_error(e)
raise e
end
end
def get_update_link_token(webhooks_url:, redirect_url:)
begin
family.get_link_token(
@ -76,9 +43,8 @@ class PlaidItem < ApplicationRecord
end
end
def post_sync(sync)
auto_match_categories!
family.broadcast_refresh
def build_category_alias_matcher(user_categories)
Provider::Plaid::CategoryAliasMatcher.new(user_categories)
end
def destroy_later
@ -86,6 +52,14 @@ class PlaidItem < ApplicationRecord
DestroyJob.perform_later(self)
end
def syncing?
Sync.joins("LEFT JOIN accounts a ON a.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
.joins("LEFT JOIN plaid_accounts pa ON pa.id = a.plaid_account_id")
.where("syncs.syncable_id = ? OR pa.plaid_item_id = ?", id, id)
.incomplete
.exists?
end
def auto_match_categories!
if family.categories.none?
family.categories.bootstrap!
@ -117,123 +91,11 @@ class PlaidItem < ApplicationRecord
end
private
def fetch_and_load_plaid_data(sync)
data = {}
# Log what we're about to fetch
Rails.logger.info "Starting Plaid data fetch (accounts, transactions, investments, liabilities)"
item = plaid_provider.get_item(access_token).item
update!(available_products: item.available_products, billed_products: item.billed_products)
# Institution details
if item.institution_id.present?
begin
Rails.logger.info "Fetching Plaid institution details for #{item.institution_id}"
institution = plaid_provider.get_institution(item.institution_id)
update!(
institution_id: item.institution_id,
institution_url: institution.institution.url,
institution_color: institution.institution.primary_color
)
rescue Plaid::ApiError => e
Rails.logger.warn "Failed to fetch Plaid institution details: #{e.message}"
end
end
# Accounts
fetched_accounts = plaid_provider.get_item_accounts(self).accounts
data[:accounts] = fetched_accounts || []
sync.update!(data: data)
Rails.logger.info "Processing Plaid accounts (count: #{fetched_accounts.size})"
internal_plaid_accounts = fetched_accounts.map do |account|
internal_plaid_account = plaid_accounts.find_or_create_from_plaid_data!(account, family)
internal_plaid_account.sync_account_data!(account)
internal_plaid_account
end
# Transactions
fetched_transactions = safe_fetch_plaid_data(:get_item_transactions)
data[:transactions] = fetched_transactions || []
sync.update!(data: data)
if fetched_transactions
Rails.logger.info "Processing Plaid transactions (added: #{fetched_transactions.added.size}, modified: #{fetched_transactions.modified.size}, removed: #{fetched_transactions.removed.size})"
transaction do
internal_plaid_accounts.each do |internal_plaid_account|
added = fetched_transactions.added.select { |t| t.account_id == internal_plaid_account.plaid_id }
modified = fetched_transactions.modified.select { |t| t.account_id == internal_plaid_account.plaid_id }
removed = fetched_transactions.removed.select { |t| t.account_id == internal_plaid_account.plaid_id }
internal_plaid_account.sync_transactions!(added:, modified:, removed:)
end
update!(next_cursor: fetched_transactions.cursor)
end
end
# Investments
fetched_investments = safe_fetch_plaid_data(:get_item_investments)
data[:investments] = fetched_investments || []
sync.update!(data: data)
if fetched_investments
Rails.logger.info "Processing Plaid investments (transactions: #{fetched_investments.transactions.size}, holdings: #{fetched_investments.holdings.size}, securities: #{fetched_investments.securities.size})"
transaction do
internal_plaid_accounts.each do |internal_plaid_account|
transactions = fetched_investments.transactions.select { |t| t.account_id == internal_plaid_account.plaid_id }
holdings = fetched_investments.holdings.select { |h| h.account_id == internal_plaid_account.plaid_id }
securities = fetched_investments.securities
internal_plaid_account.sync_investments!(transactions:, holdings:, securities:)
end
end
end
# Liabilities
fetched_liabilities = safe_fetch_plaid_data(:get_item_liabilities)
data[:liabilities] = fetched_liabilities || []
sync.update!(data: data)
if fetched_liabilities
Rails.logger.info "Processing Plaid liabilities (credit: #{fetched_liabilities.credit&.size || 0}, mortgage: #{fetched_liabilities.mortgage&.size || 0}, student: #{fetched_liabilities.student&.size || 0})"
transaction do
internal_plaid_accounts.each do |internal_plaid_account|
credit = fetched_liabilities.credit&.find { |l| l.account_id == internal_plaid_account.plaid_id }
mortgage = fetched_liabilities.mortgage&.find { |l| l.account_id == internal_plaid_account.plaid_id }
student = fetched_liabilities.student&.find { |l| l.account_id == internal_plaid_account.plaid_id }
internal_plaid_account.sync_credit_data!(credit) if credit
internal_plaid_account.sync_mortgage_data!(mortgage) if mortgage
internal_plaid_account.sync_student_loan_data!(student) if student
end
end
end
end
def safe_fetch_plaid_data(method)
begin
plaid_provider.send(method, self)
rescue Plaid::ApiError => e
Rails.logger.warn("Error fetching #{method} for item #{id}: #{e.message}")
nil
end
end
def remove_plaid_item
plaid_provider.remove_item(access_token)
rescue StandardError => e
Rails.logger.warn("Failed to remove Plaid item #{id}: #{e.message}")
end
def handle_plaid_error(error)
error_body = JSON.parse(error.response_body)
if error_body["error_code"] == "ITEM_LOGIN_REQUIRED"
update!(status: :requires_update)
end
end
class PlaidConnectionLostError < StandardError; end
end

View file

@ -1,30 +0,0 @@
module PlaidItem::Provided
extend ActiveSupport::Concern
class_methods do
def plaid_us_provider
Provider::Registry.get_provider(:plaid_us)
end
def plaid_eu_provider
Provider::Registry.get_provider(:plaid_eu)
end
def plaid_provider_for_region(region)
region.to_sym == :eu ? plaid_eu_provider : plaid_us_provider
end
end
def build_category_alias_matcher(user_categories)
Provider::Plaid::CategoryAliasMatcher.new(user_categories)
end
private
def eu?
raise "eu? is not implemented for #{self.class.name}"
end
def plaid_provider
eu? ? self.class.plaid_eu_provider : self.class.plaid_us_provider
end
end

View file

@ -0,0 +1,22 @@
class PlaidItem::SyncCompleteEvent
attr_reader :plaid_item
def initialize(plaid_item)
@plaid_item = plaid_item
end
def broadcast
plaid_item.accounts.each do |account|
account.broadcast_sync_complete
end
plaid_item.broadcast_replace_to(
plaid_item.family,
target: "plaid_item_#{plaid_item.id}",
partial: "plaid_items/plaid_item",
locals: { plaid_item: plaid_item }
)
plaid_item.family.broadcast_sync_complete
end
end

View file

@ -0,0 +1,149 @@
class PlaidItem::Syncer
attr_reader :plaid_item
def initialize(plaid_item)
@plaid_item = plaid_item
end
def perform_sync(sync)
begin
Rails.logger.info("Fetching and loading Plaid data")
fetch_and_load_plaid_data
plaid_item.update!(status: :good) if plaid_item.requires_update?
plaid_item.accounts.each do |account|
account.sync_later(parent_sync: sync, window_start_date: sync.window_start_date, window_end_date: sync.window_end_date)
end
Rails.logger.info("Plaid data fetched and loaded")
rescue Plaid::ApiError => e
handle_plaid_error(e)
raise e
end
end
def perform_post_sync
plaid_item.auto_match_categories!
end
private
def plaid
plaid_item.plaid_region == "eu" ? plaid_eu : plaid_us
end
def plaid_eu
@plaid_eu ||= Provider::Registry.get_provider(:plaid_eu)
end
def plaid_us
@plaid_us ||= Provider::Registry.get_provider(:plaid_us)
end
def safe_fetch_plaid_data(method)
begin
plaid.send(method, plaid_item)
rescue Plaid::ApiError => e
Rails.logger.warn("Error fetching #{method} for item #{plaid_item.id}: #{e.message}")
nil
end
end
def handle_plaid_error(error)
error_body = JSON.parse(error.response_body)
if error_body["error_code"] == "ITEM_LOGIN_REQUIRED"
plaid_item.update!(status: :requires_update)
end
end
def fetch_and_load_plaid_data
data = {}
# Log what we're about to fetch
Rails.logger.info "Starting Plaid data fetch (accounts, transactions, investments, liabilities)"
item = plaid.get_item(plaid_item.access_token).item
plaid_item.update!(available_products: item.available_products, billed_products: item.billed_products)
# Institution details
if item.institution_id.present?
begin
Rails.logger.info "Fetching Plaid institution details for #{item.institution_id}"
institution = plaid.get_institution(item.institution_id)
plaid_item.update!(
institution_id: item.institution_id,
institution_url: institution.institution.url,
institution_color: institution.institution.primary_color
)
rescue Plaid::ApiError => e
Rails.logger.warn "Failed to fetch Plaid institution details: #{e.message}"
end
end
# Accounts
fetched_accounts = plaid.get_item_accounts(plaid_item).accounts
data[:accounts] = fetched_accounts || []
Rails.logger.info "Processing Plaid accounts (count: #{fetched_accounts.size})"
internal_plaid_accounts = fetched_accounts.map do |account|
internal_plaid_account = plaid_item.plaid_accounts.find_or_create_from_plaid_data!(account, plaid_item.family)
internal_plaid_account.sync_account_data!(account)
internal_plaid_account
end
# Transactions
fetched_transactions = safe_fetch_plaid_data(:get_item_transactions)
data[:transactions] = fetched_transactions || []
if fetched_transactions
Rails.logger.info "Processing Plaid transactions (added: #{fetched_transactions.added.size}, modified: #{fetched_transactions.modified.size}, removed: #{fetched_transactions.removed.size})"
PlaidItem.transaction do
internal_plaid_accounts.each do |internal_plaid_account|
added = fetched_transactions.added.select { |t| t.account_id == internal_plaid_account.plaid_id }
modified = fetched_transactions.modified.select { |t| t.account_id == internal_plaid_account.plaid_id }
removed = fetched_transactions.removed.select { |t| t.account_id == internal_plaid_account.plaid_id }
internal_plaid_account.sync_transactions!(added:, modified:, removed:)
end
plaid_item.update!(next_cursor: fetched_transactions.cursor)
end
end
# Investments
fetched_investments = safe_fetch_plaid_data(:get_item_investments)
data[:investments] = fetched_investments || []
if fetched_investments
Rails.logger.info "Processing Plaid investments (transactions: #{fetched_investments.transactions.size}, holdings: #{fetched_investments.holdings.size}, securities: #{fetched_investments.securities.size})"
PlaidItem.transaction do
internal_plaid_accounts.each do |internal_plaid_account|
transactions = fetched_investments.transactions.select { |t| t.account_id == internal_plaid_account.plaid_id }
holdings = fetched_investments.holdings.select { |h| h.account_id == internal_plaid_account.plaid_id }
securities = fetched_investments.securities
internal_plaid_account.sync_investments!(transactions:, holdings:, securities:)
end
end
end
# Liabilities
fetched_liabilities = safe_fetch_plaid_data(:get_item_liabilities)
data[:liabilities] = fetched_liabilities || []
if fetched_liabilities
Rails.logger.info "Processing Plaid liabilities (credit: #{fetched_liabilities.credit&.size || 0}, mortgage: #{fetched_liabilities.mortgage&.size || 0}, student: #{fetched_liabilities.student&.size || 0})"
PlaidItem.transaction do
internal_plaid_accounts.each do |internal_plaid_account|
credit = fetched_liabilities.credit&.find { |l| l.account_id == internal_plaid_account.plaid_id }
mortgage = fetched_liabilities.mortgage&.find { |l| l.account_id == internal_plaid_account.plaid_id }
student = fetched_liabilities.student&.find { |l| l.account_id == internal_plaid_account.plaid_id }
internal_plaid_account.sync_credit_data!(credit) if credit
internal_plaid_account.sync_mortgage_data!(mortgage) if mortgage
internal_plaid_account.sync_student_loan_data!(student) if student
end
end
end
end
end

View file

@ -36,8 +36,6 @@ class Provider
default_error_transformer(error)
end
Sentry.capture_exception(transformed_error)
Response.new(
success?: false,
data: nil,

View file

@ -28,44 +28,6 @@ module Security::Provided
end
end
def sync_provider_prices(start_date:, end_date: Date.current)
unless has_prices?
Rails.logger.warn("Security id=#{id} ticker=#{ticker} is not known by provider, skipping price sync")
return 0
end
unless provider.present?
Rails.logger.warn("No security provider configured, cannot sync prices for id=#{id} ticker=#{ticker}")
return 0
end
response = provider.fetch_security_prices(self, start_date: start_date, end_date: end_date)
unless response.success?
Rails.logger.error("Provider error for sync_provider_prices with id=#{id} ticker=#{ticker}: #{response.error}")
return 0
end
fetched_prices = response.data.map do |price|
{
security_id: price.security.id,
date: price.date,
price: price.price,
currency: price.currency
}
end
valid_prices = fetched_prices.reject do |price|
is_invalid = price[:date].nil? || price[:price].nil? || price[:currency].nil?
if is_invalid
Rails.logger.warn("Invalid price data for security_id=#{id}: Missing required fields in price record: #{price.inspect}")
end
is_invalid
end
Security::Price.upsert_all(valid_prices, unique_by: %i[security_id date currency])
end
def find_or_fetch_price(date: Date.current, cache: true)
price = prices.find_by(date: date)

View file

@ -9,4 +9,14 @@ class Session < ApplicationRecord
self.user_agent = Current.user_agent
self.ip_address = Current.ip_address
end
def get_preferred_tab(tab_key)
data.dig("tab_preferences", tab_key)
end
def set_preferred_tab(tab_key, tab_value)
data["tab_preferences"] ||= {}
data["tab_preferences"][tab_key] = tab_value
save!
end
end

View file

@ -17,6 +17,7 @@ class Subscription < ApplicationRecord
validates :stripe_id, presence: true, if: :active?
validates :trial_ends_at, presence: true, if: :trialing?
validates :family_id, uniqueness: true
class << self
def new_trial_ends_at

View file

@ -1,4 +1,6 @@
class Sync < ApplicationRecord
include AASM
Error = Class.new(StandardError)
belongs_to :syncable, polymorphic: true
@ -6,12 +8,31 @@ class Sync < ApplicationRecord
belongs_to :parent, class_name: "Sync", optional: true
has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy
enum :status, { pending: "pending", syncing: "syncing", completed: "completed", failed: "failed" }
scope :ordered, -> { order(created_at: :desc) }
scope :incomplete, -> { where(status: [ :pending, :syncing ]) }
def child?
parent_id.present?
validate :window_valid
# Sync state machine
aasm column: :status, timestamps: true do
state :pending, initial: true
state :syncing
state :completed
state :failed
after_all_transitions :log_status_change
event :start, after_commit: :report_warnings do
transitions from: :pending, to: :syncing
end
event :complete do
transitions from: :syncing, to: :completed
end
event :fail do
transitions from: :syncing, to: :failed
end
end
def perform
@ -19,43 +40,83 @@ class Sync < ApplicationRecord
start!
begin
syncable.sync_data(self, start_date: start_date)
complete!
Rails.logger.info("Sync completed, starting post-sync")
syncable.post_sync(self)
Rails.logger.info("Post-sync completed")
rescue StandardError => error
fail! error, report_error: true
syncable.perform_sync(self)
rescue => e
fail!
update(error: e.message)
report_error(e)
ensure
finalize_if_all_children_finalized
end
end
end
private
def start!
Rails.logger.info("Starting sync")
update! status: :syncing
end
# Finalizes the current sync AND parent (if it exists)
def finalize_if_all_children_finalized
Sync.transaction do
lock!
def complete!
Rails.logger.info("Sync completed")
update! status: :completed, last_ran_at: Time.current
end
# If this is the "parent" and there are still children running, don't finalize.
return unless all_children_finalized?
def fail!(error, report_error: false)
Rails.logger.error("Sync failed: #{error.message}")
if report_error
Sentry.capture_exception(error) do |scope|
scope.set_context("sync", { id: id, syncable_type: syncable_type, syncable_id: syncable_id })
scope.set_tags(sync_id: id)
if syncing?
if has_failed_children?
fail!
else
complete!
end
end
update!(
status: :failed,
error: error.message,
last_ran_at: Time.current
)
# If we make it here, the sync is finalized. Run post-sync, regardless of failure/success.
perform_post_sync
end
# If this sync has a parent, try to finalize it so the child status propagates up the chain.
parent&.finalize_if_all_children_finalized
end
private
def log_status_change
Rails.logger.info("changing from #{aasm.from_state} to #{aasm.to_state} (event: #{aasm.current_event})")
end
def has_failed_children?
children.failed.any?
end
def all_children_finalized?
children.incomplete.empty?
end
def perform_post_sync
Rails.logger.info("Performing post-sync for #{syncable_type} (#{syncable.id})")
syncable.perform_post_sync
syncable.broadcast_sync_complete
rescue => e
Rails.logger.error("Error performing post-sync for #{syncable_type} (#{syncable.id}): #{e.message}")
report_error(e)
end
def report_error(error)
Sentry.capture_exception(error) do |scope|
scope.set_tags(sync_id: id)
end
end
def report_warnings
todays_sync_count = syncable.syncs.where(created_at: Date.current.all_day).count
if todays_sync_count > 10
Sentry.capture_exception(
Error.new("#{syncable_type} (#{syncable.id}) has exceeded 10 syncs today (count: #{todays_sync_count})"),
level: :warning
)
end
end
def window_valid
if window_start_date && window_end_date && window_start_date > window_end_date
errors.add(:window_end_date, "must be greater than window_start_date")
end
end
end