Skip to content

Commit

Permalink
fix: cheque cashout bug (#356)
Browse files Browse the repository at this point in the history
* feat: accesss key

* feat: access-key module & access-key cmds

* fix: remove redunt error condition

* opt: handle wrapper error and remove unused bucket type define in access-key module

* fix: return an explicit error instead of panic and optimize the error log (#339) (#341)

* feat: add daemon check before execute accesskey commands

* optmize: access-key store prefix

* optmize: not found error

* feat: add s3 signature

* chore:

* chore:

* chore:

* chore:

* chore:

* feat: add store

* chore:

* feat: s3 access-key, server, handlers, statestore, filestore

* chore:

* optmize: code structure

* style: s3 code structure

* style: code structure

* optmize: code structure

* feat: add multiple context lock

* feat: check auth

* chore:

* chore:

* feat: add bucket service

* chore:

* mod: update bucket lock

* chore:

* chore:

* chore:

* del s3d

* chore: s3 req & rsp structure

* chore:

* feat: add multibase commands (#342)

* feat: add pubBucket api

* feat: add more bucket api

* chore:

* chore:

* feat: add request and response

* chore:

* feat: server build

* chore: check acl

* chore

* chore: adjust bucket url

* �

* feat: add auth middleware

* feat: adjust code structure

* optmize: code structure & auth bug

* feat: put object

* chore:

* mod: mod bucket parse req

* optmize: adjust place of response error

* chore: mig sig 01

* chore: clear sig

* chore: mig sig 02

* chore: mig sig 03

* chore: mig sig 04

* optmize: refractor codes

* optmize: rename auth to sign

* optmize: code structure & h.name

* fix: h.name

* feat: put-object

* feat: multipart

* fix: nslock key

* chore: rename s3 constructor file name

* fix: tidy example go-ipfs-as-a-library go mod

* chore: change default s3 server address to local

* mod: add object api

* feat: s3-compatible-api - 1. add start option and configure; 2. optmize providers interfaces and implements; 3. rewrite the server construct function

* merge: object

* chore: add object lock

* chore: of delete objs

* chore:

* fix: list objects bug

* chore: rename ListObjetV1Handler to ListObjectHandler, rename BTFS-Hash to CID

* refractor: bucket service

* refactor: object service

* refractor: refract object service

* refractor: handlers

* refractor: bucket handler

* refractor: bucket handler

* refractor: response

* refractor: response func

* refractor: response

* refractor: object

* feat: add backup and recovery command (#348)

* feat: add backup and recovery command

* feat: beautify the log

* feat: init add recovery option

* fix: format error

* refractor: object

* refractor: objects

* refractor: btf api add timeout  & add cid refs to enable referred cid can not be deleted

* ref: fix delete object remove body

* ref: format code

* fix: routers

* fix: add cors header

* fix: router option

* feat: add delete objects handler

* refractor: multipart

* ref: multipart

* fix: multipart etag calculation

* chore: add min part size todo

* chore: upgrade 'github.com/anacrolix/torrent' from v1.47.0 to v1.52.5

* opt: comment and amz header

* opt: code

* opt: preflight cache max age

* feat: bucket response add acl header

* opt: change cid-list header to cid

* fix: required check exlude unknow location

* fix: get object acl

* ref: requests

* ref: complete refractor

* fix: args parse

* fix: get object unlock

* fix: object acl writer

* fix: delete objects error

* fix: Sign handler name

* fix: object name escape

* fix: copy source validate

* chore: fix

* opt: s3 log

* opt: s3 api log

* fix: allow Cache-Control header in PutObject and CopyObject Action

* feat: log details (#349)

* feat: add backup and recovery command

* feat: beautify the log

* feat: init add recovery option

* fix: format error

* feat: log details

* opt: add access-key command taglines

* opt: add accesskey command description

* chore: add out of FixChequeCashOutCmd

* chore:

* chore

* chore:

* chore:

* chore:

* chore

* chore: add accesskey commands test path

* chore:

* chore: add accesskey test path

* chore:

* chore:

* chore:

* chore:

---------

Co-authored-by: Steve <[email protected]>
Co-authored-by: Shawn-Huang-Tron <[email protected]>
Co-authored-by: fish <[email protected]>
  • Loading branch information
4 people authored Sep 22, 2023
1 parent 9921801 commit 45abbb3
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 16 deletions.
1 change: 1 addition & 0 deletions cmd/btfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ If the user need to start multiple nodes on the same machine, the configuration
spin.Analytics(api, cctx.ConfigRoot, node, version.CurrentVersionNumber, hValue)
spin.Hosts(node, env)
spin.Contracts(node, req, env, nodepb.ContractStat_HOST.String())
spin.RestartFixChequeCashOut()
}

// Give the user some immediate feedback when they hit C-c
Expand Down
25 changes: 20 additions & 5 deletions core/commands/cheque/cheque.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ type ListChequeRet struct {
Len int
}

type fixCheque struct {
PeerID string
Token string
Beneficiary string
Vault string
TotalCashedAmount *big.Int
FixCashedAmount *big.Int
}

type ListFixChequeRet struct {
FixCheques []fixCheque
Len int
}

type ReceiveCheque struct {
PeerID string
Token common.Address
Expand Down Expand Up @@ -65,11 +79,12 @@ var ChequeCmd = &cmds.Command{
Vault services include issue cheque to peer, receive cheque and store operations.`,
},
Subcommands: map[string]*cmds.Command{
"cash": CashChequeCmd,
"cashstatus": ChequeCashStatusCmd,
"cashlist": ChequeCashListCmd,
"price": StorePriceCmd,
"price-all": StorePriceAllCmd,
"cash": CashChequeCmd,
"cashstatus": ChequeCashStatusCmd,
"cashlist": ChequeCashListCmd,
"price": StorePriceCmd,
"price-all": StorePriceAllCmd,
"fix_cheque_cashout": FixChequeCashOutCmd,

"send": SendChequeCmd,
"sendlist": ListSendChequesCmd,
Expand Down
75 changes: 75 additions & 0 deletions core/commands/cheque/fix_cheque_cashout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cheque

import (
"fmt"
cmds "github.com/bittorrent/go-btfs-cmds"
"github.com/bittorrent/go-btfs/chain"
"github.com/bittorrent/go-btfs/chain/tokencfg"
"github.com/bittorrent/go-btfs/utils"
"github.com/google/martian/log"
"golang.org/x/net/context"
"io"
)

var FixChequeCashOutCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List cheque(s) received from peers.",
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
err := utils.CheckSimpleMode(env)
if err != nil {
return err
}

listRet := ListFixChequeRet{}
listRet.FixCheques = make([]fixCheque, 0)

for _, tokenAddr := range tokencfg.MpTokenAddr {
cheques, err := chain.SettleObject.SwapService.LastReceivedCheques(tokenAddr)
if err != nil {
return err
}

for k, v := range cheques {
totalCashOutAmount, newCashOutAmount, err := chain.SettleObject.CashoutService.AdjustCashCheque(
context.Background(), v.Vault, v.Beneficiary, tokenAddr, false)
if err != nil {
return err
}
if newCashOutAmount != nil && newCashOutAmount.Uint64() > 0 {
var record fixCheque
record.PeerID = k
record.Token = v.Token.String()
record.Beneficiary = v.Beneficiary.String()
record.Vault = v.Vault.String()
record.TotalCashedAmount = totalCashOutAmount
record.FixCashedAmount = newCashOutAmount

listRet.FixCheques = append(listRet.FixCheques, record)
}
}
}
listRet.Len = len(listRet.FixCheques)

log.Infof("FixChequeCashOutCmd, listRet = %+v", listRet)

return cmds.EmitOnce(res, &listRet)
},
Type: ListFixChequeRet{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *ListFixChequeRet) error {
fmt.Fprintf(w, "fix: \n\t%-55s\t%-46s\t%-46s\t%-46s\tfix_cash_amount: \n", "peerID:", "vault:", "beneficiary:", "total_cash_amount:")
for iter := 0; iter < out.Len; iter++ {
fmt.Fprintf(w, "\t%-55s\t%-46s\t%-46s\t%d\t%d \n",
out.FixCheques[iter].PeerID,
out.FixCheques[iter].Vault,
out.FixCheques[iter].Beneficiary,
out.FixCheques[iter].TotalCashedAmount.Uint64(),
out.FixCheques[iter].FixCashedAmount.Uint64(),
)
}

return nil
}),
},
}
15 changes: 8 additions & 7 deletions core/commands/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ func TestCommands(t *testing.T) {
"/bittorrent/scrape",
"/bittorrent/metainfo",
"/bittorrent/bencode",
"/multibase",
"/multibase/encode",
"/multibase/decode",
"/multibase/transcode",
"/multibase/list",
"/backup",
"/recovery",
"/accesskey",
"/accesskey/generate",
"/accesskey/enable",
Expand All @@ -350,13 +357,7 @@ func TestCommands(t *testing.T) {
"/accesskey/delete",
"/accesskey/get",
"/accesskey/list",
"/multibase",
"/multibase/encode",
"/multibase/decode",
"/multibase/transcode",
"/multibase/list",
"/backup",
"/recovery",
"/cheque/fix_cheque_cashout",
}

cmdSet := make(map[string]struct{})
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/go-bindata/go-bindata/v3 v3.1.3
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/google/martian v2.1.0+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.7.3
github.com/hashicorp/go-multierror v1.1.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand Down
16 changes: 12 additions & 4 deletions settlement/swap/swap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ func (m *addressbookMock) PutVault(peer string, vault common.Address) error {
}

type cashoutMock struct {
cashCheque func(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error)
cashoutStatus func(ctx context.Context, vaultAddress common.Address, token common.Address) (*vault.CashoutStatus, error)
cashoutResults func() ([]vault.CashOutResult, error)
hasCashoutAction func(ctx context.Context, peer common.Address, token common.Address) (bool, error)
cashCheque func(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error)
cashoutStatus func(ctx context.Context, vaultAddress common.Address, token common.Address) (*vault.CashoutStatus, error)
cashoutResults func() ([]vault.CashOutResult, error)
hasCashoutAction func(ctx context.Context, peer common.Address, token common.Address) (bool, error)
adjustCashCheque func(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error)
restartFixChequeCashOut func()
}

func (m *cashoutMock) CashCheque(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error) {
Expand All @@ -142,6 +144,12 @@ func (m *cashoutMock) CashoutResults() ([]vault.CashOutResult, error) {
func (m *cashoutMock) HasCashoutAction(ctx context.Context, peer common.Address, token common.Address) (bool, error) {
return m.hasCashoutAction(ctx, peer, token)
}
func (m *cashoutMock) AdjustCashCheque(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error) {
return m.adjustCashCheque(ctx, vaultAddress, recipient, token, restartPassFlag)
}
func (m *cashoutMock) RestartFixChequeCashOut() {
m.restartFixChequeCashOut()
}
func TestReceiveCheque(t *testing.T) {
store := mockstore.NewStateStore()
vaultService := mockvault.NewVault(
Expand Down
157 changes: 157 additions & 0 deletions settlement/swap/vault/cashout.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type CashoutService interface {
CashCheque(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error)
// CashoutStatus gets the status of the latest cashout transaction for the vault
CashoutStatus(ctx context.Context, vaultAddress common.Address, token common.Address) (*CashoutStatus, error)
AdjustCashCheque(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error)
HasCashoutAction(ctx context.Context, peer common.Address, token common.Address) (bool, error)
CashoutResults() ([]CashOutResult, error)
RestartFixChequeCashOut()
}

type cashoutService struct {
Expand Down Expand Up @@ -178,6 +180,10 @@ func (s *cashoutService) CashoutResults() ([]CashOutResult, error) {

// CashCheque sends a cashout transaction for the last cheque of the vault
func (s *cashoutService) CashCheque(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error) {
if RestartFixCashOutStatusLock {
return common.Hash{}, errors.New("Just started, it can not cash cheque, you will wait for about 40s to do it. ")
}

cheque, err := s.chequeStore.LastReceivedCheque(vault, token)
if err != nil {
return common.Hash{}, err
Expand Down Expand Up @@ -215,6 +221,19 @@ func (s *cashoutService) CashCheque(ctx context.Context, vault, recipient common
return common.Hash{}, err
}

// 1.add cash out status
cashOutStateInfo := CashOutStatusStoreInfo{
Token: token,
Vault: cheque.Vault,
Beneficiary: cheque.Beneficiary,
CumulativePayout: cheque.CumulativePayout,
TxHash: txHash.String(),
}
err = s.AddCashOutStatusStore(cashOutStateInfo)
if err != nil {
return common.Hash{}, err
}

// WaitForReceipt takes long time
go func() {
defer func() {
Expand All @@ -223,6 +242,13 @@ func (s *cashoutService) CashCheque(ctx context.Context, vault, recipient common
}
}()
s.storeCashResult(context.Background(), vault, txHash, cheque, token)

// 2.delete cash out status
err = s.DeleteCashOutStatusStore(cashOutStateInfo)
if err != nil {
log.Errorf("delete cashout status, err = %v", err)
return
}
}()
return txHash, nil
}
Expand Down Expand Up @@ -304,6 +330,137 @@ func (s *cashoutService) storeCashResult(ctx context.Context, vault common.Addre
return nil
}

// AdjustCashCheque .
func (s *cashoutService) AdjustCashCheque(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error) {
if RestartFixCashOutStatusLock {
if !restartPassFlag {
return nil, nil, errors.New("Just started, it can not fix cash out status, you will wait for about 40s to do it. ")
}
}

// 1.totalReceivedCashed
totalReceivedCashed := big.NewInt(0)
err = s.store.Get(tokencfg.AddToken(statestore.TotalReceivedCashedKey, token), &totalReceivedCashed)
if err != nil && err != storage.ErrNotFound {
return nil, nil, err
}

// 2.alreadyPaidOut in renter contract
// blockchain calls below
contract := newVaultContractMuti(vaultAddress, s.transactionService)
alreadyPaidOutOnline, err := contract.PaidOut(ctx, recipient, token)
if err != nil {
return nil, nil, err
}

// 3.compare it to fix.
diff := big.NewInt(0).Sub(alreadyPaidOutOnline, totalReceivedCashed)
log.Infof("AdjustCashCheque: diff > 0, vault=%s, recipient=%s, online=%s, local=%s, diff=%s",
vaultAddress.String(), recipient.String(),
alreadyPaidOutOnline.String(), totalReceivedCashed.String(), diff.String(),
)

if diff.Cmp(big.NewInt(0)) > 0 {
cashResult, err := s.fixStoreCashResult(vaultAddress, diff, token)
if err != nil {
return nil, nil, err
}
newCashOutAmount = cashResult.Amount
}

return alreadyPaidOutOnline, newCashOutAmount, nil
}

func (s *cashoutService) RestartFixChequeCashOut() {
if RestartFixCashOutStatusLock {
list, err := s.GetAllCashOutStatusStore()
if err != nil {
log.Infof("RestartFixChequeCashOut: GetAllCashOutStatusStore err = %v", err)
return
}

if len(list) > 0 {
log.Infof("wait 30s, for fixing cash out status")

// wait 30s, for online cashing out ok.
time.Sleep(time.Second * RestartWaitCashOutOnlineTime)

for _, v := range list {
_, _, err := s.AdjustCashCheque(context.Background(), v.Vault, v.Beneficiary, v.Token, true)
if err != nil {
log.Infof("RestartFixChequeCashOut: AdjustCashCheque err = %v, info = %+v", err, v)
continue
}

err = s.DeleteCashOutStatusStore(v)
if err != nil {
log.Infof("RestartFixChequeCashOut: DeleteCashOutStatusStore err = %v, info = %+v", err, v)
continue
}
}
}
RestartFixCashOutStatusLock = false
}
return
}

func (s *cashoutService) fixStoreCashResult(vault common.Address, shouldPaidOut *big.Int, token common.Address) (cashResult *CashOutResult, err error) {
txHash := common.Hash{} //fix txHash: 0x0000...
cashResult = &CashOutResult{
TxHash: txHash,
Vault: vault,
Token: token,
Amount: shouldPaidOut,
CashTime: time.Now().Unix(),
Status: "success",
}

totalReceivedCashed := big.NewInt(0)
if err = s.store.Get(tokencfg.AddToken(statestore.TotalReceivedCashedKey, token), &totalReceivedCashed); err == nil || err == storage.ErrNotFound {
totalReceivedCashed = totalReceivedCashed.Add(totalReceivedCashed, shouldPaidOut)
err := s.store.Put(tokencfg.AddToken(statestore.TotalReceivedCashedKey, token), totalReceivedCashed)
if err != nil {
log.Infof("fixStoreCashResult:put totalReceivedCashdKey err:%+v", err)
}
}

totalDailyReceivedCashed := big.NewInt(0)
if err = s.store.Get(statestore.GetTodayTotalDailyReceivedCashedKey(token), &totalDailyReceivedCashed); err == nil || err == storage.ErrNotFound {
totalDailyReceivedCashed = totalDailyReceivedCashed.Add(totalDailyReceivedCashed, shouldPaidOut)
err := s.store.Put(statestore.GetTodayTotalDailyReceivedCashedKey(token), totalDailyReceivedCashed)
if err != nil {
log.Infof("fixStoreCashResult:put totalReceivedDailyCashdKey err:%+v", err)
}
}

// update TotalReceivedCountCashed
uncashed := 0
err = s.store.Get(statestore.PeerReceivedUncashRecordsCountKey(vault, token), &uncashed)
if err != nil {
log.Infof("fixStoreCashResult:put totalReceivedCountCashed err:%+v", err)
} else {
cashedCount := 0
err := s.store.Get(tokencfg.AddToken(statestore.TotalReceivedCashedCountKey, token), &cashedCount)
if err == nil || err == storage.ErrNotFound {
err := s.store.Put(tokencfg.AddToken(statestore.TotalReceivedCashedCountKey, token), cashedCount+uncashed)
if err != nil {
log.Infof("fixStoreCashResult:put totalReceivedCashedConuntKey err:%+v", err)
} else {
err := s.store.Put(statestore.PeerReceivedUncashRecordsCountKey(vault, token), 0)
if err != nil {
log.Infof("fixStoreCashResult:put totalReceivedCashedConuntKey err:%+v", err)
}
}
}
}

err = s.store.Put(statestore.CashoutResultKey(vault), &cashResult)
if err != nil {
log.Infof("fixStoreCashResult:put cashoutResultKey err:%+v", err)
}
return
}

// CashoutStatus gets the status of the latest cashout transaction for the vault
func (s *cashoutService) CashoutStatus(ctx context.Context, vaultAddress common.Address, token common.Address) (*CashoutStatus, error) {
cheque, err := s.chequeStore.LastReceivedCheque(vaultAddress, token)
Expand Down
Loading

0 comments on commit 45abbb3

Please sign in to comment.