Skip to content

Commit

Permalink
Refactor transaction enrichment to support batch processing (#1803)
Browse files Browse the repository at this point in the history
* Refactor transaction enrichment to support batch processing

- Add method to enrich transactions in batches
- Implement job scheduling for unenriched transactions
- Improve logging and error handling for transaction enrichment

* Re-enable enrichment

* Fix transaction enrichment query to use correct table references

- Update queries to explicitly join and reference account_entries and account_transactions tables
- Remove unnecessary name presence check before enrichment
- Improve query precision for unenriched transaction selection

* Optimize transaction enrichment query joins

- Refactor database joins to use explicit table references
- Improve query performance for unenriched transaction selection
- Ensure correct table aliasing in enrichment methods

* Remove deprecated data enrichment job and method

- Delete EnrichDataJob as it's no longer used
- Remove `enrich_data_later` method from Account model
- Update Account::Syncer to directly call `enrich_data` instead of scheduling a job
  • Loading branch information
Shpigford authored Feb 5, 2025
1 parent abd932c commit b84a33c
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 45 deletions.
7 changes: 0 additions & 7 deletions app/jobs/enrich_data_job.rb

This file was deleted.

8 changes: 8 additions & 0 deletions app/jobs/enrich_transaction_batch_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class EnrichTransactionBatchJob < ApplicationJob
queue_as :latency_high

def perform(account, batch_size = 100, offset = 0)
enricher = Account::DataEnricher.new(account)
enricher.enrich_transaction_batch(batch_size, offset)
end
end
4 changes: 0 additions & 4 deletions app/models/account.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ def enrich_data
DataEnricher.new(self).run
end

def enrich_data_later
EnrichDataJob.perform_later(self)
end

def update_with_sync!(attributes)
should_update_balance = attributes[:balance] && attributes[:balance].to_d != balance

Expand Down
76 changes: 44 additions & 32 deletions app/models/account/data_enricher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,61 @@ def initialize(account)
end

def run
enrich_transactions
end

private
def enrich_transactions
candidates = account.entries.account_transactions.includes(entryable: [ :merchant, :category ])
total_unenriched = account.entries.account_transactions
.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'")
.where("account_entries.enriched_at IS NULL OR at.merchant_id IS NULL OR at.category_id IS NULL")
.count

Rails.logger.info("Enriching #{candidates.count} transactions for account #{account.id}")
if total_unenriched > 0
batch_size = 50
batches = (total_unenriched.to_f / batch_size).ceil

merchants = {}
batches.times do |batch|
EnrichTransactionBatchJob.perform_later(account, batch_size, batch * batch_size)
end
end
end

candidates.each do |entry|
if entry.enriched_at.nil? || entry.entryable.merchant_id.nil? || entry.entryable.category_id.nil?
begin
next unless entry.name.present?
def enrich_transaction_batch(batch_size = 50, offset = 0)
candidates = account.entries.account_transactions
.includes(entryable: [ :merchant, :category ])
.joins("JOIN account_transactions at ON at.id = account_entries.entryable_id AND account_entries.entryable_type = 'Account::Transaction'")
.where("account_entries.enriched_at IS NULL OR at.merchant_id IS NULL OR at.category_id IS NULL")
.offset(offset)
.limit(batch_size)

info = self.class.synth_provider.enrich_transaction(entry.name).info
Rails.logger.info("Enriching batch of #{candidates.count} transactions for account #{account.id} (offset: #{offset})")

next unless info.present?
merchants = {}

if info.name.present?
merchant = merchants[info.name] ||= account.family.merchants.find_or_create_by(name: info.name)
candidates.each do |entry|
begin
info = self.class.synth_provider.enrich_transaction(entry.name).info

if info.icon_url.present?
merchant.icon_url = info.icon_url
end
end
next unless info.present?

entryable_attributes = { id: entry.entryable_id }
entryable_attributes[:merchant_id] = merchant.id if merchant.present? && entry.entryable.merchant_id.nil?
if info.name.present?
merchant = merchants[info.name] ||= account.family.merchants.find_or_create_by(name: info.name)

Account.transaction do
merchant.save! if merchant.present?
entry.update!(
enriched_at: Time.current,
enriched_name: info.name,
entryable_attributes: entryable_attributes
)
end
rescue => e
Rails.logger.warn("Error enriching transaction #{entry.id}: #{e.message}")
if info.icon_url.present?
merchant.icon_url = info.icon_url
end
end

entryable_attributes = { id: entry.entryable_id }
entryable_attributes[:merchant_id] = merchant.id if merchant.present? && entry.entryable.merchant_id.nil?

Account.transaction do
merchant.save! if merchant.present?
entry.update!(
enriched_at: Time.current,
enriched_name: info.name,
entryable_attributes: entryable_attributes
)
end
rescue => e
Rails.logger.warn("Error enriching transaction #{entry.id}: #{e.message}")
end
end
end
end
3 changes: 1 addition & 2 deletions app/models/account/syncer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ def run

# Enrich if user opted in or if we're syncing transactions from a Plaid account on the hosted app
if account.family.data_enrichment_enabled? || (account.plaid_account_id.present? && Rails.application.config.app_mode.hosted?)
# Temporarily disable until optimizations complete
# account.enrich_data_later
account.enrich_data
else
Rails.logger.info("Data enrichment is disabled, skipping enrichment for account #{account.id}")
end
Expand Down

0 comments on commit b84a33c

Please sign in to comment.