diff --git a/.github/workflows/dependabot-auto.yml b/.github/workflows/dependabot-auto.yml new file mode 100644 index 00000000..0f657b9f --- /dev/null +++ b/.github/workflows/dependabot-auto.yml @@ -0,0 +1,28 @@ +name: Dependabot auto-approve and merge +on: pull_request + +permissions: + contents: write + pull-requests: write + +jobs: + approve: + runs-on: ubuntu-latest + if: github.actor == 'dependabot[bot]' + steps: + - name: Dependabot metadata + id: metadata + uses: dependabot/fetch-metadata@v2 + with: + github-token: "${{ secrets.GITHUB_TOKEN }}" + - name: Approve a PR + run: gh pr review --approve "$PR_URL" + env: + PR_URL: ${{github.event.pull_request.html_url}} + GH_TOKEN: ${{secrets.GITHUB_TOKEN}} + - name: Enable auto-merge for Dependabot PRs + if: contains(steps.metadata.outputs.dependency-names, 'my-dependency') && steps.metadata.outputs.update-type == 'version-update:semver-patch' + run: gh pr merge --auto --merge "$PR_URL" + env: + PR_URL: ${{github.event.pull_request.html_url}} + GH_TOKEN: ${{secrets.GITHUB_TOKEN}} \ No newline at end of file diff --git a/impl/cmd/cli/identity.go b/impl/cmd/cli/identity.go index e1516645..ec4a099c 100644 --- a/impl/cmd/cli/identity.go +++ b/impl/cmd/cli/identity.go @@ -103,7 +103,7 @@ var identityAddCmd = &cobra.Command{ Answer: rrds, } // generate put request - putReq, err := dht.CreatePkarrPublishRequest(privKey, msg) + putReq, err := dht.CreateDNSPublishRequest(privKey, msg) if err != nil { logrus.WithError(err).Error("failed to create put request") return err @@ -170,7 +170,7 @@ var identityGetCmd = &cobra.Command{ return err } - msg, err := dht.ParsePkarrGetResponse(*gotResp) + msg, err := dht.ParseDNSGetResponse(*gotResp) if err != nil { logrus.WithError(err).Error("failed to parse get response") return err diff --git a/impl/concurrencytest/main.go b/impl/concurrencytest/main.go index 345b67e5..9f3fbc27 100644 --- a/impl/concurrencytest/main.go +++ b/impl/concurrencytest/main.go @@ -119,7 +119,7 @@ func generateDIDPutRequest() (string, []byte, error) { return "", nil, err } - bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) + bep44Put, err := dht.CreateDNSPublishRequest(sk, *packet) if err != nil { return "", nil, err } diff --git a/impl/config/config.go b/impl/config/config.go index 2075d04e..b2f9d5a2 100644 --- a/impl/config/config.go +++ b/impl/config/config.go @@ -41,10 +41,9 @@ func (e EnvironmentVariable) String() string { } type Config struct { - Log LogConfig `toml:"log"` - ServerConfig ServerConfig `toml:"server"` - DHTConfig DHTServiceConfig `toml:"dht"` - PkarrConfig PkarrServiceConfig `toml:"pkarr"` + Log LogConfig `toml:"log"` + ServerConfig ServerConfig `toml:"server"` + DHTConfig DHTServiceConfig `toml:"dht"` } type ServerConfig struct { @@ -57,13 +56,10 @@ type ServerConfig struct { } type DHTServiceConfig struct { - BootstrapPeers []string `toml:"bootstrap_peers"` -} - -type PkarrServiceConfig struct { - RepublishCRON string `toml:"republish_cron"` - CacheTTLSeconds int `toml:"cache_ttl_seconds"` - CacheSizeLimitMB int `toml:"cache_size_limit_mb"` + BootstrapPeers []string `toml:"bootstrap_peers"` + RepublishCRON string `toml:"republish_cron"` + CacheTTLSeconds int `toml:"cache_ttl_seconds"` + CacheSizeLimitMB int `toml:"cache_size_limit_mb"` } type LogConfig struct { @@ -81,9 +77,7 @@ func GetDefaultConfig() Config { Telemetry: false, }, DHTConfig: DHTServiceConfig{ - BootstrapPeers: GetDefaultBootstrapPeers(), - }, - PkarrConfig: PkarrServiceConfig{ + BootstrapPeers: GetDefaultBootstrapPeers(), RepublishCRON: "0 */3 * * *", CacheTTLSeconds: 600, CacheSizeLimitMB: 1000, diff --git a/impl/config/config.toml b/impl/config/config.toml index 65cf88ac..b48239a6 100644 --- a/impl/config/config.toml +++ b/impl/config/config.toml @@ -9,8 +9,6 @@ telemetry = false [dht] bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht.transmissionbt.com:6881", "router.utorrent.com:6881", "router.nuh.dev:6881"] - -[pkarr] republish_cron = "0 */3 * * *" # every 3 hours cache_ttl_seconds = 600 # 10 minutes cache_size_limit_mb = 1000 # 1000 MB \ No newline at end of file diff --git a/impl/docs/swagger.yaml b/impl/docs/swagger.yaml index b8389710..95ee37d7 100644 --- a/impl/docs/swagger.yaml +++ b/impl/docs/swagger.yaml @@ -20,7 +20,7 @@ paths: get: consumes: - application/octet-stream - description: GetRecord a Pkarr record from the DHT + description: GetRecord a BEP44 DNS record from the DHT parameters: - description: ID to get in: path @@ -48,13 +48,13 @@ paths: description: Internal server error schema: type: string - summary: GetRecord a Pkarr record from the DHT + summary: GetRecord a BEP44 DNS record from the DHT tags: - - Pkarr + - DHT put: consumes: - application/octet-stream - description: PutRecord a Pkarr record into the DHT + description: PutRecord a BEP44 DNS record into the DHT parameters: - description: ID of the record to put in: path @@ -80,9 +80,9 @@ paths: description: Internal server error schema: type: string - summary: PutRecord a Pkarr record into the DHT + summary: PutRecord a BEP44 DNS record into the DHT tags: - - Pkarr + - DHT /health: get: consumes: diff --git a/impl/integrationtest/main.go b/impl/integrationtest/main.go index 94949f5d..18ed29c7 100644 --- a/impl/integrationtest/main.go +++ b/impl/integrationtest/main.go @@ -103,7 +103,7 @@ func generateDIDPutRequest() (string, []byte, error) { return "", nil, err } - bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) + bep44Put, err := dht.CreateDNSPublishRequest(sk, *packet) if err != nil { return "", nil, err } diff --git a/impl/internal/did/client_test.go b/impl/internal/did/client_test.go index 42ceecd3..cea9c9e7 100644 --- a/impl/internal/did/client_test.go +++ b/impl/internal/did/client_test.go @@ -27,7 +27,7 @@ func TestClient(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) + bep44Put, err := dht.CreateDNSPublishRequest(sk, *packet) assert.NoError(t, err) assert.NotEmpty(t, bep44Put) diff --git a/impl/internal/did/did.go b/impl/internal/did/did.go index c8d4709b..5750cd2b 100644 --- a/impl/internal/did/did.go +++ b/impl/internal/did/did.go @@ -314,7 +314,7 @@ func (d DHT) ToDNSPacket(doc did.Document, types []TypeIndex, gateways []Authori keyType := keyTypeForJWK(*vm.PublicKeyJWK) if keyType < 0 { - return nil, fmt.Errorf("+unsupported key type given alg: %s", vm.PublicKeyJWK.ALG) + return nil, fmt.Errorf("unsupported key type given alg: %s", vm.PublicKeyJWK.ALG) } // convert the public key to a base64url encoded string diff --git a/impl/pkg/dht/pkarr.go b/impl/pkg/dht/dns.go similarity index 72% rename from impl/pkg/dht/pkarr.go rename to impl/pkg/dht/dns.go index 2265c5c3..eb374213 100644 --- a/impl/pkg/dht/pkarr.go +++ b/impl/pkg/dht/dns.go @@ -12,13 +12,13 @@ import ( "github.com/TBD54566975/did-dht-method/internal/dht" ) -// CreatePkarrPublishRequest creates a put request for the given records. Requires a public/private keypair and the records to put. -// The records are expected to be a DNS message packet, such as: +// CreateDNSPublishRequest creates a put request for the given records. Requires a public/private keypair and +// the records to put. The records are expected to be a DNS message packet, such as: // // dns.Msg{ // MsgHdr: dns.MsgHdr{ // Id: 0, -// Response: true, +// BEP44Response: true, // Authoritative: true, // }, // Answer: dns.RR{ @@ -30,11 +30,11 @@ import ( // Ttl: 7200, // }, // Txt: []string{ -// "hello pkarr", +// "hello mainline", // }, // } // } -func CreatePkarrPublishRequest(privateKey ed25519.PrivateKey, msg dns.Msg) (*bep44.Put, error) { +func CreateDNSPublishRequest(privateKey ed25519.PrivateKey, msg dns.Msg) (*bep44.Put, error) { packed, err := msg.Pack() if err != nil { return nil, util.LoggingErrorMsg(err, "failed to pack records") @@ -49,9 +49,9 @@ func CreatePkarrPublishRequest(privateKey ed25519.PrivateKey, msg dns.Msg) (*bep return put, nil } -// ParsePkarrGetResponse parses the response from a get request. +// ParseDNSGetResponse parses the response from a get request. // The response is expected to be a slice of DNS resource records. -func ParsePkarrGetResponse(response dht.FullGetResult) (*dns.Msg, error) { +func ParseDNSGetResponse(response dht.FullGetResult) (*dns.Msg, error) { var payload string if err := bencode.Unmarshal(response.V, &payload); err != nil { return nil, util.LoggingErrorMsg(err, "failed to unmarshal payload value") diff --git a/impl/pkg/dht/pkarr_test.go b/impl/pkg/dht/dns_test.go similarity index 91% rename from impl/pkg/dht/pkarr_test.go rename to impl/pkg/dht/dns_test.go index f83992b1..431e4a0c 100644 --- a/impl/pkg/dht/pkarr_test.go +++ b/impl/pkg/dht/dns_test.go @@ -16,7 +16,7 @@ import ( "github.com/TBD54566975/did-dht-method/internal/util" ) -func TestGetPutPkarrDHT(t *testing.T) { +func TestGetPutDNSDHT(t *testing.T) { dht := NewTestDHT(t) defer dht.Close() @@ -31,7 +31,7 @@ func TestGetPutPkarrDHT(t *testing.T) { Ttl: 7200, }, Txt: []string{ - "hello pkarr", + "hello mainline", }, } msg := dns.Msg{ @@ -42,7 +42,7 @@ func TestGetPutPkarrDHT(t *testing.T) { }, Answer: []dns.RR{&txtRecord}, } - put, err := CreatePkarrPublishRequest(privKey, msg) + put, err := CreateDNSPublishRequest(privKey, msg) require.NoError(t, err) id, err := dht.Put(context.Background(), *put) @@ -53,7 +53,7 @@ func TestGetPutPkarrDHT(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, got) - gotMsg, err := ParsePkarrGetResponse(*got) + gotMsg, err := ParseDNSGetResponse(*got) require.NoError(t, err) require.NotEmpty(t, gotMsg.Answer) @@ -103,7 +103,7 @@ func TestGetPutDIDDHT(t *testing.T) { didDocPacket, err := didID.ToDNSPacket(*doc, nil, nil) require.NoError(t, err) - putReq, err := CreatePkarrPublishRequest(privKey, *didDocPacket) + putReq, err := CreateDNSPublishRequest(privKey, *didDocPacket) require.NoError(t, err) gotID, err := dht.Put(context.Background(), *putReq) @@ -114,7 +114,7 @@ func TestGetPutDIDDHT(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, got) - gotMsg, err := ParsePkarrGetResponse(*got) + gotMsg, err := ParseDNSGetResponse(*got) require.NoError(t, err) require.NotEmpty(t, gotMsg.Answer) diff --git a/impl/pkg/pkarr/record.go b/impl/pkg/dht/record.go similarity index 57% rename from impl/pkg/pkarr/record.go rename to impl/pkg/dht/record.go index 862624c4..ea87e9be 100644 --- a/impl/pkg/pkarr/record.go +++ b/impl/pkg/dht/record.go @@ -1,4 +1,4 @@ -package pkarr +package dht import ( "bytes" @@ -14,40 +14,47 @@ import ( "github.com/tv42/zbase32" ) -type Response struct { +type BEP44Response struct { V []byte `validate:"required"` Seq int64 `validate:"required"` Sig [64]byte `validate:"required"` } // Equals returns true if the response is equal to the other response -func (r Response) Equals(other Response) bool { +func (r BEP44Response) Equals(other BEP44Response) bool { return r.Seq == other.Seq && bytes.Equal(r.V, other.V) && r.Sig == other.Sig } -type Record struct { +// BEP44Record represents a record in the DHT +type BEP44Record struct { Value []byte `json:"v" validate:"required"` Key [32]byte `json:"k" validate:"required"` Signature [64]byte `json:"sig" validate:"required"` SequenceNumber int64 `json:"seq" validate:"required"` } -// NewRecord returns a new Record with the given key, value, signature, and sequence number -func NewRecord(k []byte, v []byte, sig []byte, seq int64) (*Record, error) { - record := Record{SequenceNumber: seq} +// FailedRecord represents a record that failed to be written to the DHT +type FailedRecord struct { + ID string `json:"id"` + Count int `json:"count"` +} + +// NewBEP44Record returns a new BEP44Record with the given key, value, signature, and sequence number +func NewBEP44Record(k []byte, v []byte, sig []byte, seq int64) (*BEP44Record, error) { + record := BEP44Record{SequenceNumber: seq} if len(k) != 32 { - return nil, errors.New("incorrect key length for pkarr record") + return nil, errors.New("incorrect key length for bep44 record") } record.Key = [32]byte(k) if len(v) > 1000 { - return nil, errors.New("pkarr record value too long") + return nil, errors.New("bep44 record value too long") } record.Value = v if len(sig) != 64 { - return nil, errors.New("incorrect sig length for pkarr record") + return nil, errors.New("incorrect sig length for bep44 record") } record.Signature = [64]byte(sig) @@ -59,7 +66,7 @@ func NewRecord(k []byte, v []byte, sig []byte, seq int64) (*Record, error) { } // IsValid returns an error if the request is invalid; also validates the signature -func (r Record) IsValid() error { +func (r BEP44Record) IsValid() error { if err := util.IsValidStruct(r); err != nil { return err } @@ -67,7 +74,7 @@ func (r Record) IsValid() error { // validate the signature bv, err := bencode.Marshal(r.Value) if err != nil { - return fmt.Errorf("error bencoding pkarr record: %v", err) + return fmt.Errorf("error bencoding bep44 record: %v", err) } if !bep44.Verify(r.Key[:], nil, r.SequenceNumber, bv, r.Signature[:]) { @@ -76,17 +83,17 @@ func (r Record) IsValid() error { return nil } -// Response returns the record as a Response -func (r Record) Response() Response { - return Response{ +// Response returns the record as a BEP44Response +func (r BEP44Record) Response() BEP44Response { + return BEP44Response{ V: r.Value, Seq: r.SequenceNumber, Sig: r.Signature, } } -// BEP44 returns the record as a BEP44 Put message -func (r Record) BEP44() bep44.Put { +// Put returns the record as a bep44.Put message +func (r BEP44Record) Put() bep44.Put { return bep44.Put{ V: r.Value, K: &r.Key, @@ -96,18 +103,18 @@ func (r Record) BEP44() bep44.Put { } // String returns a string representation of the record -func (r Record) String() string { +func (r BEP44Record) String() string { e := base64.RawURLEncoding - return fmt.Sprintf("pkarr.Record{K=%s V=%s Sig=%s Seq=%d}", zbase32.EncodeToString(r.Key[:]), e.EncodeToString(r.Value), e.EncodeToString(r.Signature[:]), r.SequenceNumber) + return fmt.Sprintf("dht.BEP44Record{K=%s V=%s Sig=%s Seq=%d}", zbase32.EncodeToString(r.Key[:]), e.EncodeToString(r.Value), e.EncodeToString(r.Signature[:]), r.SequenceNumber) } // ID returns the base32 encoded key as a string -func (r Record) ID() string { +func (r BEP44Record) ID() string { return zbase32.EncodeToString(r.Key[:]) } // Hash returns the SHA256 hash of the record as a string -func (r Record) Hash() (string, error) { +func (r BEP44Record) Hash() (string, error) { recordBytes, err := json.Marshal(r) if err != nil { return "", err @@ -115,9 +122,9 @@ func (r Record) Hash() (string, error) { return string(sha256.New().Sum(recordBytes)), nil } -// RecordFromBEP44 returns a Record from a BEP44 Put message -func RecordFromBEP44(putMsg *bep44.Put) Record { - return Record{ +// RecordFromBEP44 returns a BEP44Record from a bep44.Put message +func RecordFromBEP44(putMsg *bep44.Put) BEP44Record { + return BEP44Record{ Key: *putMsg.K, Value: putMsg.V.([]byte), Signature: putMsg.Sig, diff --git a/impl/pkg/pkarr/record_test.go b/impl/pkg/dht/record_test.go similarity index 55% rename from impl/pkg/pkarr/record_test.go rename to impl/pkg/dht/record_test.go index 614a292d..4be14fce 100644 --- a/impl/pkg/pkarr/record_test.go +++ b/impl/pkg/dht/record_test.go @@ -1,4 +1,4 @@ -package pkarr_test +package dht_test import ( "strings" @@ -9,13 +9,12 @@ import ( "github.com/TBD54566975/did-dht-method/internal/did" "github.com/TBD54566975/did-dht-method/pkg/dht" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" ) func TestNewRecord(t *testing.T) { // validate incorrect key length is rejected - r, err := pkarr.NewRecord([]byte("aaaaaaaaaaa"), nil, nil, 0) - assert.EqualError(t, err, "incorrect key length for pkarr record") + r, err := dht.NewBEP44Record([]byte("aaaaaaaaaaa"), nil, nil, 0) + assert.EqualError(t, err, "incorrect key length for bep44 record") assert.Nil(t, r) // create a did doc as a packet to store @@ -27,30 +26,30 @@ func TestNewRecord(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) - r, err = pkarr.NewRecord(putMsg.K[:], []byte(strings.Repeat("a", 1001)), putMsg.Sig[:], putMsg.Seq) - assert.EqualError(t, err, "pkarr record value too long") + r, err = dht.NewBEP44Record(putMsg.K[:], []byte(strings.Repeat("a", 1001)), putMsg.Sig[:], putMsg.Seq) + assert.EqualError(t, err, "bep44 record value too long") assert.Nil(t, r) - r, err = pkarr.NewRecord(putMsg.K[:], putMsg.V.([]byte), []byte(strings.Repeat("a", 65)), putMsg.Seq) - assert.EqualError(t, err, "incorrect sig length for pkarr record") + r, err = dht.NewBEP44Record(putMsg.K[:], putMsg.V.([]byte), []byte(strings.Repeat("a", 65)), putMsg.Seq) + assert.EqualError(t, err, "incorrect sig length for bep44 record") assert.Nil(t, r) - r, err = pkarr.NewRecord(putMsg.K[:], putMsg.V.([]byte), putMsg.Sig[:], 0) - assert.EqualError(t, err, "Key: 'Record.SequenceNumber' Error:Field validation for 'SequenceNumber' failed on the 'required' tag") + r, err = dht.NewBEP44Record(putMsg.K[:], putMsg.V.([]byte), putMsg.Sig[:], 0) + assert.EqualError(t, err, "Key: 'BEP44Record.SequenceNumber' Error:Field validation for 'SequenceNumber' failed on the 'required' tag") assert.Nil(t, r) - r, err = pkarr.NewRecord(putMsg.K[:], putMsg.V.([]byte), putMsg.Sig[:], 1) + r, err = dht.NewBEP44Record(putMsg.K[:], putMsg.V.([]byte), putMsg.Sig[:], 1) assert.EqualError(t, err, "signature is invalid") assert.Nil(t, r) - r, err = pkarr.NewRecord(putMsg.K[:], putMsg.V.([]byte), putMsg.Sig[:], putMsg.Seq) + r, err = dht.NewBEP44Record(putMsg.K[:], putMsg.V.([]byte), putMsg.Sig[:], putMsg.Seq) assert.NoError(t, err) - bep := r.BEP44() + bep := r.Put() assert.Equal(t, putMsg.K, bep.K) assert.Equal(t, putMsg.V, bep.V) assert.Equal(t, putMsg.Sig, bep.Sig) @@ -61,7 +60,7 @@ func TestNewRecord(t *testing.T) { assert.Equal(t, r.SequenceNumber, resp.Seq) assert.Equal(t, r.Signature, resp.Sig) - r2 := pkarr.RecordFromBEP44(putMsg) + r2 := dht.RecordFromBEP44(putMsg) assert.Equal(t, r.Key, r2.Key) assert.Equal(t, r.Value, r2.Value) assert.Equal(t, r.Signature, r2.Signature) diff --git a/impl/pkg/server/pkarr.go b/impl/pkg/server/dht.go similarity index 69% rename from impl/pkg/server/pkarr.go rename to impl/pkg/server/dht.go index d5c88903..966909c0 100644 --- a/impl/pkg/server/pkarr.go +++ b/impl/pkg/server/dht.go @@ -11,26 +11,26 @@ import ( "github.com/gin-gonic/gin" "github.com/TBD54566975/did-dht-method/internal/util" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" + "github.com/TBD54566975/did-dht-method/pkg/dht" "github.com/TBD54566975/did-dht-method/pkg/service" "github.com/TBD54566975/did-dht-method/pkg/telemetry" ) -// PkarrRouter is the router for the Pkarr API -type PkarrRouter struct { - service *service.PkarrService +// DHTRouter is the router for the DHT API +type DHTRouter struct { + service *service.DHTService } -// NewPkarrRouter returns a new instance of the Relay router -func NewPkarrRouter(service *service.PkarrService) (*PkarrRouter, error) { - return &PkarrRouter{service: service}, nil +// NewDHTRouter returns a new instance of the DHT router +func NewDHTRouter(service *service.DHTService) (*DHTRouter, error) { + return &DHTRouter{service: service}, nil } // GetRecord godoc // -// @Summary GetRecord a Pkarr record from the DHT -// @Description GetRecord a Pkarr record from the DHT -// @Tags Pkarr +// @Summary GetRecord a BEP44 DNS record from the DHT +// @Description GetRecord a BEP44 DNS record from the DHT +// @Tags DHT // @Accept octet-stream // @Produce octet-stream // @Param id path string true "ID to get" @@ -39,8 +39,8 @@ func NewPkarrRouter(service *service.PkarrService) (*PkarrRouter, error) { // @Failure 404 {string} string "Not found" // @Failure 500 {string} string "Internal server error" // @Router /{id} [get] -func (r *PkarrRouter) GetRecord(c *gin.Context) { - ctx, span := telemetry.GetTracer().Start(c, "PkarrHTTP.GetRecord") +func (r *DHTRouter) GetRecord(c *gin.Context) { + ctx, span := telemetry.GetTracer().Start(c, "DHTHTTP.GetRecord") defer span.End() id := GetParam(c, IDParam) @@ -60,23 +60,22 @@ func (r *PkarrRouter) GetRecord(c *gin.Context) { return } - resp, err := r.service.GetPkarr(ctx, *id) + resp, err := r.service.GetDHT(ctx, *id) if err != nil { // TODO(gabe): provide a more maintainable way to handle custom errors if strings.Contains(err.Error(), "spam") { LoggingRespondErrMsg(c, fmt.Sprintf("too many requests for bad key %s", *id), http.StatusTooManyRequests) return } - LoggingRespondErrWithMsg(c, err, "failed to get pkarr record", http.StatusInternalServerError) + LoggingRespondErrWithMsg(c, err, "failed to get dht record", http.StatusInternalServerError) return } if resp == nil { - LoggingRespondErrMsg(c, "pkarr record not found", http.StatusNotFound) + LoggingRespondErrMsg(c, "dht record not found", http.StatusNotFound) return } // Convert int64 to uint64 since binary.PutUint64 expects a uint64 value - // according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md#get var seqBuf [8]byte binary.BigEndian.PutUint64(seqBuf[:], uint64(resp.Seq)) // sig:seq:v @@ -86,9 +85,9 @@ func (r *PkarrRouter) GetRecord(c *gin.Context) { // PutRecord godoc // -// @Summary PutRecord a Pkarr record into the DHT -// @Description PutRecord a Pkarr record into the DHT -// @Tags Pkarr +// @Summary PutRecord a BEP44 DNS record into the DHT +// @Description PutRecord a BEP44 DNS record into the DHT +// @Tags DHT // @Accept octet-stream // @Param id path string true "ID of the record to put" // @Param request body []byte true "64 bytes sig, 8 bytes u64 big-endian seq, 0-1000 bytes of v." @@ -96,8 +95,8 @@ func (r *PkarrRouter) GetRecord(c *gin.Context) { // @Failure 400 {string} string "Bad request" // @Failure 500 {string} string "Internal server error" // @Router /{id} [put] -func (r *PkarrRouter) PutRecord(c *gin.Context) { - ctx, span := telemetry.GetTracer().Start(c, "PkarrHTTP.PutRecord") +func (r *DHTRouter) PutRecord(c *gin.Context) { + ctx, span := telemetry.GetTracer().Start(c, "DHTHTTP.PutRecord") defer span.End() id := GetParam(c, IDParam) @@ -129,18 +128,17 @@ func (r *PkarrRouter) PutRecord(c *gin.Context) { } // transform the request into a service request by extracting the fields - // according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md#put value := body[72:] sig := body[:64] seq := int64(binary.BigEndian.Uint64(body[64:72])) - request, err := pkarr.NewRecord(key, value, sig, seq) + request, err := dht.NewBEP44Record(key, value, sig, seq) if err != nil { LoggingRespondErrWithMsg(c, err, "error parsing request", http.StatusBadRequest) return } - if err = r.service.PublishPkarr(ctx, *id, *request); err != nil { - LoggingRespondErrWithMsg(c, err, "failed to publish pkarr record", http.StatusInternalServerError) + if err = r.service.PublishDHT(ctx, *id, *request); err != nil { + LoggingRespondErrWithMsg(c, err, "failed to publish dht record", http.StatusInternalServerError) return } diff --git a/impl/pkg/server/pkarr_test.go b/impl/pkg/server/dht_test.go similarity index 89% rename from impl/pkg/server/pkarr_test.go rename to impl/pkg/server/dht_test.go index fe43655c..64e068d3 100644 --- a/impl/pkg/server/pkarr_test.go +++ b/impl/pkg/server/dht_test.go @@ -19,13 +19,13 @@ import ( "github.com/TBD54566975/did-dht-method/pkg/storage" ) -func TestPkarrRouter(t *testing.T) { - pkarrSvc := testPkarrService(t) - pkarrRouter, err := NewPkarrRouter(&pkarrSvc) +func TestDHTRouter(t *testing.T) { + dhtSvc := testDHTService(t) + dhtRouter, err := NewDHTRouter(&dhtSvc) require.NoError(t, err) - require.NotEmpty(t, pkarrRouter) + require.NotEmpty(t, dhtRouter) - defer pkarrSvc.Close() + defer dhtSvc.Close() t.Run("test put record", func(t *testing.T) { didID, reqData := generateDIDPutRequest(t) @@ -36,7 +36,7 @@ func TestPkarrRouter(t *testing.T) { req := httptest.NewRequest(http.MethodPut, fmt.Sprintf("%s/%s", testServerURL, suffix), bytes.NewReader(reqData)) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.PutRecord(c) + dhtRouter.PutRecord(c) assert.True(t, is2xxResponse(w.Code), "unexpected %s", w.Result().Status) }) @@ -49,14 +49,14 @@ func TestPkarrRouter(t *testing.T) { req := httptest.NewRequest(http.MethodPut, fmt.Sprintf("%s/%s", testServerURL, suffix), bytes.NewReader(reqData)) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.PutRecord(c) + dhtRouter.PutRecord(c) assert.True(t, is2xxResponse(w.Code), "unexpected %s", w.Result().Status) w = httptest.NewRecorder() req = httptest.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServerURL, suffix), nil) c = newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.GetRecord(c) + dhtRouter.GetRecord(c) assert.True(t, is2xxResponse(w.Code), "unexpected %s", w.Result().Status) resp, err := io.ReadAll(w.Body) @@ -74,14 +74,14 @@ func TestPkarrRouter(t *testing.T) { req := httptest.NewRequest(http.MethodPut, fmt.Sprintf("%s/%s", testServerURL, suffix), bytes.NewReader(reqData)) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.PutRecord(c) + dhtRouter.PutRecord(c) assert.True(t, is2xxResponse(w.Code), "unexpected %s", w.Result().Status) w = httptest.NewRecorder() req = httptest.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServerURL, suffix), nil) c = newRequestContextWithParams(w, req, map[string]string{}) - pkarrRouter.GetRecord(c) + dhtRouter.GetRecord(c) assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode, "unexpected %s", w.Result().Status) }) @@ -93,7 +93,7 @@ func TestPkarrRouter(t *testing.T) { req := httptest.NewRequest(http.MethodPut, fmt.Sprintf("%s/", testServerURL), bytes.NewReader(reqData)) c := newRequestContextWithParams(w, req, map[string]string{}) - pkarrRouter.PutRecord(c) + dhtRouter.PutRecord(c) assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode, "unexpected %s", w.Result().Status) }) @@ -106,7 +106,7 @@ func TestPkarrRouter(t *testing.T) { req := httptest.NewRequest(http.MethodPut, fmt.Sprintf("%s/%s", testServerURL, suffix), bytes.NewReader(reqData)) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.PutRecord(c) + dhtRouter.PutRecord(c) assert.Equal(t, http.StatusInternalServerError, w.Result().StatusCode, "unexpected %s", w.Result().Status) }) @@ -121,7 +121,7 @@ func TestPkarrRouter(t *testing.T) { req := httptest.NewRequest(http.MethodPut, fmt.Sprintf("%s/%s", testServerURL, suffix), bytes.NewReader(reqData)) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.PutRecord(c) + dhtRouter.PutRecord(c) assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode, "unexpected %s", w.Result().Status) }) @@ -134,7 +134,7 @@ func TestPkarrRouter(t *testing.T) { req := httptest.NewRequest(http.MethodPut, fmt.Sprintf("%s/%s", testServerURL, suffix), bytes.NewReader(reqData)) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.PutRecord(c) + dhtRouter.PutRecord(c) assert.Equal(t, http.StatusBadRequest, w.Result().StatusCode, "unexpected %s", w.Result().Status) }) @@ -143,7 +143,7 @@ func TestPkarrRouter(t *testing.T) { suffix := "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy" req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServerURL, suffix), nil) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.GetRecord(c) + dhtRouter.GetRecord(c) assert.Equal(t, http.StatusNotFound, w.Result().StatusCode, "unexpected %s", w.Result().Status) }) @@ -152,18 +152,18 @@ func TestPkarrRouter(t *testing.T) { suffix := "cz13drbfxy3ih6xun4mw3cyiexrtfcs9gyp46o4469e93y36zhsy" req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServerURL, suffix), nil) c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.GetRecord(c) + dhtRouter.GetRecord(c) assert.Equal(t, http.StatusNotFound, w.Result().StatusCode, "unexpected %s", w.Result().Status) w = httptest.NewRecorder() req = httptest.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServerURL, suffix), nil) c = newRequestContextWithParams(w, req, map[string]string{IDParam: suffix}) - pkarrRouter.GetRecord(c) + dhtRouter.GetRecord(c) assert.Equal(t, http.StatusTooManyRequests, w.Result().StatusCode, "unexpected %s", w.Result().Status) }) } -func testPkarrService(t *testing.T) service.PkarrService { +func testDHTService(t *testing.T) service.DHTService { defaultConfig := config.GetDefaultConfig() db, err := storage.NewStorage(defaultConfig.ServerConfig.StorageURI) @@ -171,11 +171,11 @@ func testPkarrService(t *testing.T) service.PkarrService { require.NotEmpty(t, db) dht := dht.NewTestDHT(t) - pkarrService, err := service.NewPkarrService(&defaultConfig, db, dht) + dhtService, err := service.NewDHTService(&defaultConfig, db, dht) require.NoError(t, err) - require.NotEmpty(t, pkarrService) + require.NotEmpty(t, dhtService) - return *pkarrService + return *dhtService } func generateDIDPutRequest(t *testing.T) (string, []byte) { @@ -188,7 +188,7 @@ func generateDIDPutRequest(t *testing.T) (string, []byte) { assert.NoError(t, err) assert.NotEmpty(t, packet) - bep44Put, err := dht.CreatePkarrPublishRequest(sk, *packet) + bep44Put, err := dht.CreateDNSPublishRequest(sk, *packet) assert.NoError(t, err) assert.NotEmpty(t, bep44Put) diff --git a/impl/pkg/server/server.go b/impl/pkg/server/server.go index fd6e406e..e6093408 100644 --- a/impl/pkg/server/server.go +++ b/impl/pkg/server/server.go @@ -31,7 +31,7 @@ type Server struct { shutdown chan os.Signal cfg *config.Config - svc *service.PkarrService + svc *service.DHTService } // NewServer returns a new instance of Server with the given db and host. @@ -51,9 +51,9 @@ func NewServer(cfg *config.Config, shutdown chan os.Signal, d *dht.DHT) (*Server logrus.WithField("record_count", recordCnt).Info("storage instantiated with record count") } - pkarrService, err := service.NewPkarrService(cfg, db, d) + dhtService, err := service.NewDHTService(cfg, db, d) if err != nil { - return nil, util.LoggingErrorMsg(err, "could not instantiate pkarr service") + return nil, util.LoggingErrorMsg(err, "could not instantiate the dht service") } handler.GET("/health", Health) @@ -63,8 +63,8 @@ func NewServer(cfg *config.Config, shutdown chan os.Signal, d *dht.DHT) (*Server handler.GET("/swagger/*any", ginswagger.WrapHandler(swaggerfiles.Handler, ginswagger.URL("/swagger.yaml"))) // root relay API - if err = PkarrAPI(&handler.RouterGroup, pkarrService); err != nil { - return nil, util.LoggingErrorMsg(err, "could not setup pkarr API") + if err = DHTAPI(&handler.RouterGroup, dhtService); err != nil { + return nil, util.LoggingErrorMsg(err, "could not setup the dht API") } return &Server{ Server: &http.Server{ @@ -76,7 +76,7 @@ func NewServer(cfg *config.Config, shutdown chan os.Signal, d *dht.DHT) (*Server MaxHeaderBytes: 1 << 20, }, cfg: cfg, - svc: pkarrService, + svc: dhtService, handler: handler, shutdown: shutdown, }, nil @@ -105,14 +105,14 @@ func setupHandler(env config.Environment) *gin.Engine { return handler } -// PkarrAPI sets up the relay API routes according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md -func PkarrAPI(rg *gin.RouterGroup, service *service.PkarrService) error { - relayRouter, err := NewPkarrRouter(service) +// DHTAPI sets up the relay API routes according to the spec https://did-dht.com/#gateway-api +func DHTAPI(rg *gin.RouterGroup, service *service.DHTService) error { + dhtRouter, err := NewDHTRouter(service) if err != nil { - return util.LoggingErrorMsg(err, "could not instantiate relay router") + return util.LoggingErrorMsg(err, "could not instantiate dht router") } - rg.PUT("/:id", relayRouter.PutRecord) - rg.GET("/:id", relayRouter.GetRecord) + rg.PUT("/:id", dhtRouter.PutRecord) + rg.GET("/:id", dhtRouter.GetRecord) return nil } diff --git a/impl/pkg/service/pkarr.go b/impl/pkg/service/dht.go similarity index 54% rename from impl/pkg/service/pkarr.go rename to impl/pkg/service/dht.go index 1b17ac68..b3c6d64d 100644 --- a/impl/pkg/service/pkarr.go +++ b/impl/pkg/service/dht.go @@ -3,7 +3,6 @@ package service import ( "context" "sync" - "sync/atomic" "time" ssiutil "github.com/TBD54566975/ssi-sdk/util" @@ -12,22 +11,20 @@ import ( "github.com/goccy/go-json" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/tv42/zbase32" "github.com/TBD54566975/did-dht-method/internal/util" "github.com/TBD54566975/did-dht-method/config" dhtint "github.com/TBD54566975/did-dht-method/internal/dht" "github.com/TBD54566975/did-dht-method/pkg/dht" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" "github.com/TBD54566975/did-dht-method/pkg/storage" "github.com/TBD54566975/did-dht-method/pkg/telemetry" ) -const recordSizeLimit = 1000 +const recordSizeLimitBytes = 1000 -// PkarrService is the Pkarr service responsible for managing the Pkarr DHT and reading/writing records -type PkarrService struct { +// DHTService is the service responsible for managing BEP44 DNS records in the DHT and reading/writing records +type DHTService struct { cfg *config.Config db storage.Storage dht *dht.DHT @@ -36,17 +33,17 @@ type PkarrService struct { scheduler *dhtint.Scheduler } -// NewPkarrService returns a new instance of the Pkarr service -func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*PkarrService, error) { +// NewDHTService returns a new instance of the DHT service +func NewDHTService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*DHTService, error) { if cfg == nil { return nil, ssiutil.LoggingNewError("config is required") } // create and start get cache - cacheTTL := time.Duration(cfg.PkarrConfig.CacheTTLSeconds) * time.Second + cacheTTL := time.Duration(cfg.DHTConfig.CacheTTLSeconds) * time.Second cacheConfig := bigcache.DefaultConfig(cacheTTL) - cacheConfig.MaxEntrySize = recordSizeLimit - cacheConfig.HardMaxCacheSize = cfg.PkarrConfig.CacheSizeLimitMB + cacheConfig.MaxEntrySize = recordSizeLimitBytes + cacheConfig.HardMaxCacheSize = cfg.DHTConfig.CacheSizeLimitMB cacheConfig.CleanWindow = cacheTTL / 2 cache, err := bigcache.New(context.Background(), cacheConfig) if err != nil { @@ -63,7 +60,7 @@ func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*Pkarr // start scheduler for republishing scheduler := dhtint.NewScheduler() - svc := PkarrService{ + svc := DHTService{ cfg: cfg, db: db, dht: d, @@ -71,15 +68,15 @@ func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*Pkarr badGetCache: badGetCache, scheduler: &scheduler, } - if err = scheduler.Schedule(cfg.PkarrConfig.RepublishCRON, svc.republish); err != nil { + if err = scheduler.Schedule(cfg.DHTConfig.RepublishCRON, svc.republish); err != nil { return nil, ssiutil.LoggingErrorMsg(err, "failed to start republisher") } return &svc, nil } -// PublishPkarr stores the record in the db, publishes the given Pkarr record to the DHT, and returns the z-base-32 encoded ID -func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr.Record) error { - ctx, span := telemetry.GetTracer().Start(ctx, "PkarrService.PublishPkarr") +// PublishDHT stores the record in the db, publishes the given DNS record to the DHT, and returns the z-base-32 encoded ID +func (s *DHTService) PublishDHT(ctx context.Context, id string, record dht.BEP44Record) error { + ctx, span := telemetry.GetTracer().Start(ctx, "DHTService.PublishDHT") defer span.End() // make sure the key is valid @@ -93,9 +90,9 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr // check if the message is already in the cache if got, err := s.cache.Get(id); err == nil { - var resp pkarr.Response + var resp dht.BEP44Response if err = json.Unmarshal(got, &resp); err == nil && record.Response().Equals(resp) { - logrus.WithContext(ctx).WithField("record_id", id).Debug("resolved pkarr record from cache with matching response") + logrus.WithContext(ctx).WithField("record_id", id).Debug("resolved dht record from cache with matching response") return nil } } @@ -111,7 +108,7 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr if err = s.cache.Set(id, recordBytes); err != nil { return err } - logrus.WithContext(ctx).WithField("record", id).Debug("added pkarr record to cache and db") + logrus.WithContext(ctx).WithField("record", id).Debug("added dht record to cache and db") // return here and put it in the DHT asynchronously go func() { @@ -119,19 +116,19 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr putCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if _, err = s.dht.Put(putCtx, record.BEP44()); err != nil { + if _, err = s.dht.Put(putCtx, record.Put()); err != nil { logrus.WithContext(ctx).WithError(err).Errorf("error from dht.Put for record: %s", id) } else { - logrus.WithContext(ctx).WithField("record", id).Debug("put pkarr record to DHT") + logrus.WithContext(ctx).WithField("record", id).Debug("put record to DHT") } }() return nil } -// GetPkarr returns the full Pkarr record (including sig data) for the given z-base-32 encoded ID -func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "PkarrService.GetPkarr") +// GetDHT returns the full DNS record (including sig data) for the given z-base-32 encoded ID +func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "DHTService.GetDHT") defer span.End() // make sure the key is valid @@ -146,12 +143,12 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response // first do a cache lookup if got, err := s.cache.Get(id); err == nil { - var resp pkarr.Response + var resp dht.BEP44Response if err = json.Unmarshal(got, &resp); err == nil { - logrus.WithContext(ctx).WithField("record_id", id).Info("resolved pkarr record from cache") + logrus.WithContext(ctx).WithField("record_id", id).Info("resolved record from cache") return &resp, nil } - logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from cache, falling back to dht") + logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get record from cache, falling back to dht") } // next do a dht lookup with a timeout of 10 seconds @@ -163,17 +160,12 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response if errors.Is(err, context.DeadlineExceeded) { logrus.WithContext(ctx).WithField("record", id).Warn("dht lookup timed out, attempting to resolve from storage") } else { - logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage") + logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get record from dht, attempting to resolve from storage") } - rawID, err := util.Z32Decode(id) - if err != nil { - return nil, err - } - - record, err := s.db.ReadRecord(ctx, rawID) + record, err := s.db.ReadRecord(ctx, id) if err != nil || record == nil { - logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to resolve pkarr record from storage; adding to badGetCache") + logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to resolve record from storage; adding to badGetCache") // add the key to the badGetCache to prevent spamming the DHT if err = s.badGetCache.Set(id, []byte{0}); err != nil { @@ -183,11 +175,11 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response return nil, err } - logrus.WithContext(ctx).WithField("record", id).Info("resolved pkarr record from storage") + logrus.WithContext(ctx).WithField("record", id).Info("resolved record from storage") resp := record.Response() // add the record back to the cache for future lookups if err = s.addRecordToCache(id, record.Response()); err != nil { - logrus.WithError(err).WithField("record", id).Error("failed to set pkarr record in cache") + logrus.WithError(err).WithField("record", id).Error("failed to set record in cache") } return &resp, err @@ -202,7 +194,7 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response if err = bencode.Unmarshal(bBytes, &payload); err != nil { return nil, ssiutil.LoggingCtxErrorMsg(ctx, err, "failed to unmarshal bencoded payload") } - resp := pkarr.Response{ + resp := dht.BEP44Response{ V: []byte(payload), Seq: got.Seq, Sig: got.Sig, @@ -210,15 +202,15 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response // add the record to cache, do it here to avoid duplicate calculations if err = s.addRecordToCache(id, resp); err != nil { - logrus.WithContext(ctx).WithError(err).Errorf("failed to set pkarr record[%s] in cache", id) + logrus.WithContext(ctx).WithField("record", id).WithError(err).Error("failed to set record in cache") } else { - logrus.WithContext(ctx).WithField("record", id).Info("added pkarr record back to cache") + logrus.WithContext(ctx).WithField("record", id).Info("added record back to cache") } return &resp, nil } -func (s *PkarrService) addRecordToCache(id string, resp pkarr.Response) error { +func (s *DHTService) addRecordToCache(id string, resp dht.BEP44Response) error { recordBytes, err := json.Marshal(resp) if err != nil { return err @@ -229,34 +221,47 @@ func (s *PkarrService) addRecordToCache(id string, resp pkarr.Response) error { return nil } +// failedRecord is a struct to keep track of records that failed to be republished +type failedRecord struct { + record dht.BEP44Record + failureCnt int +} + // TODO(gabe) make this more efficient. create a publish schedule based on each individual record, not all records -func (s *PkarrService) republish() { - ctx, span := telemetry.GetTracer().Start(context.Background(), "PkarrService.republish") +func (s *DHTService) republish() { + ctx, span := telemetry.GetTracer().Start(context.Background(), "DHTService.republish") defer span.End() recordCnt, err := s.db.RecordCount(ctx) if err != nil { logrus.WithContext(ctx).WithError(err).Error("failed to get record count before republishing") return - } else { - logrus.WithContext(ctx).WithField("record_count", recordCnt).Info("republishing records") } + logrus.WithContext(ctx).WithField("record_count", recordCnt).Info("republishing records") + + // republish all records in the db and handle failed records up to 3 times + failedRecords := s.republishRecords(ctx) + s.handleFailedRecords(ctx, failedRecords) +} +// republishRecords republishes all records in the db to the DHT and returns a list of failed records +func (s *DHTService) republishRecords(ctx context.Context) []failedRecord { var nextPageToken []byte - var recordsBatch []pkarr.Record - var seenRecords, batchCnt, successCnt, errCnt int32 = 0, 1, 0, 0 + var seenRecords, batchCnt int32 + var failedRecords []failedRecord for { - recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000) + recordsBatch, nextPageToken, err := s.db.ListRecords(ctx, nextPageToken, 1000) if err != nil { logrus.WithContext(ctx).WithError(err).Error("failed to list record(s) for republishing") - return + return failedRecords } + batchSize := len(recordsBatch) seenRecords += int32(batchSize) if batchSize == 0 { logrus.WithContext(ctx).Info("no records to republish") - return + return failedRecords } logrus.WithContext(ctx).WithFields(logrus.Fields{ @@ -266,68 +271,98 @@ func (s *PkarrService) republish() { }).Infof("republishing batch [%d] of [%d] records", batchCnt, batchSize) batchCnt++ - var wg sync.WaitGroup - wg.Add(batchSize) + failedRecords = append(failedRecords, s.republishBatch(ctx, recordsBatch)...) - var batchErrCnt, batchSuccessCnt int32 = 0, 0 - for _, record := range recordsBatch { - go func(ctx context.Context, record pkarr.Record) { - defer wg.Done() + if nextPageToken == nil { + break + } + } - recordID := zbase32.EncodeToString(record.Key[:]) - logrus.WithContext(ctx).Debugf("republishing record: %s", recordID) + successRate := float64(seenRecords-int32(len(failedRecords))) / float64(seenRecords) * 100 + logrus.WithContext(ctx).WithFields(logrus.Fields{ + "success": seenRecords - int32(len(failedRecords)), + "errors": len(failedRecords), + "total": seenRecords, + }).Infof("republishing complete with [%d] batches of [%d] total records with a [%.2f] percent success rate", batchCnt, seenRecords, successRate) - putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() + return failedRecords +} - if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil { - logrus.WithContext(putCtx).WithError(putErr).Debugf("failed to republish record: %s", recordID) - atomic.AddInt32(&batchErrCnt, 1) - } else { - atomic.AddInt32(&batchSuccessCnt, 1) - } - }(ctx, record) - } +// republishBatch republishes a batch of records to the DHT and returns a list of failed records +func (s *DHTService) republishBatch(ctx context.Context, recordsBatch []dht.BEP44Record) []failedRecord { + var wg sync.WaitGroup + var failedRecords []failedRecord - // Wait for all goroutines in this batch to finish before moving on to the next batch - wg.Wait() + for _, record := range recordsBatch { + wg.Add(1) + go func(ctx context.Context, record dht.BEP44Record) { + defer wg.Done() - // Update the success and error counts - atomic.AddInt32(&successCnt, batchSuccessCnt) - atomic.AddInt32(&errCnt, batchErrCnt) + id := record.ID() + logrus.WithContext(ctx).WithField("record_id", id).Debug("republishing record") - successRate := float64(batchSuccessCnt) / float64(batchSize) + putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() - logrus.WithContext(ctx).WithFields(logrus.Fields{ - "batch_number": batchCnt, - "success": successCnt, - "errors": errCnt, - }).Infof("batch [%d] completed with a [%.2f] percent success rate", batchCnt, successRate*100) - - if successRate < 0.8 { - logrus.WithContext(ctx).WithFields(logrus.Fields{ - "batch_number": batchCnt, - "success": successCnt, - "errors": errCnt, - }).Errorf("batch [%d] failed to meet success rate threshold; exiting republishing early", batchCnt) - break - } + if _, putErr := s.dht.Put(putCtx, record.Put()); putErr != nil { + logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debug("failed to republish record") + failedRecords = append(failedRecords, failedRecord{ + record: record, + failureCnt: 1, + }) + } + }(ctx, record) + } - if nextPageToken == nil { + wg.Wait() + return failedRecords +} + +// handleFailedRecords attempts to republish failed records up to 3 times +func (s *DHTService) handleFailedRecords(ctx context.Context, failedRecords []failedRecord) { + for i := 0; i < 3; i++ { + var remainingFailedRecords []failedRecord + for _, fr := range failedRecords { + id := fr.record.ID() + putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + if _, putErr := s.dht.Put(putCtx, fr.record.Put()); putErr != nil { + logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debugf("failed to re-republish [%s], attempt: %d", id, i+1) + fr.failureCnt++ + if fr.failureCnt <= 3 { + remainingFailedRecords = append(remainingFailedRecords, fr) + } else { + logrus.WithContext(ctx).WithField("record_id", id).Errorf("record failed to republish after 3 attempts") + } + } + } + failedRecords = remainingFailedRecords + if len(failedRecords) == 0 { + logrus.WithContext(ctx).Info("all failed records successfully republished") break } + if i == 2 { + logrus.WithContext(ctx).WithField("failed_records", failedRecords).Error("failed to republish all records after 3 attempts") + for _, fr := range failedRecords { + id := fr.record.ID() + if err := s.db.WriteFailedRecord(ctx, id); err != nil { + logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Warn("failed to write failed record to db") + } + } + } } - successRate := float64(successCnt) / float64(seenRecords) - logrus.WithContext(ctx).WithFields(logrus.Fields{ - "success": seenRecords - errCnt, - "errors": errCnt, - "total": seenRecords, - }).Infof("republishing complete with [%d] batches of [%d] total records with an [%.2f] percent success rate", batchCnt, seenRecords, successRate*100) + failedRecordCnt, err := s.db.FailedRecordCount(ctx) + if err != nil { + logrus.WithContext(ctx).WithError(err).Error("failed to get failed record count") + return + } + logrus.WithContext(ctx).WithField("failed_record_count", failedRecordCnt).Warn("total failed records") } -// Close closes the Pkarr service gracefully -func (s *PkarrService) Close() { +// Close closes the Mainline service gracefully +func (s *DHTService) Close() { if s == nil { return } diff --git a/impl/pkg/service/pkarr_test.go b/impl/pkg/service/dht_test.go similarity index 71% rename from impl/pkg/service/pkarr_test.go rename to impl/pkg/service/dht_test.go index 8d83d90f..f2affb02 100644 --- a/impl/pkg/service/pkarr_test.go +++ b/impl/pkg/service/dht_test.go @@ -13,27 +13,26 @@ import ( "github.com/TBD54566975/did-dht-method/config" "github.com/TBD54566975/did-dht-method/internal/did" "github.com/TBD54566975/did-dht-method/pkg/dht" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" "github.com/TBD54566975/did-dht-method/pkg/storage" ) -func TestPkarrService(t *testing.T) { - svc := newPkarrService(t, "a") +func TestDHTService(t *testing.T) { + svc := newDHTService(t, "a") t.Run("test put bad record", func(t *testing.T) { - err := svc.PublishPkarr(context.Background(), "", pkarr.Record{}) + err := svc.PublishDHT(context.Background(), "", dht.BEP44Record{}) assert.Error(t, err) assert.Contains(t, err.Error(), "validation for 'Value' failed on the 'required' tag") }) t.Run("test get non existent record", func(t *testing.T) { - got, err := svc.GetPkarr(context.Background(), "test") + got, err := svc.GetDHT(context.Background(), "test") assert.NoError(t, err) assert.Nil(t, got) }) t.Run("test get record with invalid ID", func(t *testing.T) { - got, err := svc.GetPkarr(context.Background(), "---") + got, err := svc.GetDHT(context.Background(), "---") assert.ErrorContains(t, err, "illegal z-base-32 data at input byte 0") assert.Nil(t, got) }) @@ -49,18 +48,18 @@ func TestPkarrService(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) suffix, err := d.Suffix() require.NoError(t, err) - err = svc.PublishPkarr(context.Background(), suffix, pkarr.RecordFromBEP44(putMsg)) + err = svc.PublishDHT(context.Background(), suffix, dht.RecordFromBEP44(putMsg)) assert.NoError(t, err) // invalidate the signature putMsg.Sig[0] = 0 - err = svc.PublishPkarr(context.Background(), suffix, pkarr.RecordFromBEP44(putMsg)) + err = svc.PublishDHT(context.Background(), suffix, dht.RecordFromBEP44(putMsg)) assert.Error(t, err) assert.Contains(t, err.Error(), "signature is invalid") }) @@ -76,16 +75,16 @@ func TestPkarrService(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) suffix, err := d.Suffix() require.NoError(t, err) - err = svc.PublishPkarr(context.Background(), suffix, pkarr.RecordFromBEP44(putMsg)) + err = svc.PublishDHT(context.Background(), suffix, dht.RecordFromBEP44(putMsg)) assert.NoError(t, err) - got, err := svc.GetPkarr(context.Background(), suffix) + got, err := svc.GetDHT(context.Background(), suffix) assert.NoError(t, err) assert.NotEmpty(t, got) assert.Equal(t, putMsg.V, got.V) @@ -104,20 +103,20 @@ func TestPkarrService(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) suffix, err := d.Suffix() require.NoError(t, err) - err = svc.PublishPkarr(context.Background(), suffix, pkarr.RecordFromBEP44(putMsg)) + err = svc.PublishDHT(context.Background(), suffix, dht.RecordFromBEP44(putMsg)) require.NoError(t, err) // remove it from the cache so the get tests the uncached lookup path err = svc.cache.Delete(suffix) require.NoError(t, err) - got, err := svc.GetPkarr(context.Background(), suffix) + got, err := svc.GetDHT(context.Background(), suffix) assert.NoError(t, err) assert.NotEmpty(t, got) assert.Equal(t, putMsg.V, got.V) @@ -126,12 +125,12 @@ func TestPkarrService(t *testing.T) { }) t.Run("test get record with invalid ID", func(t *testing.T) { - got, err := svc.GetPkarr(context.Background(), "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy") + got, err := svc.GetDHT(context.Background(), "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy") assert.NoError(t, err) assert.Empty(t, got) // try it again to make sure the cache is working - got, err = svc.GetPkarr(context.Background(), "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy") + got, err = svc.GetDHT(context.Background(), "uqaj3fcr9db6jg6o9pjs53iuftyj45r46aubogfaceqjbo6pp9sy") assert.ErrorContains(t, err, "rate limited to prevent spam") assert.Empty(t, got) }) @@ -140,7 +139,7 @@ func TestPkarrService(t *testing.T) { } func TestDHT(t *testing.T) { - svc1 := newPkarrService(t, "b") + svc1 := newDHTService(t, "b") // create and publish a record to service1 sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{}) @@ -150,16 +149,16 @@ func TestDHT(t *testing.T) { packet, err := d.ToDNSPacket(*doc, nil, nil) require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) suffix, err := d.Suffix() require.NoError(t, err) - err = svc1.PublishPkarr(context.Background(), suffix, pkarr.RecordFromBEP44(putMsg)) + err = svc1.PublishDHT(context.Background(), suffix, dht.RecordFromBEP44(putMsg)) require.NoError(t, err) // make sure we can get it back - got, err := svc1.GetPkarr(context.Background(), suffix) + got, err := svc1.GetDHT(context.Background(), suffix) require.NoError(t, err) require.NotEmpty(t, got) assert.Equal(t, putMsg.V, got.V) @@ -167,10 +166,10 @@ func TestDHT(t *testing.T) { assert.Equal(t, putMsg.Seq, got.Seq) // create service2 with service1 as a bootstrap peer - svc2 := newPkarrService(t, "c", anacrolixdht.NewAddr(svc1.dht.Addr())) + svc2 := newDHTService(t, "c", anacrolixdht.NewAddr(svc1.dht.Addr())) // get the record via service2 - gotFrom2, err := svc2.GetPkarr(context.Background(), suffix) + gotFrom2, err := svc2.GetDHT(context.Background(), suffix) require.NoError(t, err) require.NotEmpty(t, gotFrom2) assert.Equal(t, putMsg.V, gotFrom2.V) @@ -184,20 +183,20 @@ func TestDHT(t *testing.T) { } func TestNoConfig(t *testing.T) { - svc, err := NewPkarrService(nil, nil, nil) + svc, err := NewDHTService(nil, nil, nil) assert.EqualError(t, err, "config is required") assert.Empty(t, svc) - svc, err = NewPkarrService(&config.Config{ - PkarrConfig: config.PkarrServiceConfig{ + svc, err = NewDHTService(&config.Config{ + DHTConfig: config.DHTServiceConfig{ CacheSizeLimitMB: -1, }, }, nil, nil) assert.EqualError(t, err, "failed to instantiate cache: HardMaxCacheSize must be >= 0") assert.Nil(t, svc) - svc, err = NewPkarrService(&config.Config{ - PkarrConfig: config.PkarrServiceConfig{ + svc, err = NewDHTService(&config.Config{ + DHTConfig: config.DHTServiceConfig{ RepublishCRON: "not a real cron expression", }, }, nil, nil) @@ -207,7 +206,7 @@ func TestNoConfig(t *testing.T) { t.Cleanup(func() { svc.Close() }) } -func newPkarrService(t *testing.T, id string, bootstrapPeers ...anacrolixdht.Addr) PkarrService { +func newDHTService(t *testing.T, id string, bootstrapPeers ...anacrolixdht.Addr) DHTService { defaultConfig := config.GetDefaultConfig() db, err := storage.NewStorage(fmt.Sprintf("bolt://diddht-test-%s.db", id)) @@ -217,9 +216,9 @@ func newPkarrService(t *testing.T, id string, bootstrapPeers ...anacrolixdht.Add t.Cleanup(func() { os.Remove(fmt.Sprintf("diddht-test-%s.db", id)) }) d := dht.NewTestDHT(t, bootstrapPeers...) - pkarrService, err := NewPkarrService(&defaultConfig, db, d) + dhtService, err := NewDHTService(&defaultConfig, db, d) require.NoError(t, err) - require.NotEmpty(t, pkarrService) + require.NotEmpty(t, dhtService) - return *pkarrService + return *dhtService } diff --git a/impl/pkg/storage/db/bolt/bolt.go b/impl/pkg/storage/db/bolt/bolt.go index 7141d409..ef4272df 100644 --- a/impl/pkg/storage/db/bolt/bolt.go +++ b/impl/pkg/storage/db/bolt/bolt.go @@ -1,7 +1,10 @@ package bolt import ( + "bytes" "context" + "encoding/binary" + "fmt" "time" "github.com/goccy/go-json" @@ -10,12 +13,14 @@ import ( "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" + "github.com/TBD54566975/did-dht-method/pkg/dht" "github.com/TBD54566975/did-dht-method/pkg/telemetry" ) const ( - pkarrNamespace = "pkarr" + dhtNamespace = "dht" + oldDHTNamespace = "pkarr" + failedNamespace = "failed" ) type Bolt struct { @@ -36,12 +41,54 @@ func NewBolt(path string) (*Bolt, error) { return nil, err } + // Perform the migration + go migrate(db) + return &Bolt{db: db}, nil } +func migrate(db *bolt.DB) { + // Perform the migration within a write transaction + err := db.Update(func(tx *bolt.Tx) error { + // Create the new namespace bucket + newBucket, err := tx.CreateBucketIfNotExists([]byte(dhtNamespace)) + if err != nil { + return fmt.Errorf("failed to create new namespace bucket: %v", err) + } + + // Get the old namespace bucket + oldBucket := tx.Bucket([]byte(oldDHTNamespace)) + if oldBucket == nil { + // If the old namespace bucket doesn't exist, there's nothing to migrate + return nil + } + + // Iterate over the key-value pairs in the old namespace bucket + err = oldBucket.ForEach(func(k, v []byte) error { + // Copy each key-value pair to the new namespace bucket + err = newBucket.Put(k, v) + if err != nil { + return fmt.Errorf("failed to copy key-value pair to new namespace: %v", err) + } + return nil + }) + if err != nil { + return err + } + + return nil + }) + + if err != nil { + logrus.WithError(err).Error("failed to migrate records") + } else { + logrus.Info("migration completed successfully") + } +} + // WriteRecord writes the given record to the storage // TODO: don't overwrite existing records, store unique seq numbers -func (b *Bolt) WriteRecord(ctx context.Context, record pkarr.Record) error { +func (b *Bolt) WriteRecord(ctx context.Context, record dht.BEP44Record) error { ctx, span := telemetry.GetTracer().Start(ctx, "bolt.WriteRecord") defer span.End() @@ -51,15 +98,27 @@ func (b *Bolt) WriteRecord(ctx context.Context, record pkarr.Record) error { return err } - return b.write(ctx, pkarrNamespace, encoded.K, recordBytes) + // write to both the old and new namespaces for now + errOld := b.write(ctx, oldDHTNamespace, record.ID(), recordBytes) + errNew := b.write(ctx, dhtNamespace, record.ID(), recordBytes) + if errOld == nil && errNew == nil { + return nil + } + if errOld != nil && errNew != nil { + return errors.New(fmt.Sprintf("old: %v, new: %v", errOld, errNew)) + } + if errOld != nil { + return errOld + } + return errNew } // ReadRecord reads the record with the given id from the storage -func (b *Bolt) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, error) { +func (b *Bolt) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) { ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ReadRecord") defer span.End() - recordBytes, err := b.read(ctx, pkarrNamespace, encoding.EncodeToString(id)) + recordBytes, err := b.read(ctx, oldDHTNamespace, id) if err != nil { return nil, err } @@ -67,7 +126,7 @@ func (b *Bolt) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, error) return nil, nil } - var b64record base64PkarrRecord + var b64record base64BEP44Record if err = json.Unmarshal(recordBytes, &b64record); err != nil { return nil, err } @@ -81,18 +140,18 @@ func (b *Bolt) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, error) } // ListRecords lists all records in the storage -func (b *Bolt) ListRecords(ctx context.Context, nextPageToken []byte, pagesize int) ([]pkarr.Record, []byte, error) { +func (b *Bolt) ListRecords(ctx context.Context, nextPageToken []byte, pageSize int) ([]dht.BEP44Record, []byte, error) { ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ListRecords") defer span.End() - boltRecords, err := b.readSeveral(ctx, pkarrNamespace, nextPageToken, pagesize) + boltRecords, err := b.readSeveral(ctx, oldDHTNamespace, nextPageToken, pageSize) if err != nil { return nil, nil, err } - var records []pkarr.Record + var records []dht.BEP44Record for _, recordBytes := range boltRecords { - var encodedRecord base64PkarrRecord + var encodedRecord base64BEP44Record if err = json.Unmarshal(recordBytes.value, &encodedRecord); err != nil { return nil, nil, err } @@ -105,7 +164,7 @@ func (b *Bolt) ListRecords(ctx context.Context, nextPageToken []byte, pagesize i records = append(records, *record) } - if len(boltRecords) == pagesize { + if len(boltRecords) == pageSize { nextPageToken = boltRecords[len(boltRecords)-1].key } else { nextPageToken = nil @@ -202,16 +261,85 @@ func (b *Bolt) readSeveral(ctx context.Context, namespace string, after []byte, return result, err } -// RecordCount returns the number of records in the storage for the pkarr namespace +// RecordCount returns the number of records in the storage for the mainline namespace func (b *Bolt) RecordCount(ctx context.Context) (int, error) { _, span := telemetry.GetTracer().Start(ctx, "bolt.RecordCount") defer span.End() var count int err := b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(pkarrNamespace)) + bucket := tx.Bucket([]byte(oldDHTNamespace)) + if bucket == nil { + logrus.WithContext(ctx).WithField("namespace", oldDHTNamespace).Warn("namespace does not exist") + return nil + } + count = bucket.Stats().KeyN + return nil + }) + return count, err +} + +func (b *Bolt) WriteFailedRecord(ctx context.Context, id string) error { + _, span := telemetry.GetTracer().Start(ctx, "bolt.WriteFailedRecord") + defer span.End() + + return b.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte(failedNamespace)) + if err != nil { + return err + } + + var count int32 = 1 + v := bucket.Get([]byte(id)) + if v != nil { + if err = json.Unmarshal(v, &count); err != nil { + return err + } + count++ + } + + buf := new(bytes.Buffer) + if err = binary.Write(buf, binary.LittleEndian, count); err != nil { + return err + } + return bucket.Put([]byte(id), buf.Bytes()) + }) +} + +func (b *Bolt) ListFailedRecords(ctx context.Context) ([]dht.FailedRecord, error) { + _, span := telemetry.GetTracer().Start(ctx, "bolt.ListFailedRecords") + defer span.End() + + var result []dht.FailedRecord + err := b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(failedNamespace)) + if bucket == nil { + logrus.WithField("namespace", failedNamespace).Warn("namespace does not exist") + return nil + } + + cursor := bucket.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + var count int + if err := binary.Read(bytes.NewReader(v), binary.LittleEndian, &count); err != nil { + return err + } + result = append(result, dht.FailedRecord{ID: string(k), Count: count}) + } + return nil + }) + return result, err +} + +func (b *Bolt) FailedRecordCount(ctx context.Context) (int, error) { + _, span := telemetry.GetTracer().Start(ctx, "bolt.FailedRecordCount") + defer span.End() + + var count int + err := b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(failedNamespace)) if bucket == nil { - logrus.WithContext(ctx).WithField("namespace", pkarrNamespace).Warn("namespace does not exist") + logrus.WithField("namespace", failedNamespace).Warn("namespace does not exist") return nil } count = bucket.Stats().KeyN diff --git a/impl/pkg/storage/db/bolt/bolt_test.go b/impl/pkg/storage/db/bolt/bolt_test.go index 8a015869..7d9ee485 100644 --- a/impl/pkg/storage/db/bolt/bolt_test.go +++ b/impl/pkg/storage/db/bolt/bolt_test.go @@ -7,12 +7,11 @@ import ( "github.com/goccy/go-json" - "github.com/TBD54566975/did-dht-method/internal/did" - "github.com/TBD54566975/did-dht-method/pkg/dht" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/TBD54566975/did-dht-method/internal/did" + "github.com/TBD54566975/did-dht-method/pkg/dht" ) func TestBoltDB_ReadWrite(t *testing.T) { @@ -40,17 +39,17 @@ func TestBoltDB_ReadWrite(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, players1, players1Result) - // get a value from a dhtNamespace that doesn't exist + // get a value from a oldDHTNamespace that doesn't exist res, err := db.read(ctx, "bad", "worse") assert.NoError(t, err) assert.Empty(t, res) - // get a value that doesn't exist in the dhtNamespace + // get a value that doesn't exist in the oldDHTNamespace noValue, err := db.read(ctx, namespace, "Porsche") assert.NoError(t, err) assert.Empty(t, noValue) - // create a second value in the dhtNamespace + // create a second value in the oldDHTNamespace team2 := "McLaren" players2 := []string{"Lando Norris", "Daniel Ricciardo"} p2Bytes, err := json.Marshal(players2) @@ -59,7 +58,7 @@ func TestBoltDB_ReadWrite(t *testing.T) { err = db.write(ctx, namespace, team2, p2Bytes) assert.NoError(t, err) - // get all values from the dhtNamespace + // get all values from the oldDHTNamespace gotAll, err := db.readAll(namespace) assert.NoError(t, err) assert.True(t, len(gotAll) == 2) @@ -123,16 +122,16 @@ func TestReadWrite(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) - r := pkarr.RecordFromBEP44(putMsg) + r := dht.RecordFromBEP44(putMsg) err = db.WriteRecord(ctx, r) require.NoError(t, err) - r2, err := db.ReadRecord(ctx, r.Key[:]) + r2, err := db.ReadRecord(ctx, r.ID()) require.NoError(t, err) assert.Equal(t, r.Key, r2.Key) @@ -168,12 +167,12 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) // create record - record := pkarr.RecordFromBEP44(putMsg) + record := dht.RecordFromBEP44(putMsg) err = db.WriteRecord(ctx, record) assert.NoError(t, err) @@ -189,12 +188,12 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) // create eleventhRecord - eleventhRecord := pkarr.RecordFromBEP44(putMsg) + eleventhRecord := dht.RecordFromBEP44(putMsg) err = db.WriteRecord(ctx, eleventhRecord) assert.NoError(t, err) diff --git a/impl/pkg/storage/db/bolt/pkarr.go b/impl/pkg/storage/db/bolt/dht.go similarity index 67% rename from impl/pkg/storage/db/bolt/pkarr.go rename to impl/pkg/storage/db/bolt/dht.go index 97f30040..917094d1 100644 --- a/impl/pkg/storage/db/bolt/pkarr.go +++ b/impl/pkg/storage/db/bolt/dht.go @@ -6,14 +6,14 @@ import ( "github.com/TBD54566975/ssi-sdk/util" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" + "github.com/TBD54566975/did-dht-method/pkg/dht" ) var ( encoding = base64.RawURLEncoding ) -type base64PkarrRecord struct { +type base64BEP44Record struct { // Up to an 1000 byte base64URL encoded string V string `json:"v" validate:"required"` // 32 byte base64URL encoded string @@ -23,8 +23,8 @@ type base64PkarrRecord struct { Seq int64 `json:"seq" validate:"required"` } -func encodeRecord(r pkarr.Record) base64PkarrRecord { - return base64PkarrRecord{ +func encodeRecord(r dht.BEP44Record) base64BEP44Record { + return base64BEP44Record{ V: encoding.EncodeToString(r.Value[:]), K: encoding.EncodeToString(r.Key[:]), Sig: encoding.EncodeToString(r.Signature[:]), @@ -32,23 +32,23 @@ func encodeRecord(r pkarr.Record) base64PkarrRecord { } } -func (b base64PkarrRecord) Decode() (*pkarr.Record, error) { +func (b base64BEP44Record) Decode() (*dht.BEP44Record, error) { v, err := encoding.DecodeString(b.V) if err != nil { - return nil, fmt.Errorf("error parsing pkarr value field: %v", err) + return nil, fmt.Errorf("error parsing bep44 value field: %v", err) } k, err := encoding.DecodeString(b.K) if err != nil { - return nil, fmt.Errorf("error parsing pkarr key field: %v", err) + return nil, fmt.Errorf("error parsing bep44 key field: %v", err) } sig, err := encoding.DecodeString(b.Sig) if err != nil { - return nil, fmt.Errorf("error parsing pkarr sig field: %v", err) + return nil, fmt.Errorf("error parsing bep44 sig field: %v", err) } - record, err := pkarr.NewRecord(k, v, sig, b.Seq) + record, err := dht.NewBEP44Record(k, v, sig, b.Seq) if err != nil { // TODO: do something useful if this happens return nil, util.LoggingErrorMsg(err, "error loading record from database, skipping") diff --git a/impl/pkg/storage/db/postgres/migrations/00001_create_dht_records_table.sql b/impl/pkg/storage/db/postgres/migrations/00001_create_dht_records_table.sql new file mode 100644 index 00000000..f2ee580e --- /dev/null +++ b/impl/pkg/storage/db/postgres/migrations/00001_create_dht_records_table.sql @@ -0,0 +1,17 @@ +-- +goose Up +CREATE TABLE dht_records ( + id SERIAL PRIMARY KEY, + key BYTEA UNIQUE NOT NULL, + value BYTEA NOT NULL, + sig BYTEA NOT NULL, + seq BIGINT NOT NULL +); + +CREATE TABLE failed_records ( + id BYTEA PRIMARY KEY, + failure_count INTEGER NOT NULL +); + +-- +goose Down +DROP TABLE failed_records; +DROP TABLE dht_records; \ No newline at end of file diff --git a/impl/pkg/storage/db/postgres/migrations/00001_create_pkarr_records_table.sql b/impl/pkg/storage/db/postgres/migrations/00001_create_pkarr_records_table.sql deleted file mode 100644 index 05273773..00000000 --- a/impl/pkg/storage/db/postgres/migrations/00001_create_pkarr_records_table.sql +++ /dev/null @@ -1,11 +0,0 @@ --- +goose Up -CREATE TABLE pkarr_records ( - id SERIAL PRIMARY KEY, - key BYTEA UNIQUE NOT NULL, - value BYTEA NOT NULL, - sig BYTEA NOT NULL, - seq BIGINT NOT NULL -); - --- +goose Down -DROP TABLE pkarr_records; \ No newline at end of file diff --git a/impl/pkg/storage/db/postgres/models.go b/impl/pkg/storage/db/postgres/models.go index 03958671..a4d02233 100644 --- a/impl/pkg/storage/db/postgres/models.go +++ b/impl/pkg/storage/db/postgres/models.go @@ -4,10 +4,15 @@ package postgres -type PkarrRecord struct { +type DhtRecord struct { ID int32 Key []byte Value []byte Sig []byte Seq int64 } + +type FailedRecord struct { + ID []byte + FailureCount int32 +} diff --git a/impl/pkg/storage/db/postgres/postgres.go b/impl/pkg/storage/db/postgres/postgres.go index bd41e684..365622b4 100644 --- a/impl/pkg/storage/db/postgres/postgres.go +++ b/impl/pkg/storage/db/postgres/postgres.go @@ -10,8 +10,9 @@ import ( _ "github.com/jackc/pgx/v5/stdlib" "github.com/pressly/goose/v3" "github.com/sirupsen/logrus" + "github.com/tv42/zbase32" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" + "github.com/TBD54566975/did-dht-method/pkg/dht" "github.com/TBD54566975/did-dht-method/pkg/telemetry" ) @@ -61,7 +62,7 @@ func (p Postgres) connect(ctx context.Context) (*Queries, *pgx.Conn, error) { return New(conn), conn, nil } -func (p Postgres) WriteRecord(ctx context.Context, record pkarr.Record) error { +func (p Postgres) WriteRecord(ctx context.Context, record dht.BEP44Record) error { ctx, span := telemetry.GetTracer().Start(ctx, "postgres.WriteRecord") defer span.End() @@ -84,7 +85,7 @@ func (p Postgres) WriteRecord(ctx context.Context, record pkarr.Record) error { return nil } -func (p Postgres) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, error) { +func (p Postgres) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) { ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ReadRecord") defer span.End() @@ -94,7 +95,11 @@ func (p Postgres) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, err } defer db.Close(ctx) - row, err := queries.ReadRecord(ctx, id) + decodedID, err := zbase32.DecodeString(id) + if err != nil { + return nil, err + } + row, err := queries.ReadRecord(ctx, decodedID) if err != nil { return nil, err } @@ -107,7 +112,7 @@ func (p Postgres) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, err return record, nil } -func (p Postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit int) ([]pkarr.Record, []byte, error) { +func (p Postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit int) ([]dht.BEP44Record, []byte, error) { ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ListRecords") defer span.End() @@ -117,7 +122,7 @@ func (p Postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit i } defer db.Close(ctx) - var rows []PkarrRecord + var rows []DhtRecord if nextPageToken == nil { rows, err = queries.ListRecordsFirstPage(ctx, int32(limit)) } else { @@ -130,9 +135,9 @@ func (p Postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit i return nil, nil, err } - var records []pkarr.Record + var records []dht.BEP44Record for _, row := range rows { - record, err := pkarr.NewRecord(row.Key, row.Value, row.Sig, row.Seq) + record, err := dht.NewBEP44Record(row.Key, row.Value, row.Sig, row.Seq) if err != nil { // TODO: do something useful if this happens logrus.WithContext(ctx).WithError(err).WithField("record_id", row.ID).Warn("error loading record from database, skipping") @@ -151,13 +156,8 @@ func (p Postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit i return records, nextPageToken, nil } -func (p Postgres) Close() error { - // no-op, postgres connection is closed after each request - return nil -} - -func (row PkarrRecord) Record() (*pkarr.Record, error) { - return pkarr.NewRecord(row.Key, row.Value, row.Sig, row.Seq) +func (row DhtRecord) Record() (*dht.BEP44Record, error) { + return dht.NewBEP44Record(row.Key, row.Value, row.Sig, row.Seq) } func (p Postgres) RecordCount(ctx context.Context) (int, error) { @@ -177,3 +177,73 @@ func (p Postgres) RecordCount(ctx context.Context) (int, error) { return int(count), nil } + +func (p Postgres) WriteFailedRecord(ctx context.Context, id string) error { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.WriteFailedRecord") + defer span.End() + + queries, db, err := p.connect(ctx) + if err != nil { + return err + } + defer db.Close(ctx) + + err = queries.WriteFailedRecord(ctx, WriteFailedRecordParams{ + ID: []byte(id), + FailureCount: 1, + }) + if err != nil { + return err + } + + return nil +} + +func (p Postgres) ListFailedRecords(ctx context.Context) ([]dht.FailedRecord, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ListFailedRecords") + defer span.End() + + queries, db, err := p.connect(ctx) + if err != nil { + return nil, err + } + defer db.Close(ctx) + + rows, err := queries.ListFailedRecords(ctx) + if err != nil { + return nil, err + } + + var failedRecords []dht.FailedRecord + for _, row := range rows { + failedRecords = append(failedRecords, dht.FailedRecord{ + ID: string(row.ID), + Count: int(row.FailureCount), + }) + } + + return failedRecords, nil +} + +func (p Postgres) FailedRecordCount(ctx context.Context) (int, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.FailedRecordCount") + defer span.End() + + queries, db, err := p.connect(ctx) + if err != nil { + return 0, err + } + defer db.Close(ctx) + + count, err := queries.FailedRecordCount(ctx) + if err != nil { + return 0, err + } + + return int(count), nil +} + +func (p Postgres) Close() error { + // no-op, postgres connection is closed after each request + return nil +} diff --git a/impl/pkg/storage/db/postgres/postgres_test.go b/impl/pkg/storage/db/postgres/postgres_test.go index 9bee06bb..028146d0 100644 --- a/impl/pkg/storage/db/postgres/postgres_test.go +++ b/impl/pkg/storage/db/postgres/postgres_test.go @@ -3,7 +3,6 @@ package postgres_test import ( "context" "net/url" - "os" "testing" "github.com/stretchr/testify/assert" @@ -11,13 +10,12 @@ import ( "github.com/TBD54566975/did-dht-method/internal/did" "github.com/TBD54566975/did-dht-method/pkg/dht" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" "github.com/TBD54566975/did-dht-method/pkg/storage" "github.com/TBD54566975/did-dht-method/pkg/storage/db/postgres" ) func getTestDB(t *testing.T) storage.Storage { - uri := os.Getenv("TEST_DB") + uri := "postgres://postgres:a@127.0.0.1:5432/postgres" // os.Getenv("TEST_DB") if uri == "" { t.SkipNow() } @@ -50,16 +48,16 @@ func TestReadWrite(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) - r := pkarr.RecordFromBEP44(putMsg) + r := dht.RecordFromBEP44(putMsg) err = db.WriteRecord(ctx, r) require.NoError(t, err) - r2, err := db.ReadRecord(ctx, r.Key[:]) + r2, err := db.ReadRecord(ctx, r.ID()) require.NoError(t, err) assert.Equal(t, r.Key, r2.Key) @@ -95,12 +93,12 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) // create record - record := pkarr.RecordFromBEP44(putMsg) + record := dht.RecordFromBEP44(putMsg) err = db.WriteRecord(ctx, record) assert.NoError(t, err) @@ -116,12 +114,12 @@ func TestDBPagination(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, packet) - putMsg, err := dht.CreatePkarrPublishRequest(sk, *packet) + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) require.NoError(t, err) require.NotEmpty(t, putMsg) // create eleventhRecord - eleventhRecord := pkarr.RecordFromBEP44(putMsg) + eleventhRecord := dht.RecordFromBEP44(putMsg) err = db.WriteRecord(ctx, eleventhRecord) assert.NoError(t, err) diff --git a/impl/pkg/storage/db/postgres/queries.sql.go b/impl/pkg/storage/db/postgres/queries.sql.go index 3687e39a..9fdfde12 100644 --- a/impl/pkg/storage/db/postgres/queries.sql.go +++ b/impl/pkg/storage/db/postgres/queries.sql.go @@ -9,8 +9,43 @@ import ( "context" ) +const failedRecordCount = `-- name: FailedRecordCount :one +SELECT count(*) AS exact_count FROM failed_records +` + +func (q *Queries) FailedRecordCount(ctx context.Context) (int64, error) { + row := q.db.QueryRow(ctx, failedRecordCount) + var exact_count int64 + err := row.Scan(&exact_count) + return exact_count, err +} + +const listFailedRecords = `-- name: ListFailedRecords :many +SELECT id, failure_count FROM failed_records +` + +func (q *Queries) ListFailedRecords(ctx context.Context) ([]FailedRecord, error) { + rows, err := q.db.Query(ctx, listFailedRecords) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FailedRecord + for rows.Next() { + var i FailedRecord + if err := rows.Scan(&i.ID, &i.FailureCount); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listRecords = `-- name: ListRecords :many -SELECT id, key, value, sig, seq FROM pkarr_records WHERE id > (SELECT id FROM pkarr_records WHERE pkarr_records.key = $1) ORDER BY id ASC LIMIT $2 +SELECT id, key, value, sig, seq FROM dht_records WHERE id > (SELECT id FROM dht_records WHERE dht_records.key = $1) ORDER BY id ASC LIMIT $2 ` type ListRecordsParams struct { @@ -18,15 +53,15 @@ type ListRecordsParams struct { Limit int32 } -func (q *Queries) ListRecords(ctx context.Context, arg ListRecordsParams) ([]PkarrRecord, error) { +func (q *Queries) ListRecords(ctx context.Context, arg ListRecordsParams) ([]DhtRecord, error) { rows, err := q.db.Query(ctx, listRecords, arg.Key, arg.Limit) if err != nil { return nil, err } defer rows.Close() - var items []PkarrRecord + var items []DhtRecord for rows.Next() { - var i PkarrRecord + var i DhtRecord if err := rows.Scan( &i.ID, &i.Key, @@ -45,18 +80,18 @@ func (q *Queries) ListRecords(ctx context.Context, arg ListRecordsParams) ([]Pka } const listRecordsFirstPage = `-- name: ListRecordsFirstPage :many -SELECT id, key, value, sig, seq FROM pkarr_records ORDER BY id ASC LIMIT $1 +SELECT id, key, value, sig, seq FROM dht_records ORDER BY id ASC LIMIT $1 ` -func (q *Queries) ListRecordsFirstPage(ctx context.Context, limit int32) ([]PkarrRecord, error) { +func (q *Queries) ListRecordsFirstPage(ctx context.Context, limit int32) ([]DhtRecord, error) { rows, err := q.db.Query(ctx, listRecordsFirstPage, limit) if err != nil { return nil, err } defer rows.Close() - var items []PkarrRecord + var items []DhtRecord for rows.Next() { - var i PkarrRecord + var i DhtRecord if err := rows.Scan( &i.ID, &i.Key, @@ -75,12 +110,12 @@ func (q *Queries) ListRecordsFirstPage(ctx context.Context, limit int32) ([]Pkar } const readRecord = `-- name: ReadRecord :one -SELECT id, key, value, sig, seq FROM pkarr_records WHERE key = $1 LIMIT 1 +SELECT id, key, value, sig, seq FROM dht_records WHERE key = $1 LIMIT 1 ` -func (q *Queries) ReadRecord(ctx context.Context, key []byte) (PkarrRecord, error) { +func (q *Queries) ReadRecord(ctx context.Context, key []byte) (DhtRecord, error) { row := q.db.QueryRow(ctx, readRecord, key) - var i PkarrRecord + var i DhtRecord err := row.Scan( &i.ID, &i.Key, @@ -92,7 +127,7 @@ func (q *Queries) ReadRecord(ctx context.Context, key []byte) (PkarrRecord, erro } const recordCount = `-- name: RecordCount :one -SELECT count(*) AS exact_count FROM pkarr_records +SELECT count(*) AS exact_count FROM dht_records ` func (q *Queries) RecordCount(ctx context.Context) (int64, error) { @@ -102,8 +137,24 @@ func (q *Queries) RecordCount(ctx context.Context) (int64, error) { return exact_count, err } +const writeFailedRecord = `-- name: WriteFailedRecord :exec +INSERT INTO failed_records(id, failure_count) +VALUES($1, $2) +ON CONFLICT (id) DO UPDATE SET failure_count = failed_records.failure_count + 1 +` + +type WriteFailedRecordParams struct { + ID []byte + FailureCount int32 +} + +func (q *Queries) WriteFailedRecord(ctx context.Context, arg WriteFailedRecordParams) error { + _, err := q.db.Exec(ctx, writeFailedRecord, arg.ID, arg.FailureCount) + return err +} + const writeRecord = `-- name: WriteRecord :exec -INSERT INTO pkarr_records(key, value, sig, seq) VALUES($1, $2, $3, $4) +INSERT INTO dht_records(key, value, sig, seq) VALUES($1, $2, $3, $4) ` type WriteRecordParams struct { diff --git a/impl/pkg/storage/db/postgres/queries/queries.sql b/impl/pkg/storage/db/postgres/queries/queries.sql index 8ae96ea3..74153618 100644 --- a/impl/pkg/storage/db/postgres/queries/queries.sql +++ b/impl/pkg/storage/db/postgres/queries/queries.sql @@ -1,14 +1,25 @@ -- name: WriteRecord :exec -INSERT INTO pkarr_records(key, value, sig, seq) VALUES($1, $2, $3, $4); +INSERT INTO dht_records(key, value, sig, seq) VALUES($1, $2, $3, $4); -- name: ReadRecord :one -SELECT * FROM pkarr_records WHERE key = $1 LIMIT 1; +SELECT * FROM dht_records WHERE key = $1 LIMIT 1; -- name: ListRecords :many -SELECT * FROM pkarr_records WHERE id > (SELECT id FROM pkarr_records WHERE pkarr_records.key = $1) ORDER BY id ASC LIMIT $2; +SELECT * FROM dht_records WHERE id > (SELECT id FROM dht_records WHERE dht_records.key = $1) ORDER BY id ASC LIMIT $2; -- name: ListRecordsFirstPage :many -SELECT * FROM pkarr_records ORDER BY id ASC LIMIT $1; +SELECT * FROM dht_records ORDER BY id ASC LIMIT $1; -- name: RecordCount :one -SELECT count(*) AS exact_count FROM pkarr_records; +SELECT count(*) AS exact_count FROM dht_records; + +-- name: WriteFailedRecord :exec +INSERT INTO failed_records(id, failure_count) +VALUES($1, $2) +ON CONFLICT (id) DO UPDATE SET failure_count = failed_records.failure_count + 1; + +-- name: ListFailedRecords :many +SELECT * FROM failed_records; + +-- name: FailedRecordCount :one +SELECT count(*) AS exact_count FROM failed_records; \ No newline at end of file diff --git a/impl/pkg/storage/storage.go b/impl/pkg/storage/storage.go index c8e460f0..daf4b2d2 100644 --- a/impl/pkg/storage/storage.go +++ b/impl/pkg/storage/storage.go @@ -8,16 +8,21 @@ import ( "github.com/sirupsen/logrus" - "github.com/TBD54566975/did-dht-method/pkg/pkarr" + "github.com/TBD54566975/did-dht-method/pkg/dht" "github.com/TBD54566975/did-dht-method/pkg/storage/db/bolt" "github.com/TBD54566975/did-dht-method/pkg/storage/db/postgres" ) type Storage interface { - WriteRecord(ctx context.Context, record pkarr.Record) error - ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, error) - ListRecords(ctx context.Context, nextPageToken []byte, pageSize int) (records []pkarr.Record, nextPage []byte, err error) + WriteRecord(ctx context.Context, record dht.BEP44Record) error + ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) + ListRecords(ctx context.Context, nextPageToken []byte, pageSize int) (records []dht.BEP44Record, nextPage []byte, err error) RecordCount(ctx context.Context) (int, error) + + WriteFailedRecord(ctx context.Context, id string) error + ListFailedRecords(ctx context.Context) ([]dht.FailedRecord, error) + FailedRecordCount(ctx context.Context) (int, error) + Close() error } diff --git a/impl/pkg/telemetry/telemetry.go b/impl/pkg/telemetry/telemetry.go index ec1d4247..a4417746 100644 --- a/impl/pkg/telemetry/telemetry.go +++ b/impl/pkg/telemetry/telemetry.go @@ -57,7 +57,7 @@ func SetupTelemetry(ctx context.Context) error { otel.SetMeterProvider(meterProvider) // setup memory metrics - err = runtime.Start(runtime.WithMeterProvider(meterProvider), runtime.WithMinimumReadMemStatsInterval(time.Second*15)) + err = runtime.Start(runtime.WithMeterProvider(meterProvider), runtime.WithMinimumReadMemStatsInterval(15*time.Second)) if err != nil { return err } diff --git a/impl/sqlc.yaml b/impl/sqlc.yaml new file mode 100644 index 00000000..b36ecbd9 --- /dev/null +++ b/impl/sqlc.yaml @@ -0,0 +1,10 @@ +version: "2" +sql: + - engine: "postgresql" + queries: "pkg/storage/db/postgres/queries" + schema: "pkg/storage/db/postgres/migrations" + gen: + go: + package: "postgres" + out: "pkg/storage/db/postgres" + sql_package: "pgx/v5" \ No newline at end of file diff --git a/sqlc.yaml b/sqlc.yaml deleted file mode 100644 index 763be2bd..00000000 --- a/sqlc.yaml +++ /dev/null @@ -1,10 +0,0 @@ -version: "2" -sql: - - engine: "postgresql" - queries: "impl/pkg/storage/db/postgres/queries" - schema: "impl/pkg/storage/db/postgres/migrations" - gen: - go: - package: "postgres" - out: "impl/pkg/storage/db/postgres" - sql_package: "pgx/v5" \ No newline at end of file