1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-07-19 05:09:38 +02:00

Handle stale syncs (#2257)
Some checks are pending
Publish Docker image / ci (push) Waiting to run
Publish Docker image / Build docker image (push) Blocked by required conditions

* Handle stale syncs

* Use `visible` sync logic in sidebar groups
This commit is contained in:
Zach Gollwitzer 2025-05-17 18:28:21 -04:00 committed by GitHub
parent 10f255a9a9
commit 9f13b5bb83
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 79 additions and 7 deletions

View file

@ -0,0 +1,7 @@
class SyncCleanerJob < ApplicationJob
queue_as :scheduled
def perform
Sync.clean
end
end

View file

@ -62,7 +62,7 @@ class Account < ApplicationRecord
end end
def syncing? def syncing?
self_syncing = syncs.incomplete.any? self_syncing = syncs.visible.any?
# Since Plaid Items sync as a "group", if the item is syncing, even if the account # Since Plaid Items sync as a "group", if the item is syncing, even if the account
# sync hasn't yet started (i.e. we're still fetching the Plaid data), show it as syncing in UI. # sync hasn't yet started (i.e. we're still fetching the Plaid data), show it as syncing in UI.

View file

@ -107,7 +107,11 @@ class BalanceSheet
def totals_query def totals_query
@totals_query ||= active_accounts @totals_query ||= active_accounts
.joins(ActiveRecord::Base.sanitize_sql_array([ "LEFT JOIN exchange_rates ON exchange_rates.date = CURRENT_DATE AND accounts.currency = exchange_rates.from_currency AND exchange_rates.to_currency = ?", currency ])) .joins(ActiveRecord::Base.sanitize_sql_array([ "LEFT JOIN exchange_rates ON exchange_rates.date = CURRENT_DATE AND accounts.currency = exchange_rates.from_currency AND exchange_rates.to_currency = ?", currency ]))
.joins("LEFT JOIN syncs ON syncs.syncable_id = accounts.id AND syncs.syncable_type = 'Account' AND (syncs.status = 'pending' OR syncs.status = 'syncing')") .joins(ActiveRecord::Base.sanitize_sql_array([
"LEFT JOIN syncs ON syncs.syncable_id = accounts.id AND syncs.syncable_type = 'Account' AND syncs.status IN (?) AND syncs.created_at > ?",
%w[pending syncing],
Sync::VISIBLE_FOR.ago
]))
.select( .select(
"accounts.*", "accounts.*",
"SUM(accounts.balance * COALESCE(exchange_rates.rate, 1)) as converted_balance", "SUM(accounts.balance * COALESCE(exchange_rates.rate, 1)) as converted_balance",

View file

@ -10,8 +10,19 @@ module Syncable
end end
def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil) def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil)
new_sync = syncs.create!(parent: parent_sync, window_start_date: window_start_date, window_end_date: window_end_date) Sync.transaction do
SyncJob.perform_later(new_sync) # Expire any previous in-flight syncs for this record that exceeded the
# global staleness window.
syncs.stale_candidates.find_each(&:mark_stale!)
new_sync = syncs.create!(
parent: parent_sync,
window_start_date: window_start_date,
window_end_date: window_end_date
)
SyncJob.perform_later(new_sync)
end
end end
def perform_sync(sync) def perform_sync(sync)

View file

@ -40,7 +40,7 @@ class Family < ApplicationRecord
Sync.joins("LEFT JOIN plaid_items ON plaid_items.id = syncs.syncable_id AND syncs.syncable_type = 'PlaidItem'") Sync.joins("LEFT JOIN plaid_items ON plaid_items.id = syncs.syncable_id AND syncs.syncable_type = 'PlaidItem'")
.joins("LEFT JOIN accounts ON accounts.id = syncs.syncable_id AND syncs.syncable_type = 'Account'") .joins("LEFT JOIN accounts ON accounts.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
.where("syncs.syncable_id = ? OR accounts.family_id = ? OR plaid_items.family_id = ?", id, id, id) .where("syncs.syncable_id = ? OR accounts.family_id = ? OR plaid_items.family_id = ?", id, id, id)
.incomplete .visible
.exists? .exists?
end end

View file

@ -56,7 +56,7 @@ class PlaidItem < ApplicationRecord
Sync.joins("LEFT JOIN accounts a ON a.id = syncs.syncable_id AND syncs.syncable_type = 'Account'") Sync.joins("LEFT JOIN accounts a ON a.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
.joins("LEFT JOIN plaid_accounts pa ON pa.id = a.plaid_account_id") .joins("LEFT JOIN plaid_accounts pa ON pa.id = a.plaid_account_id")
.where("syncs.syncable_id = ? OR pa.plaid_item_id = ?", id, id) .where("syncs.syncable_id = ? OR pa.plaid_item_id = ?", id, id)
.incomplete .visible
.exists? .exists?
end end

View file

@ -1,4 +1,11 @@
class Sync < ApplicationRecord class Sync < ApplicationRecord
# We run a cron that marks any syncs that have been running > 2 hours as "stale"
# Syncs often become stale when new code is deployed and the worker restarts
STALE_AFTER = 2.hours
# The max time that a sync will show in the UI (after 5 minutes)
VISIBLE_FOR = 5.minutes
include AASM include AASM
Error = Class.new(StandardError) Error = Class.new(StandardError)
@ -9,7 +16,11 @@ class Sync < ApplicationRecord
has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy
scope :ordered, -> { order(created_at: :desc) } scope :ordered, -> { order(created_at: :desc) }
scope :incomplete, -> { where(status: [ :pending, :syncing ]) } scope :incomplete, -> { where("syncs.status IN (?)", %w[pending syncing]) }
scope :visible, -> { incomplete.where("syncs.created_at > ?", VISIBLE_FOR.ago) }
# In-flight records that have exceeded the allowed runtime
scope :stale_candidates, -> { incomplete.where("syncs.created_at < ?", STALE_AFTER.ago) }
validate :window_valid validate :window_valid
@ -19,6 +30,7 @@ class Sync < ApplicationRecord
state :syncing state :syncing
state :completed state :completed
state :failed state :failed
state :stale
after_all_transitions :log_status_change after_all_transitions :log_status_change
@ -33,6 +45,17 @@ class Sync < ApplicationRecord
event :fail do event :fail do
transitions from: :syncing, to: :failed transitions from: :syncing, to: :failed
end end
# Marks a sync that never completed within the expected time window
event :mark_stale do
transitions from: %i[pending syncing], to: :stale
end
end
class << self
def clean
stale_candidates.find_each(&:mark_stale!)
end
end end
def perform def perform

View file

@ -6,3 +6,9 @@ import_market_data:
args: args:
mode: "full" mode: "full"
clear_cache: false clear_cache: false
clean_syncs:
cron: "0 * * * *" # every hour
class: "SyncCleanerJob"
queue: "scheduled"
description: "Cleans up stale syncs"

View file

@ -167,4 +167,25 @@ class SyncTest < ActiveSupport::TestCase
assert_equal "failed", family_sync.reload.status assert_equal "failed", family_sync.reload.status
assert_equal "completed", account_sync.reload.status assert_equal "completed", account_sync.reload.status
end end
test "clean marks stale incomplete rows" do
stale_pending = Sync.create!(
syncable: accounts(:depository),
status: :pending,
created_at: 3.hours.ago
)
stale_syncing = Sync.create!(
syncable: accounts(:depository),
status: :syncing,
created_at: 3.hours.ago,
pending_at: 3.hours.ago,
syncing_at: 2.hours.ago
)
Sync.clean
assert_equal "stale", stale_pending.reload.status
assert_equal "stale", stale_syncing.reload.status
end
end end