mirror of
https://github.com/maybe-finance/maybe.git
synced 2025-07-22 06:39:39 +02:00
Sync hierarchy updates (#2087)
* Add parent sync orchestration * Pass sync object to dependents
This commit is contained in:
parent
9fa3698823
commit
8648f11413
11 changed files with 111 additions and 30 deletions
|
@ -2,7 +2,6 @@ class SyncJob < ApplicationJob
|
||||||
queue_as :high_priority
|
queue_as :high_priority
|
||||||
|
|
||||||
def perform(sync)
|
def perform(sync)
|
||||||
sleep 1 # simulate work for faster jobs
|
|
||||||
sync.perform
|
sync.perform
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -70,24 +70,26 @@ class Account < ApplicationRecord
|
||||||
DestroyJob.perform_later(self)
|
DestroyJob.perform_later(self)
|
||||||
end
|
end
|
||||||
|
|
||||||
def sync_data(start_date: nil)
|
def sync_data(sync, start_date: nil)
|
||||||
update!(last_synced_at: Time.current)
|
update!(last_synced_at: Time.current)
|
||||||
|
|
||||||
Rails.logger.info("Auto-matching transfers")
|
|
||||||
family.auto_match_transfers!
|
|
||||||
|
|
||||||
Rails.logger.info("Processing balances (#{linked? ? 'reverse' : 'forward'})")
|
Rails.logger.info("Processing balances (#{linked? ? 'reverse' : 'forward'})")
|
||||||
sync_balances
|
sync_balances
|
||||||
|
end
|
||||||
|
|
||||||
|
def post_sync(sync)
|
||||||
|
family.remove_syncing_notice!
|
||||||
|
|
||||||
|
accountable.post_sync(sync)
|
||||||
|
|
||||||
if enrichable?
|
if enrichable?
|
||||||
Rails.logger.info("Enriching transaction data")
|
Rails.logger.info("Enriching transaction data")
|
||||||
enrich_data
|
enrich_data
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
def post_sync
|
unless sync.child?
|
||||||
broadcast_remove_to(family, target: "syncing-notice")
|
family.auto_match_transfers!
|
||||||
accountable.post_sync
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def original_balance
|
def original_balance
|
||||||
|
|
|
@ -45,7 +45,7 @@ module Accountable
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def post_sync
|
def post_sync(sync)
|
||||||
broadcast_replace_to(
|
broadcast_replace_to(
|
||||||
account,
|
account,
|
||||||
target: "chart_account_#{account.id}",
|
target: "chart_account_#{account.id}",
|
||||||
|
|
|
@ -9,8 +9,8 @@ module Syncable
|
||||||
syncs.where(status: [ :syncing, :pending ]).any?
|
syncs.where(status: [ :syncing, :pending ]).any?
|
||||||
end
|
end
|
||||||
|
|
||||||
def sync_later(start_date: nil)
|
def sync_later(start_date: nil, parent_sync: nil)
|
||||||
new_sync = syncs.create!(start_date: start_date)
|
new_sync = syncs.create!(start_date: start_date, parent: parent_sync)
|
||||||
SyncJob.perform_later(new_sync)
|
SyncJob.perform_later(new_sync)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -18,11 +18,11 @@ module Syncable
|
||||||
syncs.create!(start_date: start_date).perform
|
syncs.create!(start_date: start_date).perform
|
||||||
end
|
end
|
||||||
|
|
||||||
def sync_data(start_date: nil)
|
def sync_data(sync, start_date: nil)
|
||||||
raise NotImplementedError, "Subclasses must implement the `sync_data` method"
|
raise NotImplementedError, "Subclasses must implement the `sync_data` method"
|
||||||
end
|
end
|
||||||
|
|
||||||
def post_sync
|
def post_sync(sync)
|
||||||
# no-op, syncable can optionally provide implementation
|
# no-op, syncable can optionally provide implementation
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -43,29 +43,36 @@ class Family < ApplicationRecord
|
||||||
@income_statement ||= IncomeStatement.new(self)
|
@income_statement ||= IncomeStatement.new(self)
|
||||||
end
|
end
|
||||||
|
|
||||||
def sync_data(start_date: nil)
|
def sync_data(sync, start_date: nil)
|
||||||
update!(last_synced_at: Time.current)
|
update!(last_synced_at: Time.current)
|
||||||
|
|
||||||
accounts.manual.each do |account|
|
accounts.manual.each do |account|
|
||||||
account.sync_later(start_date: start_date)
|
account.sync_later(start_date: start_date, parent_sync: sync)
|
||||||
end
|
end
|
||||||
|
|
||||||
plaid_items.each do |plaid_item|
|
plaid_items.each do |plaid_item|
|
||||||
plaid_item.sync_later(start_date: start_date)
|
plaid_item.sync_later(start_date: start_date, parent_sync: sync)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def post_sync
|
def remove_syncing_notice!
|
||||||
|
broadcast_remove target: "syncing-notice"
|
||||||
|
end
|
||||||
|
|
||||||
|
def post_sync(sync)
|
||||||
|
auto_match_transfers!
|
||||||
broadcast_refresh
|
broadcast_refresh
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# If family has any syncs pending/syncing within the last hour, we show a persistent "syncing" notice.
|
||||||
|
# Ignore syncs older than 1 hour as they are considered "stale"
|
||||||
def syncing?
|
def syncing?
|
||||||
Sync.where(
|
Sync.where(
|
||||||
"(syncable_type = 'Family' AND syncable_id = ?) OR
|
"(syncable_type = 'Family' AND syncable_id = ?) OR
|
||||||
(syncable_type = 'Account' AND syncable_id IN (SELECT id FROM accounts WHERE family_id = ? AND plaid_account_id IS NULL)) OR
|
(syncable_type = 'Account' AND syncable_id IN (SELECT id FROM accounts WHERE family_id = ? AND plaid_account_id IS NULL)) OR
|
||||||
(syncable_type = 'PlaidItem' AND syncable_id IN (SELECT id FROM plaid_items WHERE family_id = ?))",
|
(syncable_type = 'PlaidItem' AND syncable_id IN (SELECT id FROM plaid_items WHERE family_id = ?))",
|
||||||
id, id, id
|
id, id, id
|
||||||
).where(status: [ "pending", "syncing" ]).exists?
|
).where(status: [ "pending", "syncing" ], created_at: 1.hour.ago..).exists?
|
||||||
end
|
end
|
||||||
|
|
||||||
def eu?
|
def eu?
|
||||||
|
|
|
@ -37,7 +37,7 @@ class PlaidItem < ApplicationRecord
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def sync_data(start_date: nil)
|
def sync_data(sync, start_date: nil)
|
||||||
update!(last_synced_at: Time.current)
|
update!(last_synced_at: Time.current)
|
||||||
|
|
||||||
begin
|
begin
|
||||||
|
@ -79,7 +79,7 @@ class PlaidItem < ApplicationRecord
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def post_sync
|
def post_sync(sync)
|
||||||
family.broadcast_refresh
|
family.broadcast_refresh
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1,32 +1,71 @@
|
||||||
class Sync < ApplicationRecord
|
class Sync < ApplicationRecord
|
||||||
belongs_to :syncable, polymorphic: true
|
belongs_to :syncable, polymorphic: true
|
||||||
|
|
||||||
|
belongs_to :parent, class_name: "Sync", optional: true
|
||||||
|
has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy
|
||||||
|
|
||||||
enum :status, { pending: "pending", syncing: "syncing", completed: "completed", failed: "failed" }
|
enum :status, { pending: "pending", syncing: "syncing", completed: "completed", failed: "failed" }
|
||||||
|
|
||||||
scope :ordered, -> { order(created_at: :desc) }
|
scope :ordered, -> { order(created_at: :desc) }
|
||||||
|
|
||||||
|
def child?
|
||||||
|
parent_id.present?
|
||||||
|
end
|
||||||
|
|
||||||
def perform
|
def perform
|
||||||
Rails.logger.tagged("Sync", id, syncable_type, syncable_id) do
|
Rails.logger.tagged("Sync", id, syncable_type, syncable_id) do
|
||||||
start!
|
start!
|
||||||
|
|
||||||
begin
|
begin
|
||||||
data = syncable.sync_data(start_date: start_date)
|
data = syncable.sync_data(self, start_date: start_date)
|
||||||
update!(data: data) if data
|
update!(data: data) if data
|
||||||
complete!
|
|
||||||
|
complete! unless has_pending_child_syncs?
|
||||||
rescue StandardError => error
|
rescue StandardError => error
|
||||||
fail! error
|
fail! error
|
||||||
raise error if Rails.env.development?
|
raise error if Rails.env.development?
|
||||||
ensure
|
ensure
|
||||||
Rails.logger.info("Sync completed, starting post-sync")
|
Rails.logger.info("Sync completed, starting post-sync")
|
||||||
|
|
||||||
syncable.post_sync
|
if has_parent?
|
||||||
|
notify_parent_of_completion!
|
||||||
|
else
|
||||||
|
syncable.post_sync(self)
|
||||||
|
end
|
||||||
|
|
||||||
Rails.logger.info("Post-sync completed")
|
Rails.logger.info("Post-sync completed")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def handle_child_completion_event
|
||||||
|
unless has_pending_child_syncs?
|
||||||
|
if has_failed_child_syncs?
|
||||||
|
fail!("One or more child syncs failed")
|
||||||
|
else
|
||||||
|
complete!
|
||||||
|
syncable.post_sync(self)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
def has_pending_child_syncs?
|
||||||
|
children.where(status: [ :pending, :syncing ]).any?
|
||||||
|
end
|
||||||
|
|
||||||
|
def has_failed_child_syncs?
|
||||||
|
children.where(status: :failed).any?
|
||||||
|
end
|
||||||
|
|
||||||
|
def has_parent?
|
||||||
|
parent_id.present?
|
||||||
|
end
|
||||||
|
|
||||||
|
def notify_parent_of_completion!
|
||||||
|
parent.handle_child_completion_event
|
||||||
|
end
|
||||||
|
|
||||||
def start!
|
def start!
|
||||||
Rails.logger.info("Starting sync")
|
Rails.logger.info("Starting sync")
|
||||||
update! status: :syncing
|
update! status: :syncing
|
||||||
|
|
5
db/migrate/20250411140604_add_parent_syncs.rb
Normal file
5
db/migrate/20250411140604_add_parent_syncs.rb
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
class AddParentSyncs < ActiveRecord::Migration[7.2]
|
||||||
|
def change
|
||||||
|
add_reference :syncs, :parent, foreign_key: { to_table: :syncs }, type: :uuid
|
||||||
|
end
|
||||||
|
end
|
5
db/schema.rb
generated
5
db/schema.rb
generated
|
@ -10,7 +10,7 @@
|
||||||
#
|
#
|
||||||
# It's strongly recommended that you check this file into your version control system.
|
# It's strongly recommended that you check this file into your version control system.
|
||||||
|
|
||||||
ActiveRecord::Schema[7.2].define(version: 2025_04_10_144939) do
|
ActiveRecord::Schema[7.2].define(version: 2025_04_11_140604) do
|
||||||
# These are extensions that must be enabled in order to support this database
|
# These are extensions that must be enabled in order to support this database
|
||||||
enable_extension "pgcrypto"
|
enable_extension "pgcrypto"
|
||||||
enable_extension "plpgsql"
|
enable_extension "plpgsql"
|
||||||
|
@ -547,6 +547,8 @@ ActiveRecord::Schema[7.2].define(version: 2025_04_10_144939) do
|
||||||
t.datetime "created_at", null: false
|
t.datetime "created_at", null: false
|
||||||
t.datetime "updated_at", null: false
|
t.datetime "updated_at", null: false
|
||||||
t.text "error_backtrace", array: true
|
t.text "error_backtrace", array: true
|
||||||
|
t.uuid "parent_id"
|
||||||
|
t.index ["parent_id"], name: "index_syncs_on_parent_id"
|
||||||
t.index ["syncable_type", "syncable_id"], name: "index_syncs_on_syncable"
|
t.index ["syncable_type", "syncable_id"], name: "index_syncs_on_syncable"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -665,6 +667,7 @@ ActiveRecord::Schema[7.2].define(version: 2025_04_10_144939) do
|
||||||
add_foreign_key "security_prices", "securities"
|
add_foreign_key "security_prices", "securities"
|
||||||
add_foreign_key "sessions", "impersonation_sessions", column: "active_impersonator_session_id"
|
add_foreign_key "sessions", "impersonation_sessions", column: "active_impersonator_session_id"
|
||||||
add_foreign_key "sessions", "users"
|
add_foreign_key "sessions", "users"
|
||||||
|
add_foreign_key "syncs", "syncs", column: "parent_id"
|
||||||
add_foreign_key "taggings", "tags"
|
add_foreign_key "taggings", "tags"
|
||||||
add_foreign_key "tags", "families"
|
add_foreign_key "tags", "families"
|
||||||
add_foreign_key "tool_calls", "messages"
|
add_foreign_key "tool_calls", "messages"
|
||||||
|
|
|
@ -17,13 +17,13 @@ class FamilyTest < ActiveSupport::TestCase
|
||||||
items_count = @syncable.plaid_items.count
|
items_count = @syncable.plaid_items.count
|
||||||
|
|
||||||
Account.any_instance.expects(:sync_later)
|
Account.any_instance.expects(:sync_later)
|
||||||
.with(start_date: nil)
|
.with(start_date: nil, parent_sync: family_sync)
|
||||||
.times(manual_accounts_count)
|
.times(manual_accounts_count)
|
||||||
|
|
||||||
PlaidItem.any_instance.expects(:sync_later)
|
PlaidItem.any_instance.expects(:sync_later)
|
||||||
.with(start_date: nil)
|
.with(start_date: nil, parent_sync: family_sync)
|
||||||
.times(items_count)
|
.times(items_count)
|
||||||
|
|
||||||
@syncable.sync_data(start_date: family_sync.start_date)
|
@syncable.sync_data(family_sync, start_date: family_sync.start_date)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -7,7 +7,7 @@ class SyncTest < ActiveSupport::TestCase
|
||||||
end
|
end
|
||||||
|
|
||||||
test "runs successful sync" do
|
test "runs successful sync" do
|
||||||
@sync.syncable.expects(:sync_data).with(start_date: @sync.start_date).once
|
@sync.syncable.expects(:sync_data).with(@sync, start_date: @sync.start_date).once
|
||||||
|
|
||||||
assert_equal "pending", @sync.status
|
assert_equal "pending", @sync.status
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ class SyncTest < ActiveSupport::TestCase
|
||||||
end
|
end
|
||||||
|
|
||||||
test "handles sync errors" do
|
test "handles sync errors" do
|
||||||
@sync.syncable.expects(:sync_data).with(start_date: @sync.start_date).raises(StandardError.new("test sync error"))
|
@sync.syncable.expects(:sync_data).with(@sync, start_date: @sync.start_date).raises(StandardError.new("test sync error"))
|
||||||
|
|
||||||
assert_equal "pending", @sync.status
|
assert_equal "pending", @sync.status
|
||||||
previously_ran_at = @sync.last_ran_at
|
previously_ran_at = @sync.last_ran_at
|
||||||
|
@ -31,4 +31,30 @@ class SyncTest < ActiveSupport::TestCase
|
||||||
assert_equal "failed", @sync.status
|
assert_equal "failed", @sync.status
|
||||||
assert_equal "test sync error", @sync.error
|
assert_equal "test sync error", @sync.error
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "runs sync with child syncs" do
|
||||||
|
family = families(:dylan_family)
|
||||||
|
|
||||||
|
parent = Sync.create!(syncable: family)
|
||||||
|
child1 = Sync.create!(syncable: family.accounts.first, parent: parent)
|
||||||
|
child2 = Sync.create!(syncable: family.accounts.last, parent: parent)
|
||||||
|
|
||||||
|
parent.syncable.expects(:sync_data).returns([]).once
|
||||||
|
child1.syncable.expects(:sync_data).returns([]).once
|
||||||
|
child2.syncable.expects(:sync_data).returns([]).once
|
||||||
|
|
||||||
|
parent.perform # no-op
|
||||||
|
|
||||||
|
assert_equal "syncing", parent.status
|
||||||
|
assert_equal "pending", child1.status
|
||||||
|
assert_equal "pending", child2.status
|
||||||
|
|
||||||
|
child1.perform
|
||||||
|
assert_equal "completed", child1.status
|
||||||
|
assert_equal "syncing", parent.status
|
||||||
|
|
||||||
|
child2.perform
|
||||||
|
assert_equal "completed", child2.status
|
||||||
|
assert_equal "completed", parent.status
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue