From 2653189e713ee4b1cc62965144e2d3e2db954c2d Mon Sep 17 00:00:00 2001 From: Philipp-Florens Lehwalder Date: Fri, 19 Jan 2024 13:29:22 +0100 Subject: [PATCH] feat(funder): Introduce egoistic funding One of the participants can be egoistic and only deposits only if the other(s) have deposited their amount Signed-off-by: Philipp-Florens Lehwalder --- channel/funder.go | 109 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 4 deletions(-) diff --git a/channel/funder.go b/channel/funder.go index 73d0d4f..2c85b37 100644 --- a/channel/funder.go +++ b/channel/funder.go @@ -57,6 +57,8 @@ type assetHolder struct { type Funder struct { mtx sync.RWMutex + EgoisticPart []bool + ContractBackend // accounts associates an Account to every AssetIndex. accounts map[AssetMapKey]accounts.Account @@ -80,6 +82,14 @@ func NewFunder(backend ContractBackend) *Funder { } } +func (f *Funder) SetEgoisticPart(idx channel.Index, numParts int) { + f.mtx.Lock() + defer f.mtx.Unlock() + + f.EgoisticPart = make([]bool, numParts) + f.EgoisticPart[idx] = true +} + // RegisterAsset registers the depositor and account for the specified asset in // the funder. // @@ -210,6 +220,12 @@ func (f *Funder) fundAssets(ctx context.Context, assets []channel.Asset, channel errg := perror.NewGatherer() fundingIDs := FundingIDs(channelID, req.Params.Parts...) + // TODO: We need a check whether the length matches and whether only one idx is set to true. + egoistic := false + if len(f.EgoisticPart) != 0 { + egoistic = f.EgoisticPart[req.Idx] + } + for i, asset := range assets { // Bind contract. assetIdx, ok := assetIdx(req.State.Assets, asset) @@ -218,10 +234,20 @@ func (f *Funder) fundAssets(ctx context.Context, assets []channel.Asset, channel continue } contract := bindAssetHolder(f.ContractBackend, asset, assetIdx) - // Wait for the funding event. - errg.Go(func() error { - return f.waitForFundingConfirmation(ctx, req, contract, fundingIDs) - }) + + if egoistic { + err := f.EgoisticWaitForFundingConfirmation(ctx, req, contract, fundingIDs) + if err != nil { + f.log.WithField("asset", asset).WithError(err).Error("Could not fund asset") + errg.Add(errors.WithMessage(err, "funding asset")) + continue + } + } else { + // Wait for the funding event in a goroutine. + errg.Go(func() error { + return f.waitForFundingConfirmation(ctx, req, contract, fundingIDs) + }) + } // Send the funding TX. tx, err := f.sendFundingTx(ctx, asset, req, contract, fundingIDs[req.Idx]) @@ -401,6 +427,81 @@ loop: return nil } +// EgoisticWaitForFundingConfirmation waits for the confirmation events on the blockchain that +// all peers except oneself (request.Idx) successfully funded the channel for the specified asset +// according to the funding agreement. +func (f *Funder) EgoisticWaitForFundingConfirmation(ctx context.Context, request channel.FundingReq, asset assetHolder, fundingIDs [][32]byte) error { + // If asset on different ledger, return. + a := request.State.Assets[asset.assetIndex] + ethAsset, ok := a.(*Asset) + if !ok { + return fmt.Errorf("wrong type: expected *Asset, got %T", a) + } + if ethAsset.ChainID.MapKey() != f.chainID.MapKey() { + return nil + } + + // Subscribe to events. + deposited, sub, subErr, err := f.subscribeDeposited(ctx, asset.contract, fundingIDs...) + if err != nil { + return errors.WithMessage(err, "subscribing to deposited event") + } + defer sub.Close() + + remainingAll := request.Agreement.Clone()[asset.assetIndex] + // Create a new remainingOthers slice excluding the balance of the current participant. + remainingOthers := make([]*big.Int, 0, len(remainingAll)-1) + for i, bal := range remainingAll { + if channel.Index(i) != request.Idx { + remainingOthers = append(remainingOthers, bal) + } + } + + // Calculate the total of the remainingOthers balances. + remainingTotal := channel.Balances([][]*big.Int{remainingOthers}).Sum()[0] + if remainingTotal.Cmp(big.NewInt(0)) <= 0 { + return nil + } +loop: + for { + select { + case rawEvent := <-deposited: + event, ok := rawEvent.Data.(*assetholder.AssetholderDeposited) + if !ok { + log.Panic("wrong event type") + } + log := f.log.WithField("fundingID", event.FundingID) + + idx := partIdx(event.FundingID, fundingIDs) + // Ignore if the current participant should have deposited. + if channel.Index(idx) != request.Idx { + continue + } + + remainingForPart := remainingOthers[idx] + remainingForPart.Sub(remainingForPart, event.Amount) + log.Debugf("peer[%d]: got: %v, remainingOthers for [%d, %d] = %v", request.Idx, event.Amount, asset.assetIndex, idx, remainingForPart) + + // Exit loop if fully funded. + remainingTotal := channel.Balances([][]*big.Int{remainingOthers}).Sum()[0] + if remainingTotal.Cmp(big.NewInt(0)) <= 0 { + break loop + } + case <-ctx.Done(): + return fundingTimeoutError(remainingOthers, asset) + case err := <-subErr: + // Resolve race between ctx and subErr, as ctx fires both events. + select { + case <-ctx.Done(): + return fundingTimeoutError(remainingOthers, asset) + default: + } + return err + } + } + return nil +} + func fundingTimeoutError(remaining []channel.Bal, asset assetHolder) error { var indices []channel.Index for k, bals := range remaining {