Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: clean queen #5

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions cmd/queentest/main.go → cmd/honeypot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"os"
"os/signal"
"syscall"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/probe-lab/ants-watch"
"github.com/probe-lab/ants-watch/db"
)

var logger = logging.Logger("ants-queen")
Expand All @@ -20,7 +18,7 @@ func main() {
logging.SetLogLevel("dht", "error")
logging.SetLogLevel("basichost", "info")

postgresStr := flag.String("postgres", "", "Postgres connection string, postgres://user:password@host:port/dbname")
nebulaPostgresStr := flag.String("postgres", "", "Postgres connection string, postgres://user:password@host:port/dbname")
nPorts := flag.Int("nPorts", 128, "Number of ports ants can listen on")
firstPort := flag.Int("firstPort", 6000, "First port ants can listen on")
upnp := flag.Bool("upnp", false, "Enable UPnP")
Expand All @@ -34,41 +32,16 @@ func main() {
var queen *ants.Queen
var err error
if *upnp {
queen, err = ants.NewQueen(ctx, *postgresStr, "keys.db", 0, 0)
queen, err = ants.NewQueen(ctx, *nebulaPostgresStr, "keys.db", 0, 0)
} else {
queen, err = ants.NewQueen(ctx, *postgresStr, "keys.db", uint16(*nPorts), uint16(*firstPort))
queen, err = ants.NewQueen(ctx, *nebulaPostgresStr, "keys.db", uint16(*nPorts), uint16(*firstPort))
}
if err != nil {
panic(err)
}

go queen.Run(ctx)

go func() {
nctx, ncancel := context.WithCancel(ctx)
defer ncancel()

logger.Info("Starting continuous normalization...")

for {
select {
case <-nctx.Done():
logger.Info("Normalization context canceled, stopping normalization loop.")
return
default:
err := db.NormalizeRequests(nctx, queen.Client.Handler, queen.Client)
if err != nil {
logger.Errorf("Error during normalization: %w", err)
} else {
logger.Info("Normalization completed for current batch.")
}

// TODO: remove the hardcoded time
time.Sleep(10 * time.Second)
}
}
}()

go func() {
sig := <-sigChan
logger.Infof("Received signal: %s, shutting down...", sig)
Expand Down
52 changes: 0 additions & 52 deletions cmd/queentest/analysis.py

This file was deleted.

150 changes: 76 additions & 74 deletions queen.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/dennis-tra/nebula-crawler/config"
Expand Down Expand Up @@ -43,8 +42,6 @@ type Queen struct {
ants []*Ant
antsLogs chan antslog.RequestLog

seen map[peer.ID]struct{}

upnp bool
// portsOccupancy is a slice of bools that represent the occupancy of the ports
// false corresponds to an available port, true to an occupied port
Expand All @@ -58,8 +55,6 @@ type Queen struct {

resolveBatchSize int
resolveBatchTime int // in sec

Client *db.DBClient
}

func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPorts, firstPort uint16) (*Queen, error) {
Expand All @@ -70,9 +65,41 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort
return nil, err
}

mmc, err := maxmind.NewClient(os.Getenv("MAXMIND_ASN_DB"), os.Getenv("MAXMIND_COUNTRY_DB"))
if err != nil {
logger.Errorf("Failed to initialized Maxmind client: %v\n", err)
}

queen := &Queen{
nebulaDB: nebulaDB,
keysDB: keysDB,
peerstore: peerstore,
datastore: dssync.MutexWrap(ds.NewMapDatastore()),
ants: []*Ant{},
antsLogs: make(chan antslog.RequestLog, 1024),
upnp: true,
dbc: getDbClient(ctx),
mmc: mmc,
uclient: getUdgerClient(),
Comment on lines +83 to +84
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kasteph are mmc and uclient used anywhere at the moment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at the moment, these are for the multiaddress resolutions.

resolveBatchSize: getBatchSize(),
resolveBatchTime: getBatchSize(),
}

if nPorts != 0 {
queen.upnp = false
queen.firstPort = firstPort
queen.portsOccupancy = make([]bool, nPorts)
}

logger.Info("queen created")

return queen, nil
}

func getDbClient(ctx context.Context) *db.DBClient {
dbPort, err := strconv.Atoi(os.Getenv("DB_PORT"))
if err != nil {
fmt.Errorf("Port must be an integer", err)
logger.Errorf("Port must be an integer: %w", err)
}

mP, _ := tele.NewMeterProvider()
Expand All @@ -94,21 +121,24 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort
if err != nil {
logger.Errorf("Failed to initialize DB client: %v\n", err)
}
return dbc
}

mmc, err := maxmind.NewClient(os.Getenv("MAXMIND_ASN_DB"), os.Getenv("MAXMIND_COUNTRY_DB"))
if err != nil {
logger.Errorf("Failed to initialized Maxmind client: %v\n", err)
}

func getUdgerClient() *udger.Client {
filePathUdger := os.Getenv("UDGER_FILEPATH")
var uclient *udger.Client
if filePathUdger != "" {
uclient, err = udger.NewClient(filePathUdger)
uclient, err := udger.NewClient(filePathUdger)
if err != nil {
logger.Errorf("Failed to initialize Udger client with %s: %v\n", filePathUdger, err)
return nil
}
return uclient
}
logger.Warn("Missing UDGER_FILEPATH: skipping udger")
return nil
}

func getBatchSize() int {
batchSizeEnvVal := os.Getenv("BATCH_SIZE")
if len(batchSizeEnvVal) == 0 {
batchSizeEnvVal = "1000"
Expand All @@ -117,43 +147,19 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort
if err != nil {
logger.Errorln("BATCH_SIZE should be an integer")
}
return batchSize
}

batchTimeEnvVal := os.Getenv("BATCH_SIZE")
func getBatchTime() int {
batchTimeEnvVal := os.Getenv("BATCH_TIME")
if len(batchTimeEnvVal) == 0 {
batchTimeEnvVal = "30"
}
batchTime, err := strconv.Atoi(batchTimeEnvVal)
if err != nil {
logger.Errorln("BATCH_TIME should be an integer")
}

queen := &Queen{
nebulaDB: nebulaDB,
keysDB: keysDB,
peerstore: peerstore,
datastore: dssync.MutexWrap(ds.NewMapDatastore()),
ants: []*Ant{},
antsLogs: make(chan antslog.RequestLog, 1024),
seen: make(map[peer.ID]struct{}),
dbc: dbc,
mmc: mmc,
uclient: uclient,
resolveBatchSize: batchSize,
resolveBatchTime: batchTime,
Client: dbc,
}

if nPorts == 0 {
queen.upnp = true
} else {
queen.upnp = false
queen.firstPort = firstPort
queen.portsOccupancy = make([]bool, nPorts)
}

logger.Debug("queen created")

return queen, nil
return batchTime
}

func (q *Queen) takeAvailablePort() (uint16, error) {
Expand All @@ -177,6 +183,8 @@ func (q *Queen) freePort(port uint16) {

func (q *Queen) Run(ctx context.Context) {
go q.consumeAntsLogs(ctx)
go q.normalizeRequests(ctx)

t := time.NewTicker(CRAWL_INTERVAL)
q.routine(ctx)

Expand All @@ -191,13 +199,6 @@ func (q *Queen) Run(ctx context.Context) {
}

func (q *Queen) consumeAntsLogs(ctx context.Context) {
lnCount := 0
f, err := os.OpenFile("log.txt", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
logger.Panicln(err)
}
defer f.Close()

requests := make([]models.RequestsDenormalized, 0, q.resolveBatchSize)
// bulk insert for every batch size or N seconds, whichever comes first
ticker := time.NewTicker(time.Duration(q.resolveBatchTime) * time.Second)
Expand All @@ -215,20 +216,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) {
} else {
agent = peerstoreAgent.(string)
}
fmt.Printf(
"%s \tself: %s \ttype: %s \trequester: %s \ttarget: %s \tagent: %s \tmaddrs: %s\n",
log.Timestamp.Format(time.RFC3339),
log.Self,
reqType,
log.Requester,
log.Target.B58String(),
agent,
maddrs,
)

// Keep this protocols slice empty for now,
// because we don't need it yet and I don't know how to get it
// protocols := make([]string, 0)
// protocols, _ := q.peerstore.GetProtocols(log.Requester)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kasteph this is the string to get the list of protocols supported by the remote peer. However there is currently no field in the models.RequestsDenormalized struct to add the protocols. Can I leave that to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!


request := models.RequestsDenormalized{
RequestStartedAt: log.Timestamp,
Expand All @@ -247,18 +235,10 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) {
}
requests = requests[:0]
}
if _, ok := q.seen[log.Requester]; !ok {
q.seen[log.Requester] = struct{}{}
if strings.Contains(agent, "light") {
lnCount++
}
fmt.Fprintf(f, "\r%s %s\n", log.Requester, agent)
logger.Debugf("total: %d \tlight: %d", len(q.seen), lnCount)
}

case <-ticker.C:
if len(requests) > 0 {
err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests)
err := db.BulkInsertRequests(ctx, q.dbc.Handler, requests)
if err != nil {
logger.Fatalf("Error inserting requests: %v", err)
}
Expand All @@ -267,7 +247,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) {

case <-ctx.Done():
if len(requests) > 0 {
err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests)
err := db.BulkInsertRequests(ctx, q.dbc.Handler, requests)
if err != nil {
logger.Fatalf("Error inserting remaining requests: %v", err)
}
Expand All @@ -277,6 +257,28 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) {
}
}

func (q *Queen) normalizeRequests(ctx context.Context) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I moved requests normalization to the queen. Let me know if this seems wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, much cleaner!

nctx, ncancel := context.WithCancel(ctx)
defer ncancel()

logger.Info("Starting continuous normalization...")

for {
select {
case <-nctx.Done():
logger.Info("Normalization context canceled, stopping normalization loop.")
return
default:
err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc)
if err != nil {
logger.Errorf("Error during normalization: %w", err)
} else {
logger.Info("Normalization completed for current batch.")
}
}
}
}

func (q *Queen) routine(ctx context.Context) {
networkPeers, err := q.nebulaDB.GetLatestPeerIds(ctx)
if err != nil {
Expand Down Expand Up @@ -346,7 +348,7 @@ func (q *Queen) routine(ctx context.Context) {

for _, ant := range q.ants {
logger.Debugf("Upserting ant: %v\n", ant.Host.ID().String())
antID, err := q.Client.UpsertPeer(ctx, ant.Host.ID().String(), null.StringFrom(ant.UserAgent), nil, time.Now())
antID, err := q.dbc.UpsertPeer(ctx, ant.Host.ID().String(), null.StringFrom(ant.UserAgent), nil, time.Now())
if err != nil {
logger.Errorf("antID: %d could not be inserted because of %v", antID, err)
}
Expand Down