Skip to content

Commit

Permalink
Multiple improvements to the reporting service
Browse files Browse the repository at this point in the history
  • Loading branch information
fridrik01 committed Nov 25, 2024
1 parent 64a27ae commit 7259475
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 34 deletions.
4 changes: 3 additions & 1 deletion beacond/cmd/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func DefaultComponents() []any {
*BeaconBlockHeader, *BeaconState, *BeaconStateMarshallable,
*ExecutionPayload, *ExecutionPayloadHeader, *KVStore, *Logger,
],
components.ProvideReportingService[*ExecutionPayload, *PayloadAttributes, *Logger],
components.ProvideReportingService[
*ExecutionPayload, *PayloadAttributes, *Logger,
],
components.ProvideCometBFTService[*Logger],
components.ProvideServiceRegistry[
*AvailabilityStore,
Expand Down
5 changes: 4 additions & 1 deletion beacond/cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ type (
NodeAPIServer = server.Server[NodeAPIContext]

// ReportingService is a type alias for the reporting service.
ReportingService = version.ReportingService[*ExecutionPayload, *PayloadAttributes]
ReportingService = version.ReportingService[
*ExecutionPayload,
*PayloadAttributes,
]

// SidecarFactory is a type alias for the sidecar factory.
SidecarFactory = dablob.SidecarFactory[
Expand Down
21 changes: 18 additions & 3 deletions mod/execution/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"math/big"
"strings"
"sync"
"time"

"github.com/berachain/beacon-kit/mod/errors"
Expand All @@ -49,8 +50,12 @@ type EngineClient[
eth1ChainID *big.Int
// clientMetrics is the metrics for the engine client.
metrics *clientMetrics
// capabilities is a map of capabilities that the execution client has.
capabilities map[string]struct{}
// Capabilities is a map of capabilities that the execution client has.
Capabilities map[string]struct{}
// connected will be set to true when we have successfully connected
// to the execution client.
mutex sync.RWMutex
connected bool
}

// New creates a new engine client EngineClient.
Expand Down Expand Up @@ -79,9 +84,10 @@ func New[
cfg.RPCJWTRefreshInterval,
),
)),
capabilities: make(map[string]struct{}),
Capabilities: make(map[string]struct{}),
eth1ChainID: eth1ChainID,
metrics: newClientMetrics(telemetrySink, logger),
connected: false,
}
}

Expand Down Expand Up @@ -130,11 +136,20 @@ func (s *EngineClient[
}
continue
}
s.mutex.Lock()
s.connected = true
s.mutex.Unlock()
return nil
}
}
}

func (s *EngineClient[_, _]) IsConnected() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.connected
}

/* -------------------------------------------------------------------------- */
/* Helpers */
/* -------------------------------------------------------------------------- */
Expand Down
4 changes: 2 additions & 2 deletions mod/execution/pkg/client/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ func (s *EngineClient[
// Capture and log the capabilities that the execution client has.
for _, capability := range result {
s.logger.Info("Exchanged capability", "capability", capability)
s.capabilities[capability] = struct{}{}
s.Capabilities[capability] = struct{}{}
}

// Log the capabilities that the execution client does not have.
for _, capability := range ethclient.BeaconKitSupportedCapabilities() {
if _, exists := s.capabilities[capability]; !exists {
if _, exists := s.Capabilities[capability]; !exists {
s.logger.Warn(
"Your execution client may require an update 🚸",
"unsupported_capability", capability,
Expand Down
4 changes: 2 additions & 2 deletions mod/execution/pkg/client/ethclient/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ func (s *Client[ExecutionPayloadT]) GetClientVersionV1(
result := make([]engineprimitives.ClientVersionV1, 0)

// NOTE: although the ethereum spec does not require us passing a
// clientversion as param, it seems some clients require it and even enfore
// a valid Code.
// clientversion as param, it seems some clients require it and even
// enforce a valid Code.
if err := s.Call(
ctx, &result, GetClientVersionV1, engine.ClientVersionV1{Code: "GE"},
); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion mod/node-core/pkg/components/service_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ type ServiceRegistryInput[
]
Logger LoggerT
NodeAPIServer *server.Server[NodeAPIContextT]
ReportingService *version.ReportingService[ExecutionPayloadT, *engineprimitives.PayloadAttributes[WithdrawalT]]
ReportingService *version.ReportingService[
ExecutionPayloadT,
*engineprimitives.PayloadAttributes[WithdrawalT],
]
TelemetrySink *metrics.TelemetrySink
TelemetryService *telemetry.Service
ValidatorService *validator.Service[
Expand Down
118 changes: 94 additions & 24 deletions mod/node-core/pkg/services/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
"runtime"
"time"

engineprimitives "github.com/berachain/beacon-kit/mod/engine-primitives/pkg/engine-primitives"
"github.com/berachain/beacon-kit/mod/errors"
"github.com/berachain/beacon-kit/mod/execution/pkg/client"
"github.com/berachain/beacon-kit/mod/execution/pkg/client/ethclient"
"github.com/berachain/beacon-kit/mod/log"
"github.com/berachain/beacon-kit/mod/primitives/pkg/constraints"
)
Expand Down Expand Up @@ -83,68 +86,135 @@ func (*ReportingService[_, _]) Name() string {

// Start begins the periodic logging of the chain version.
func (rs *ReportingService[_, _]) Start(ctx context.Context) error {
ticker := time.NewTicker(rs.reportingInterval)
rs.handleReport(ctx)
// we print to console always at the beginning
rs.printToConsole(engineprimitives.ClientVersionV1{
Version: "unknown",
Name: "unknown"},
)

connectedTicker := time.NewTicker(time.Second)
go func() {
// wait until the client is connected
connected := false
for !connected {
select {
case <-connectedTicker.C:
connected = rs.client.IsConnected()
case <-ctx.Done():
connectedTicker.Stop()
return
}
}
connectedTicker.Stop()

rs.logger.Info("Connected to execution client")

// log telemetry immediately after we are connected
ethVersion, err := rs.GetEthVersion(ctx)
if err != nil {
rs.logger.Warn("Failed to get eth version", "err", err)
}
rs.logTelemetry(ethVersion)

// then we start reporting at the reportingInterval interval
ticker := time.NewTicker(rs.reportingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rs.handleReport(ctx)
// since the eth client can be updated separately for beacon node
// we need to fetch the version every time
ethVersion, err = rs.GetEthVersion(ctx)
if err != nil {
rs.logger.Warn("Failed to get eth version", "err", err)
}

// print to console and log telemetry
rs.printToConsole(ethVersion)
rs.logTelemetry(ethVersion)

continue
case <-ctx.Done():
return
}
}
}()

return nil
}

func (rs *ReportingService[_, _]) handleReport(ctx context.Context) {
systemInfo := runtime.GOOS + "/" + runtime.GOARCH

func (rs *ReportingService[_, _]) printToConsole(
ethClient engineprimitives.ClientVersionV1) {
rs.logger.Info(fmt.Sprintf(`
+==========================================================================+
+ ⭐️ Star BeaconKit on GitHub @ https://github.com/berachain/beacon-kit +
+ 🧩 Your node is running version: %-40s+
+ ♦ Eth client: %-59s+
+ 💾 Your system: %-57s+
+ 🦺 Please report issues @ https://github.com/berachain/beacon-kit/issues +
+==========================================================================+
`,
rs.version,
systemInfo,
fmt.Sprintf("%s (version: %s)", ethClient.Name, ethClient.Version),
runtime.GOOS+"/"+runtime.GOARCH,
))
}

// TODO: Delete this counter as it should be included in the new beacon_kit.runtime.version metric.
func (rs *ReportingService[_, _]) GetEthVersion(
ctx context.Context) (engineprimitives.ClientVersionV1, error) {
ethVersion := engineprimitives.ClientVersionV1{
Version: "unknown",
Name: "unknown",
}

if _, ok := rs.client.Capabilities[ethclient.GetClientVersionV1]; ok {
// Get the client version from the execution layer.
info, err := rs.client.GetClientVersionV1(ctx)
if err != nil {
return ethVersion, fmt.Errorf("failed to get client version: %w", err)
}

// the spec says we should have at least one client version
if len(info) == 0 {
return ethVersion, errors.New("no client version returned")
}

ethVersion.Version = info[0].Version
ethVersion.Name = info[0].Name
} else {
rs.logger.Warn("Client does not have capability to get client version")
}

return ethVersion, nil
}

func (rs *ReportingService[_, _]) logTelemetry(
ethVersion engineprimitives.ClientVersionV1) {
systemInfo := runtime.GOOS + "/" + runtime.GOARCH

// TODO: Delete this counter as it should be included in the new
// beacon_kit.runtime.version metric.
rs.sink.IncrementCounter(
"beacon_kit.runtime.version.reported",
"version", rs.version, "system", systemInfo,
)

// Get the client version from the execution layer.
info, err := rs.client.GetClientVersionV1(ctx)
if err != nil {
rs.logger.Error("Failed to get client version", "err", err)
return
}
rs.logger.Info("GetClientVersionV1", "info", info)

// the spec says we should have at least one client version
if len(info) == 0 {
rs.logger.Warn("No client version returned")
return
}
rs.logger.Info("Reporting version", "version", rs.version,
"system", systemInfo,
"eth_version", ethVersion.Version,
"eth_name", ethVersion.Name)

// Report the version to the telemetry sink and include labels for beacon node version and eth name and version
// Report the version to the telemetry sink and include labels
// for beacon node version and eth name and version
var args = [8]string{
"version", rs.version,
"system", systemInfo,
"eth_version", info[0].Version,
"eth_name", info[0].Name,
"eth_version", ethVersion.Version,
"eth_name", ethVersion.Name,
}
rs.sink.SetGauge("beacon_kit.runtime.version", 1, args[:]...)
}

0 comments on commit 7259475

Please sign in to comment.