mirror of
https://github.com/maybe-finance/maybe.git
synced 2025-08-08 15:05:22 +02:00
Handle stale syncs
This commit is contained in:
parent
10f255a9a9
commit
894fda9659
8 changed files with 73 additions and 6 deletions
7
app/jobs/sync_cleaner_job.rb
Normal file
7
app/jobs/sync_cleaner_job.rb
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
class SyncCleanerJob < ApplicationJob
|
||||||
|
queue_as :scheduled
|
||||||
|
|
||||||
|
def perform
|
||||||
|
Sync.clean
|
||||||
|
end
|
||||||
|
end
|
|
@ -62,7 +62,7 @@ class Account < ApplicationRecord
|
||||||
end
|
end
|
||||||
|
|
||||||
def syncing?
|
def syncing?
|
||||||
self_syncing = syncs.incomplete.any?
|
self_syncing = syncs.visible.any?
|
||||||
|
|
||||||
# Since Plaid Items sync as a "group", if the item is syncing, even if the account
|
# Since Plaid Items sync as a "group", if the item is syncing, even if the account
|
||||||
# sync hasn't yet started (i.e. we're still fetching the Plaid data), show it as syncing in UI.
|
# sync hasn't yet started (i.e. we're still fetching the Plaid data), show it as syncing in UI.
|
||||||
|
|
|
@ -10,8 +10,19 @@ module Syncable
|
||||||
end
|
end
|
||||||
|
|
||||||
def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil)
|
def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil)
|
||||||
new_sync = syncs.create!(parent: parent_sync, window_start_date: window_start_date, window_end_date: window_end_date)
|
Sync.transaction do
|
||||||
SyncJob.perform_later(new_sync)
|
# Expire any previous in-flight syncs for this record that exceeded the
|
||||||
|
# global staleness window.
|
||||||
|
syncs.stale_candidates.find_each(&:mark_stale!)
|
||||||
|
|
||||||
|
new_sync = syncs.create!(
|
||||||
|
parent: parent_sync,
|
||||||
|
window_start_date: window_start_date,
|
||||||
|
window_end_date: window_end_date
|
||||||
|
)
|
||||||
|
|
||||||
|
SyncJob.perform_later(new_sync)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform_sync(sync)
|
def perform_sync(sync)
|
||||||
|
|
|
@ -40,7 +40,7 @@ class Family < ApplicationRecord
|
||||||
Sync.joins("LEFT JOIN plaid_items ON plaid_items.id = syncs.syncable_id AND syncs.syncable_type = 'PlaidItem'")
|
Sync.joins("LEFT JOIN plaid_items ON plaid_items.id = syncs.syncable_id AND syncs.syncable_type = 'PlaidItem'")
|
||||||
.joins("LEFT JOIN accounts ON accounts.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
|
.joins("LEFT JOIN accounts ON accounts.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
|
||||||
.where("syncs.syncable_id = ? OR accounts.family_id = ? OR plaid_items.family_id = ?", id, id, id)
|
.where("syncs.syncable_id = ? OR accounts.family_id = ? OR plaid_items.family_id = ?", id, id, id)
|
||||||
.incomplete
|
.visible
|
||||||
.exists?
|
.exists?
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ class PlaidItem < ApplicationRecord
|
||||||
Sync.joins("LEFT JOIN accounts a ON a.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
|
Sync.joins("LEFT JOIN accounts a ON a.id = syncs.syncable_id AND syncs.syncable_type = 'Account'")
|
||||||
.joins("LEFT JOIN plaid_accounts pa ON pa.id = a.plaid_account_id")
|
.joins("LEFT JOIN plaid_accounts pa ON pa.id = a.plaid_account_id")
|
||||||
.where("syncs.syncable_id = ? OR pa.plaid_item_id = ?", id, id)
|
.where("syncs.syncable_id = ? OR pa.plaid_item_id = ?", id, id)
|
||||||
.incomplete
|
.visible
|
||||||
.exists?
|
.exists?
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,11 @@
|
||||||
class Sync < ApplicationRecord
|
class Sync < ApplicationRecord
|
||||||
|
# We run a cron that marks any syncs that have been running > 2 hours as "stale"
|
||||||
|
# Syncs often become stale when new code is deployed and the worker restarts
|
||||||
|
STALE_AFTER = 2.hours
|
||||||
|
|
||||||
|
# The max time that a sync will show in the UI (after 5 minutes)
|
||||||
|
VISIBLE_FOR = 5.minutes
|
||||||
|
|
||||||
include AASM
|
include AASM
|
||||||
|
|
||||||
Error = Class.new(StandardError)
|
Error = Class.new(StandardError)
|
||||||
|
@ -9,7 +16,10 @@ class Sync < ApplicationRecord
|
||||||
has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy
|
has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy
|
||||||
|
|
||||||
scope :ordered, -> { order(created_at: :desc) }
|
scope :ordered, -> { order(created_at: :desc) }
|
||||||
scope :incomplete, -> { where(status: [ :pending, :syncing ]) }
|
scope :incomplete, -> { where("syncs.status IN (?)", [ :pending, :syncing ]) }
|
||||||
|
scope :visible, -> { incomplete.where("syncs.created_at > ?", VISIBLE_FOR.ago) }
|
||||||
|
# In-flight records that have exceeded the allowed runtime
|
||||||
|
scope :stale_candidates, -> { incomplete.where("syncs.created_at < ?", STALE_AFTER.ago) }
|
||||||
|
|
||||||
validate :window_valid
|
validate :window_valid
|
||||||
|
|
||||||
|
@ -19,6 +29,7 @@ class Sync < ApplicationRecord
|
||||||
state :syncing
|
state :syncing
|
||||||
state :completed
|
state :completed
|
||||||
state :failed
|
state :failed
|
||||||
|
state :stale
|
||||||
|
|
||||||
after_all_transitions :log_status_change
|
after_all_transitions :log_status_change
|
||||||
|
|
||||||
|
@ -33,6 +44,17 @@ class Sync < ApplicationRecord
|
||||||
event :fail do
|
event :fail do
|
||||||
transitions from: :syncing, to: :failed
|
transitions from: :syncing, to: :failed
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Marks a sync that never completed within the expected time window
|
||||||
|
event :mark_stale do
|
||||||
|
transitions from: %i[pending syncing], to: :stale
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class << self
|
||||||
|
def clean
|
||||||
|
stale_candidates.find_each(&:mark_stale!)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform
|
def perform
|
||||||
|
|
|
@ -6,3 +6,9 @@ import_market_data:
|
||||||
args:
|
args:
|
||||||
mode: "full"
|
mode: "full"
|
||||||
clear_cache: false
|
clear_cache: false
|
||||||
|
|
||||||
|
clean_syncs:
|
||||||
|
cron: "0 * * * *" # every hour
|
||||||
|
class: "SyncCleanerJob"
|
||||||
|
queue: "scheduled"
|
||||||
|
description: "Cleans up stale syncs"
|
||||||
|
|
|
@ -167,4 +167,25 @@ class SyncTest < ActiveSupport::TestCase
|
||||||
assert_equal "failed", family_sync.reload.status
|
assert_equal "failed", family_sync.reload.status
|
||||||
assert_equal "completed", account_sync.reload.status
|
assert_equal "completed", account_sync.reload.status
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "clean marks stale incomplete rows" do
|
||||||
|
stale_pending = Sync.create!(
|
||||||
|
syncable: accounts(:depository),
|
||||||
|
status: :pending,
|
||||||
|
created_at: 3.hours.ago
|
||||||
|
)
|
||||||
|
|
||||||
|
stale_syncing = Sync.create!(
|
||||||
|
syncable: accounts(:depository),
|
||||||
|
status: :syncing,
|
||||||
|
created_at: 3.hours.ago,
|
||||||
|
pending_at: 3.hours.ago,
|
||||||
|
syncing_at: 2.hours.ago
|
||||||
|
)
|
||||||
|
|
||||||
|
Sync.clean
|
||||||
|
|
||||||
|
assert_equal "stale", stale_pending.reload.status
|
||||||
|
assert_equal "stale", stale_syncing.reload.status
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue