From c122200d52e36898d346a20be022a113047eeb8c Mon Sep 17 00:00:00 2001 From: Crimson Thompson Date: Fri, 27 Sep 2024 10:55:10 +0200 Subject: [PATCH] feat(payments): import stripe data in asc order --- .../plugins/public/stripe/accounts.go | 25 +++--- .../plugins/public/stripe/accounts_test.go | 3 +- .../plugins/public/stripe/balances.go | 2 +- .../plugins/public/stripe/balances_test.go | 2 +- .../plugins/public/stripe/client/accounts.go | 39 +++++++--- .../plugins/public/stripe/client/balances.go | 7 +- .../plugins/public/stripe/client/client.go | 13 +++- .../public/stripe/client/client_generated.go | 47 +++++------ .../public/stripe/client/external_accounts.go | 50 ++++++++---- .../plugins/public/stripe/client/payments.go | 75 +++++++++++++----- .../plugins/public/stripe/client/timeline.go | 77 +++++++++++++++++++ .../public/stripe/external_accounts.go | 20 +++-- .../public/stripe/external_accounts_test.go | 5 +- .../plugins/public/stripe/payments.go | 25 ++++-- .../plugins/public/stripe/payments_test.go | 31 ++------ 15 files changed, 292 insertions(+), 129 deletions(-) create mode 100644 components/payments/internal/connectors/plugins/public/stripe/client/timeline.go diff --git a/components/payments/internal/connectors/plugins/public/stripe/accounts.go b/components/payments/internal/connectors/plugins/public/stripe/accounts.go index adf7acfe7f..ca7af593be 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/accounts.go +++ b/components/payments/internal/connectors/plugins/public/stripe/accounts.go @@ -6,6 +6,7 @@ import ( "time" "github.com/formancehq/payments/internal/connectors/plugins/currency" + "github.com/formancehq/payments/internal/connectors/plugins/public/stripe/client" "github.com/formancehq/payments/internal/models" ) @@ -14,16 +15,16 @@ var ( ) // root account reference is internal so we don't pass it to Stripe API clients -func resolveAccount(ref *string) *string { - if *ref == rootAccountReference { - return nil +func resolveAccount(ref string) string { + if ref == rootAccountReference { + return "" } return ref } type AccountsState struct { - InitFinished bool `json:"init_finished"` - LastID string `json:"lastID,omitempty"` + RootCreated bool `json:"root_created"` + Timeline client.Timeline `json:"timeline"` } func (p *Plugin) fetchNextAccounts(ctx context.Context, req models.FetchNextAccountsRequest) (models.FetchNextAccountsResponse, error) { @@ -35,7 +36,7 @@ func (p *Plugin) fetchNextAccounts(ctx context.Context, req models.FetchNextAcco } accounts := make([]models.PSPAccount, 0, req.PageSize) - if !oldState.InitFinished { + if !oldState.RootCreated { // create a root account if this is the first time this is being run accounts = append(accounts, models.PSPAccount{ Name: &rootAccountReference, @@ -44,21 +45,19 @@ func (p *Plugin) fetchNextAccounts(ctx context.Context, req models.FetchNextAcco Raw: json.RawMessage("{}"), Metadata: map[string]string{}, }) - oldState.InitFinished = true + oldState.RootCreated = true } needed := req.PageSize - len(accounts) - newState := AccountsState{ - InitFinished: oldState.InitFinished, - } - rawAccounts, hasMore, err := p.client.GetAccounts(ctx, &oldState.LastID, int64(needed)) + newState := oldState + rawAccounts, timeline, hasMore, err := p.client.GetAccounts(ctx, oldState.Timeline, int64(needed)) if err != nil { return models.FetchNextAccountsResponse{}, err } - for _, acc := range rawAccounts { - newState.LastID = acc.ID + newState.Timeline = timeline + for _, acc := range rawAccounts { raw, err := json.Marshal(acc) if err != nil { return models.FetchNextAccountsResponse{}, err diff --git a/components/payments/internal/connectors/plugins/public/stripe/accounts_test.go b/components/payments/internal/connectors/plugins/public/stripe/accounts_test.go index 6689ab4955..a8e06bede8 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/accounts_test.go +++ b/components/payments/internal/connectors/plugins/public/stripe/accounts_test.go @@ -52,6 +52,7 @@ var _ = Describe("Stripe Plugin Accounts", func() { // pageSize passed to client is less when we generate a root account m.EXPECT().GetAccounts(ctx, gomock.Any(), int64(pageSize-1)).Return( sampleAccounts, + client.Timeline{LatestID: sampleAccounts[len(sampleAccounts)-1].ID}, true, nil, ) @@ -65,7 +66,7 @@ var _ = Describe("Stripe Plugin Accounts", func() { err = json.Unmarshal(res.NewState, &state) Expect(err).To(BeNil()) - Expect(state.LastID).To(Equal(res.Accounts[len(res.Accounts)-1].Reference)) + Expect(state.Timeline.LatestID).To(Equal(res.Accounts[len(res.Accounts)-1].Reference)) }) }) }) diff --git a/components/payments/internal/connectors/plugins/public/stripe/balances.go b/components/payments/internal/connectors/plugins/public/stripe/balances.go index 69d5c9c461..e7d88ce2cb 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/balances.go +++ b/components/payments/internal/connectors/plugins/public/stripe/balances.go @@ -20,7 +20,7 @@ func (p *Plugin) fetchNextBalances(ctx context.Context, req models.FetchNextBala return models.FetchNextBalancesResponse{}, err } - balance, err := p.client.GetAccountBalances(ctx, resolveAccount(&from.Reference)) + balance, err := p.client.GetAccountBalances(ctx, resolveAccount(from.Reference)) if err != nil { return models.FetchNextBalancesResponse{}, err } diff --git a/components/payments/internal/connectors/plugins/public/stripe/balances_test.go b/components/payments/internal/connectors/plugins/public/stripe/balances_test.go index 55ddba1384..dbe414301e 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/balances_test.go +++ b/components/payments/internal/connectors/plugins/public/stripe/balances_test.go @@ -52,7 +52,7 @@ var _ = Describe("Stripe Plugin Balances", func() { FromPayload: json.RawMessage(fmt.Sprintf(`{"reference": "%s"}`, accRef)), State: json.RawMessage(`{}`), } - m.EXPECT().GetAccountBalances(ctx, &accRef).Return( + m.EXPECT().GetAccountBalances(ctx, accRef).Return( sampleBalance, nil, ) diff --git a/components/payments/internal/connectors/plugins/public/stripe/client/accounts.go b/components/payments/internal/connectors/plugins/public/stripe/client/accounts.go index ac8354d5ac..75ca3bc2c9 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/client/accounts.go +++ b/components/payments/internal/connectors/plugins/public/stripe/client/accounts.go @@ -6,21 +6,42 @@ import ( "github.com/stripe/stripe-go/v79" ) -func (c *client) GetAccounts(ctx context.Context, lastID *string, pageSize int64) ([]*stripe.Account, bool, error) { +func (c *client) GetAccounts( + ctx context.Context, + timeline Timeline, + pageSize int64, +) (results []*stripe.Account, _ Timeline, hasMore bool, err error) { // TODO // f := connectors.ClientMetrics(ctx, "stripe", "list_accounts") // now := time.Now() // defer f(ctx, now) - filters := stripe.ListParams{ - Limit: &pageSize, + + results = make([]*stripe.Account, 0, int(pageSize)) + + if !timeline.IsCaughtUp() { + var oldest interface{} + oldest, timeline, hasMore, err = scanForOldest(timeline, pageSize, func(params stripe.ListParams) (stripe.ListContainer, error) { + itr := c.accountClient.List(&stripe.AccountListParams{ListParams: params}) + return itr.AccountList(), itr.Err() + }) + if err != nil { + return results, timeline, false, err + } + // either there are no records or we haven't found the start yet + if !timeline.IsCaughtUp() { + return results, timeline, hasMore, nil + } + results = append(results, oldest.(*stripe.Account)) } - if lastID == nil { - filters.StartingAfter = lastID + + filters := stripe.ListParams{ + Limit: limit(pageSize, len(results)), + EndingBefore: &timeline.LatestID, + Single: true, // turn off autopagination } itr := c.accountClient.List(&stripe.AccountListParams{ListParams: filters}) - if err := itr.Err(); err != nil { - return nil, false, handleError(itr.Iter) - } - return itr.AccountList().Data, itr.AccountList().ListMeta.HasMore, nil + results = append(results, itr.AccountList().Data...) + timeline.LatestID = results[len(results)-1].ID + return results, timeline, itr.AccountList().ListMeta.HasMore, itr.Err() } diff --git a/components/payments/internal/connectors/plugins/public/stripe/client/balances.go b/components/payments/internal/connectors/plugins/public/stripe/client/balances.go index 76f637fbda..47e7f90d8f 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/client/balances.go +++ b/components/payments/internal/connectors/plugins/public/stripe/client/balances.go @@ -7,13 +7,14 @@ import ( "github.com/stripe/stripe-go/v79" ) -func (c *client) GetAccountBalances(ctx context.Context, accountID *string) (*stripe.Balance, error) { +func (c *client) GetAccountBalances(ctx context.Context, accountID string) (*stripe.Balance, error) { // TODO // f := connectors.ClientMetrics(ctx, "stripe", "get_balances") // now := time.Now() // defer f(ctx, now) - filters := stripe.Params{ - StripeAccount: accountID, + var filters stripe.Params + if accountID != "" { + filters.StripeAccount = &accountID } balance, err := c.balanceClient.Get(&stripe.BalanceParams{Params: filters}) diff --git a/components/payments/internal/connectors/plugins/public/stripe/client/client.go b/components/payments/internal/connectors/plugins/public/stripe/client/client.go index 975671471c..0e3a6c685c 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/client/client.go +++ b/components/payments/internal/connectors/plugins/public/stripe/client/client.go @@ -12,10 +12,10 @@ import ( //go:generate mockgen -source client.go -destination client_generated.go -package client . Client type Client interface { - GetAccounts(ctx context.Context, lastID *string, pageSize int64) ([]*stripe.Account, bool, error) - GetAccountBalances(ctx context.Context, accountID *string) (*stripe.Balance, error) - GetExternalAccounts(ctx context.Context, accountID *string, lastID *string, pageSize int64) ([]*stripe.BankAccount, bool, error) - GetPayments(ctx context.Context, accountID *string, lastID *string, pageSize int64) ([]*stripe.BalanceTransaction, bool, error) + GetAccounts(ctx context.Context, timeline Timeline, pageSize int64) ([]*stripe.Account, Timeline, bool, error) + GetAccountBalances(ctx context.Context, accountID string) (*stripe.Balance, error) + GetExternalAccounts(ctx context.Context, accountID string, timeline Timeline, pageSize int64) ([]*stripe.BankAccount, Timeline, bool, error) + GetPayments(ctx context.Context, accountID string, timeline Timeline, pageSize int64) ([]*stripe.BalanceTransaction, Timeline, bool, error) } type client struct { @@ -37,3 +37,8 @@ func New(backend stripe.Backend, apiKey string) Client { balanceTransactionClient: balancetransaction.Client{B: backend, Key: apiKey}, } } + +func limit(wanted int64, have int) *int64 { + needed := wanted - int64(have) + return &needed +} diff --git a/components/payments/internal/connectors/plugins/public/stripe/client/client_generated.go b/components/payments/internal/connectors/plugins/public/stripe/client/client_generated.go index e35d4d1680..f57082a602 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/client/client_generated.go +++ b/components/payments/internal/connectors/plugins/public/stripe/client/client_generated.go @@ -41,7 +41,7 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { } // GetAccountBalances mocks base method. -func (m *MockClient) GetAccountBalances(ctx context.Context, accountID *string) (*stripe.Balance, error) { +func (m *MockClient) GetAccountBalances(ctx context.Context, accountID string) (*stripe.Balance, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetAccountBalances", ctx, accountID) ret0, _ := ret[0].(*stripe.Balance) @@ -56,49 +56,52 @@ func (mr *MockClientMockRecorder) GetAccountBalances(ctx, accountID any) *gomock } // GetAccounts mocks base method. -func (m *MockClient) GetAccounts(ctx context.Context, lastID *string, pageSize int64) ([]*stripe.Account, bool, error) { +func (m *MockClient) GetAccounts(ctx context.Context, timeline Timeline, pageSize int64) ([]*stripe.Account, Timeline, bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAccounts", ctx, lastID, pageSize) + ret := m.ctrl.Call(m, "GetAccounts", ctx, timeline, pageSize) ret0, _ := ret[0].([]*stripe.Account) - ret1, _ := ret[1].(bool) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].(Timeline) + ret2, _ := ret[2].(bool) + ret3, _ := ret[3].(error) + return ret0, ret1, ret2, ret3 } // GetAccounts indicates an expected call of GetAccounts. -func (mr *MockClientMockRecorder) GetAccounts(ctx, lastID, pageSize any) *gomock.Call { +func (mr *MockClientMockRecorder) GetAccounts(ctx, timeline, pageSize any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccounts", reflect.TypeOf((*MockClient)(nil).GetAccounts), ctx, lastID, pageSize) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccounts", reflect.TypeOf((*MockClient)(nil).GetAccounts), ctx, timeline, pageSize) } // GetExternalAccounts mocks base method. -func (m *MockClient) GetExternalAccounts(ctx context.Context, accountID, lastID *string, pageSize int64) ([]*stripe.BankAccount, bool, error) { +func (m *MockClient) GetExternalAccounts(ctx context.Context, accountID string, timeline Timeline, pageSize int64) ([]*stripe.BankAccount, Timeline, bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetExternalAccounts", ctx, accountID, lastID, pageSize) + ret := m.ctrl.Call(m, "GetExternalAccounts", ctx, accountID, timeline, pageSize) ret0, _ := ret[0].([]*stripe.BankAccount) - ret1, _ := ret[1].(bool) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].(Timeline) + ret2, _ := ret[2].(bool) + ret3, _ := ret[3].(error) + return ret0, ret1, ret2, ret3 } // GetExternalAccounts indicates an expected call of GetExternalAccounts. -func (mr *MockClientMockRecorder) GetExternalAccounts(ctx, accountID, lastID, pageSize any) *gomock.Call { +func (mr *MockClientMockRecorder) GetExternalAccounts(ctx, accountID, timeline, pageSize any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExternalAccounts", reflect.TypeOf((*MockClient)(nil).GetExternalAccounts), ctx, accountID, lastID, pageSize) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExternalAccounts", reflect.TypeOf((*MockClient)(nil).GetExternalAccounts), ctx, accountID, timeline, pageSize) } // GetPayments mocks base method. -func (m *MockClient) GetPayments(ctx context.Context, accountID, lastID *string, pageSize int64) ([]*stripe.BalanceTransaction, bool, error) { +func (m *MockClient) GetPayments(ctx context.Context, accountID string, timeline Timeline, pageSize int64) ([]*stripe.BalanceTransaction, Timeline, bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPayments", ctx, accountID, lastID, pageSize) + ret := m.ctrl.Call(m, "GetPayments", ctx, accountID, timeline, pageSize) ret0, _ := ret[0].([]*stripe.BalanceTransaction) - ret1, _ := ret[1].(bool) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].(Timeline) + ret2, _ := ret[2].(bool) + ret3, _ := ret[3].(error) + return ret0, ret1, ret2, ret3 } // GetPayments indicates an expected call of GetPayments. -func (mr *MockClientMockRecorder) GetPayments(ctx, accountID, lastID, pageSize any) *gomock.Call { +func (mr *MockClientMockRecorder) GetPayments(ctx, accountID, timeline, pageSize any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPayments", reflect.TypeOf((*MockClient)(nil).GetPayments), ctx, accountID, lastID, pageSize) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPayments", reflect.TypeOf((*MockClient)(nil).GetPayments), ctx, accountID, timeline, pageSize) } diff --git a/components/payments/internal/connectors/plugins/public/stripe/client/external_accounts.go b/components/payments/internal/connectors/plugins/public/stripe/client/external_accounts.go index 01af9bd524..ebd9595e06 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/client/external_accounts.go +++ b/components/payments/internal/connectors/plugins/public/stripe/client/external_accounts.go @@ -8,32 +8,52 @@ import ( func (c *client) GetExternalAccounts( ctx context.Context, - accountID *string, - lastID *string, + accountID string, + timeline Timeline, pageSize int64, -) ([]*stripe.BankAccount, bool, error) { +) (results []*stripe.BankAccount, _ Timeline, hasMore bool, err error) { // TODO // f := connectors.ClientMetrics(ctx, "stripe", "list_accounts") // now := time.Now() // defer f(ctx, now) - filters := stripe.ListParams{ - Limit: &pageSize, - } - if lastID == nil { - filters.StartingAfter = lastID - } + + results = make([]*stripe.BankAccount, 0, int(pageSize)) // return 0 results because this endpoint cannot be used for root account - if accountID == nil { - return []*stripe.BankAccount{}, false, nil + if accountID == "" { + return results, timeline, false, nil + } + + if !timeline.IsCaughtUp() { + var oldest interface{} + oldest, timeline, hasMore, err = scanForOldest(timeline, pageSize, func(params stripe.ListParams) (stripe.ListContainer, error) { + itr := c.bankAccountClient.List(&stripe.BankAccountListParams{ + Account: &accountID, + ListParams: params, + }) + return itr.BankAccountList(), itr.Err() + }) + if err != nil { + return results, timeline, false, err + } + // either there are no records or we haven't found the start yet + if !timeline.IsCaughtUp() { + return results, timeline, hasMore, nil + } + results = append(results, oldest.(*stripe.BankAccount)) } itr := c.bankAccountClient.List(&stripe.BankAccountListParams{ - Account: accountID, - ListParams: filters, + Account: &accountID, + ListParams: stripe.ListParams{ + Limit: &pageSize, + EndingBefore: &timeline.LatestID, + }, }) if err := itr.Err(); err != nil { - return nil, false, handleError(itr.Iter) + return nil, timeline, false, handleError(itr.Iter) } - return itr.BankAccountList().Data, itr.BankAccountList().ListMeta.HasMore, nil + results = append(results, itr.BankAccountList().Data...) + timeline.LatestID = results[len(results)-1].ID + return results, timeline, itr.BankAccountList().ListMeta.HasMore, nil } diff --git a/components/payments/internal/connectors/plugins/public/stripe/client/payments.go b/components/payments/internal/connectors/plugins/public/stripe/client/payments.go index e777e2af60..dc31d6043b 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/client/payments.go +++ b/components/payments/internal/connectors/plugins/public/stripe/client/payments.go @@ -6,7 +6,7 @@ import ( "github.com/stripe/stripe-go/v79" ) -var ( +const ( expandSource = "data.source" expandSourceCharge = "data.source.charge" expandSourceDispute = "data.source.dispute" @@ -17,35 +17,68 @@ var ( expandSourceRefundPaymentIntent = "data.source.refund.payment_intent" ) -func (c *client) GetPayments(ctx context.Context, accountID *string, lastID *string, pageSize int64) ([]*stripe.BalanceTransaction, bool, error) { +func (c *client) GetPayments( + ctx context.Context, + accountID string, + timeline Timeline, + pageSize int64, +) (results []*stripe.BalanceTransaction, _ Timeline, hasMore bool, err error) { // TODO // f := connectors.ClientMetrics(ctx, "stripe", "list_accounts") // now := time.Now() // defer f(ctx, now) - filters := stripe.ListParams{ - Limit: &pageSize, + + results = make([]*stripe.BalanceTransaction, 0, int(pageSize)) + + if !timeline.IsCaughtUp() { + var oldest interface{} + oldest, timeline, hasMore, err = scanForOldest(timeline, pageSize, func(params stripe.ListParams) (stripe.ListContainer, error) { + if accountID != "" { + params.StripeAccount = &accountID + } + transactionParams := &stripe.BalanceTransactionListParams{ListParams: params} + expandBalanceTransactionParams(transactionParams) + itr := c.balanceTransactionClient.List(transactionParams) + return itr.BalanceTransactionList(), itr.Err() + }) + if err != nil { + return results, timeline, false, err + } + // either there are no records or we haven't found the start yet + if !timeline.IsCaughtUp() { + return results, timeline, hasMore, nil + } + results = append(results, oldest.(*stripe.BalanceTransaction)) } - if lastID == nil { - filters.StartingAfter = lastID + + filters := stripe.ListParams{ + Limit: limit(pageSize, len(results)), + EndingBefore: &timeline.LatestID, + Single: true, // turn off autopagination } - expand := []*string{ - &expandSource, - &expandSourceCharge, - &expandSourceDispute, - &expandSourcePayout, - &expandSourceRefund, - &expandSourceTransfer, - &expandSourcePaymentIntent, - &expandSourceRefundPaymentIntent, + if accountID != "" { + filters.StripeAccount = &accountID } - itr := c.balanceTransactionClient.List(&stripe.BalanceTransactionListParams{ + params := &stripe.BalanceTransactionListParams{ ListParams: filters, - Expand: expand, - }) - if err := itr.Err(); err != nil { - return nil, false, handleError(itr.Iter) } - return itr.BalanceTransactionList().Data, itr.BalanceTransactionList().ListMeta.HasMore, nil + expandBalanceTransactionParams(params) + + itr := c.balanceTransactionClient.List(params) + results = append(results, itr.BalanceTransactionList().Data...) + timeline.LatestID = results[len(results)-1].ID + return results, timeline, itr.BalanceTransactionList().ListMeta.HasMore, itr.Err() +} + +func expandBalanceTransactionParams(params *stripe.BalanceTransactionListParams) { + params.AddExpand(expandSource) + params.AddExpand(expandSourceCharge) + params.AddExpand(expandSourceDispute) + params.AddExpand(expandSourcePayout) + params.AddExpand(expandSourceRefund) + params.AddExpand(expandSourceTransfer) + params.AddExpand(expandSourcePaymentIntent) + params.AddExpand(expandSourceRefundPaymentIntent) } diff --git a/components/payments/internal/connectors/plugins/public/stripe/client/timeline.go b/components/payments/internal/connectors/plugins/public/stripe/client/timeline.go new file mode 100644 index 0000000000..c9ee5dd9fc --- /dev/null +++ b/components/payments/internal/connectors/plugins/public/stripe/client/timeline.go @@ -0,0 +1,77 @@ +package client + +import ( + "fmt" + + "github.com/stripe/stripe-go/v79" +) + +// Timeline allows the client to navigate the backlog and decide whether to fetch +// historical or recently added data +type Timeline struct { + LatestID string `json:"latest_id"` + BacklogCursor string `json:"backlog_cursor"` +} + +func (t Timeline) IsCaughtUp() bool { + return t.LatestID != "" +} + +func scanForOldest( + timeline Timeline, + pageSize int64, + listFn func(stripe.ListParams) (stripe.ListContainer, error), +) (interface{}, Timeline, bool, error) { + filters := stripe.ListParams{ + Limit: limit(pageSize, 0), + Single: true, // turn off autopagination + } + if timeline.BacklogCursor != "" { + filters.StartingAfter = &timeline.BacklogCursor + } + + var oldest interface{} + var oldestID string + + list, err := listFn(filters) + if err != nil { + return oldest, timeline, false, err + } + hasMore := list.GetListMeta().HasMore + + switch v := list.(type) { + case *stripe.AccountList: + if len(v.Data) == 0 { + return oldest, timeline, hasMore, nil + } + account := v.Data[len(v.Data)-1] + oldest = account + oldestID = account.ID + + case *stripe.BankAccountList: + if len(v.Data) == 0 { + return oldest, timeline, hasMore, nil + } + account := v.Data[len(v.Data)-1] + oldest = account + oldestID = account.ID + + case *stripe.BalanceTransactionList: + if len(v.Data) == 0 { + return oldest, timeline, hasMore, nil + } + trx := v.Data[len(v.Data)-1] + oldest = trx + oldestID = trx.ID + default: + return nil, timeline, hasMore, fmt.Errorf("failed to fetch backlog for type %T", list) + } + + // we haven't found the oldest yet + if hasMore { + timeline.BacklogCursor = oldestID + return nil, timeline, hasMore, nil + } + timeline.LatestID = oldestID + return oldest, timeline, hasMore, nil +} diff --git a/components/payments/internal/connectors/plugins/public/stripe/external_accounts.go b/components/payments/internal/connectors/plugins/public/stripe/external_accounts.go index 4c4d8f3ddc..b4551ef625 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/external_accounts.go +++ b/components/payments/internal/connectors/plugins/public/stripe/external_accounts.go @@ -7,13 +7,18 @@ import ( "time" "github.com/formancehq/payments/internal/connectors/plugins/currency" + "github.com/formancehq/payments/internal/connectors/plugins/public/stripe/client" "github.com/formancehq/payments/internal/models" "github.com/pkg/errors" ) +type ExternalAccountsState struct { + Timeline client.Timeline `json:"timeline"` +} + func (p *Plugin) fetchNextExternalAccounts(ctx context.Context, req models.FetchNextExternalAccountsRequest) (models.FetchNextExternalAccountsResponse, error) { var ( - oldState AccountsState + oldState ExternalAccountsState from models.PSPAccount ) if req.State != nil { @@ -28,16 +33,21 @@ func (p *Plugin) fetchNextExternalAccounts(ctx context.Context, req models.Fetch return models.FetchNextExternalAccountsResponse{}, err } - newState := AccountsState{} + newState := oldState var accounts []models.PSPAccount - rawAccounts, hasMore, err := p.client.GetExternalAccounts(ctx, resolveAccount(&from.Reference), &oldState.LastID, int64(req.PageSize)) + rawAccounts, timeline, hasMore, err := p.client.GetExternalAccounts( + ctx, + resolveAccount(from.Reference), + oldState.Timeline, + int64(req.PageSize), + ) if err != nil { return models.FetchNextExternalAccountsResponse{}, err } - for _, acc := range rawAccounts { - newState.LastID = acc.ID + newState.Timeline = timeline + for _, acc := range rawAccounts { raw, err := json.Marshal(acc) if err != nil { return models.FetchNextExternalAccountsResponse{}, err diff --git a/components/payments/internal/connectors/plugins/public/stripe/external_accounts_test.go b/components/payments/internal/connectors/plugins/public/stripe/external_accounts_test.go index 23cbc71308..61bcd2844d 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/external_accounts_test.go +++ b/components/payments/internal/connectors/plugins/public/stripe/external_accounts_test.go @@ -55,8 +55,9 @@ var _ = Describe("Stripe Plugin ExternalAccounts", func() { State: json.RawMessage(`{}`), PageSize: pageSize, } - m.EXPECT().GetExternalAccounts(ctx, &accRef, gomock.Any(), int64(pageSize)).Return( + m.EXPECT().GetExternalAccounts(ctx, accRef, gomock.Any(), int64(pageSize)).Return( sampleExternalAccounts, + client.Timeline{LatestID: sampleExternalAccounts[len(sampleExternalAccounts)-1].ID}, true, nil, ) @@ -69,7 +70,7 @@ var _ = Describe("Stripe Plugin ExternalAccounts", func() { err = json.Unmarshal(res.NewState, &state) Expect(err).To(BeNil()) - Expect(state.LastID).To(Equal(res.ExternalAccounts[len(res.ExternalAccounts)-1].Reference)) + Expect(state.Timeline.LatestID).To(Equal(res.ExternalAccounts[len(res.ExternalAccounts)-1].Reference)) }) }) }) diff --git a/components/payments/internal/connectors/plugins/public/stripe/payments.go b/components/payments/internal/connectors/plugins/public/stripe/payments.go index 0aa8f01472..bd9dc60be2 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/payments.go +++ b/components/payments/internal/connectors/plugins/public/stripe/payments.go @@ -10,6 +10,7 @@ import ( "time" "github.com/formancehq/payments/internal/connectors/plugins/currency" + "github.com/formancehq/payments/internal/connectors/plugins/public/stripe/client" "github.com/formancehq/payments/internal/models" "github.com/pkg/errors" stripesdk "github.com/stripe/stripe-go/v79" @@ -22,12 +23,12 @@ var ( ErrUnsupportedCurrency = errors.New("unsupported currency") ) -type PaymentState struct { - LastID string `json:"lastID,omitempty"` +type PaymentsState struct { + Timeline client.Timeline `json:"timeline"` } func (p *Plugin) fetchNextPayments(ctx context.Context, req models.FetchNextPaymentsRequest) (models.FetchNextPaymentsResponse, error) { - var oldState PaymentState + var oldState PaymentsState if req.State != nil { if err := json.Unmarshal(req.State, &oldState); err != nil { return models.FetchNextPaymentsResponse{}, err @@ -43,14 +44,19 @@ func (p *Plugin) fetchNextPayments(ctx context.Context, req models.FetchNextPaym } var payments []models.PSPPayment - newState := PaymentState{} - rawPayments, hasMore, err := p.client.GetPayments(ctx, resolveAccount(&from.Reference), &oldState.LastID, int64(req.PageSize)) + newState := oldState + rawPayments, timeline, hasMore, err := p.client.GetPayments( + ctx, + resolveAccount(from.Reference), + oldState.Timeline, + int64(req.PageSize), + ) if err != nil { return models.FetchNextPaymentsResponse{}, err } + newState.Timeline = timeline for _, rawPayment := range rawPayments { - newState.LastID = rawPayment.ID payment, err := p.translatePayment(&from.Reference, rawPayment) if err != nil { return models.FetchNextPaymentsResponse{}, fmt.Errorf("failed to translate payment: %w", err) @@ -76,7 +82,8 @@ func (p *Plugin) fetchNextPayments(ctx context.Context, req models.FetchNextPaym func (p *Plugin) translatePayment(accountRef *string, balanceTransaction *stripesdk.BalanceTransaction) (payment *models.PSPPayment, err error) { if balanceTransaction.Source == nil { - return nil, fmt.Errorf("payment source is invalid") + log.Printf("skipping balance transaction of type %q with nil source element: %q", balanceTransaction.Type, balanceTransaction.ID) + return nil, nil } rawData, err := json.Marshal(balanceTransaction) @@ -87,6 +94,10 @@ func (p *Plugin) translatePayment(accountRef *string, balanceTransaction *stripe switch balanceTransaction.Type { case stripesdk.BalanceTransactionTypeCharge: + if balanceTransaction.Source.Charge == nil { + log.Printf("skipping balance transaction of type %q with nil charge element: %q", balanceTransaction.Type, balanceTransaction.ID) + return nil, nil + } transactionCurrency := strings.ToUpper(string(balanceTransaction.Source.Charge.Currency)) _, ok := supportedCurrenciesWithDecimal[transactionCurrency] if !ok { diff --git a/components/payments/internal/connectors/plugins/public/stripe/payments_test.go b/components/payments/internal/connectors/plugins/public/stripe/payments_test.go index cf130741da..943b870c47 100644 --- a/components/payments/internal/connectors/plugins/public/stripe/payments_test.go +++ b/components/payments/internal/connectors/plugins/public/stripe/payments_test.go @@ -187,27 +187,6 @@ var _ = Describe("Stripe Plugin Payments", func() { } }) - It("fails when payments missing source are received", func(ctx SpecContext) { - req := models.FetchNextPaymentsRequest{ - FromPayload: json.RawMessage(fmt.Sprintf(`{"reference": "%s"}`, accRef)), - State: json.RawMessage(`{}`), - PageSize: pageSize, - } - p := []*stripesdk.BalanceTransaction{ - { - ID: "someid", - Type: stripesdk.BalanceTransactionTypeAdjustment, - }, - } - m.EXPECT().GetPayments(ctx, &accRef, gomock.Any(), int64(pageSize)).Return( - p, - true, - nil, - ) - res, err := plg.FetchNextPayments(ctx, req) - Expect(err).To(MatchError(ContainSubstring(stripe.ErrInvalidPaymentSource.Error()))) - Expect(res.HasMore).To(BeFalse()) - }) It("fails when payments contain unsupported currencies", func(ctx SpecContext) { req := models.FetchNextPaymentsRequest{ @@ -226,8 +205,9 @@ var _ = Describe("Stripe Plugin Payments", func() { }, }, } - m.EXPECT().GetPayments(ctx, &accRef, gomock.Any(), int64(pageSize)).Return( + m.EXPECT().GetPayments(ctx, accRef, gomock.Any(), int64(pageSize)).Return( p, + client.Timeline{}, true, nil, ) @@ -242,8 +222,9 @@ var _ = Describe("Stripe Plugin Payments", func() { State: json.RawMessage(`{}`), PageSize: pageSize, } - m.EXPECT().GetPayments(ctx, &accRef, gomock.Any(), int64(pageSize)).Return( + m.EXPECT().GetPayments(ctx, accRef, gomock.Any(), int64(pageSize)).Return( samplePayments, + client.Timeline{LatestID: samplePayments[len(samplePayments)-1].ID}, true, nil, ) @@ -302,11 +283,11 @@ var _ = Describe("Stripe Plugin Payments", func() { Expect(res.Payments[11].Type).To(Equal(models.PAYMENT_TYPE_PAYIN)) Expect(res.Payments[11].Status).To(Equal(models.PAYMENT_STATUS_DISPUTE)) - var state stripe.PaymentState + var state stripe.PaymentsState err = json.Unmarshal(res.NewState, &state) Expect(err).To(BeNil()) - Expect(state.LastID).To(Equal(samplePayments[len(samplePayments)-1].ID)) + Expect(state.Timeline.LatestID).To(Equal(samplePayments[len(samplePayments)-1].ID)) }) }) })