Skip to content

Commit

Permalink
feat(payments): import stripe data in asc order
Browse files Browse the repository at this point in the history
  • Loading branch information
laouji committed Sep 30, 2024
1 parent a31ee88 commit c122200
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/formancehq/payments/internal/connectors/plugins/currency"
"github.com/formancehq/payments/internal/connectors/plugins/public/stripe/client"
"github.com/formancehq/payments/internal/models"
)

Expand All @@ -14,16 +15,16 @@ var (
)

// root account reference is internal so we don't pass it to Stripe API clients
func resolveAccount(ref *string) *string {
if *ref == rootAccountReference {
return nil
func resolveAccount(ref string) string {
if ref == rootAccountReference {
return ""
}
return ref
}

type AccountsState struct {
InitFinished bool `json:"init_finished"`
LastID string `json:"lastID,omitempty"`
RootCreated bool `json:"root_created"`
Timeline client.Timeline `json:"timeline"`
}

func (p *Plugin) fetchNextAccounts(ctx context.Context, req models.FetchNextAccountsRequest) (models.FetchNextAccountsResponse, error) {
Expand All @@ -35,7 +36,7 @@ func (p *Plugin) fetchNextAccounts(ctx context.Context, req models.FetchNextAcco
}

accounts := make([]models.PSPAccount, 0, req.PageSize)
if !oldState.InitFinished {
if !oldState.RootCreated {
// create a root account if this is the first time this is being run
accounts = append(accounts, models.PSPAccount{
Name: &rootAccountReference,
Expand All @@ -44,21 +45,19 @@ func (p *Plugin) fetchNextAccounts(ctx context.Context, req models.FetchNextAcco
Raw: json.RawMessage("{}"),
Metadata: map[string]string{},
})
oldState.InitFinished = true
oldState.RootCreated = true
}

needed := req.PageSize - len(accounts)

newState := AccountsState{
InitFinished: oldState.InitFinished,
}
rawAccounts, hasMore, err := p.client.GetAccounts(ctx, &oldState.LastID, int64(needed))
newState := oldState
rawAccounts, timeline, hasMore, err := p.client.GetAccounts(ctx, oldState.Timeline, int64(needed))
if err != nil {
return models.FetchNextAccountsResponse{}, err
}
for _, acc := range rawAccounts {
newState.LastID = acc.ID
newState.Timeline = timeline

for _, acc := range rawAccounts {
raw, err := json.Marshal(acc)
if err != nil {
return models.FetchNextAccountsResponse{}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var _ = Describe("Stripe Plugin Accounts", func() {
// pageSize passed to client is less when we generate a root account
m.EXPECT().GetAccounts(ctx, gomock.Any(), int64(pageSize-1)).Return(
sampleAccounts,
client.Timeline{LatestID: sampleAccounts[len(sampleAccounts)-1].ID},
true,
nil,
)
Expand All @@ -65,7 +66,7 @@ var _ = Describe("Stripe Plugin Accounts", func() {

err = json.Unmarshal(res.NewState, &state)
Expect(err).To(BeNil())
Expect(state.LastID).To(Equal(res.Accounts[len(res.Accounts)-1].Reference))
Expect(state.Timeline.LatestID).To(Equal(res.Accounts[len(res.Accounts)-1].Reference))
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (p *Plugin) fetchNextBalances(ctx context.Context, req models.FetchNextBala
return models.FetchNextBalancesResponse{}, err
}

balance, err := p.client.GetAccountBalances(ctx, resolveAccount(&from.Reference))
balance, err := p.client.GetAccountBalances(ctx, resolveAccount(from.Reference))
if err != nil {
return models.FetchNextBalancesResponse{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var _ = Describe("Stripe Plugin Balances", func() {
FromPayload: json.RawMessage(fmt.Sprintf(`{"reference": "%s"}`, accRef)),
State: json.RawMessage(`{}`),
}
m.EXPECT().GetAccountBalances(ctx, &accRef).Return(
m.EXPECT().GetAccountBalances(ctx, accRef).Return(
sampleBalance,
nil,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,42 @@ import (
"github.com/stripe/stripe-go/v79"
)

func (c *client) GetAccounts(ctx context.Context, lastID *string, pageSize int64) ([]*stripe.Account, bool, error) {
func (c *client) GetAccounts(
ctx context.Context,
timeline Timeline,
pageSize int64,
) (results []*stripe.Account, _ Timeline, hasMore bool, err error) {
// TODO
// f := connectors.ClientMetrics(ctx, "stripe", "list_accounts")
// now := time.Now()
// defer f(ctx, now)
filters := stripe.ListParams{
Limit: &pageSize,

results = make([]*stripe.Account, 0, int(pageSize))

if !timeline.IsCaughtUp() {
var oldest interface{}
oldest, timeline, hasMore, err = scanForOldest(timeline, pageSize, func(params stripe.ListParams) (stripe.ListContainer, error) {
itr := c.accountClient.List(&stripe.AccountListParams{ListParams: params})
return itr.AccountList(), itr.Err()
})
if err != nil {
return results, timeline, false, err
}
// either there are no records or we haven't found the start yet
if !timeline.IsCaughtUp() {
return results, timeline, hasMore, nil
}
results = append(results, oldest.(*stripe.Account))
}
if lastID == nil {
filters.StartingAfter = lastID

filters := stripe.ListParams{
Limit: limit(pageSize, len(results)),
EndingBefore: &timeline.LatestID,
Single: true, // turn off autopagination
}

itr := c.accountClient.List(&stripe.AccountListParams{ListParams: filters})
if err := itr.Err(); err != nil {
return nil, false, handleError(itr.Iter)
}
return itr.AccountList().Data, itr.AccountList().ListMeta.HasMore, nil
results = append(results, itr.AccountList().Data...)
timeline.LatestID = results[len(results)-1].ID
return results, timeline, itr.AccountList().ListMeta.HasMore, itr.Err()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"github.com/stripe/stripe-go/v79"
)

func (c *client) GetAccountBalances(ctx context.Context, accountID *string) (*stripe.Balance, error) {
func (c *client) GetAccountBalances(ctx context.Context, accountID string) (*stripe.Balance, error) {
// TODO
// f := connectors.ClientMetrics(ctx, "stripe", "get_balances")
// now := time.Now()
// defer f(ctx, now)
filters := stripe.Params{
StripeAccount: accountID,
var filters stripe.Params
if accountID != "" {
filters.StripeAccount = &accountID
}

balance, err := c.balanceClient.Get(&stripe.BalanceParams{Params: filters})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

//go:generate mockgen -source client.go -destination client_generated.go -package client . Client
type Client interface {
GetAccounts(ctx context.Context, lastID *string, pageSize int64) ([]*stripe.Account, bool, error)
GetAccountBalances(ctx context.Context, accountID *string) (*stripe.Balance, error)
GetExternalAccounts(ctx context.Context, accountID *string, lastID *string, pageSize int64) ([]*stripe.BankAccount, bool, error)
GetPayments(ctx context.Context, accountID *string, lastID *string, pageSize int64) ([]*stripe.BalanceTransaction, bool, error)
GetAccounts(ctx context.Context, timeline Timeline, pageSize int64) ([]*stripe.Account, Timeline, bool, error)
GetAccountBalances(ctx context.Context, accountID string) (*stripe.Balance, error)
GetExternalAccounts(ctx context.Context, accountID string, timeline Timeline, pageSize int64) ([]*stripe.BankAccount, Timeline, bool, error)
GetPayments(ctx context.Context, accountID string, timeline Timeline, pageSize int64) ([]*stripe.BalanceTransaction, Timeline, bool, error)
}

type client struct {
Expand All @@ -37,3 +37,8 @@ func New(backend stripe.Backend, apiKey string) Client {
balanceTransactionClient: balancetransaction.Client{B: backend, Key: apiKey},
}
}

func limit(wanted int64, have int) *int64 {
needed := wanted - int64(have)
return &needed
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,52 @@ import (

func (c *client) GetExternalAccounts(
ctx context.Context,
accountID *string,
lastID *string,
accountID string,
timeline Timeline,
pageSize int64,
) ([]*stripe.BankAccount, bool, error) {
) (results []*stripe.BankAccount, _ Timeline, hasMore bool, err error) {
// TODO
// f := connectors.ClientMetrics(ctx, "stripe", "list_accounts")
// now := time.Now()
// defer f(ctx, now)
filters := stripe.ListParams{
Limit: &pageSize,
}
if lastID == nil {
filters.StartingAfter = lastID
}

results = make([]*stripe.BankAccount, 0, int(pageSize))

// return 0 results because this endpoint cannot be used for root account
if accountID == nil {
return []*stripe.BankAccount{}, false, nil
if accountID == "" {
return results, timeline, false, nil
}

if !timeline.IsCaughtUp() {
var oldest interface{}
oldest, timeline, hasMore, err = scanForOldest(timeline, pageSize, func(params stripe.ListParams) (stripe.ListContainer, error) {
itr := c.bankAccountClient.List(&stripe.BankAccountListParams{
Account: &accountID,
ListParams: params,
})
return itr.BankAccountList(), itr.Err()
})
if err != nil {
return results, timeline, false, err
}
// either there are no records or we haven't found the start yet
if !timeline.IsCaughtUp() {
return results, timeline, hasMore, nil
}
results = append(results, oldest.(*stripe.BankAccount))
}

itr := c.bankAccountClient.List(&stripe.BankAccountListParams{
Account: accountID,
ListParams: filters,
Account: &accountID,
ListParams: stripe.ListParams{
Limit: &pageSize,
EndingBefore: &timeline.LatestID,
},
})
if err := itr.Err(); err != nil {
return nil, false, handleError(itr.Iter)
return nil, timeline, false, handleError(itr.Iter)
}
return itr.BankAccountList().Data, itr.BankAccountList().ListMeta.HasMore, nil
results = append(results, itr.BankAccountList().Data...)
timeline.LatestID = results[len(results)-1].ID
return results, timeline, itr.BankAccountList().ListMeta.HasMore, nil
}
Loading

0 comments on commit c122200

Please sign in to comment.