Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into 5089-move-key-to-c…
Browse files Browse the repository at this point in the history
…onfig
  • Loading branch information
fasmat committed Feb 28, 2024
2 parents 23366a6 + 432019e commit f1ce011
Show file tree
Hide file tree
Showing 24 changed files with 985 additions and 142 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ and permanent ineligibility for rewards.

* [#5418](https://github.com/spacemeshos/go-spacemesh/pull/5418) Add `grpc-post-listener` to separate post service from
`grpc-private-listener` and not require mTLS for the post service.
* [#5465](https://github.com/spacemeshos/go-spacemesh/pull/5465)
Add an option to cache SQL query results. This is useful for nodes with high peer counts.

If you are not using a remote post service you do not need to adjust anything. If you are using a remote setup
make sure your post service now connects to `grpc-post-listener` instead of `grpc-private-listener`. If you are
Expand Down Expand Up @@ -274,6 +276,7 @@ and permanent ineligibility for rewards.

### Improvements

>>>>>>> origin/develop
* [#5467](https://github.com/spacemeshos/go-spacemesh/pull/5467)
Fix a bug that could cause ATX sync to stall because of exhausted limit of concurrent requests for dependencies.
Fetching dependencies of an ATX is not limited anymore.
Expand Down
1 change: 1 addition & 0 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,7 @@ func TestGetPositioningAtxDbFailed(t *testing.T) {
sig := maps.Values(tab.signers)[0]

db := datastoremocks.NewMockExecutor(tab.mctrl)
db.EXPECT().QueryCache().Return(sql.NullQueryCache)
tab.Builder.cdb = datastore.NewCachedDB(db, logtest.New(t))
expected := errors.New("db error")
db.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any()).Return(0, expected)
Expand Down
31 changes: 22 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ type BaseConfig struct {
OptFilterThreshold int `mapstructure:"optimistic-filtering-threshold"`
TickSize uint64 `mapstructure:"tick-size"`

DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabaseSizeMeteringInterval time.Duration `mapstructure:"db-size-metering-interval"`
DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"`
DatabaseVacuumState int `mapstructure:"db-vacuum-state"`
DatabaseSkipMigrations []int `mapstructure:"db-skip-migrations"`
DatabaseConnections int `mapstructure:"db-connections"`
DatabaseLatencyMetering bool `mapstructure:"db-latency-metering"`
DatabaseSizeMeteringInterval time.Duration `mapstructure:"db-size-metering-interval"`
DatabasePruneInterval time.Duration `mapstructure:"db-prune-interval"`
DatabaseVacuumState int `mapstructure:"db-vacuum-state"`
DatabaseSkipMigrations []int `mapstructure:"db-skip-migrations"`
DatabaseQueryCache bool `mapstructure:"db-query-cache"`
DatabaseQueryCacheSizes DatabaseQueryCacheSizes `mapstructure:"db-query-cache-sizes"`

PruneActivesetsFrom types.EpochID `mapstructure:"prune-activesets-from"`

Expand All @@ -141,6 +143,12 @@ type BaseConfig struct {
NoMainOverride bool `mapstructure:"no-main-override"`
}

type DatabaseQueryCacheSizes struct {
EpochATXs int `mapstructure:"epoch-atxs"`
ATXBlob int `mapstructure:"atx-blob"`
ActiveSetBlob int `mapstructure:"active-set-blob"`
}

type DeprecatedPoETServers struct{}

// DeprecatedMsg implements Deprecated interface.
Expand Down Expand Up @@ -222,9 +230,14 @@ func defaultBaseConfig() BaseConfig {
DatabaseConnections: 16,
DatabaseSizeMeteringInterval: 10 * time.Minute,
DatabasePruneInterval: 30 * time.Minute,
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,
DatabaseQueryCacheSizes: DatabaseQueryCacheSizes{
EpochATXs: 20,
ATXBlob: 10000,
ActiveSetBlob: 200,
},
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,
}
}

Expand Down
38 changes: 38 additions & 0 deletions datastore/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type VrfNonceKey struct {
type Executor interface {
sql.Executor
WithTx(context.Context, func(*sql.Tx) error) error
QueryCache() sql.QueryCache
}

// CachedDB is simply a database injected with cache.
type CachedDB struct {
Executor
sql.QueryCache
logger log.Log

// cache is optional
Expand Down Expand Up @@ -108,6 +110,7 @@ func NewCachedDB(db Executor, lg log.Log, opts ...Opt) *CachedDB {

return &CachedDB{
Executor: db,
QueryCache: db.QueryCache(),
atxsdata: o.atxsdata,
logger: lg,
atxHdrCache: atxHdrCache,
Expand Down
103 changes: 64 additions & 39 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
)

const (
fetchSubKey sql.QueryCacheSubKey = "epoch-info-req"
)

type handler struct {
logger log.Log
cdb *datastore.CachedDB
Expand All @@ -38,12 +42,13 @@ func newHandler(
func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ []byte) ([]byte, error) {
nodes, err := identities.GetMalicious(h.cdb)
if err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to get malicious IDs", log.Err(err))
h.logger.With().Warning("serve: failed to get malicious IDs",
log.Context(ctx), log.Err(err))
return nil, err
}
h.logger.WithContext(ctx).
With().
Debug("serve: responded to malicious IDs request", log.Int("num_malicious", len(nodes)))
h.logger.With().
Debug("serve: responded to malicious IDs request",
log.Context(ctx), log.Int("num_malicious", len(nodes)))
malicious := &MaliciousIDs{
NodeIDs: nodes,
}
Expand All @@ -60,23 +65,27 @@ func (h *handler) handleEpochInfoReq(ctx context.Context, msg []byte) ([]byte, e
if err := codec.Decode(msg, &epoch); err != nil {
return nil, err
}
atxids, err := atxs.GetIDsByEpoch(h.cdb, epoch)
if err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to get epoch atx IDs", epoch, log.Err(err))
return nil, err
}
ed := EpochData{
AtxIDs: atxids,
}
h.logger.WithContext(ctx).With().Debug("serve: responded to epoch info request",
epoch,
log.Int("atx_count", len(ed.AtxIDs)),
)
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize epoch atx", epoch, log.Err(err))
}
return bts, nil

cacheKey := sql.QueryCacheKey(atxs.CacheKindEpochATXs, epoch.String())
return sql.WithCachedSubKey(h.cdb, cacheKey, fetchSubKey, func() ([]byte, error) {
atxids, err := atxs.GetIDsByEpoch(h.cdb, epoch)
if err != nil {
h.logger.With().Warning("serve: failed to get epoch atx IDs",
epoch, log.Err(err), log.Context(ctx))
return nil, err
}
ed := EpochData{
AtxIDs: atxids,
}
h.logger.With().Debug("serve: responded to epoch info request",
epoch, log.Context(ctx), log.Int("atx_count", len(ed.AtxIDs)))
bts, err := codec.Encode(&ed)
if err != nil {
h.logger.With().Fatal("serve: failed to serialize epoch atx",
epoch, log.Context(ctx), log.Err(err))
}
return bts, nil
})
}

// handleLayerDataReq returns all data in a layer, described in LayerData.
Expand All @@ -91,13 +100,16 @@ func (h *handler) handleLayerDataReq(ctx context.Context, req []byte) ([]byte, e
}
ld.Ballots, err = ballots.IDsInLayer(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Warning("serve: failed to get layer ballots", lid, log.Err(err))
h.logger.With().Warning("serve: failed to get layer ballots",
lid, log.Err(err), log.Context(ctx))
return nil, err
}

out, err := codec.Encode(&ld)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer data response", log.Err(err))
h.logger.With().Fatal(
"serve: failed to serialize layer data response",
log.Context(ctx), log.Err(err))
}
return out, nil
}
Expand All @@ -121,20 +133,22 @@ func (h *handler) handleLayerOpinionsReq2(ctx context.Context, data []byte) ([]b
opnReqV2.Inc()
lo.PrevAggHash, err = layers.GetAggregatedHash(h.cdb, lid.Sub(1))
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get prev agg hash", lid, log.Err(err))
h.logger.With().Error("serve: failed to get prev agg hash", log.Context(ctx), lid, log.Err(err))
return nil, err
}
bid, err := certificates.CertifiedBlock(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get layer certified block", lid, log.Err(err))
h.logger.With().Error("serve: failed to get layer certified block",
log.Context(ctx), lid, log.Err(err))
return nil, err
}
if err == nil {
lo.Certified = &bid
}
out, err = codec.Encode(&lo)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to serialize layer opinions response", log.Err(err))
h.logger.With().Fatal("serve: failed to serialize layer opinions response",
log.Context(ctx), log.Err(err))
}
return out, nil
}
Expand All @@ -143,15 +157,16 @@ func (h *handler) handleCertReq(ctx context.Context, lid types.LayerID, bid type
certReq.Inc()
certs, err := certificates.Get(h.cdb, lid)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
h.logger.WithContext(ctx).With().Error("serve: failed to get certificate", lid, log.Err(err))
h.logger.With().Error("serve: failed to get certificate", log.Context(ctx), lid, log.Err(err))
return nil, err
}
if err == nil {
for _, cert := range certs {
if cert.Block == bid {
out, err := codec.Encode(cert.Cert)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode cert", log.Err(err))
h.logger.With().Fatal("serve: failed to encode cert",
log.Context(ctx), log.Err(err))
}
return out, nil
}
Expand All @@ -163,7 +178,7 @@ func (h *handler) handleCertReq(ctx context.Context, lid types.LayerID, bid type
func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error) {
var requestBatch RequestBatch
if err := codec.Decode(data, &requestBatch); err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to parse request", log.Err(err))
h.logger.With().Warning("serve: failed to parse request", log.Context(ctx), log.Err(err))
return nil, errBadRequest
}
resBatch := ResponseBatch{
Expand All @@ -176,20 +191,23 @@ func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error
totalHashReqs.WithLabelValues(string(r.Hint)).Add(1)
res, err := h.bs.Get(r.Hint, r.Hash.Bytes())
if err != nil {
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested nonexistent hash",
h.logger.With().Debug("serve: remote peer requested nonexistent hash",
log.Context(ctx),
log.String("hash", r.Hash.ShortString()),
log.String("hint", string(r.Hint)),
log.Err(err))
hashMissing.WithLabelValues(string(r.Hint)).Add(1)
continue
} else if res == nil {
h.logger.WithContext(ctx).With().Debug("serve: remote peer requested golden",
h.logger.With().Debug("serve: remote peer requested golden",
log.Context(ctx),
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
hashEmptyData.WithLabelValues(string(r.Hint)).Add(1)
continue
} else {
h.logger.WithContext(ctx).With().Debug("serve: responded to hash request",
h.logger.With().Debug("serve: responded to hash request",
log.Context(ctx),
log.String("hash", r.Hash.ShortString()),
log.Int("dataSize", len(res)))
}
Expand All @@ -203,11 +221,13 @@ func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error

bts, err := codec.Encode(&resBatch)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode batch id",
h.logger.With().Fatal("serve: failed to encode batch id",
log.Context(ctx),
log.Err(err),
log.String("batch_hash", resBatch.ID.ShortString()))
}
h.logger.WithContext(ctx).With().Debug("serve: returning response for batch",
h.logger.With().Debug("serve: returning response for batch",
log.Context(ctx),
log.String("batch_hash", resBatch.ID.ShortString()),
log.Int("count_responses", len(resBatch.Responses)),
log.Int("data_size", len(bts)))
Expand All @@ -222,23 +242,28 @@ func (h *handler) handleMeshHashReq(ctx context.Context, reqData []byte) ([]byte
err error
)
if err = codec.Decode(reqData, &req); err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to parse mesh hash request", log.Err(err))
h.logger.With().Warning("serve: failed to parse mesh hash request",
log.Context(ctx), log.Err(err))
return nil, errBadRequest
}
if err := req.Validate(); err != nil {
h.logger.WithContext(ctx).With().Debug("failed to validate mesh hash request", log.Err(err))
h.logger.With().Debug("failed to validate mesh hash request",
log.Context(ctx), log.Err(err))
return nil, err
}
hashes, err = layers.GetAggHashes(h.cdb, req.From, req.To, req.Step)
if err != nil {
h.logger.WithContext(ctx).With().Warning("serve: failed to get mesh hashes", log.Err(err))
h.logger.With().Warning("serve: failed to get mesh hashes",
log.Context(ctx), log.Err(err))
return nil, err
}
data, err = codec.EncodeSlice(hashes)
if err != nil {
h.logger.WithContext(ctx).With().Fatal("serve: failed to encode hashes", log.Err(err))
h.logger.With().Fatal("serve: failed to encode hashes",
log.Context(ctx), log.Err(err))
}
h.logger.WithContext(ctx).With().Debug("serve: returning response for mesh hashes",
h.logger.With().Debug("serve: returning response for mesh hashes",
log.Context(ctx),
log.Stringer("layer_from", req.From),
log.Stringer("layer_to", req.To),
log.Uint32("by", req.Step),
Expand Down
Loading

0 comments on commit f1ce011

Please sign in to comment.