1
0
Fork 0
mirror of https://github.com/maybe-finance/maybe.git synced 2025-08-09 07:25:19 +02:00

Clean up sync class, add state machine

This commit is contained in:
Zach Gollwitzer 2025-05-11 15:02:46 -04:00
parent 5432b3903d
commit 5c8bca31ec
13 changed files with 147 additions and 152 deletions

View file

@ -63,6 +63,10 @@ gem "rotp", "~> 6.3"
gem "rqrcode", "~> 3.0" gem "rqrcode", "~> 3.0"
gem "activerecord-import" gem "activerecord-import"
# State machines
gem "aasm"
gem "after_commit_everywhere", "~> 1.0"
# AI # AI
gem "ruby-openai" gem "ruby-openai"

View file

@ -8,6 +8,8 @@ GIT
GEM GEM
remote: https://rubygems.org/ remote: https://rubygems.org/
specs: specs:
aasm (5.5.0)
concurrent-ruby (~> 1.0)
actioncable (7.2.2.1) actioncable (7.2.2.1)
actionpack (= 7.2.2.1) actionpack (= 7.2.2.1)
activesupport (= 7.2.2.1) activesupport (= 7.2.2.1)
@ -83,6 +85,9 @@ GEM
tzinfo (~> 2.0, >= 2.0.5) tzinfo (~> 2.0, >= 2.0.5)
addressable (2.8.7) addressable (2.8.7)
public_suffix (>= 2.0.2, < 7.0) public_suffix (>= 2.0.2, < 7.0)
after_commit_everywhere (1.6.0)
activerecord (>= 4.2)
activesupport
ast (2.4.3) ast (2.4.3)
aws-eventstream (1.3.2) aws-eventstream (1.3.2)
aws-partitions (1.1093.0) aws-partitions (1.1093.0)
@ -561,7 +566,9 @@ PLATFORMS
x86_64-linux-musl x86_64-linux-musl
DEPENDENCIES DEPENDENCIES
aasm
activerecord-import activerecord-import
after_commit_everywhere (~> 1.0)
aws-sdk-s3 (~> 1.177.0) aws-sdk-s3 (~> 1.177.0)
bcrypt (~> 3.1) bcrypt (~> 3.1)
benchmark-ips benchmark-ips

View file

@ -5,23 +5,15 @@ class Account::Syncer
@account = account @account = account
end end
def child_syncables def perform_sync(sync:, start_date: nil)
[]
end
def perform_sync(start_date: nil)
Rails.logger.info("Processing balances (#{account.linked? ? 'reverse' : 'forward'})") Rails.logger.info("Processing balances (#{account.linked? ? 'reverse' : 'forward'})")
sync_balances sync_balances
end end
def perform_post_sync def perform_post_sync
account.family.remove_syncing_notice! account.family.remove_syncing_notice!
account.accountable.post_sync
# account.accountable.post_sync(sync)
# unless sync.child?
account.family.auto_match_transfers! account.family.auto_match_transfers!
# end
end end
private private

View file

@ -68,7 +68,7 @@ module Accountable
end end
end end
def post_sync(sync) def post_sync
broadcast_replace_to( broadcast_replace_to(
account, account,
target: "chart_account_#{account.id}", target: "chart_account_#{account.id}",

View file

@ -6,7 +6,7 @@ module Syncable
end end
def syncing? def syncing?
syncs.where(status: [ :syncing, :pending ]).any? syncs.incomplete.any?
end end
def sync_later(start_date: nil, parent_sync: nil) def sync_later(start_date: nil, parent_sync: nil)
@ -14,8 +14,12 @@ module Syncable
SyncJob.perform_later(new_sync) SyncJob.perform_later(new_sync)
end end
def sync(start_date: nil) def perform_sync(sync:, start_date: nil)
syncs.create!(start_date: start_date).perform syncer.perform_sync(sync: sync, start_date: start_date)
end
def perform_post_sync
syncer.perform_post_sync
end end
def sync_error def sync_error
@ -28,6 +32,10 @@ module Syncable
private private
def latest_sync def latest_sync
syncs.order(created_at: :desc).first syncs.ordered.first
end
def syncer
self.class::Syncer.new(self)
end end
end end

View file

@ -5,11 +5,7 @@ class Family::Syncer
@family = family @family = family
end end
def child_syncables def perform_sync(sync:, start_date: nil)
family.plaid_items + family.accounts.manual
end
def perform_sync(start_date: nil)
# We don't rely on this value to guard the app, but keep it eventually consistent # We don't rely on this value to guard the app, but keep it eventually consistent
family.sync_trial_status! family.sync_trial_status!
@ -17,10 +13,20 @@ class Family::Syncer
family.rules.each do |rule| family.rules.each do |rule|
rule.apply_later rule.apply_later
end end
# Schedule child syncs
child_syncables.each do |syncable|
syncable.sync_later(start_date: start_date, parent_sync: sync)
end
end end
def perform_post_sync def perform_post_sync
family.auto_match_transfers! family.auto_match_transfers!
family.broadcast_refresh family.broadcast_refresh
end end
private
def child_syncables
family.plaid_items + family.accounts.manual
end
end end

View file

@ -62,7 +62,7 @@ class Import < ApplicationRecord
def publish def publish
import! import!
family.sync family.sync_later
update! status: :complete update! status: :complete
rescue => error rescue => error

View file

@ -5,16 +5,16 @@ class PlaidItem::Syncer
@plaid_item = plaid_item @plaid_item = plaid_item
end end
def child_syncables def perform_sync(sync:, start_date: nil)
plaid_item.accounts
end
def perform_sync(start_date: nil)
begin begin
Rails.logger.info("Fetching and loading Plaid data") Rails.logger.info("Fetching and loading Plaid data")
fetch_and_load_plaid_data fetch_and_load_plaid_data
plaid_item.update!(status: :good) if plaid_item.requires_update? plaid_item.update!(status: :good) if plaid_item.requires_update?
plaid_item.accounts.each do |account|
account.sync_later(start_date: start_date, parent_sync: sync)
end
Rails.logger.info("Plaid data fetched and loaded") Rails.logger.info("Plaid data fetched and loaded")
rescue Plaid::ApiError => e rescue Plaid::ApiError => e
handle_plaid_error(e) handle_plaid_error(e)

View file

@ -1,111 +1,87 @@
class Sync < ApplicationRecord class Sync < ApplicationRecord
Error = Class.new(StandardError) include AASM
belongs_to :syncable, polymorphic: true belongs_to :syncable, polymorphic: true
belongs_to :parent, class_name: "Sync", optional: true belongs_to :parent, class_name: "Sync", optional: true
has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy
enum :status, { pending: "pending", syncing: "syncing", completed: "completed", failed: "failed" }
scope :ordered, -> { order(created_at: :desc) } scope :ordered, -> { order(created_at: :desc) }
scope :incomplete, -> { where(status: [ :pending, :syncing ]) }
def child? # Sync state machine
parent_id.present? aasm column: :status do
end state :pending, initial: true
state :syncing
state :completed
state :failed
def perform event :start, after_commit: :handle_start do
Rails.logger.tagged("Sync", id, syncable_type, syncable_id) do transitions from: :pending, to: :syncing
start! end
begin event :complete, after_commit: :handle_finalization do
syncer.perform_sync(start_date: start_date) transitions from: :syncing, to: :completed
end
# Schedule child syncables to sync later event :fail, after_commit: :handle_finalization do
syncer.child_syncables.each do |child_syncable| transitions from: :syncing, to: :failed
child_syncable.sync_later(start_date: start_date, parent_sync: self)
end
unless has_pending_child_syncs?
complete!
Rails.logger.info("Sync completed, starting post-sync")
syncer.perform_post_sync
Rails.logger.info("Post-sync completed")
end
rescue StandardError => error
fail! error, report_error: true
ensure
notify_parent_of_completion! if has_parent?
end
end end
end end
def handle_child_completion_event def perform(start_date: nil)
start!
begin
syncable.perform_sync(sync: self, start_date: start_date)
attempt_finalization
rescue => e
fail!
handle_error(e)
end
end
# If the sync doesn't have any in-progress children, finalize it.
def attempt_finalization
Sync.transaction do Sync.transaction do
# We need this to ensure 2 child syncs don't update the parent at the exact same time with different results lock!
# and cause the sync to hang in "syncing" status indefinitely
self.lock!
unless has_pending_child_syncs? return unless all_children_finalized?
if has_failed_child_syncs?
fail!(Error.new("One or more child syncs failed"))
else
complete!
end
# If this sync is both a child and a parent, we need to notify the parent of completion if has_failed_children?
notify_parent_of_completion! if has_parent? fail!
else
syncer.perform_post_sync complete!
end end
end end
end end
private private
def syncer def has_failed_children?
"#{syncable_type}::Syncer".constantize.new(syncable) children.failed.any?
end end
def has_pending_child_syncs? def all_children_finalized?
children.where(status: [ :pending, :syncing ]).any? children.incomplete.empty?
end end
def has_failed_child_syncs? # Once sync finalizes, notify its parent and run its post-sync logic.
children.where(status: :failed).any? def handle_finalization
end syncable.perform_post_sync
def has_parent? if parent
parent_id.present? parent.attempt_finalization
end
def notify_parent_of_completion!
parent.handle_child_completion_event
end
def start!
Rails.logger.info("Starting sync")
update! status: :syncing
end
def complete!
Rails.logger.info("Sync completed")
update! status: :completed, last_ran_at: Time.current
end
def fail!(error, report_error: false)
Rails.logger.error("Sync failed: #{error.message}")
if report_error
Sentry.capture_exception(error) do |scope|
scope.set_context("sync", { id: id, syncable_type: syncable_type, syncable_id: syncable_id })
scope.set_tags(sync_id: id)
end
end end
end
update!( def handle_error(error)
status: :failed, update!(error: error.message)
error: error.message, Sentry.capture_exception(error) do |scope|
last_ran_at: Time.current scope.set_tags(sync_id: id)
) end
end
def handle_start
update!(last_ran_at: Time.current)
end end
end end

View file

@ -7,14 +7,14 @@ module SyncableInterfaceTest
test "can sync later" do test "can sync later" do
assert_difference "@syncable.syncs.count", 1 do assert_difference "@syncable.syncs.count", 1 do
assert_enqueued_with job: SyncJob do assert_enqueued_with job: SyncJob do
@syncable.sync_later @syncable.sync_later(start_date: 2.days.ago.to_date)
end end
end end
end end
test "can sync" do test "can perform sync" do
assert_difference "@syncable.syncs.count", 1 do mock_sync = mock
@syncable.sync(start_date: 2.days.ago.to_date) @syncable.class.any_instance.expects(:perform_sync).with(sync: mock_sync, start_date: 2.days.ago.to_date).once
end @syncable.perform_sync(sync: mock_sync, start_date: 2.days.ago.to_date)
end end
end end

View file

@ -12,8 +12,17 @@ class Family::SyncerTest < ActiveSupport::TestCase
items_count = @family.plaid_items.count items_count = @family.plaid_items.count
syncer = Family::Syncer.new(@family) syncer = Family::Syncer.new(@family)
syncer.perform_sync(start_date: family_sync.start_date)
assert_equal manual_accounts_count + items_count, syncer.child_syncables.count Account.any_instance
.expects(:sync_later)
.with(start_date: family_sync.start_date, parent_sync: family_sync)
.times(manual_accounts_count)
PlaidItem.any_instance
.expects(:sync_later)
.with(start_date: family_sync.start_date, parent_sync: family_sync)
.times(items_count)
syncer.perform_sync(sync: family_sync, start_date: family_sync.start_date)
end end
end end

View file

@ -4,9 +4,10 @@ class SyncTest < ActiveSupport::TestCase
include ActiveJob::TestHelper include ActiveJob::TestHelper
test "runs successful sync" do test "runs successful sync" do
sync = Sync.create!(syncable: accounts(:depository), last_ran_at: 1.day.ago) syncable = accounts(:depository)
sync = Sync.create!(syncable: syncable, last_ran_at: 1.day.ago)
Account::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once syncable.expects(:perform_sync).with(sync: sync, start_date: sync.start_date).once
assert_equal "pending", sync.status assert_equal "pending", sync.status
@ -19,8 +20,10 @@ class SyncTest < ActiveSupport::TestCase
end end
test "handles sync errors" do test "handles sync errors" do
sync = Sync.create!(syncable: accounts(:depository), last_ran_at: 1.day.ago) syncable = accounts(:depository)
Account::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).raises(StandardError.new("test sync error")) sync = Sync.create!(syncable: syncable, last_ran_at: 1.day.ago)
syncable.expects(:perform_sync).with(sync: sync, start_date: sync.start_date).raises(StandardError.new("test sync error"))
assert_equal "pending", sync.status assert_equal "pending", sync.status
previously_ran_at = sync.last_ran_at previously_ran_at = sync.last_ran_at
@ -33,54 +36,43 @@ class SyncTest < ActiveSupport::TestCase
end end
test "can run nested syncs that alert the parent when complete" do test "can run nested syncs that alert the parent when complete" do
# Clear out fixture syncs
Sync.destroy_all
# These fixtures represent a Parent -> Child -> Grandchild sync hierarchy
# Family -> PlaidItem -> Account
family = families(:dylan_family) family = families(:dylan_family)
plaid_item = plaid_items(:one) plaid_item = plaid_items(:one)
account = accounts(:connected) account = accounts(:connected)
sync = Sync.create!(syncable: family) family_sync = Sync.create!(syncable: family)
plaid_item_sync = Sync.create!(syncable: plaid_item, parent: family_sync)
account_sync = Sync.create!(syncable: account, parent: plaid_item_sync)
Family::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once assert_equal "pending", family_sync.status
Family::Syncer.any_instance.expects(:perform_post_sync).once assert_equal "pending", plaid_item_sync.status
Family::Syncer.any_instance.expects(:child_syncables).returns([ plaid_item ]) assert_equal "pending", account_sync.status
PlaidItem::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once family.expects(:perform_sync).with(sync: family_sync, start_date: family_sync.start_date).once
PlaidItem::Syncer.any_instance.expects(:perform_post_sync).once
PlaidItem::Syncer.any_instance.expects(:child_syncables).returns([ account ])
Account::Syncer.any_instance.expects(:perform_sync).with(start_date: sync.start_date).once family_sync.perform
Account::Syncer.any_instance.expects(:perform_post_sync).once
Account::Syncer.any_instance.expects(:child_syncables).returns([])
sync.perform assert_equal "syncing", family_sync.reload.status
assert_equal 1, family.syncs.count plaid_item.expects(:perform_sync).with(sync: plaid_item_sync, start_date: plaid_item_sync.start_date).once
assert_equal "syncing", family.syncs.first.status
assert_equal 1, plaid_item.syncs.count
assert_equal "pending", plaid_item.syncs.first.status
# We have to perform jobs 2x because the child sync will schedule the grandchild sync, plaid_item_sync.perform
# which then needs to be run.
perform_enqueued_jobs
assert_equal 1, family.syncs.count assert_equal "syncing", family_sync.reload.status
assert_equal "syncing", family.syncs.first.status assert_equal "syncing", plaid_item_sync.reload.status
assert_equal 1, plaid_item.syncs.count
assert_equal "syncing", plaid_item.syncs.first.status
assert_equal 1, account.syncs.count
assert_equal "pending", account.syncs.first.status
perform_enqueued_jobs account.expects(:perform_sync).with(sync: account_sync, start_date: account_sync.start_date).once
assert_equal 1, family.syncs.count # Since these are accessed through `parent`, they won't necessarily be the same
assert_equal "completed", family.syncs.first.status # instance we configured above
assert_equal 1, plaid_item.syncs.count Account.any_instance.expects(:perform_post_sync).once
assert_equal "completed", plaid_item.syncs.first.status PlaidItem.any_instance.expects(:perform_post_sync).once
assert_equal 1, account.syncs.count Family.any_instance.expects(:perform_post_sync).once
assert_equal "completed", account.syncs.first.status
account_sync.perform
assert_equal "completed", family_sync.reload.status
assert_equal "completed", plaid_item_sync.reload.status
assert_equal "completed", account_sync.reload.status
end end
end end

View file

@ -17,6 +17,7 @@ require "rails/test_help"
require "minitest/mock" require "minitest/mock"
require "minitest/autorun" require "minitest/autorun"
require "mocha/minitest" require "mocha/minitest"
require "aasm/minitest"
VCR.configure do |config| VCR.configure do |config|
config.cassette_library_dir = "test/vcr_cassettes" config.cassette_library_dir = "test/vcr_cassettes"