Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Really really disable mempool #347

Merged
merged 5 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ linters-settings:
- "github.com/coder/websocket" # Currently uses our fork with a bug fix.

# Enforces copyright header
goheader:
goheader: # TODO: Replace goheader, autofix is too buggy.
values:
const:
COMPANY: "Hemi Labs, Inc."
regexp:
YEAR_RANGE: "(\\d{4}-{{MOD-YEAR}})|({{MOD-YEAR}})"
template: |-
Copyright (c) {{ YEAR }} {{ COMPANY }}
Copyright (c) {{ YEAR_RANGE }} {{ COMPANY }}
Use of this source code is governed by the MIT License,
which can be found in the LICENSE file.

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ build:
install: $(cmds)

lint:
$(shell go env GOPATH)/bin/golangci-lint run --fix ./...
# TODO: re-enable autofix with --fix, after removing buggy goheader linter
$(shell go env GOPATH)/bin/golangci-lint run ./...

lint-deps:
GOBIN=$(shell go env GOPATH)/bin go install github.com/golangci/golangci-lint/cmd/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ You may then connect your local `popmd` to your aforementioned local `bfgd` via

### 🏁 Prerequisites

- Connect to a live [bfgd](#-running-bfgd) instance.
- Connect to a live [bfgd](cmd/bfgd) instance.

## ▶️ Running the Hemi stack

Expand Down
22 changes: 11 additions & 11 deletions service/tbc/crawler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Hemi Labs, Inc.
// Copyright (c) 2024-2025 Hemi Labs, Inc.
// Use of this source code is governed by the MIT License,
// which can be found in the LICENSE file.

Expand Down Expand Up @@ -616,11 +616,11 @@ func (s *Server) unindexUtxosInBlocks(ctx context.Context, endHash *chainhash.Ha
}

// Add tx's back to the mempool.
if s.cfg.MempoolEnabled {
// XXX this may not be the right spot.
txHashes, _ := b.MsgBlock().TxHashes()
_ = s.mempool.txsRemove(ctx, txHashes)
}
//if s.cfg.MempoolEnabled {
// // XXX this may not be the right spot.
// txHashes, _ := b.MsgBlock().TxHashes()
// _ = s.mempool.txsRemove(ctx, txHashes)
//}

blocksProcessed++

Expand Down Expand Up @@ -1001,11 +1001,11 @@ func (s *Server) unindexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash
// This is probably not needed here since we alreayd dealt with
// it via the utxo unindexer but since it will be mostly a
// no-op just go ahead.
if s.cfg.MempoolEnabled {
// XXX this may not be the right spot.
txHashes, _ := b.MsgBlock().TxHashes()
_ = s.mempool.txsRemove(ctx, txHashes)
}
//if s.cfg.MempoolEnabled {
// // XXX this may not be the right spot.
// txHashes, _ := b.MsgBlock().TxHashes()
// _ = s.mempool.txsRemove(ctx, txHashes)
//}

blocksProcessed++

Expand Down
268 changes: 136 additions & 132 deletions service/tbc/mempool.go
Original file line number Diff line number Diff line change
@@ -1,137 +1,141 @@
// Copyright (c) 2024 Hemi Labs, Inc.
// Copyright (c) 2024-2025 Hemi Labs, Inc.
// Use of this source code is governed by the MIT License,
// which can be found in the LICENSE file.

package tbc

import (
"context"
"errors"
"fmt"
"sync"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
)

type mempool struct {
mtx sync.RWMutex

txs map[chainhash.Hash][]byte // when nil, tx has not been downloaded
size int // total memory used by mempool
}

func (m *mempool) getDataConstruct(ctx context.Context) (*wire.MsgGetData, error) {
log.Tracef("getDataConstruct")
defer log.Tracef("getDataConstruct exit")

getData := wire.NewMsgGetData()

m.mtx.RLock()
defer m.mtx.RUnlock()

for k, v := range m.txs {
if v != nil {
continue
}
if err := getData.AddInvVect(&wire.InvVect{
Type: wire.InvTypeTx,
Hash: k,
}); err != nil {
// only happens when asking max inventory
return nil, fmt.Errorf("construct get data: %w", err)
}
}
return getData, nil
}

func (m *mempool) txsInsert(ctx context.Context, msg *wire.MsgTx, raw []byte) error {
log.Tracef("txsInsert")
defer log.Tracef("txsInsert exit")

m.mtx.Lock()
defer m.mtx.Unlock()

if tx := m.txs[msg.TxHash()]; tx == nil {
m.txs[msg.TxHash()] = raw
m.size += len(raw)
}

return nil
}

func (m *mempool) invTxsInsert(ctx context.Context, inv *wire.MsgInv) error {
log.Tracef("invTxsInsert")
defer log.Tracef("invTxsInsert exit")

if len(inv.InvList) == 0 {
return errors.New("empty inventory")
}

m.mtx.Lock()
defer m.mtx.Unlock()

l := len(m.txs)
for _, v := range inv.InvList {
switch v.Type {
case wire.InvTypeTx:
if _, ok := m.txs[v.Hash]; !ok {
m.txs[v.Hash] = nil
}
}
}

// if the map length does not change, nothing was inserted.
if len(m.txs) != l {
return errors.New("insert inventory tx: already exists")
}
return nil
}

func (m *mempool) txsRemove(ctx context.Context, txs []chainhash.Hash) error {
log.Tracef("txsRemove")
defer log.Tracef("txsRemove exit")

if len(txs) == 0 {
return errors.New("no transactions provided")
}

m.mtx.Lock()
defer m.mtx.Unlock()

l := len(m.txs)
for k := range txs {
if tx, ok := m.txs[txs[k]]; ok {
m.size -= len(tx)
delete(m.txs, txs[k])
}
}

// if the map length does not change, nothing was deleted.
if len(m.txs) != l {
return errors.New("remove txs: nothing removed")
}
return nil
}

func (m *mempool) stats(ctx context.Context) (int, int) {
m.mtx.RLock()
defer m.mtx.RUnlock()

// Approximate size of mempool; map and cap overhead is missing.
return len(m.txs), m.size + (len(m.txs) * chainhash.HashSize)
}

func (m *mempool) Dump(ctx context.Context) string {
m.mtx.RLock()
defer m.mtx.RUnlock()

return spew.Sdump(m.txs)
}

func mempoolNew() (*mempool, error) {
return &mempool{
txs: make(map[chainhash.Hash][]byte, wire.MaxInvPerMsg),
}, nil
}
//type mempool struct {
// mtx sync.RWMutex
//
// txs map[chainhash.Hash][]byte // when nil, tx has not been downloaded
// size int // total memory used by mempool
//}
//
//func (m *mempool) getDataConstruct(ctx context.Context) (*wire.MsgGetData, error) {
// log.Tracef("getDataConstruct")
// defer log.Tracef("getDataConstruct exit")
//
// getData := wire.NewMsgGetData()
//
// m.mtx.RLock()
// defer m.mtx.RUnlock()
//
// for k, v := range m.txs {
// if v != nil {
// continue
// }
// if err := getData.AddInvVect(&wire.InvVect{
// Type: wire.InvTypeTx,
// Hash: k,
// }); err != nil {
// // only happens when asking max inventory
// return nil, fmt.Errorf("construct get data: %w", err)
// }
// }
// return getData, nil
//}
//
//func (m *mempool) txsInsert(ctx context.Context, msg *wire.MsgTx, raw []byte) error {
// log.Tracef("txsInsert")
// defer log.Tracef("txsInsert exit")
//
// if true {
// return fmt.Errorf("txsInsert: disabled")
// }
//
// m.mtx.Lock()
// defer m.mtx.Unlock()
//
// if tx := m.txs[msg.TxHash()]; tx == nil {
// m.txs[msg.TxHash()] = raw
// m.size += len(raw)
// }
//
// return nil
//}
//
//func (m *mempool) invTxsInsert(ctx context.Context, inv *wire.MsgInv) error {
// log.Tracef("invTxsInsert")
// defer log.Tracef("invTxsInsert exit")
//
// if true {
// return fmt.Errorf("invTxsInsert: disabled")
// }
//
// if len(inv.InvList) == 0 {
// return errors.New("empty inventory")
// }
//
// m.mtx.Lock()
// defer m.mtx.Unlock()
//
// l := len(m.txs)
// for _, v := range inv.InvList {
// switch v.Type {
// case wire.InvTypeTx:
// if _, ok := m.txs[v.Hash]; !ok {
// m.txs[v.Hash] = nil
// }
// }
// }
//
// // if the map length does not change, nothing was inserted.
// if len(m.txs) != l {
// return errors.New("insert inventory tx: already exists")
// }
// return nil
//}
//
//func (m *mempool) txsRemove(ctx context.Context, txs []chainhash.Hash) error {
// log.Tracef("txsRemove")
// defer log.Tracef("txsRemove exit")
//
// if true {
// return fmt.Errorf("txsRemove: disabled")
// }
//
// if len(txs) == 0 {
// return errors.New("no transactions provided")
// }
//
// m.mtx.Lock()
// defer m.mtx.Unlock()
//
// l := len(m.txs)
// for k := range txs {
// if tx, ok := m.txs[txs[k]]; ok {
// m.size -= len(tx)
// delete(m.txs, txs[k])
// }
// }
//
// // if the map length does not change, nothing was deleted.
// if len(m.txs) != l {
// return errors.New("remove txs: nothing removed")
// }
// return nil
//}
//
//func (m *mempool) stats(ctx context.Context) (int, int) {
// m.mtx.RLock()
// defer m.mtx.RUnlock()
//
// // Approximate size of mempool; map and cap overhead is missing.
// return len(m.txs), m.size + (len(m.txs) * chainhash.HashSize)
//}
//
//func (m *mempool) Dump(ctx context.Context) string {
// m.mtx.RLock()
// defer m.mtx.RUnlock()
//
// return spew.Sdump(m.txs)
//}
//
//func mempoolNew() (*mempool, error) {
// if true {
// return nil, fmt.Errorf("mempoolNew: disabled")
// }
// return &mempool{
// txs: make(map[chainhash.Hash][]byte, wire.MaxInvPerMsg),
// }, nil
//}
Loading
Loading