mirror of
https://github.com/maybe-finance/maybe.git
synced 2025-08-05 05:25:24 +02:00
Plaid sync domain improvements (#2267)
Breaks our Plaid sync process out into more manageable classes. Notably, this moves the sync process to a distinct, 2-step flow: 1. Import stage - we first make API calls and import Plaid data to "mirror" tables 2. Processing stage - read the raw data, apply business rules, build internal domain models and sync balances This provides several benefits: - Plaid syncs can now be "replayed" without fetching API data again - Mirror tables provide better audit and debugging capabilities - Eliminates the "all or nothing" sync behavior that is currently in place, which is brittle
This commit is contained in:
parent
5c82af0e8c
commit
03a146222d
72 changed files with 3763 additions and 706 deletions
79
app/models/plaid_item/accounts_snapshot.rb
Normal file
79
app/models/plaid_item/accounts_snapshot.rb
Normal file
|
@ -0,0 +1,79 @@
|
|||
# All Plaid data is fetched at the item-level. This class is a simple wrapper that
|
||||
# providers a convenience method, get_account_data which scopes the item-level payload
|
||||
# to each Plaid Account
|
||||
class PlaidItem::AccountsSnapshot
|
||||
def initialize(plaid_item, plaid_provider:)
|
||||
@plaid_item = plaid_item
|
||||
@plaid_provider = plaid_provider
|
||||
end
|
||||
|
||||
def accounts
|
||||
@accounts ||= plaid_provider.get_item_accounts(plaid_item.access_token).accounts
|
||||
end
|
||||
|
||||
def get_account_data(account_id)
|
||||
AccountData.new(
|
||||
account_data: accounts.find { |a| a.account_id == account_id },
|
||||
transactions_data: account_scoped_transactions_data(account_id),
|
||||
investments_data: account_scoped_investments_data(account_id),
|
||||
liabilities_data: account_scoped_liabilities_data(account_id)
|
||||
)
|
||||
end
|
||||
|
||||
private
|
||||
attr_reader :plaid_item, :plaid_provider
|
||||
|
||||
TransactionsData = Data.define(:added, :modified, :removed)
|
||||
LiabilitiesData = Data.define(:credit, :mortgage, :student)
|
||||
InvestmentsData = Data.define(:transactions, :holdings, :securities)
|
||||
AccountData = Data.define(:account_data, :transactions_data, :investments_data, :liabilities_data)
|
||||
|
||||
def account_scoped_transactions_data(account_id)
|
||||
return nil unless transactions_data
|
||||
|
||||
TransactionsData.new(
|
||||
added: transactions_data.added.select { |t| t.account_id == account_id },
|
||||
modified: transactions_data.modified.select { |t| t.account_id == account_id },
|
||||
removed: transactions_data.removed.select { |t| t.account_id == account_id }
|
||||
)
|
||||
end
|
||||
|
||||
def account_scoped_investments_data(account_id)
|
||||
return nil unless investments_data
|
||||
|
||||
transactions = investments_data.transactions.select { |t| t.account_id == account_id }
|
||||
holdings = investments_data.holdings.select { |h| h.account_id == account_id }
|
||||
securities = transactions.count > 0 && holdings.count > 0 ? investments_data.securities : []
|
||||
|
||||
InvestmentsData.new(
|
||||
transactions: transactions,
|
||||
holdings: holdings,
|
||||
securities: securities
|
||||
)
|
||||
end
|
||||
|
||||
def account_scoped_liabilities_data(account_id)
|
||||
return nil unless liabilities_data
|
||||
|
||||
LiabilitiesData.new(
|
||||
credit: liabilities_data.credit&.find { |c| c.account_id == account_id },
|
||||
mortgage: liabilities_data.mortgage&.find { |m| m.account_id == account_id },
|
||||
student: liabilities_data.student&.find { |s| s.account_id == account_id }
|
||||
)
|
||||
end
|
||||
|
||||
def transactions_data
|
||||
return nil unless plaid_item.supports_product?("transactions")
|
||||
@transactions_data ||= plaid_provider.get_transactions(plaid_item.access_token)
|
||||
end
|
||||
|
||||
def investments_data
|
||||
return nil unless plaid_item.supports_product?("investments")
|
||||
@investments_data ||= plaid_provider.get_item_investments(plaid_item.access_token)
|
||||
end
|
||||
|
||||
def liabilities_data
|
||||
return nil unless plaid_item.supports_product?("liabilities")
|
||||
@liabilities_data ||= plaid_provider.get_item_liabilities(plaid_item.access_token)
|
||||
end
|
||||
end
|
53
app/models/plaid_item/importer.rb
Normal file
53
app/models/plaid_item/importer.rb
Normal file
|
@ -0,0 +1,53 @@
|
|||
class PlaidItem::Importer
|
||||
def initialize(plaid_item, plaid_provider:)
|
||||
@plaid_item = plaid_item
|
||||
@plaid_provider = plaid_provider
|
||||
end
|
||||
|
||||
def import
|
||||
fetch_and_import_item_data
|
||||
fetch_and_import_accounts_data
|
||||
rescue Plaid::ApiError => e
|
||||
handle_plaid_error(e)
|
||||
end
|
||||
|
||||
private
|
||||
attr_reader :plaid_item, :plaid_provider
|
||||
|
||||
# All errors that should halt the import should be re-raised after handling
|
||||
# These errors will propagate up to the Sync record and mark it as failed.
|
||||
def handle_plaid_error(error)
|
||||
error_body = JSON.parse(error.response_body)
|
||||
|
||||
case error_body["error_code"]
|
||||
when "ITEM_LOGIN_REQUIRED"
|
||||
plaid_item.update!(status: :requires_update)
|
||||
raise error
|
||||
else
|
||||
raise error
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_and_import_item_data
|
||||
item_data = plaid_provider.get_item(plaid_item.access_token).item
|
||||
institution_data = plaid_provider.get_institution(item_data.institution_id).institution
|
||||
|
||||
plaid_item.upsert_plaid_snapshot!(item_data)
|
||||
plaid_item.upsert_plaid_institution_snapshot!(institution_data)
|
||||
end
|
||||
|
||||
def fetch_and_import_accounts_data
|
||||
snapshot = PlaidItem::AccountsSnapshot.new(plaid_item, plaid_provider: plaid_provider)
|
||||
|
||||
snapshot.accounts.each do |raw_account|
|
||||
plaid_account = plaid_item.plaid_accounts.find_or_initialize_by(
|
||||
plaid_id: raw_account.account_id
|
||||
)
|
||||
|
||||
PlaidAccount::Importer.new(
|
||||
plaid_account,
|
||||
account_snapshot: snapshot.get_account_data(raw_account.account_id)
|
||||
).import
|
||||
end
|
||||
end
|
||||
end
|
7
app/models/plaid_item/provided.rb
Normal file
7
app/models/plaid_item/provided.rb
Normal file
|
@ -0,0 +1,7 @@
|
|||
module PlaidItem::Provided
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
def plaid_provider
|
||||
@plaid_provider ||= Provider::Registry.plaid_provider_for_region(self.plaid_region)
|
||||
end
|
||||
end
|
|
@ -6,144 +6,21 @@ class PlaidItem::Syncer
|
|||
end
|
||||
|
||||
def perform_sync(sync)
|
||||
begin
|
||||
Rails.logger.info("Fetching and loading Plaid data")
|
||||
fetch_and_load_plaid_data
|
||||
plaid_item.update!(status: :good) if plaid_item.requires_update?
|
||||
# Loads item metadata, accounts, transactions, and other data to our DB
|
||||
plaid_item.import_latest_plaid_data
|
||||
|
||||
plaid_item.accounts.each do |account|
|
||||
account.sync_later(parent_sync: sync, window_start_date: sync.window_start_date, window_end_date: sync.window_end_date)
|
||||
end
|
||||
# Processes the raw Plaid data and updates internal domain objects
|
||||
plaid_item.process_accounts
|
||||
|
||||
Rails.logger.info("Plaid data fetched and loaded")
|
||||
rescue Plaid::ApiError => e
|
||||
handle_plaid_error(e)
|
||||
raise e
|
||||
end
|
||||
# All data is synced, so we can now run an account sync to calculate historical balances and more
|
||||
plaid_item.schedule_account_syncs(
|
||||
parent_sync: sync,
|
||||
window_start_date: sync.window_start_date,
|
||||
window_end_date: sync.window_end_date
|
||||
)
|
||||
end
|
||||
|
||||
def perform_post_sync
|
||||
plaid_item.auto_match_categories!
|
||||
# no-op
|
||||
end
|
||||
|
||||
private
|
||||
def plaid
|
||||
plaid_item.plaid_region == "eu" ? plaid_eu : plaid_us
|
||||
end
|
||||
|
||||
def plaid_eu
|
||||
@plaid_eu ||= Provider::Registry.get_provider(:plaid_eu)
|
||||
end
|
||||
|
||||
def plaid_us
|
||||
@plaid_us ||= Provider::Registry.get_provider(:plaid_us)
|
||||
end
|
||||
|
||||
def safe_fetch_plaid_data(method)
|
||||
begin
|
||||
plaid.send(method, plaid_item)
|
||||
rescue Plaid::ApiError => e
|
||||
Rails.logger.warn("Error fetching #{method} for item #{plaid_item.id}: #{e.message}")
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
def handle_plaid_error(error)
|
||||
error_body = JSON.parse(error.response_body)
|
||||
|
||||
if error_body["error_code"] == "ITEM_LOGIN_REQUIRED"
|
||||
plaid_item.update!(status: :requires_update)
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_and_load_plaid_data
|
||||
data = {}
|
||||
|
||||
# Log what we're about to fetch
|
||||
Rails.logger.info "Starting Plaid data fetch (accounts, transactions, investments, liabilities)"
|
||||
|
||||
item = plaid.get_item(plaid_item.access_token).item
|
||||
plaid_item.update!(available_products: item.available_products, billed_products: item.billed_products)
|
||||
|
||||
# Institution details
|
||||
if item.institution_id.present?
|
||||
begin
|
||||
Rails.logger.info "Fetching Plaid institution details for #{item.institution_id}"
|
||||
institution = plaid.get_institution(item.institution_id)
|
||||
plaid_item.update!(
|
||||
institution_id: item.institution_id,
|
||||
institution_url: institution.institution.url,
|
||||
institution_color: institution.institution.primary_color
|
||||
)
|
||||
rescue Plaid::ApiError => e
|
||||
Rails.logger.warn "Failed to fetch Plaid institution details: #{e.message}"
|
||||
end
|
||||
end
|
||||
|
||||
# Accounts
|
||||
fetched_accounts = plaid.get_item_accounts(plaid_item).accounts
|
||||
data[:accounts] = fetched_accounts || []
|
||||
Rails.logger.info "Processing Plaid accounts (count: #{fetched_accounts.size})"
|
||||
|
||||
internal_plaid_accounts = fetched_accounts.map do |account|
|
||||
internal_plaid_account = plaid_item.plaid_accounts.find_or_create_from_plaid_data!(account, plaid_item.family)
|
||||
internal_plaid_account.sync_account_data!(account)
|
||||
internal_plaid_account
|
||||
end
|
||||
|
||||
# Transactions
|
||||
fetched_transactions = safe_fetch_plaid_data(:get_item_transactions)
|
||||
data[:transactions] = fetched_transactions || []
|
||||
|
||||
if fetched_transactions
|
||||
Rails.logger.info "Processing Plaid transactions (added: #{fetched_transactions.added.size}, modified: #{fetched_transactions.modified.size}, removed: #{fetched_transactions.removed.size})"
|
||||
PlaidItem.transaction do
|
||||
internal_plaid_accounts.each do |internal_plaid_account|
|
||||
added = fetched_transactions.added.select { |t| t.account_id == internal_plaid_account.plaid_id }
|
||||
modified = fetched_transactions.modified.select { |t| t.account_id == internal_plaid_account.plaid_id }
|
||||
removed = fetched_transactions.removed.select { |t| t.account_id == internal_plaid_account.plaid_id }
|
||||
|
||||
internal_plaid_account.sync_transactions!(added:, modified:, removed:)
|
||||
end
|
||||
|
||||
plaid_item.update!(next_cursor: fetched_transactions.cursor)
|
||||
end
|
||||
end
|
||||
|
||||
# Investments
|
||||
fetched_investments = safe_fetch_plaid_data(:get_item_investments)
|
||||
data[:investments] = fetched_investments || []
|
||||
|
||||
if fetched_investments
|
||||
Rails.logger.info "Processing Plaid investments (transactions: #{fetched_investments.transactions.size}, holdings: #{fetched_investments.holdings.size}, securities: #{fetched_investments.securities.size})"
|
||||
PlaidItem.transaction do
|
||||
internal_plaid_accounts.each do |internal_plaid_account|
|
||||
transactions = fetched_investments.transactions.select { |t| t.account_id == internal_plaid_account.plaid_id }
|
||||
holdings = fetched_investments.holdings.select { |h| h.account_id == internal_plaid_account.plaid_id }
|
||||
securities = fetched_investments.securities
|
||||
|
||||
internal_plaid_account.sync_investments!(transactions:, holdings:, securities:)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Liabilities
|
||||
fetched_liabilities = safe_fetch_plaid_data(:get_item_liabilities)
|
||||
data[:liabilities] = fetched_liabilities || []
|
||||
|
||||
if fetched_liabilities
|
||||
Rails.logger.info "Processing Plaid liabilities (credit: #{fetched_liabilities.credit&.size || 0}, mortgage: #{fetched_liabilities.mortgage&.size || 0}, student: #{fetched_liabilities.student&.size || 0})"
|
||||
PlaidItem.transaction do
|
||||
internal_plaid_accounts.each do |internal_plaid_account|
|
||||
credit = fetched_liabilities.credit&.find { |l| l.account_id == internal_plaid_account.plaid_id }
|
||||
mortgage = fetched_liabilities.mortgage&.find { |l| l.account_id == internal_plaid_account.plaid_id }
|
||||
student = fetched_liabilities.student&.find { |l| l.account_id == internal_plaid_account.plaid_id }
|
||||
|
||||
internal_plaid_account.sync_credit_data!(credit) if credit
|
||||
internal_plaid_account.sync_mortgage_data!(mortgage) if mortgage
|
||||
internal_plaid_account.sync_student_loan_data!(student) if student
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue