Skip to content

Commit

Permalink
import other commands
Browse files Browse the repository at this point in the history
  • Loading branch information
kvaps committed May 7, 2024
1 parent d2664b5 commit da01bab
Show file tree
Hide file tree
Showing 747 changed files with 118,600 additions and 376 deletions.
57 changes: 52 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,60 @@
VERSION=$(shell git describe --tags)
TALOS_VERSION=$(shell go list -m github.com/siderolabs/talos | awk '{sub(/^v/, "", $$NF); print $$NF}')

generate: update-dashboard
generate:
go generate

build:
go build -ldflags="-X 'main.Version=$(VERSION)'"

update-dashboard:
rm -rf internal/pkg/dashboard internal/pkg/meta internal/app/machined/pkg/runtime
wget -O- https://github.com/siderolabs/talos/archive/refs/tags/v$(TALOS_VERSION).tar.gz | tar --strip=1 -xzf- talos-$(TALOS_VERSION)/internal/pkg/dashboard talos-$(TALOS_VERSION)/internal/pkg/meta talos-$(TALOS_VERSION)/internal/app/machined/pkg/runtime
sed -i 's|github.com/siderolabs/talos/internal|github.com/aenix-io/talm/internal|g' `grep -rl 'github.com/siderolabs/talos/internal' internal/pkg/dashboard internal/pkg/meta internal/app/machined/pkg/runtime`
import: import-internal import-commands

import-commands:
go run tools/import_commands.go --talos-version v$(TALOS_VERSION) \
bootstrap \
containers \
dashboard \
disks \
dmesg \
events \
get \
health \
image \
kubeconfig \
list \
logs \
memory \
mounts \
netstat \
pcap \
processes \
read \
reboot \
reset \
restart \
rollback \
service \
shutdown \
stats \
time \
version

import-internal:
rm -rf internal
mkdir internal
wget -O- https://github.com/siderolabs/talos/archive/refs/tags/v$(TALOS_VERSION).tar.gz | tar --strip=1 -xzf- \
talos-$(TALOS_VERSION)/internal/pkg/meta \
talos-$(TALOS_VERSION)/internal/app/machined/ \
talos-$(TALOS_VERSION)/internal/pkg/cri \
talos-$(TALOS_VERSION)/internal/pkg/cgroup \
talos-$(TALOS_VERSION)/internal/pkg/dashboard \
talos-$(TALOS_VERSION)/internal/pkg/environment \
talos-$(TALOS_VERSION)/internal/pkg/etcd \
talos-$(TALOS_VERSION)/internal/pkg/install \
talos-$(TALOS_VERSION)/internal/pkg/logind \
talos-$(TALOS_VERSION)/internal/pkg/mount \
talos-$(TALOS_VERSION)/internal/pkg/partition \
talos-$(TALOS_VERSION)/internal/pkg/secureboot \
talos-$(TALOS_VERSION)/internal/pkg/smbios
sed -i 's|github.com/siderolabs/talos/internal|github.com/aenix-io/talm/internal|g' `grep -rl 'github.com/siderolabs/talos/internal' internal`

235 changes: 173 additions & 62 deletions go.mod

Large diffs are not rendered by default.

365 changes: 311 additions & 54 deletions go.sum

Large diffs are not rendered by default.

256 changes: 256 additions & 0 deletions internal/app/machined/internal/server/v1alpha1/v1alpha1_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package runtime provides the runtime implementation.
package runtime

import (
"context"
"fmt"
"log"
"net/netip"
"slices"
"strings"

"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/xslices"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aenix-io/talm/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/cluster"
"github.com/siderolabs/talos/pkg/cluster/check"
"github.com/siderolabs/talos/pkg/conditions"
"github.com/siderolabs/talos/pkg/grpc/middleware/authz"
clusterapi "github.com/siderolabs/talos/pkg/machinery/api/cluster"
"github.com/siderolabs/talos/pkg/machinery/config/machine"
"github.com/siderolabs/talos/pkg/machinery/constants"
clusterres "github.com/siderolabs/talos/pkg/machinery/resources/cluster"
)

// HealthCheck implements the cluster.ClusterServer interface.
func (s *Server) HealthCheck(in *clusterapi.HealthCheckRequest, srv clusterapi.ClusterService_HealthCheckServer) error {
clientProvider := &cluster.LocalClientProvider{}
defer clientProvider.Close() //nolint:errcheck

k8sProvider := &cluster.KubernetesClient{
ClientProvider: clientProvider,
ForceEndpoint: in.GetClusterInfo().GetForceEndpoint(),
}
defer k8sProvider.K8sClose() //nolint:errcheck

checkCtx, checkCtxCancel := context.WithTimeout(srv.Context(), in.WaitTimeout.AsDuration())
defer checkCtxCancel()

md := metadata.New(nil)
authz.SetMetadata(md, authz.GetRoles(srv.Context()))
checkCtx = metadata.NewOutgoingContext(checkCtx, md)

r := s.Controller.Runtime()

clusterInfo, err := buildClusterInfo(checkCtx, in, r, *k8sProvider)
if err != nil {
return err
}

state := struct {
cluster.ClientProvider
cluster.K8sProvider
cluster.Info
}{
ClientProvider: clientProvider,
K8sProvider: k8sProvider,
Info: clusterInfo,
}

nodeInternalIPs := xslices.Map(clusterInfo.Nodes(), func(info cluster.NodeInfo) string {
return info.InternalIP.String()
})

if err := srv.Send(&clusterapi.HealthCheckProgress{
Message: fmt.Sprintf("discovered nodes: %q", nodeInternalIPs),
}); err != nil {
return err
}

return check.Wait(checkCtx, &state, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), &healthReporter{srv: srv})
}

type healthReporter struct {
srv clusterapi.ClusterService_HealthCheckServer
lastLine string
}

func (hr *healthReporter) Update(condition conditions.Condition) {
line := fmt.Sprintf("waiting for %s", condition)

if line != hr.lastLine {
hr.srv.Send(&clusterapi.HealthCheckProgress{ //nolint:errcheck
Message: strings.TrimSpace(line),
})

hr.lastLine = line
}
}

type clusterState struct {
nodeInfos []cluster.NodeInfo
nodeInfosByType map[machine.Type][]cluster.NodeInfo
}

func (cl *clusterState) Nodes() []cluster.NodeInfo {
return cl.nodeInfos
}

func (cl *clusterState) NodesByType(t machine.Type) []cluster.NodeInfo {
return cl.nodeInfosByType[t]
}

func (cl *clusterState) String() string {
return fmt.Sprintf("control plane: %q, worker: %q",
xslices.Map(cl.nodeInfosByType[machine.TypeControlPlane], func(info cluster.NodeInfo) string {
return info.InternalIP.String()
}),
xslices.Map(cl.nodeInfosByType[machine.TypeWorker], func(info cluster.NodeInfo) string {
return info.InternalIP.String()
}))
}

//nolint:gocyclo
func buildClusterInfo(ctx context.Context,
req *clusterapi.HealthCheckRequest,
r runtime.Runtime,
cli cluster.KubernetesClient,
) (cluster.Info, error) {
controlPlaneNodes := req.GetClusterInfo().GetControlPlaneNodes()
workerNodes := req.GetClusterInfo().GetWorkerNodes()

// if the node list is explicitly provided, use it
if len(controlPlaneNodes) != 0 || len(workerNodes) != 0 {
controlPlaneNodeInfos, err := cluster.IPsToNodeInfos(controlPlaneNodes)
if err != nil {
return nil, err
}

workerNodeInfos, err := cluster.IPsToNodeInfos(workerNodes)
if err != nil {
return nil, err
}

return &clusterState{
nodeInfos: append(slices.Clone(controlPlaneNodeInfos), workerNodeInfos...),
nodeInfosByType: map[machine.Type][]cluster.NodeInfo{
machine.TypeControlPlane: controlPlaneNodeInfos,
machine.TypeWorker: workerNodeInfos,
},
}, nil
}

// try to discover nodes using discovery service
discoveryMemberList, err := getDiscoveryMemberList(ctx, r)
if err != nil {
log.Printf("discovery service returned error: %v\n", err)
}

// discovery service returned some nodes, use them
if len(discoveryMemberList) > 0 {
return check.NewDiscoveredClusterInfo(discoveryMemberList)
}

// as the last resort, get the nodes from the cluster itself
k8sCli, err := cli.K8sClient(ctx)
if err != nil {
return nil, err
}

nodeList, err := k8sCli.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

nodeInfos := make([]cluster.NodeInfo, len(nodeList.Items))
nodeInfosByType := map[machine.Type][]cluster.NodeInfo{}

for i, node := range nodeList.Items {
nodeInfo, err2 := k8sNodeToNodeInfo(&node)
if err2 != nil {
return nil, err
}

if isControlPlaneNode(&node) {
nodeInfosByType[machine.TypeControlPlane] = append(nodeInfosByType[machine.TypeControlPlane], *nodeInfo)
} else {
nodeInfosByType[machine.TypeWorker] = append(nodeInfosByType[machine.TypeWorker], *nodeInfo)
}

nodeInfos[i] = *nodeInfo
}

return &clusterState{
nodeInfos: nodeInfos,
nodeInfosByType: nodeInfosByType,
}, nil
}

func k8sNodeToNodeInfo(node *corev1.Node) (*cluster.NodeInfo, error) {
if node == nil {
return nil, nil
}

var internalIP netip.Addr

ips := make([]netip.Addr, 0, len(node.Status.Addresses))

for _, address := range node.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
ip, err := netip.ParseAddr(address.Address)
if err != nil {
return nil, err
}

internalIP = ip
ips = append(ips, ip)
} else if address.Type == corev1.NodeExternalIP {
ip, err := netip.ParseAddr(address.Address)
if err != nil {
return nil, err
}

ips = append(ips, ip)
}
}

return &cluster.NodeInfo{
InternalIP: internalIP,
IPs: ips,
}, nil
}

func getDiscoveryMemberList(ctx context.Context, runtime runtime.Runtime) ([]*clusterres.Member, error) {
res := runtime.State().V1Alpha2().Resources()

list, err := safe.StateListAll[*clusterres.Member](ctx, res)
if err != nil {
return nil, err
}

result := make([]*clusterres.Member, 0, list.Len())

for iter := list.Iterator(); iter.Next(); {
result = append(result, iter.Value())
}

return result, nil
}

func isControlPlaneNode(node *corev1.Node) bool {
for key := range node.Labels {
if key == constants.LabelNodeRoleControlPlane {
return true
}
}

return false
}
Loading

0 comments on commit da01bab

Please sign in to comment.