diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index 203f6fc5..120e9559 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -12,6 +12,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/config" "github.com/ethersphere/beekeeper/pkg/k8s" "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/stamper" "github.com/ethersphere/beekeeper/pkg/swap" "github.com/go-git/go-billy/v5/memfs" "github.com/go-git/go-git/v5" @@ -54,6 +55,7 @@ type command struct { homeDir string config *config.Config // beekeeper clusters configuration (config dir) k8sClient *k8s.Client // kubernetes client + stamper stamper.Client swapClient swap.Client log logging.Logger } diff --git a/cmd/beekeeper/cmd/stamper.go b/cmd/beekeeper/cmd/stamper.go index fb4f329c..2b61d357 100644 --- a/cmd/beekeeper/cmd/stamper.go +++ b/cmd/beekeeper/cmd/stamper.go @@ -10,11 +10,6 @@ import ( func (c *command) initStamperCmd() (err error) { const ( optionNameNamespace = "namespace" - optionTTLTreshold = "ttl-treshold" - optionTopUpTo = "topup-to" - optionUsageThreshold = "usage-threshold" - optionDiutionDepth = "dilution-depth" - optionPeriodicCheck = "periodic-check" optionNameTimeout = "timeout" optionNameLabelSelector = "label-selector" ) @@ -27,30 +22,126 @@ func (c *command) initStamperCmd() (err error) { namespace := c.globalConfig.GetString(optionNameNamespace) // clusterName := c.globalConfig.GetString(optionNameClusterName) - stamper := stamper.NewClient(&stamper.ClientConfig{ + c.stamper = stamper.NewStamperClient(&stamper.ClientConfig{ Log: c.log, Namespace: namespace, K8sClient: c.k8sClient, LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), + InCluster: c.globalConfig.GetBool(optionNameInCluster), }) - _ = stamper - return }, PreRunE: c.preRunE, } - cmd.Flags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace. Overrides cluster name if set.") - cmd.Flags().String(optionNameClusterName, "", "Name of the Beekeeper cluster to target. Ignored if a namespace is specified, in which case the namespace from the cluster configuration is used.") - cmd.Flags().Duration(optionTTLTreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.") + cmd.PersistentFlags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace. Overrides cluster name if set.") + cmd.PersistentFlags().String(optionNameClusterName, "", "Name of the Beekeeper cluster to target. Ignored if a namespace is specified, in which case the namespace from the cluster configuration is used.") + cmd.PersistentFlags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. Use an empty string to select all resources.") + cmd.PersistentFlags().Duration(optionNameTimeout, 0*time.Minute, "Maximum duration to wait for the operation to complete. Default is no timeout.") + + cmd.AddCommand(c.initStamperTopup()) + cmd.AddCommand(c.initStamperDilute()) + cmd.AddCommand(c.initStamperCreate()) + cmd.AddCommand(c.initStamperSet()) + + c.root.AddCommand(cmd) + + return nil +} + +func (c *command) initStamperTopup() *cobra.Command { + const ( + optionTTLThreshold = "ttl-threshold" + optionTopUpTo = "topup-to" + ) + + cmd := &cobra.Command{ + Use: "topup", + Short: "Top up the TTL of postage batches", + Long: `Top up the TTL of postage batches.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return + }, + } + + cmd.Flags().Duration(optionTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.") cmd.Flags().Duration(optionTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.") + + c.root.AddCommand(cmd) + + return cmd +} + +func (c *command) initStamperDilute() *cobra.Command { + const ( + optionUsageThreshold = "usage-threshold" + optionDiutionDepth = "dilution-depth" + ) + + cmd := &cobra.Command{ + Use: "dilute", + Short: "Dilute postage batches", + Long: `Dilute postage batches.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return + }, + } + cmd.Flags().Float64(optionUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.") cmd.Flags().Uint16(optionDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.") - cmd.Flags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. Use an empty string to select all resources.") - cmd.Flags().Duration(optionNameTimeout, 0*time.Minute, "Maximum duration to wait for the operation to complete. Default is no timeout.") c.root.AddCommand(cmd) - return nil + return cmd +} + +func (c *command) initStamperCreate() *cobra.Command { + const ( + optionNameAmount = "amount" + optionNameDepth = "depth" + ) + + cmd := &cobra.Command{ + Use: "create", + Short: "Create a new postage batch", + Long: `Create a new postage batch.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return + }, + } + + cmd.Flags().Uint64(optionNameAmount, 1000000000, "Amount of tokens to be staked in the postage batch.") + cmd.Flags().Uint8(optionNameDepth, 8, "Depth of the postage batch.") + + c.root.AddCommand(cmd) + + return cmd +} + +func (c *command) initStamperSet() *cobra.Command { + const ( + optionTTLThreshold = "ttl-threshold" + optionTopUpTo = "topup-to" + optionUsageThreshold = "usage-threshold" + optionDiutionDepth = "dilution-depth" + ) + + cmd := &cobra.Command{ + Use: "set", + Short: "Set stamper configuration", + Long: `Set stamper configuration.`, + RunE: func(cmd *cobra.Command, args []string) (err error) { + return + }, + } + + cmd.Flags().Duration(optionTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.") + cmd.Flags().Duration(optionTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.") + cmd.Flags().Float64(optionUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.") + cmd.Flags().Uint16(optionDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.") + + c.root.AddCommand(cmd) + + return cmd } diff --git a/pkg/check/settlements/settlements.go b/pkg/check/settlements/settlements.go index 98fd55c4..cb87d661 100644 --- a/pkg/check/settlements/settlements.go +++ b/pkg/check/settlements/settlements.go @@ -27,7 +27,7 @@ type Options struct { PostageDepth uint64 PostageLabel string Seed int64 - Threshold int64 // balances treshold + Threshold int64 // balances threshold UploadNodeCount int WaitAfterUpload time.Duration // seconds to wait before downloading a file WaitBeforeDownload time.Duration // seconds to wait before downloading a file diff --git a/pkg/funder/operator/operator.go b/pkg/funder/operator/operator.go index 5ac845a8..d7058e1e 100644 --- a/pkg/funder/operator/operator.go +++ b/pkg/funder/operator/operator.go @@ -29,7 +29,7 @@ type ClientConfig struct { type Client struct { *ClientConfig - httpClient http.Client + httpClient *http.Client } func NewClient(cfg *ClientConfig) *Client { @@ -48,7 +48,7 @@ func NewClient(cfg *ClientConfig) *Client { } return &Client{ - httpClient: *httpClient, + httpClient: httpClient, ClientConfig: cfg, } } diff --git a/pkg/stamper/node.go b/pkg/stamper/node.go new file mode 100644 index 00000000..5febea1b --- /dev/null +++ b/pkg/stamper/node.go @@ -0,0 +1,66 @@ +package stamper + +import ( + "context" + "fmt" + "time" + + "github.com/ethersphere/beekeeper/pkg/bee/api" +) + +var _ Client = (*Node)(nil) + +type Node struct { + client *api.Client + Name string +} + +func NewNodeInfo(client *api.Client, name string) *Node { + return &Node{ + client: client, + Name: name, + } +} + +func (n *Node) Create(ctx context.Context, amount uint64, depth uint8) error { + _, err := n.client.Postage.CreatePostageBatch(ctx, int64(amount), uint64(depth), "beekeeper") + if err != nil { + return fmt.Errorf("node %s: create postage batch: %w", n.Name, err) + } + + return nil +} + +func (n *Node) Dilute(ctx context.Context, threshold float64, depthIncrement uint8) error { + batches, err := n.client.Postage.PostageBatches(ctx) + if err != nil { + return fmt.Errorf("node %s: get postage batches: %w", n.Name, err) + } + + for _, batch := range batches { + if !batch.Usable || batch.ImmutableFlag || batch.Utilization == 0 { + continue + } + + usageFactor := batch.Depth - batch.BucketDepth // depth - bucketDepth + divisor := float64(int(1) << usageFactor) // 2^(depth - bucketDepth) + stampsUsage := (float64(batch.Utilization) / divisor) * 100 // (utilization / 2^(depth - bucketDepth)) * 100 + + if stampsUsage >= threshold { + newDepth := batch.Depth + depthIncrement + if err := n.client.Postage.DilutePostageBatch(ctx, batch.BatchID, uint64(newDepth), ""); err != nil { + return fmt.Errorf("node %s: dilute batch %s: %w", n.Name, batch.BatchID, err) + } + } + } + + return nil +} + +func (n *Node) Set(ctx context.Context, ttlThreshold time.Duration, topupDuration time.Duration, threshold float64, depth uint16) error { + panic("unimplemented") +} + +func (n *Node) Topup(ctx context.Context, ttlThreshold time.Duration, topupDuration time.Duration) error { + panic("unimplemented") +} diff --git a/pkg/stamper/stamper.go b/pkg/stamper/stamper.go index 632dc2a2..8e550b40 100644 --- a/pkg/stamper/stamper.go +++ b/pkg/stamper/stamper.go @@ -1,27 +1,40 @@ package stamper import ( + "context" + "fmt" "io" "net/http" + "net/url" + "time" + "github.com/ethersphere/beekeeper/pkg/bee/api" "github.com/ethersphere/beekeeper/pkg/k8s" "github.com/ethersphere/beekeeper/pkg/logging" ) +type Client interface { + Create(ctx context.Context, amount uint64, depth uint8) error + Dilute(ctx context.Context, threshold float64, depth uint8) error + Set(ctx context.Context, ttlThreshold, topupDuration time.Duration, threshold float64, depth uint16) error + Topup(ctx context.Context, ttlThreshold, topupDuration time.Duration) error +} + type ClientConfig struct { Log logging.Logger Namespace string K8sClient *k8s.Client HTTPClient *http.Client // injected HTTP client LabelSelector string + InCluster bool } -type Client struct { +type StamperClient struct { *ClientConfig httpClient http.Client } -func NewClient(cfg *ClientConfig) *Client { +func NewStamperClient(cfg *ClientConfig) *StamperClient { if cfg == nil { return nil } @@ -36,8 +49,119 @@ func NewClient(cfg *ClientConfig) *Client { httpClient = &http.Client{} } - return &Client{ + return &StamperClient{ httpClient: *httpClient, ClientConfig: cfg, } } + +// Create implements Client. +func (s *StamperClient) Create(ctx context.Context, amount uint64, depth uint8) error { + panic("unimplemented") +} + +// Dilute implements Client. +func (s *StamperClient) Dilute(ctx context.Context, usageThreshold float64, dilutionDepth uint8) error { + nodes, err := s.getNamespaceNodes(ctx) + if err != nil { + return fmt.Errorf("get namespace nodes: %w", err) + } + + for _, node := range nodes { + if err := node.Dilute(ctx, usageThreshold, dilutionDepth); err != nil { + return fmt.Errorf("dilute node %s: %w", node.Name, err) + } + } + + return nil +} + +// Set implements Client. +func (s *StamperClient) Set(ctx context.Context, ttlThreshold time.Duration, topupTo time.Duration, usageThreshold float64, dilutionDepth uint16) error { + panic("unimplemented") +} + +// Topup implements Client. +func (s *StamperClient) Topup(ctx context.Context, ttlThreshold time.Duration, topupTo time.Duration) (err error) { + nodes, err := s.getNamespaceNodes(ctx) + if err != nil { + return fmt.Errorf("get namespace nodes: %w", err) + } + + for _, node := range nodes { + _ = node + // do something with node + } + + return nil +} + +func (sc *StamperClient) getNamespaceNodes(ctx context.Context) (nodes []Node, err error) { + if sc.Namespace == "" { + return nil, fmt.Errorf("namespace not provided") + } + + if sc.InCluster { + nodes, err = sc.getServiceNodes(ctx) + } else { + nodes, err = sc.getIngressNodes(ctx) + } + + if err != nil { + return nil, fmt.Errorf("get nodes: %w", err) + } + + return nodes, nil +} + +func (sc *StamperClient) getServiceNodes(ctx context.Context) ([]Node, error) { + svcNodes, err := sc.K8sClient.Service.GetNodes(ctx, sc.Namespace, sc.LabelSelector) + if err != nil { + return nil, fmt.Errorf("list api services: %w", err) + } + + nodes := make([]Node, len(svcNodes)) + for i, node := range svcNodes { + parsedURL, err := url.Parse(node.Endpoint) + if err != nil { + return nil, fmt.Errorf("extract base URL: %w", err) + } + + apiClient := api.NewClient(parsedURL, &api.ClientOptions{ + HTTPClient: &sc.httpClient, + }) + + nodes[i] = *NewNodeInfo(apiClient, node.Name) + } + + return nodes, nil +} + +func (sc *StamperClient) getIngressNodes(ctx context.Context) ([]Node, error) { + ingressNodes, err := sc.K8sClient.Ingress.GetNodes(ctx, sc.Namespace, sc.LabelSelector) + if err != nil { + return nil, fmt.Errorf("list ingress api nodes hosts: %w", err) + } + + ingressRouteNodes, err := sc.K8sClient.IngressRoute.GetNodes(ctx, sc.Namespace, sc.LabelSelector) + if err != nil { + return nil, fmt.Errorf("list ingress route api nodes hosts: %w", err) + } + + allNodes := append(ingressNodes, ingressRouteNodes...) + nodes := make([]Node, len(allNodes)) + for i, node := range allNodes { + parsedURL, err := url.Parse(node.Host) + if err != nil { + return nil, fmt.Errorf("extract base URL: %w", err) + } + + apiClient := api.NewClient(parsedURL, &api.ClientOptions{ + HTTPClient: &sc.httpClient, + }) + + nodes[i] = *NewNodeInfo(apiClient, node.Name) + } + + return nodes, nil +}