diff --git a/impl/cmd/cli/identity.go b/impl/cmd/cli/identity.go index 79735413..bfb36d9a 100644 --- a/impl/cmd/cli/identity.go +++ b/impl/cmd/cli/identity.go @@ -103,7 +103,7 @@ var identityAddCmd = &cobra.Command{ Answer: rrds, } // generate put request - putReq, err := dht.CreatePKARRPublishRequest(privKey, msg) + putReq, err := dht.CreatePkarrPublishRequest(privKey, msg) if err != nil { logrus.WithError(err).Error("failed to create put request") return err @@ -170,7 +170,7 @@ var identityGetCmd = &cobra.Command{ return err } - msg, err := dht.ParsePKARRGetResponse(*gotResp) + msg, err := dht.ParsePkarrGetResponse(*gotResp) if err != nil { logrus.WithError(err).Error("failed to parse get response") return err diff --git a/impl/cmd/main.go b/impl/cmd/main.go index d15265ec..0a20fcb1 100644 --- a/impl/cmd/main.go +++ b/impl/cmd/main.go @@ -31,10 +31,7 @@ import ( // @license.name Apache 2.0 // @license.url http://www.apache.org/licenses/LICENSE-2.0.html func main() { - logrus.SetFormatter(&logrus.JSONFormatter{ - DisableTimestamp: false, - PrettyPrint: true, - }) + logrus.SetFormatter(&logrus.JSONFormatter{}) logrus.SetReportCaller(true) log := logrus.NewEntry(logrus.StandardLogger()).WithField("version", config.Version) @@ -59,10 +56,10 @@ func run() error { configPath = envConfigPath } - logrus.WithField("path", configPath).Info("loading config from file") + logrus.WithContext(ctx).WithField("path", configPath).Info("loading config from file") cfg, err := config.LoadConfig(configPath) if err != nil { - logrus.Fatalf("could not instantiate config: %s", err.Error()) + logrus.WithContext(ctx).Fatalf("could not instantiate config: %s", err.Error()) } // set up logger @@ -91,7 +88,7 @@ func run() error { serverErrors := make(chan error, 1) go func() { - logrus.WithField("listen_address", s.Addr).Info("starting listener") + logrus.WithContext(ctx).WithField("listen_address", s.Addr).Info("starting listener") serverErrors <- s.ListenAndServe() }() @@ -99,7 +96,7 @@ func run() error { case err = <-serverErrors: return errors.Wrap(err, "server error") case sig := <-shutdown: - logrus.WithField("signal", sig.String()).Info("shutdown signal received") + logrus.WithContext(ctx).WithField("signal", sig.String()).Info("shutdown signal received") ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() diff --git a/impl/concurrencytest/main.go b/impl/concurrencytest/main.go index 74933928..345b67e5 100644 --- a/impl/concurrencytest/main.go +++ b/impl/concurrencytest/main.go @@ -119,7 +119,7 @@ func generateDIDPutRequest() (string, []byte, error) { return "", nil, err } - bep44Put, err := dht.CreatePKARRPublishRequest(sk, *packet) + bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) if err != nil { return "", nil, err } diff --git a/impl/integrationtest/main.go b/impl/integrationtest/main.go index 138c393f..bad1df58 100644 --- a/impl/integrationtest/main.go +++ b/impl/integrationtest/main.go @@ -103,7 +103,7 @@ func generateDIDPutRequest() (string, []byte, error) { return "", nil, err } - bep44Put, err := dht.CreatePKARRPublishRequest(sk, *packet) + bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) if err != nil { return "", nil, err } diff --git a/impl/internal/did/client_test.go b/impl/internal/did/client_test.go index 741b4882..42ceecd3 100644 --- a/impl/internal/did/client_test.go +++ b/impl/internal/did/client_test.go @@ -27,7 +27,7 @@ func TestClient(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - bep44Put, err := dht.CreatePKARRPublishRequest(sk, *packet) + bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) assert.NoError(t, err) assert.NotEmpty(t, bep44Put) diff --git a/impl/pkg/dht/dht.go b/impl/pkg/dht/dht.go index b3a68932..43d5074a 100644 --- a/impl/pkg/dht/dht.go +++ b/impl/pkg/dht/dht.go @@ -88,7 +88,7 @@ func (d *DHT) Put(ctx context.Context, request bep44.Put) (string, error) { // Check if there are any nodes in the DHT if len(d.Server.Nodes()) == 0 { - logrus.Warn("no nodes available in the DHT for publishing") + logrus.WithContext(ctx).Warn("no nodes available in the DHT for publishing") } key := util.Z32Encode(request.K[:]) @@ -101,7 +101,7 @@ func (d *DHT) Put(ctx context.Context, request bep44.Put) (string, error) { } return "", errutil.LoggingNewErrorf("failed to put key[%s] into dht, tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) } else { - logrus.WithField("key", key).Debug("successfully put key into dht") + logrus.WithContext(ctx).WithField("key", key).Debug("successfully put key into dht") } return util.Z32Encode(request.K[:]), nil } diff --git a/impl/pkg/dht/pkarr.go b/impl/pkg/dht/pkarr.go index fddbf6c8..aaba9d8a 100644 --- a/impl/pkg/dht/pkarr.go +++ b/impl/pkg/dht/pkarr.go @@ -11,7 +11,7 @@ import ( "github.com/miekg/dns" ) -// CreatePKARRPublishRequest creates a put request for the given records. Requires a public/private keypair and the records to put. +// CreatePkarrPublishRequest creates a put request for the given records. Requires a public/private keypair and the records to put. // The records are expected to be a DNS message packet, such as: // // dns.Msg{ @@ -33,7 +33,7 @@ import ( // }, // } // } -func CreatePKARRPublishRequest(privateKey ed25519.PrivateKey, msg dns.Msg) (*bep44.Put, error) { +func CreatePkarrPublishRequest(privateKey ed25519.PrivateKey, msg dns.Msg) (*bep44.Put, error) { packed, err := msg.Pack() if err != nil { return nil, util.LoggingErrorMsg(err, "failed to pack records") @@ -48,9 +48,9 @@ func CreatePKARRPublishRequest(privateKey ed25519.PrivateKey, msg dns.Msg) (*bep return put, nil } -// ParsePKARRGetResponse parses the response from a get request. +// ParsePkarrGetResponse parses the response from a get request. // The response is expected to be a slice of DNS resource records. -func ParsePKARRGetResponse(response getput.GetResult) (*dns.Msg, error) { +func ParsePkarrGetResponse(response getput.GetResult) (*dns.Msg, error) { var payload string if err := bencode.Unmarshal(response.V, &payload); err != nil { return nil, util.LoggingErrorMsg(err, "failed to unmarshal payload value") diff --git a/impl/pkg/dht/pkarr_test.go b/impl/pkg/dht/pkarr_test.go index d6392335..6b5a1643 100644 --- a/impl/pkg/dht/pkarr_test.go +++ b/impl/pkg/dht/pkarr_test.go @@ -41,7 +41,7 @@ func TestGetPutPKARRDHT(t *testing.T) { }, Answer: []dns.RR{&txtRecord}, } - put, err := CreatePKARRPublishRequest(privKey, msg) + put, err := CreatePkarrPublishRequest(privKey, msg) require.NoError(t, err) id, err := d.Put(context.Background(), *put) @@ -52,7 +52,7 @@ func TestGetPutPKARRDHT(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, got) - gotMsg, err := ParsePKARRGetResponse(*got) + gotMsg, err := ParsePkarrGetResponse(*got) require.NoError(t, err) require.NotEmpty(t, gotMsg.Answer) @@ -101,7 +101,7 @@ func TestGetPutDIDDHT(t *testing.T) { didDocPacket, err := didID.ToDNSPacket(*doc, nil, nil) require.NoError(t, err) - putReq, err := CreatePKARRPublishRequest(privKey, *didDocPacket) + putReq, err := CreatePkarrPublishRequest(privKey, *didDocPacket) require.NoError(t, err) gotID, err := dht.Put(context.Background(), *putReq) @@ -112,7 +112,7 @@ func TestGetPutDIDDHT(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, got) - gotMsg, err := ParsePKARRGetResponse(*got) + gotMsg, err := ParsePkarrGetResponse(*got) require.NoError(t, err) require.NotEmpty(t, gotMsg.Answer) diff --git a/impl/pkg/pkarr/record_test.go b/impl/pkg/pkarr/record_test.go index 4f53a737..614a292d 100644 --- a/impl/pkg/pkarr/record_test.go +++ b/impl/pkg/pkarr/record_test.go @@ -27,7 +27,7 @@ func TestNewRecord(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) diff --git a/impl/pkg/server/pkarr_test.go b/impl/pkg/server/pkarr_test.go index 264fdff1..c1d1ed16 100644 --- a/impl/pkg/server/pkarr_test.go +++ b/impl/pkg/server/pkarr_test.go @@ -172,7 +172,7 @@ func generateDIDPutRequest(t *testing.T) (string, []byte) { assert.NoError(t, err) assert.NotEmpty(t, packet) - bep44Put, err := dht.CreatePKARRPublishRequest(sk, *packet) + bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) assert.NoError(t, err) assert.NotEmpty(t, bep44Put) diff --git a/impl/pkg/server/util.go b/impl/pkg/server/util.go index 2e6bcb4d..ee447f41 100644 --- a/impl/pkg/server/util.go +++ b/impl/pkg/server/util.go @@ -46,7 +46,7 @@ func RespondBytes(c *gin.Context, data []byte, statusCode int) { // LoggingRespondError sends an error response back to the client as a safe error func LoggingRespondError(c *gin.Context, err error, statusCode int) { - logrus.WithError(err).Error() + logrus.WithContext(c).WithError(err).Error() Respond(c, err, statusCode) } diff --git a/impl/pkg/service/pkarr.go b/impl/pkg/service/pkarr.go index 6be23bf1..ed1b68da 100644 --- a/impl/pkg/service/pkarr.go +++ b/impl/pkg/service/pkarr.go @@ -76,7 +76,7 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr if got, err := s.cache.Get(id); err == nil { var resp pkarr.Response if err = json.Unmarshal(got, &resp); err == nil && record.Response().Equals(resp) { - logrus.WithField("record_id", id).Debug("resolved pkarr record from cache with matching response") + logrus.WithContext(ctx).WithField("record_id", id).Debug("resolved pkarr record from cache with matching response") return nil } } @@ -101,7 +101,7 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr defer cancel() if _, err = s.dht.Put(putCtx, record.BEP44()); err != nil { - logrus.WithError(err).Errorf("error from dht.Put for record: %s", id) + logrus.WithContext(ctx).WithError(err).Errorf("error from dht.Put for record: %s", id) } }() @@ -117,17 +117,17 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response if got, err := s.cache.Get(id); err == nil { var resp pkarr.Response if err = json.Unmarshal(got, &resp); err == nil { - logrus.WithField("record_id", id).Debug("resolved pkarr record from cache") + logrus.WithContext(ctx).WithField("record_id", id).Debug("resolved pkarr record from cache") return &resp, nil } - logrus.WithError(err).WithField("record", id).Warn("failed to get pkarr record from cache, falling back to dht") + logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from cache, falling back to dht") } // next do a dht lookup got, err := s.dht.GetFull(ctx, id) if err != nil { // try to resolve from storage before returning and error - logrus.WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage") + logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage") rawID, err := util.Z32Decode(id) if err != nil { @@ -136,11 +136,11 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response record, err := s.db.ReadRecord(ctx, rawID) if err != nil || record == nil { - logrus.WithError(err).WithField("record", id).Error("failed to resolve pkarr record from storage") + logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to resolve pkarr record from storage") return nil, err } - logrus.WithField("record", id).Debug("resolved pkarr record from storage") + logrus.WithContext(ctx).WithField("record", id).Debug("resolved pkarr record from storage") resp := record.Response() if err = s.addRecordToCache(id, record.Response()); err != nil { logrus.WithError(err).WithField("record", id).Error("failed to set pkarr record in cache") @@ -166,7 +166,7 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response // add the record to cache, do it here to avoid duplicate calculations if err = s.addRecordToCache(id, resp); err != nil { - logrus.WithError(err).Errorf("failed to set pkarr record[%s] in cache", id) + logrus.WithContext(ctx).WithError(err).Errorf("failed to set pkarr record[%s] in cache", id) } return &resp, nil @@ -197,8 +197,7 @@ func (s *PkarrService) republish() { var nextPageToken []byte var allRecords []pkarr.Record - errCnt := 0 - successCnt := 0 + errCnt, successCnt, batchCnt := 0, 0, 0 for { allRecords, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000) if err != nil { @@ -207,11 +206,12 @@ func (s *PkarrService) republish() { } if len(allRecords) == 0 { - logrus.Info("No records to republish") + logrus.Info("no records to republish") return } - logrus.WithField("record_count", len(allRecords)).Info("republishing records in batch") + logrus.WithField("record_count", len(allRecords)).Infof("republishing records in batch: %d", batchCnt) + batchCnt++ for _, record := range allRecords { recordID := zbase32.EncodeToString(record.Key[:]) @@ -232,5 +232,5 @@ func (s *PkarrService) republish() { "success": len(allRecords) - errCnt, "errors": errCnt, "total": len(allRecords), - }).Info("Republishing complete") + }).Infof("republishing complete with [%d] batches", batchCnt) } diff --git a/impl/pkg/service/pkarr_test.go b/impl/pkg/service/pkarr_test.go index f034d52d..6433829c 100644 --- a/impl/pkg/service/pkarr_test.go +++ b/impl/pkg/service/pkarr_test.go @@ -50,7 +50,7 @@ func TestPkarrService(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) @@ -77,7 +77,7 @@ func TestPkarrService(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) @@ -105,7 +105,7 @@ func TestPkarrService(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) @@ -138,7 +138,7 @@ func TestDHT(t *testing.T) { packet, err := d.ToDNSPacket(*doc, nil, nil) require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) suffix, err := d.Suffix() diff --git a/impl/pkg/storage/db/bolt/bolt.go b/impl/pkg/storage/db/bolt/bolt.go index 54d5ec10..19dc100c 100644 --- a/impl/pkg/storage/db/bolt/bolt.go +++ b/impl/pkg/storage/db/bolt/bolt.go @@ -141,7 +141,7 @@ func (b *Bolt) read(ctx context.Context, namespace, key string) ([]byte, error) err := b.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - logrus.WithField("namespace", namespace).Info("namespace does not exist") + logrus.WithContext(ctx).WithField("namespace", namespace).Info("namespace does not exist") return nil } result = bucket.Get([]byte(key)) @@ -175,7 +175,7 @@ func (b *Bolt) readSeveral(ctx context.Context, namespace string, after []byte, err := b.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(namespace)) if bucket == nil { - logrus.WithField("namespace", namespace).Warn("namespace does not exist") + logrus.WithContext(ctx).WithField("namespace", namespace).Warn("namespace does not exist") return nil } @@ -210,7 +210,7 @@ func (b *Bolt) RecordCount(ctx context.Context) (int, error) { err := b.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(pkarrNamespace)) if bucket == nil { - logrus.WithField("namespace", pkarrNamespace).Warn("namespace does not exist") + logrus.WithContext(ctx).WithField("namespace", pkarrNamespace).Warn("namespace does not exist") return nil } count = bucket.Stats().KeyN diff --git a/impl/pkg/storage/db/bolt/bolt_test.go b/impl/pkg/storage/db/bolt/bolt_test.go index b4fbd289..8a015869 100644 --- a/impl/pkg/storage/db/bolt/bolt_test.go +++ b/impl/pkg/storage/db/bolt/bolt_test.go @@ -123,7 +123,7 @@ func TestReadWrite(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) @@ -168,7 +168,7 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) @@ -189,7 +189,7 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) diff --git a/impl/pkg/storage/db/postgres/postgres.go b/impl/pkg/storage/db/postgres/postgres.go index 7d3872a5..bd41e684 100644 --- a/impl/pkg/storage/db/postgres/postgres.go +++ b/impl/pkg/storage/db/postgres/postgres.go @@ -135,7 +135,7 @@ func (p Postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit i record, err := pkarr.NewRecord(row.Key, row.Value, row.Sig, row.Seq) if err != nil { // TODO: do something useful if this happens - logrus.WithError(err).WithField("record_id", row.ID).Warn("error loading record from database, skipping") + logrus.WithContext(ctx).WithError(err).WithField("record_id", row.ID).Warn("error loading record from database, skipping") continue } diff --git a/impl/pkg/storage/db/postgres/postgres_test.go b/impl/pkg/storage/db/postgres/postgres_test.go index 9fc88ba9..9bee06bb 100644 --- a/impl/pkg/storage/db/postgres/postgres_test.go +++ b/impl/pkg/storage/db/postgres/postgres_test.go @@ -50,7 +50,7 @@ func TestReadWrite(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) @@ -95,7 +95,7 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) @@ -116,7 +116,7 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePKARRPublishRequest(sk, *packet) + putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg)