From 5c8bca31ec9ef8b1a9cecc1ab8a17fa88c68ab8c Mon Sep 17 00:00:00 2001 From: Zach Gollwitzer Date: Sun, 11 May 2025 15:02:46 -0400 Subject: [PATCH] Clean up sync class, add state machine --- Gemfile | 4 + Gemfile.lock | 7 ++ app/models/account/syncer.rb | 12 +- app/models/concerns/accountable.rb | 2 +- app/models/concerns/syncable.rb | 16 ++- app/models/family/syncer.rb | 16 ++- app/models/import.rb | 2 +- app/models/plaid_item/syncer.rb | 10 +- app/models/sync.rb | 134 +++++++++------------ test/interfaces/syncable_interface_test.rb | 10 +- test/models/family/syncer_test.rb | 13 +- test/models/sync_test.rb | 72 +++++------ test/test_helper.rb | 1 + 13 files changed, 147 insertions(+), 152 deletions(-) diff --git a/Gemfile b/Gemfile index 26b29fc9..2a32b38b 100644 --- a/Gemfile +++ b/Gemfile @@ -63,6 +63,10 @@ gem "rotp", "~> 6.3" gem "rqrcode", "~> 3.0" gem "activerecord-import" +# State machines +gem "aasm" +gem "after_commit_everywhere", "~> 1.0" + # AI gem "ruby-openai" diff --git a/Gemfile.lock b/Gemfile.lock index 224f8f9c..81c6ccb0 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -8,6 +8,8 @@ GIT GEM remote: https://rubygems.org/ specs: + aasm (5.5.0) + concurrent-ruby (~> 1.0) actioncable (7.2.2.1) actionpack (= 7.2.2.1) activesupport (= 7.2.2.1) @@ -83,6 +85,9 @@ GEM tzinfo (~> 2.0, >= 2.0.5) addressable (2.8.7) public_suffix (>= 2.0.2, < 7.0) + after_commit_everywhere (1.6.0) + activerecord (>= 4.2) + activesupport ast (2.4.3) aws-eventstream (1.3.2) aws-partitions (1.1093.0) @@ -561,7 +566,9 @@ PLATFORMS x86_64-linux-musl DEPENDENCIES + aasm activerecord-import + after_commit_everywhere (~> 1.0) aws-sdk-s3 (~> 1.177.0) bcrypt (~> 3.1) benchmark-ips diff --git a/app/models/account/syncer.rb b/app/models/account/syncer.rb index a1576aea..6b17b531 100644 --- a/app/models/account/syncer.rb +++ b/app/models/account/syncer.rb @@ -5,23 +5,15 @@ class Account::Syncer @account = account end - def child_syncables - [] - end - - def perform_sync(start_date: nil) + def perform_sync(sync:, start_date: nil) Rails.logger.info("Processing balances (#{account.linked? ? 'reverse' : 'forward'})") sync_balances end def perform_post_sync account.family.remove_syncing_notice! - - # account.accountable.post_sync(sync) - - # unless sync.child? + account.accountable.post_sync account.family.auto_match_transfers! - # end end private diff --git a/app/models/concerns/accountable.rb b/app/models/concerns/accountable.rb index 9d0ecb34..af9b04ab 100644 --- a/app/models/concerns/accountable.rb +++ b/app/models/concerns/accountable.rb @@ -68,7 +68,7 @@ module Accountable end end - def post_sync(sync) + def post_sync broadcast_replace_to( account, target: "chart_account_#{account.id}", diff --git a/app/models/concerns/syncable.rb b/app/models/concerns/syncable.rb index 43afed5f..2d21a601 100644 --- a/app/models/concerns/syncable.rb +++ b/app/models/concerns/syncable.rb @@ -6,7 +6,7 @@ module Syncable end def syncing? - syncs.where(status: [ :syncing, :pending ]).any? + syncs.incomplete.any? end def sync_later(start_date: nil, parent_sync: nil) @@ -14,8 +14,12 @@ module Syncable SyncJob.perform_later(new_sync) end - def sync(start_date: nil) - syncs.create!(start_date: start_date).perform + def perform_sync(sync:, start_date: nil) + syncer.perform_sync(sync: sync, start_date: start_date) + end + + def perform_post_sync + syncer.perform_post_sync end def sync_error @@ -28,6 +32,10 @@ module Syncable private def latest_sync - syncs.order(created_at: :desc).first + syncs.ordered.first + end + + def syncer + self.class::Syncer.new(self) end end diff --git a/app/models/family/syncer.rb b/app/models/family/syncer.rb index 3134a65c..58836341 100644 --- a/app/models/family/syncer.rb +++ b/app/models/family/syncer.rb @@ -5,11 +5,7 @@ class Family::Syncer @family = family end - def child_syncables - family.plaid_items + family.accounts.manual - end - - def perform_sync(start_date: nil) + def perform_sync(sync:, start_date: nil) # We don't rely on this value to guard the app, but keep it eventually consistent family.sync_trial_status! @@ -17,10 +13,20 @@ class Family::Syncer family.rules.each do |rule| rule.apply_later end + + # Schedule child syncs + child_syncables.each do |syncable| + syncable.sync_later(start_date: start_date, parent_sync: sync) + end end def perform_post_sync family.auto_match_transfers! family.broadcast_refresh end + + private + def child_syncables + family.plaid_items + family.accounts.manual + end end diff --git a/app/models/import.rb b/app/models/import.rb index 3ea68015..e96d1fc1 100644 --- a/app/models/import.rb +++ b/app/models/import.rb @@ -62,7 +62,7 @@ class Import < ApplicationRecord def publish import! - family.sync + family.sync_later update! status: :complete rescue => error diff --git a/app/models/plaid_item/syncer.rb b/app/models/plaid_item/syncer.rb index 031b586c..88cddeab 100644 --- a/app/models/plaid_item/syncer.rb +++ b/app/models/plaid_item/syncer.rb @@ -5,16 +5,16 @@ class PlaidItem::Syncer @plaid_item = plaid_item end - def child_syncables - plaid_item.accounts - end - - def perform_sync(start_date: nil) + def perform_sync(sync:, start_date: nil) begin Rails.logger.info("Fetching and loading Plaid data") fetch_and_load_plaid_data plaid_item.update!(status: :good) if plaid_item.requires_update? + plaid_item.accounts.each do |account| + account.sync_later(start_date: start_date, parent_sync: sync) + end + Rails.logger.info("Plaid data fetched and loaded") rescue Plaid::ApiError => e handle_plaid_error(e) diff --git a/app/models/sync.rb b/app/models/sync.rb index d3d9f225..bce26f85 100644 --- a/app/models/sync.rb +++ b/app/models/sync.rb @@ -1,111 +1,87 @@ class Sync < ApplicationRecord - Error = Class.new(StandardError) + include AASM belongs_to :syncable, polymorphic: true belongs_to :parent, class_name: "Sync", optional: true has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy - enum :status, { pending: "pending", syncing: "syncing", completed: "completed", failed: "failed" } - scope :ordered, -> { order(created_at: :desc) } + scope :incomplete, -> { where(status: [ :pending, :syncing ]) } - def child? - parent_id.present? - end + # Sync state machine + aasm column: :status do + state :pending, initial: true + state :syncing + state :completed + state :failed - def perform - Rails.logger.tagged("Sync", id, syncable_type, syncable_id) do - start! + event :start, after_commit: :handle_start do + transitions from: :pending, to: :syncing + end - begin - syncer.perform_sync(start_date: start_date) + event :complete, after_commit: :handle_finalization do + transitions from: :syncing, to: :completed + end - # Schedule child syncables to sync later - syncer.child_syncables.each do |child_syncable| - child_syncable.sync_later(start_date: start_date, parent_sync: self) - end - - unless has_pending_child_syncs? - complete! - Rails.logger.info("Sync completed, starting post-sync") - syncer.perform_post_sync - Rails.logger.info("Post-sync completed") - end - rescue StandardError => error - fail! error, report_error: true - ensure - notify_parent_of_completion! if has_parent? - end + event :fail, after_commit: :handle_finalization do + transitions from: :syncing, to: :failed end end - def handle_child_completion_event + def perform(start_date: nil) + start! + + begin + syncable.perform_sync(sync: self, start_date: start_date) + attempt_finalization + rescue => e + fail! + handle_error(e) + end + end + + # If the sync doesn't have any in-progress children, finalize it. + def attempt_finalization Sync.transaction do - # We need this to ensure 2 child syncs don't update the parent at the exact same time with different results - # and cause the sync to hang in "syncing" status indefinitely - self.lock! + lock! - unless has_pending_child_syncs? - if has_failed_child_syncs? - fail!(Error.new("One or more child syncs failed")) - else - complete! - end + return unless all_children_finalized? - # If this sync is both a child and a parent, we need to notify the parent of completion - notify_parent_of_completion! if has_parent? - - syncer.perform_post_sync + if has_failed_children? + fail! + else + complete! end end end private - def syncer - "#{syncable_type}::Syncer".constantize.new(syncable) + def has_failed_children? + children.failed.any? end - def has_pending_child_syncs? - children.where(status: [ :pending, :syncing ]).any? + def all_children_finalized? + children.incomplete.empty? end - def has_failed_child_syncs? - children.where(status: :failed).any? - end + # Once sync finalizes, notify its parent and run its post-sync logic. + def handle_finalization + syncable.perform_post_sync - def has_parent? - parent_id.present? - end - - def notify_parent_of_completion! - parent.handle_child_completion_event - end - - def start! - Rails.logger.info("Starting sync") - update! status: :syncing - end - - def complete! - Rails.logger.info("Sync completed") - update! status: :completed, last_ran_at: Time.current - end - - def fail!(error, report_error: false) - Rails.logger.error("Sync failed: #{error.message}") - - if report_error - Sentry.capture_exception(error) do |scope| - scope.set_context("sync", { id: id, syncable_type: syncable_type, syncable_id: syncable_id }) - scope.set_tags(sync_id: id) - end + if parent + parent.attempt_finalization end + end - update!( - status: :failed, - error: error.message, - last_ran_at: Time.current - ) + def handle_error(error) + update!(error: error.message) + Sentry.capture_exception(error) do |scope| + scope.set_tags(sync_id: id) + end + end + + def handle_start + update!(last_ran_at: Time.current) end end diff --git a/test/interfaces/syncable_interface_test.rb b/test/interfaces/syncable_interface_test.rb index 41c34b4b..bd741675 100644 --- a/test/interfaces/syncable_interface_test.rb +++ b/test/interfaces/syncable_interface_test.rb @@ -7,14 +7,14 @@ module SyncableInterfaceTest test "can sync later" do assert_difference "@syncable.syncs.count", 1 do assert_enqueued_with job: SyncJob do - @syncable.sync_later + @syncable.sync_later(start_date: 2.days.ago.to_date) end end end - test "can sync" do - assert_difference "@syncable.syncs.count", 1 do - @syncable.sync(start_date: 2.days.ago.to_date) - end + test "can perform sync" do + mock_sync = mock + @syncable.class.any_instance.expects(:perform_sync).with(sync: mock_sync, start_date: 2.days.ago.to_date).once + @syncable.perform_sync(sync: mock_sync, start_date: 2.days.ago.to_date) end end diff --git a/test/models/family/syncer_test.rb b/test/models/family/syncer_test.rb index bc35e678..4ea82b00 100644 --- a/test/models/family/syncer_test.rb +++ b/test/models/family/syncer_test.rb @@ -12,8 +12,17 @@ class Family::SyncerTest < ActiveSupport::TestCase items_count = @family.plaid_items.count syncer = Family::Syncer.new(@family) - syncer.perform_sync(start_date: family_sync.start_date) - assert_equal manual_accounts_count + items_count, syncer.child_syncables.count + Account.any_instance + .expects(:sync_later) + .with(start_date: family_sync.start_date, parent_sync: family_sync) + .times(manual_accounts_count) + + PlaidItem.any_instance + .expects(:sync_later) + .with(start_date: family_sync.start_date, parent_sync: family_sync) + .times(items_count) + + syncer.perform_sync(sync: family_sync, start_date: family_sync.start_date) end end diff --git a/test/models/sync_test.rb b/test/models/sync_test.rb index 2919c2f2..5c689684 100644 --- a/test/models/sync_test.rb +++ b/test/models/sync_test.rb @@ -4,9 +4,10 @@ class SyncTest < ActiveSupport::TestCase include ActiveJob::TestHelper test "runs successful sync" do - sync = Sync.create!(syncable: accounts(:depository), last_ran_at: 1.day.ago) + syncable = accounts(:depository) + sync = Sync.create!(syncable: syncable, last_ran_at: 1.day.ago) - Account::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once + syncable.expects(:perform_sync).with(sync: sync, start_date: sync.start_date).once assert_equal "pending", sync.status @@ -19,8 +20,10 @@ class SyncTest < ActiveSupport::TestCase end test "handles sync errors" do - sync = Sync.create!(syncable: accounts(:depository), last_ran_at: 1.day.ago) - Account::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).raises(StandardError.new("test sync error")) + syncable = accounts(:depository) + sync = Sync.create!(syncable: syncable, last_ran_at: 1.day.ago) + + syncable.expects(:perform_sync).with(sync: sync, start_date: sync.start_date).raises(StandardError.new("test sync error")) assert_equal "pending", sync.status previously_ran_at = sync.last_ran_at @@ -33,54 +36,43 @@ class SyncTest < ActiveSupport::TestCase end test "can run nested syncs that alert the parent when complete" do - # Clear out fixture syncs - Sync.destroy_all - - # These fixtures represent a Parent -> Child -> Grandchild sync hierarchy - # Family -> PlaidItem -> Account family = families(:dylan_family) plaid_item = plaid_items(:one) account = accounts(:connected) - sync = Sync.create!(syncable: family) + family_sync = Sync.create!(syncable: family) + plaid_item_sync = Sync.create!(syncable: plaid_item, parent: family_sync) + account_sync = Sync.create!(syncable: account, parent: plaid_item_sync) - Family::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once - Family::Syncer.any_instance.expects(:perform_post_sync).once - Family::Syncer.any_instance.expects(:child_syncables).returns([ plaid_item ]) + assert_equal "pending", family_sync.status + assert_equal "pending", plaid_item_sync.status + assert_equal "pending", account_sync.status - PlaidItem::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once - PlaidItem::Syncer.any_instance.expects(:perform_post_sync).once - PlaidItem::Syncer.any_instance.expects(:child_syncables).returns([ account ]) + family.expects(:perform_sync).with(sync: family_sync, start_date: family_sync.start_date).once - Account::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once - Account::Syncer.any_instance.expects(:perform_post_sync).once - Account::Syncer.any_instance.expects(:child_syncables).returns([]) + family_sync.perform - sync.perform + assert_equal "syncing", family_sync.reload.status - assert_equal 1, family.syncs.count - assert_equal "syncing", family.syncs.first.status - assert_equal 1, plaid_item.syncs.count - assert_equal "pending", plaid_item.syncs.first.status + plaid_item.expects(:perform_sync).with(sync: plaid_item_sync, start_date: plaid_item_sync.start_date).once - # We have to perform jobs 2x because the child sync will schedule the grandchild sync, - # which then needs to be run. - perform_enqueued_jobs + plaid_item_sync.perform - assert_equal 1, family.syncs.count - assert_equal "syncing", family.syncs.first.status - assert_equal 1, plaid_item.syncs.count - assert_equal "syncing", plaid_item.syncs.first.status - assert_equal 1, account.syncs.count - assert_equal "pending", account.syncs.first.status + assert_equal "syncing", family_sync.reload.status + assert_equal "syncing", plaid_item_sync.reload.status - perform_enqueued_jobs + account.expects(:perform_sync).with(sync: account_sync, start_date: account_sync.start_date).once - assert_equal 1, family.syncs.count - assert_equal "completed", family.syncs.first.status - assert_equal 1, plaid_item.syncs.count - assert_equal "completed", plaid_item.syncs.first.status - assert_equal 1, account.syncs.count - assert_equal "completed", account.syncs.first.status + # Since these are accessed through `parent`, they won't necessarily be the same + # instance we configured above + Account.any_instance.expects(:perform_post_sync).once + PlaidItem.any_instance.expects(:perform_post_sync).once + Family.any_instance.expects(:perform_post_sync).once + + account_sync.perform + + assert_equal "completed", family_sync.reload.status + assert_equal "completed", plaid_item_sync.reload.status + assert_equal "completed", account_sync.reload.status end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 23f98faa..7eac9dde 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -17,6 +17,7 @@ require "rails/test_help" require "minitest/mock" require "minitest/autorun" require "mocha/minitest" +require "aasm/minitest" VCR.configure do |config| config.cassette_library_dir = "test/vcr_cassettes"