-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: clean queen #5
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,6 @@ import ( | |
"fmt" | ||
"os" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/dennis-tra/nebula-crawler/config" | ||
|
@@ -44,8 +43,6 @@ type Queen struct { | |
ants []*Ant | ||
antsLogs chan antslog.RequestLog | ||
|
||
seen map[peer.ID]struct{} | ||
|
||
upnp bool | ||
// portsOccupancy is a slice of bools that represent the occupancy of the ports | ||
// false corresponds to an available port, true to an occupied port | ||
|
@@ -59,8 +56,6 @@ type Queen struct { | |
|
||
resolveBatchSize int | ||
resolveBatchTime int // in sec | ||
|
||
Client *db.DBClient | ||
} | ||
|
||
func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPorts, firstPort uint16) (*Queen, error) { | ||
|
@@ -71,9 +66,41 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort | |
return nil, err | ||
} | ||
|
||
mmc, err := maxmind.NewClient(os.Getenv("MAXMIND_ASN_DB"), os.Getenv("MAXMIND_COUNTRY_DB")) | ||
if err != nil { | ||
logger.Errorf("Failed to initialized Maxmind client: %v\n", err) | ||
} | ||
|
||
queen := &Queen{ | ||
nebulaDB: nebulaDB, | ||
keysDB: keysDB, | ||
peerstore: peerstore, | ||
datastore: dssync.MutexWrap(ds.NewMapDatastore()), | ||
ants: []*Ant{}, | ||
antsLogs: make(chan antslog.RequestLog, 1024), | ||
upnp: true, | ||
dbc: getDbClient(ctx), | ||
mmc: mmc, | ||
uclient: getUdgerClient(), | ||
resolveBatchSize: getBatchSize(), | ||
resolveBatchTime: getBatchSize(), | ||
} | ||
|
||
if nPorts != 0 { | ||
queen.upnp = false | ||
queen.firstPort = firstPort | ||
queen.portsOccupancy = make([]bool, nPorts) | ||
} | ||
|
||
logger.Info("queen created") | ||
|
||
return queen, nil | ||
} | ||
|
||
func getDbClient(ctx context.Context) *db.DBClient { | ||
dbPort, err := strconv.Atoi(os.Getenv("DB_PORT")) | ||
if err != nil { | ||
logger.Errorf("Port must be an integer", err) | ||
logger.Errorf("Port must be an integer: %w", err) | ||
} | ||
|
||
mP, _ := tele.NewMeterProvider() | ||
|
@@ -95,21 +122,24 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort | |
if err != nil { | ||
logger.Errorf("Failed to initialize DB client: %v\n", err) | ||
} | ||
return dbc | ||
} | ||
|
||
mmc, err := maxmind.NewClient(os.Getenv("MAXMIND_ASN_DB"), os.Getenv("MAXMIND_COUNTRY_DB")) | ||
if err != nil { | ||
logger.Errorf("Failed to initialized Maxmind client: %v\n", err) | ||
} | ||
|
||
func getUdgerClient() *udger.Client { | ||
filePathUdger := os.Getenv("UDGER_FILEPATH") | ||
var uclient *udger.Client | ||
if filePathUdger != "" { | ||
uclient, err = udger.NewClient(filePathUdger) | ||
uclient, err := udger.NewClient(filePathUdger) | ||
if err != nil { | ||
logger.Errorf("Failed to initialize Udger client with %s: %v\n", filePathUdger, err) | ||
return nil | ||
} | ||
return uclient | ||
} | ||
logger.Warn("Missing UDGER_FILEPATH: skipping udger") | ||
return nil | ||
} | ||
|
||
func getBatchSize() int { | ||
batchSizeEnvVal := os.Getenv("BATCH_SIZE") | ||
if len(batchSizeEnvVal) == 0 { | ||
batchSizeEnvVal = "1000" | ||
|
@@ -118,43 +148,19 @@ func NewQueen(ctx context.Context, dbConnString string, keysDbPath string, nPort | |
if err != nil { | ||
logger.Errorln("BATCH_SIZE should be an integer") | ||
} | ||
return batchSize | ||
} | ||
|
||
batchTimeEnvVal := os.Getenv("BATCH_SIZE") | ||
func getBatchTime() int { | ||
batchTimeEnvVal := os.Getenv("BATCH_TIME") | ||
if len(batchTimeEnvVal) == 0 { | ||
batchTimeEnvVal = "30" | ||
} | ||
batchTime, err := strconv.Atoi(batchTimeEnvVal) | ||
if err != nil { | ||
logger.Errorln("BATCH_TIME should be an integer") | ||
} | ||
|
||
queen := &Queen{ | ||
nebulaDB: nebulaDB, | ||
keysDB: keysDB, | ||
peerstore: peerstore, | ||
datastore: dssync.MutexWrap(ds.NewMapDatastore()), | ||
ants: []*Ant{}, | ||
antsLogs: make(chan antslog.RequestLog, 1024), | ||
seen: make(map[peer.ID]struct{}), | ||
dbc: dbc, | ||
mmc: mmc, | ||
uclient: uclient, | ||
resolveBatchSize: batchSize, | ||
resolveBatchTime: batchTime, | ||
Client: dbc, | ||
} | ||
|
||
if nPorts == 0 { | ||
queen.upnp = true | ||
} else { | ||
queen.upnp = false | ||
queen.firstPort = firstPort | ||
queen.portsOccupancy = make([]bool, nPorts) | ||
} | ||
|
||
logger.Debug("queen created") | ||
|
||
return queen, nil | ||
return batchTime | ||
} | ||
|
||
func (q *Queen) takeAvailablePort() (uint16, error) { | ||
|
@@ -178,6 +184,8 @@ func (q *Queen) freePort(port uint16) { | |
|
||
func (q *Queen) Run(ctx context.Context) { | ||
go q.consumeAntsLogs(ctx) | ||
go q.normalizeRequests(ctx) | ||
|
||
t := time.NewTicker(CRAWL_INTERVAL) | ||
q.routine(ctx) | ||
|
||
|
@@ -193,13 +201,6 @@ func (q *Queen) Run(ctx context.Context) { | |
} | ||
|
||
func (q *Queen) consumeAntsLogs(ctx context.Context) { | ||
lnCount := 0 | ||
f, err := os.OpenFile("log.txt", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) | ||
if err != nil { | ||
logger.Panicln(err) | ||
} | ||
defer f.Close() | ||
|
||
requests := make([]models.RequestsDenormalized, 0, q.resolveBatchSize) | ||
// bulk insert for every batch size or N seconds, whichever comes first | ||
ticker := time.NewTicker(time.Duration(q.resolveBatchTime) * time.Second) | ||
|
@@ -217,20 +218,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { | |
} else { | ||
agent = peerstoreAgent.(string) | ||
} | ||
fmt.Printf( | ||
"%s \tself: %s \ttype: %s \trequester: %s \ttarget: %s \tagent: %s \tmaddrs: %s\n", | ||
log.Timestamp.Format(time.RFC3339), | ||
log.Self, | ||
reqType, | ||
log.Requester, | ||
log.Target.B58String(), | ||
agent, | ||
maddrs, | ||
) | ||
|
||
// Keep this protocols slice empty for now, | ||
// because we don't need it yet and I don't know how to get it | ||
// protocols := make([]string, 0) | ||
// protocols, _ := q.peerstore.GetProtocols(log.Requester) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kasteph this is the string to get the list of protocols supported by the remote peer. However there is currently no field in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok! |
||
|
||
request := models.RequestsDenormalized{ | ||
RequestStartedAt: log.Timestamp, | ||
|
@@ -249,18 +237,10 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { | |
} | ||
requests = requests[:0] | ||
} | ||
if _, ok := q.seen[log.Requester]; !ok { | ||
q.seen[log.Requester] = struct{}{} | ||
if strings.Contains(agent, "light") { | ||
lnCount++ | ||
} | ||
fmt.Fprintf(f, "\r%s %s\n", log.Requester, agent) | ||
logger.Debugf("total: %d \tlight: %d", len(q.seen), lnCount) | ||
} | ||
|
||
case <-ticker.C: | ||
if len(requests) > 0 { | ||
err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests) | ||
err := db.BulkInsertRequests(ctx, q.dbc.Handler, requests) | ||
if err != nil { | ||
logger.Fatalf("Error inserting requests: %v", err) | ||
} | ||
|
@@ -269,7 +249,7 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { | |
|
||
case <-ctx.Done(): | ||
if len(requests) > 0 { | ||
err = db.BulkInsertRequests(ctx, q.dbc.Handler, requests) | ||
err := db.BulkInsertRequests(ctx, q.dbc.Handler, requests) | ||
if err != nil { | ||
logger.Fatalf("Error inserting remaining requests: %v", err) | ||
} | ||
|
@@ -279,6 +259,28 @@ func (q *Queen) consumeAntsLogs(ctx context.Context) { | |
} | ||
} | ||
|
||
func (q *Queen) normalizeRequests(ctx context.Context) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: I moved requests normalization to the queen. Let me know if this seems wrong There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this looks good, much cleaner! |
||
nctx, ncancel := context.WithCancel(ctx) | ||
defer ncancel() | ||
|
||
logger.Info("Starting continuous normalization...") | ||
|
||
for { | ||
select { | ||
case <-nctx.Done(): | ||
logger.Info("Normalization context canceled, stopping normalization loop.") | ||
return | ||
default: | ||
err := db.NormalizeRequests(nctx, q.dbc.Handler, q.dbc) | ||
if err != nil { | ||
logger.Errorf("Error during normalization: %w", err) | ||
} else { | ||
logger.Info("Normalization completed for current batch.") | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (q *Queen) persistLiveAntsKeys() { | ||
antsKeys := make([]crypto.PrivKey, 0, len(q.ants)) | ||
for _, ant := range q.ants { | ||
|
@@ -358,7 +360,7 @@ func (q *Queen) routine(ctx context.Context) { | |
|
||
for _, ant := range q.ants { | ||
logger.Debugf("Upserting ant: %v\n", ant.Host.ID().String()) | ||
antID, err := q.Client.UpsertPeer(ctx, ant.Host.ID().String(), null.StringFrom(ant.UserAgent), nil, time.Now()) | ||
antID, err := q.dbc.UpsertPeer(ctx, ant.Host.ID().String(), null.StringFrom(ant.UserAgent), nil, time.Now()) | ||
if err != nil { | ||
logger.Errorf("antID: %d could not be inserted because of %v", antID, err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kasteph are
mmc
anduclient
used anywhere at the moment?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not at the moment, these are for the multiaddress resolutions.