diff --git a/app/jobs/sync_cleaner_job.rb b/app/jobs/sync_cleaner_job.rb new file mode 100644 index 00000000..f5e42551 --- /dev/null +++ b/app/jobs/sync_cleaner_job.rb @@ -0,0 +1,7 @@ +class SyncCleanerJob < ApplicationJob + queue_as :scheduled + + def perform + Sync.clean + end +end diff --git a/app/models/account.rb b/app/models/account.rb index 13734071..4984fb89 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -62,7 +62,7 @@ class Account < ApplicationRecord end def syncing? - self_syncing = syncs.incomplete.any? + self_syncing = syncs.visible.any? # Since Plaid Items sync as a "group", if the item is syncing, even if the account # sync hasn't yet started (i.e. we're still fetching the Plaid data), show it as syncing in UI. diff --git a/app/models/balance_sheet.rb b/app/models/balance_sheet.rb index cc50e3da..a89a9859 100644 --- a/app/models/balance_sheet.rb +++ b/app/models/balance_sheet.rb @@ -107,7 +107,11 @@ class BalanceSheet def totals_query @totals_query ||= active_accounts .joins(ActiveRecord::Base.sanitize_sql_array([ "LEFT JOIN exchange_rates ON exchange_rates.date = CURRENT_DATE AND accounts.currency = exchange_rates.from_currency AND exchange_rates.to_currency = ?", currency ])) - .joins("LEFT JOIN syncs ON syncs.syncable_id = accounts.id AND syncs.syncable_type = 'Account' AND (syncs.status = 'pending' OR syncs.status = 'syncing')") + .joins(ActiveRecord::Base.sanitize_sql_array([ + "LEFT JOIN syncs ON syncs.syncable_id = accounts.id AND syncs.syncable_type = 'Account' AND syncs.status IN (?) AND syncs.created_at > ?", + %w[pending syncing], + Sync::VISIBLE_FOR.ago + ])) .select( "accounts.*", "SUM(accounts.balance * COALESCE(exchange_rates.rate, 1)) as converted_balance", diff --git a/app/models/concerns/syncable.rb b/app/models/concerns/syncable.rb index 8dfa8e41..9b5e09e4 100644 --- a/app/models/concerns/syncable.rb +++ b/app/models/concerns/syncable.rb @@ -10,8 +10,19 @@ module Syncable end def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil) - new_sync = syncs.create!(parent: parent_sync, window_start_date: window_start_date, window_end_date: window_end_date) - SyncJob.perform_later(new_sync) + Sync.transaction do + # Expire any previous in-flight syncs for this record that exceeded the + # global staleness window. + syncs.stale_candidates.find_each(&:mark_stale!) + + new_sync = syncs.create!( + parent: parent_sync, + window_start_date: window_start_date, + window_end_date: window_end_date + ) + + SyncJob.perform_later(new_sync) + end end def perform_sync(sync) diff --git a/app/models/family.rb b/app/models/family.rb index 0cfb8b89..cd068cae 100644 --- a/app/models/family.rb +++ b/app/models/family.rb @@ -40,7 +40,7 @@ class Family < ApplicationRecord Sync.joins("LEFT JOIN plaid_items ON plaid_items.id = syncs.syncable_id AND syncs.syncable_type = 'PlaidItem'") .joins("LEFT JOIN accounts ON accounts.id = syncs.syncable_id AND syncs.syncable_type = 'Account'") .where("syncs.syncable_id = ? OR accounts.family_id = ? OR plaid_items.family_id = ?", id, id, id) - .incomplete + .visible .exists? end diff --git a/app/models/plaid_item.rb b/app/models/plaid_item.rb index 4aae91ca..2ba10599 100644 --- a/app/models/plaid_item.rb +++ b/app/models/plaid_item.rb @@ -56,7 +56,7 @@ class PlaidItem < ApplicationRecord Sync.joins("LEFT JOIN accounts a ON a.id = syncs.syncable_id AND syncs.syncable_type = 'Account'") .joins("LEFT JOIN plaid_accounts pa ON pa.id = a.plaid_account_id") .where("syncs.syncable_id = ? OR pa.plaid_item_id = ?", id, id) - .incomplete + .visible .exists? end diff --git a/app/models/sync.rb b/app/models/sync.rb index f783bbc2..456a4b01 100644 --- a/app/models/sync.rb +++ b/app/models/sync.rb @@ -1,4 +1,11 @@ class Sync < ApplicationRecord + # We run a cron that marks any syncs that have been running > 2 hours as "stale" + # Syncs often become stale when new code is deployed and the worker restarts + STALE_AFTER = 2.hours + + # The max time that a sync will show in the UI (after 5 minutes) + VISIBLE_FOR = 5.minutes + include AASM Error = Class.new(StandardError) @@ -9,7 +16,11 @@ class Sync < ApplicationRecord has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy scope :ordered, -> { order(created_at: :desc) } - scope :incomplete, -> { where(status: [ :pending, :syncing ]) } + scope :incomplete, -> { where("syncs.status IN (?)", %w[pending syncing]) } + scope :visible, -> { incomplete.where("syncs.created_at > ?", VISIBLE_FOR.ago) } + + # In-flight records that have exceeded the allowed runtime + scope :stale_candidates, -> { incomplete.where("syncs.created_at < ?", STALE_AFTER.ago) } validate :window_valid @@ -19,6 +30,7 @@ class Sync < ApplicationRecord state :syncing state :completed state :failed + state :stale after_all_transitions :log_status_change @@ -33,6 +45,17 @@ class Sync < ApplicationRecord event :fail do transitions from: :syncing, to: :failed end + + # Marks a sync that never completed within the expected time window + event :mark_stale do + transitions from: %i[pending syncing], to: :stale + end + end + + class << self + def clean + stale_candidates.find_each(&:mark_stale!) + end end def perform diff --git a/config/schedule.yml b/config/schedule.yml index 28078e4d..75b709a6 100644 --- a/config/schedule.yml +++ b/config/schedule.yml @@ -6,3 +6,9 @@ import_market_data: args: mode: "full" clear_cache: false + +clean_syncs: + cron: "0 * * * *" # every hour + class: "SyncCleanerJob" + queue: "scheduled" + description: "Cleans up stale syncs" diff --git a/test/models/sync_test.rb b/test/models/sync_test.rb index cbab9ed3..12e410c6 100644 --- a/test/models/sync_test.rb +++ b/test/models/sync_test.rb @@ -167,4 +167,25 @@ class SyncTest < ActiveSupport::TestCase assert_equal "failed", family_sync.reload.status assert_equal "completed", account_sync.reload.status end + + test "clean marks stale incomplete rows" do + stale_pending = Sync.create!( + syncable: accounts(:depository), + status: :pending, + created_at: 3.hours.ago + ) + + stale_syncing = Sync.create!( + syncable: accounts(:depository), + status: :syncing, + created_at: 3.hours.ago, + pending_at: 3.hours.ago, + syncing_at: 2.hours.ago + ) + + Sync.clean + + assert_equal "stale", stale_pending.reload.status + assert_equal "stale", stale_syncing.reload.status + end end