mirror of
https://github.com/maybe-finance/maybe.git
synced 2025-08-09 07:25:19 +02:00
Improve sync window column names, add timestamps
This commit is contained in:
parent
7988ab64cc
commit
6a1d625a4f
15 changed files with 126 additions and 61 deletions
|
@ -5,7 +5,7 @@ class Account::Syncer
|
|||
@account = account
|
||||
end
|
||||
|
||||
def perform_sync(sync:, start_date: nil)
|
||||
def perform_sync(sync)
|
||||
Rails.logger.info("Processing balances (#{account.linked? ? 'reverse' : 'forward'})")
|
||||
sync_balances
|
||||
end
|
||||
|
|
|
@ -9,13 +9,13 @@ module Syncable
|
|||
syncs.incomplete.any?
|
||||
end
|
||||
|
||||
def sync_later(start_date: nil, parent_sync: nil)
|
||||
new_sync = syncs.create!(start_date: start_date, parent: parent_sync)
|
||||
def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil)
|
||||
new_sync = syncs.create!(parent: parent_sync, window_start_date: window_start_date, window_end_date: window_end_date)
|
||||
SyncJob.perform_later(new_sync)
|
||||
end
|
||||
|
||||
def perform_sync(sync:, start_date: nil)
|
||||
syncer.perform_sync(sync: sync, start_date: start_date)
|
||||
def perform_sync(sync)
|
||||
syncer.perform_sync(sync)
|
||||
end
|
||||
|
||||
def perform_post_sync
|
||||
|
@ -27,7 +27,7 @@ module Syncable
|
|||
end
|
||||
|
||||
def last_synced_at
|
||||
latest_sync&.last_ran_at
|
||||
latest_sync&.completed_at
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -45,7 +45,7 @@ class Entry < ApplicationRecord
|
|||
|
||||
def sync_account_later
|
||||
sync_start_date = [ date_previously_was, date ].compact.min unless destroyed?
|
||||
account.sync_later(start_date: sync_start_date)
|
||||
account.sync_later(window_start_date: sync_start_date)
|
||||
end
|
||||
|
||||
def entryable_name_short
|
||||
|
|
|
@ -5,7 +5,7 @@ class Family::Syncer
|
|||
@family = family
|
||||
end
|
||||
|
||||
def perform_sync(sync:, start_date: nil)
|
||||
def perform_sync(sync)
|
||||
# We don't rely on this value to guard the app, but keep it eventually consistent
|
||||
family.sync_trial_status!
|
||||
|
||||
|
@ -16,7 +16,7 @@ class Family::Syncer
|
|||
|
||||
# Schedule child syncs
|
||||
child_syncables.each do |syncable|
|
||||
syncable.sync_later(start_date: start_date, parent_sync: sync)
|
||||
syncable.sync_later(parent_sync: sync, window_start_date: sync.window_start_date, window_end_date: sync.window_end_date)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -91,17 +91,25 @@ class MarketDataSyncer
|
|||
)
|
||||
|
||||
unless fetched_prices.success?
|
||||
message = "#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}"
|
||||
Rails.logger.warn(message)
|
||||
Sentry.capture_exception(MissingSecurityPriceError.new(message))
|
||||
error = MissingSecurityPriceError.new(
|
||||
"#{PRICE_PROVIDER_NAME} could not fetch security price for: #{security.ticker} between: #{start_date} and: #{Date.current}. Provider error: #{fetched_prices.error.message}"
|
||||
)
|
||||
|
||||
Rails.logger.warn(error.message)
|
||||
Sentry.capture_exception(error, level: :warning)
|
||||
|
||||
return
|
||||
end
|
||||
|
||||
prices_for_upsert = fetched_prices.data.map do |price|
|
||||
if price.security.nil? || price.date.nil? || price.price.nil? || price.currency.nil?
|
||||
message = "#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}"
|
||||
Rails.logger.warn(message)
|
||||
Sentry.capture_exception(InvalidSecurityPriceDataError.new(message))
|
||||
error = InvalidSecurityPriceDataError.new(
|
||||
"#{PRICE_PROVIDER_NAME} returned invalid price data for security: #{security.ticker} on: #{price.date}. Price data: #{price.inspect}"
|
||||
)
|
||||
|
||||
Rails.logger.warn(error.message)
|
||||
Sentry.capture_exception(error, level: :warning)
|
||||
|
||||
next
|
||||
end
|
||||
|
||||
|
|
|
@ -5,14 +5,14 @@ class PlaidItem::Syncer
|
|||
@plaid_item = plaid_item
|
||||
end
|
||||
|
||||
def perform_sync(sync:, start_date: nil)
|
||||
def perform_sync(sync)
|
||||
begin
|
||||
Rails.logger.info("Fetching and loading Plaid data")
|
||||
fetch_and_load_plaid_data
|
||||
plaid_item.update!(status: :good) if plaid_item.requires_update?
|
||||
|
||||
plaid_item.accounts.each do |account|
|
||||
account.sync_later(start_date: start_date, parent_sync: sync)
|
||||
account.sync_later(parent_sync: sync, window_start_date: sync.window_start_date, window_end_date: sync.window_end_date)
|
||||
end
|
||||
|
||||
Rails.logger.info("Plaid data fetched and loaded")
|
||||
|
@ -42,9 +42,9 @@ class PlaidItem::Syncer
|
|||
|
||||
def safe_fetch_plaid_data(method)
|
||||
begin
|
||||
plaid.send(method, self)
|
||||
plaid.send(method, plaid_item)
|
||||
rescue Plaid::ApiError => e
|
||||
Rails.logger.warn("Error fetching #{method} for item #{id}: #{e.message}")
|
||||
Rails.logger.warn("Error fetching #{method} for item #{plaid_item.id}: #{e.message}")
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
@ -53,7 +53,7 @@ class PlaidItem::Syncer
|
|||
error_body = JSON.parse(error.response_body)
|
||||
|
||||
if error_body["error_code"] == "ITEM_LOGIN_REQUIRED"
|
||||
update!(status: :requires_update)
|
||||
plaid_item.update!(status: :requires_update)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -64,14 +64,14 @@ class PlaidItem::Syncer
|
|||
Rails.logger.info "Starting Plaid data fetch (accounts, transactions, investments, liabilities)"
|
||||
|
||||
item = plaid.get_item(plaid_item.access_token).item
|
||||
update!(available_products: item.available_products, billed_products: item.billed_products)
|
||||
plaid_item.update!(available_products: item.available_products, billed_products: item.billed_products)
|
||||
|
||||
# Institution details
|
||||
if item.institution_id.present?
|
||||
begin
|
||||
Rails.logger.info "Fetching Plaid institution details for #{item.institution_id}"
|
||||
institution = plaid.get_institution(item.institution_id)
|
||||
update!(
|
||||
plaid_item.update!(
|
||||
institution_id: item.institution_id,
|
||||
institution_url: institution.institution.url,
|
||||
institution_color: institution.institution.primary_color
|
||||
|
@ -98,7 +98,7 @@ class PlaidItem::Syncer
|
|||
|
||||
if fetched_transactions
|
||||
Rails.logger.info "Processing Plaid transactions (added: #{fetched_transactions.added.size}, modified: #{fetched_transactions.modified.size}, removed: #{fetched_transactions.removed.size})"
|
||||
transaction do
|
||||
PlaidItem.transaction do
|
||||
internal_plaid_accounts.each do |internal_plaid_account|
|
||||
added = fetched_transactions.added.select { |t| t.account_id == internal_plaid_account.plaid_id }
|
||||
modified = fetched_transactions.modified.select { |t| t.account_id == internal_plaid_account.plaid_id }
|
||||
|
@ -107,7 +107,7 @@ class PlaidItem::Syncer
|
|||
internal_plaid_account.sync_transactions!(added:, modified:, removed:)
|
||||
end
|
||||
|
||||
update!(next_cursor: fetched_transactions.cursor)
|
||||
plaid_item.update!(next_cursor: fetched_transactions.cursor)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -117,7 +117,7 @@ class PlaidItem::Syncer
|
|||
|
||||
if fetched_investments
|
||||
Rails.logger.info "Processing Plaid investments (transactions: #{fetched_investments.transactions.size}, holdings: #{fetched_investments.holdings.size}, securities: #{fetched_investments.securities.size})"
|
||||
transaction do
|
||||
PlaidItem.transaction do
|
||||
internal_plaid_accounts.each do |internal_plaid_account|
|
||||
transactions = fetched_investments.transactions.select { |t| t.account_id == internal_plaid_account.plaid_id }
|
||||
holdings = fetched_investments.holdings.select { |h| h.account_id == internal_plaid_account.plaid_id }
|
||||
|
@ -134,7 +134,7 @@ class PlaidItem::Syncer
|
|||
|
||||
if fetched_liabilities
|
||||
Rails.logger.info "Processing Plaid liabilities (credit: #{fetched_liabilities.credit&.size || 0}, mortgage: #{fetched_liabilities.mortgage&.size || 0}, student: #{fetched_liabilities.student&.size || 0})"
|
||||
transaction do
|
||||
PlaidItem.transaction do
|
||||
internal_plaid_accounts.each do |internal_plaid_account|
|
||||
credit = fetched_liabilities.credit&.find { |l| l.account_id == internal_plaid_account.plaid_id }
|
||||
mortgage = fetched_liabilities.mortgage&.find { |l| l.account_id == internal_plaid_account.plaid_id }
|
||||
|
|
|
@ -9,14 +9,16 @@ class Sync < ApplicationRecord
|
|||
scope :ordered, -> { order(created_at: :desc) }
|
||||
scope :incomplete, -> { where(status: [ :pending, :syncing ]) }
|
||||
|
||||
validate :window_valid
|
||||
|
||||
# Sync state machine
|
||||
aasm column: :status do
|
||||
aasm column: :status, timestamps: true do
|
||||
state :pending, initial: true
|
||||
state :syncing
|
||||
state :completed
|
||||
state :failed
|
||||
|
||||
event :start, after_commit: :handle_start do
|
||||
event :start do
|
||||
transitions from: :pending, to: :syncing
|
||||
end
|
||||
|
||||
|
@ -29,11 +31,11 @@ class Sync < ApplicationRecord
|
|||
end
|
||||
end
|
||||
|
||||
def perform(start_date: nil)
|
||||
def perform(window_start_date: nil, window_end_date: nil)
|
||||
start!
|
||||
|
||||
begin
|
||||
syncable.perform_sync(sync: self, start_date: start_date)
|
||||
syncable.perform_sync(self)
|
||||
attempt_finalization
|
||||
rescue => e
|
||||
fail!
|
||||
|
@ -81,7 +83,9 @@ class Sync < ApplicationRecord
|
|||
end
|
||||
end
|
||||
|
||||
def handle_start
|
||||
update!(last_ran_at: Time.current)
|
||||
def window_valid
|
||||
if window_start_date && window_end_date && window_start_date > window_end_date
|
||||
errors.add(:window_end_date, "must be greater than window_start_date")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
51
db/migrate/20250512171654_update_sync_timestamps.rb
Normal file
51
db/migrate/20250512171654_update_sync_timestamps.rb
Normal file
|
@ -0,0 +1,51 @@
|
|||
class UpdateSyncTimestamps < ActiveRecord::Migration[7.2]
|
||||
def change
|
||||
# Timestamps, managed by aasm
|
||||
add_column :syncs, :pending_at, :datetime
|
||||
add_column :syncs, :syncing_at, :datetime
|
||||
add_column :syncs, :completed_at, :datetime
|
||||
add_column :syncs, :failed_at, :datetime
|
||||
|
||||
add_column :syncs, :window_start_date, :date
|
||||
add_column :syncs, :window_end_date, :date
|
||||
|
||||
reversible do |dir|
|
||||
dir.up do
|
||||
execute <<-SQL
|
||||
UPDATE syncs
|
||||
SET
|
||||
completed_at = CASE
|
||||
WHEN status = 'completed' THEN last_ran_at
|
||||
ELSE NULL
|
||||
END,
|
||||
failed_at = CASE
|
||||
WHEN status = 'failed' THEN last_ran_at
|
||||
ELSE NULL
|
||||
END
|
||||
SQL
|
||||
|
||||
execute <<-SQL
|
||||
UPDATE syncs
|
||||
SET window_start_date = start_date
|
||||
SQL
|
||||
end
|
||||
|
||||
dir.down do
|
||||
execute <<-SQL
|
||||
UPDATE syncs
|
||||
SET
|
||||
last_ran_at = completed_at
|
||||
SQL
|
||||
|
||||
execute <<-SQL
|
||||
UPDATE syncs
|
||||
SET start_date = window_start_date
|
||||
SQL
|
||||
end
|
||||
end
|
||||
|
||||
remove_column :syncs, :start_date, :date
|
||||
remove_column :syncs, :last_ran_at, :datetime
|
||||
remove_column :syncs, :error_backtrace, :text, array: true
|
||||
end
|
||||
end
|
11
db/schema.rb
generated
11
db/schema.rb
generated
|
@ -10,7 +10,7 @@
|
|||
#
|
||||
# It's strongly recommended that you check this file into your version control system.
|
||||
|
||||
ActiveRecord::Schema[7.2].define(version: 2025_05_09_182903) do
|
||||
ActiveRecord::Schema[7.2].define(version: 2025_05_12_171654) do
|
||||
# These are extensions that must be enabled in order to support this database
|
||||
enable_extension "pgcrypto"
|
||||
enable_extension "plpgsql"
|
||||
|
@ -586,15 +586,18 @@ ActiveRecord::Schema[7.2].define(version: 2025_05_09_182903) do
|
|||
create_table "syncs", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
|
||||
t.string "syncable_type", null: false
|
||||
t.uuid "syncable_id", null: false
|
||||
t.datetime "last_ran_at"
|
||||
t.date "start_date"
|
||||
t.string "status", default: "pending"
|
||||
t.string "error"
|
||||
t.jsonb "data"
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.text "error_backtrace", array: true
|
||||
t.uuid "parent_id"
|
||||
t.datetime "pending_at"
|
||||
t.datetime "syncing_at"
|
||||
t.datetime "completed_at"
|
||||
t.datetime "failed_at"
|
||||
t.date "window_start_date"
|
||||
t.date "window_end_date"
|
||||
t.index ["parent_id"], name: "index_syncs_on_parent_id"
|
||||
t.index ["syncable_type", "syncable_id"], name: "index_syncs_on_syncable"
|
||||
end
|
||||
|
|
6
test/fixtures/syncs.yml
vendored
6
test/fixtures/syncs.yml
vendored
|
@ -1,17 +1,17 @@
|
|||
account:
|
||||
syncable_type: Account
|
||||
syncable: depository
|
||||
last_ran_at: <%= Time.now %>
|
||||
status: completed
|
||||
completed_at: <%= Time.now %>
|
||||
|
||||
plaid_item:
|
||||
syncable_type: PlaidItem
|
||||
syncable: one
|
||||
last_ran_at: <%= Time.now %>
|
||||
status: completed
|
||||
completed_at: <%= Time.now %>
|
||||
|
||||
family:
|
||||
syncable_type: Family
|
||||
syncable: dylan_family
|
||||
last_ran_at: <%= Time.now %>
|
||||
status: completed
|
||||
completed_at: <%= Time.now %>
|
||||
|
|
|
@ -7,14 +7,14 @@ module SyncableInterfaceTest
|
|||
test "can sync later" do
|
||||
assert_difference "@syncable.syncs.count", 1 do
|
||||
assert_enqueued_with job: SyncJob do
|
||||
@syncable.sync_later(start_date: 2.days.ago.to_date)
|
||||
@syncable.sync_later(window_start_date: 2.days.ago.to_date)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
test "can perform sync" do
|
||||
mock_sync = mock
|
||||
@syncable.class.any_instance.expects(:perform_sync).with(sync: mock_sync, start_date: 2.days.ago.to_date).once
|
||||
@syncable.perform_sync(sync: mock_sync, start_date: 2.days.ago.to_date)
|
||||
@syncable.class.any_instance.expects(:perform_sync).with(mock_sync).once
|
||||
@syncable.perform_sync(mock_sync)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,7 +4,7 @@ class SyncJobTest < ActiveJob::TestCase
|
|||
test "sync is performed" do
|
||||
syncable = accounts(:depository)
|
||||
|
||||
sync = syncable.syncs.create!(start_date: 2.days.ago.to_date)
|
||||
sync = syncable.syncs.create!(window_start_date: 2.days.ago.to_date)
|
||||
|
||||
sync.expects(:perform).once
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ class EntryTest < ActiveSupport::TestCase
|
|||
prior_date = @entry.date - 1
|
||||
@entry.update! date: prior_date
|
||||
|
||||
@entry.account.expects(:sync_later).with(start_date: prior_date)
|
||||
@entry.account.expects(:sync_later).with(window_start_date: prior_date)
|
||||
@entry.sync_account_later
|
||||
end
|
||||
|
||||
|
@ -38,14 +38,14 @@ class EntryTest < ActiveSupport::TestCase
|
|||
prior_date = @entry.date
|
||||
@entry.update! date: @entry.date + 1
|
||||
|
||||
@entry.account.expects(:sync_later).with(start_date: prior_date)
|
||||
@entry.account.expects(:sync_later).with(window_start_date: prior_date)
|
||||
@entry.sync_account_later
|
||||
end
|
||||
|
||||
test "triggers sync with correct start date when transaction deleted" do
|
||||
@entry.destroy!
|
||||
|
||||
@entry.account.expects(:sync_later).with(start_date: nil)
|
||||
@entry.account.expects(:sync_later).with(window_start_date: nil)
|
||||
@entry.sync_account_later
|
||||
end
|
||||
|
||||
|
|
|
@ -15,14 +15,16 @@ class Family::SyncerTest < ActiveSupport::TestCase
|
|||
|
||||
Account.any_instance
|
||||
.expects(:sync_later)
|
||||
.with(start_date: family_sync.start_date, parent_sync: family_sync)
|
||||
.with(parent_sync: family_sync, window_start_date: nil, window_end_date: nil)
|
||||
.times(manual_accounts_count)
|
||||
|
||||
PlaidItem.any_instance
|
||||
.expects(:sync_later)
|
||||
.with(start_date: family_sync.start_date, parent_sync: family_sync)
|
||||
.with(parent_sync: family_sync, window_start_date: nil, window_end_date: nil)
|
||||
.times(items_count)
|
||||
|
||||
syncer.perform_sync(sync: family_sync, start_date: family_sync.start_date)
|
||||
syncer.perform_sync(family_sync)
|
||||
|
||||
assert_equal "completed", family_sync.reload.status
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,32 +5,29 @@ class SyncTest < ActiveSupport::TestCase
|
|||
|
||||
test "runs successful sync" do
|
||||
syncable = accounts(:depository)
|
||||
sync = Sync.create!(syncable: syncable, last_ran_at: 1.day.ago)
|
||||
sync = Sync.create!(syncable: syncable)
|
||||
|
||||
syncable.expects(:perform_sync).with(sync: sync, start_date: sync.start_date).once
|
||||
syncable.expects(:perform_sync).with(sync).once
|
||||
|
||||
assert_equal "pending", sync.status
|
||||
|
||||
previously_ran_at = sync.last_ran_at
|
||||
|
||||
sync.perform
|
||||
|
||||
assert sync.last_ran_at > previously_ran_at
|
||||
assert sync.completed_at < Time.now
|
||||
assert_equal "completed", sync.status
|
||||
end
|
||||
|
||||
test "handles sync errors" do
|
||||
syncable = accounts(:depository)
|
||||
sync = Sync.create!(syncable: syncable, last_ran_at: 1.day.ago)
|
||||
sync = Sync.create!(syncable: syncable)
|
||||
|
||||
syncable.expects(:perform_sync).with(sync: sync, start_date: sync.start_date).raises(StandardError.new("test sync error"))
|
||||
syncable.expects(:perform_sync).with(sync).raises(StandardError.new("test sync error"))
|
||||
|
||||
assert_equal "pending", sync.status
|
||||
previously_ran_at = sync.last_ran_at
|
||||
|
||||
sync.perform
|
||||
|
||||
assert sync.last_ran_at > previously_ran_at
|
||||
assert sync.failed_at < Time.now
|
||||
assert_equal "failed", sync.status
|
||||
assert_equal "test sync error", sync.error
|
||||
end
|
||||
|
@ -48,20 +45,20 @@ class SyncTest < ActiveSupport::TestCase
|
|||
assert_equal "pending", plaid_item_sync.status
|
||||
assert_equal "pending", account_sync.status
|
||||
|
||||
family.expects(:perform_sync).with(sync: family_sync, start_date: family_sync.start_date).once
|
||||
family.expects(:perform_sync).with(family_sync).once
|
||||
|
||||
family_sync.perform
|
||||
|
||||
assert_equal "syncing", family_sync.reload.status
|
||||
|
||||
plaid_item.expects(:perform_sync).with(sync: plaid_item_sync, start_date: plaid_item_sync.start_date).once
|
||||
plaid_item.expects(:perform_sync).with(plaid_item_sync).once
|
||||
|
||||
plaid_item_sync.perform
|
||||
|
||||
assert_equal "syncing", family_sync.reload.status
|
||||
assert_equal "syncing", plaid_item_sync.reload.status
|
||||
|
||||
account.expects(:perform_sync).with(sync: account_sync, start_date: account_sync.start_date).once
|
||||
account.expects(:perform_sync).with(account_sync).once
|
||||
|
||||
# Since these are accessed through `parent`, they won't necessarily be the same
|
||||
# instance we configured above
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue