diff --git a/.gitignore b/.gitignore index fd33bdea..9a61f93d 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ x/wire/file/client/client x/wire/file/server/server x/wire/intf/client/client x/wire/intf/server/server +x/wire/forward/forward diff --git a/topo/node/forward/forward.go b/topo/node/forward/forward.go index a572501c..93d0aa0f 100644 --- a/topo/node/forward/forward.go +++ b/topo/node/forward/forward.go @@ -16,6 +16,7 @@ package forward import ( "context" "fmt" + "net" fpb "github.com/openconfig/kne/proto/forward" tpb "github.com/openconfig/kne/proto/topo" @@ -27,6 +28,10 @@ import ( "k8s.io/utils/pointer" ) +const ( + fwdPort = "50058" +) + func New(nodeImpl *node.Impl) (node.Node, error) { if nodeImpl == nil { return nil, fmt.Errorf("nodeImpl cannot be nil") @@ -59,6 +64,48 @@ func (n *Node) Create(ctx context.Context) error { return nil } +func interfaceFlag(intf string) string { + return fmt.Sprintf("--interfaces=%s", intf) +} + +func endpointFlag(lintf, addr, rintf string) string { + return fmt.Sprintf("--endpoints=%s/%s/%s", lintf, addr, rintf) +} + +func wireToArg(wire *fpb.Wire) (string, error) { + switch at := wire.A.Endpoint.(type) { + case *fpb.Endpoint_Interface: + // If A is an interface, then this node should serve as the fwd client for this wire. + // Additionally Z should not be an interface. + switch zt := wire.Z.Endpoint.(type) { + case *fpb.Endpoint_Interface: + return "", fmt.Errorf("endpoints A and Z cannot both be interfaces") + case *fpb.Endpoint_LocalNode: + ln := wire.GetZ().GetLocalNode() + //pod, err := n.KubeClient.CoreV1().Pods(n.Namespace).Get(ctx, ln.GetName(), metav1.GetOptions{}) + //if err != nil { + // return "", err + //} + return endpointFlag(wire.GetA().GetInterface().GetName(), net.JoinHostPort(ln.GetName(), fwdPort), ln.GetInterface()), nil + default: + return "", fmt.Errorf("endpoint Z type not supported: %T", zt) + } + case *fpb.Endpoint_LocalNode: + // If A is not an interface, then this node should serve as the fwd server for this wire. + // Additionally Z should be an interface. + switch zt := wire.Z.Endpoint.(type) { + case *fpb.Endpoint_Interface: + return interfaceFlag(wire.GetZ().GetInterface().GetName()), nil + case *fpb.Endpoint_LocalNode: + return "", fmt.Errorf("one of endpoints A and Z must be an interface") + default: + return "", fmt.Errorf("endpoint Z type not supported: %T", zt) + } + default: + return "", fmt.Errorf("endpoint A type not supported: %T", at) + } +} + // CreatePod creates a Pod for the Node based on the underlying proto. func (n *Node) CreatePod(ctx context.Context) error { pb := n.Proto @@ -68,14 +115,22 @@ func (n *Node) CreatePod(ctx context.Context) error { initContainerImage = node.DefaultInitContainerImage } + fwdArgs := pb.Config.Args if vendorData := pb.Config.GetVendorData(); vendorData != nil { fwdCfg := &fpb.ForwardConfig{} if err := vendorData.UnmarshalTo(fwdCfg); err != nil { return err } - // TODO: Validate and use fwdCfg to pass arguments to the container. log.Infof("Got fwdCfg: %v", prototext.Format(fwdCfg)) + for _, wire := range fwdCfg.GetWires() { + arg, err := wireToArg(wire) + if err != nil { + return err + } + fwdArgs = append(fwdArgs, arg) + } } + log.Infof("Using container args: %v", fwdArgs) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -99,7 +154,7 @@ func (n *Node) CreatePod(ctx context.Context) error { Name: pb.Name, Image: pb.Config.Image, Command: pb.Config.Command, - Args: pb.Config.Args, + Args: fwdArgs, Env: node.ToEnvVar(pb.Config.Env), Resources: node.ToResourceRequirements(pb.Constraints), ImagePullPolicy: "IfNotPresent", @@ -161,9 +216,9 @@ func defaults(pb *tpb.Node) *tpb.Node { if pb.Config == nil { pb.Config = &tpb.Config{} } - if len(pb.GetConfig().GetCommand()) == 0 { - pb.Config.Command = []string{"/bin/sh", "-c", "sleep 2000000000000"} - } + //if len(pb.GetConfig().GetCommand()) == 0 { + // pb.Config.Command = []string{"/bin/sh", "-c", "sleep 2000000000000"} + //} if pb.Config.EntryCommand == "" { pb.Config.EntryCommand = fmt.Sprintf("kubectl exec -it %s -- sh", pb.Name) } diff --git a/x/wire/forward/Dockerfile b/x/wire/forward/Dockerfile new file mode 100644 index 00000000..fb6724ef --- /dev/null +++ b/x/wire/forward/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.21 + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY x/wire/ x/wire/ +COPY proto/ proto/ + +RUN go build -o forward x/wire/forward/main.go + +ENTRYPOINT ./forward diff --git a/x/wire/forward/main.go b/x/wire/forward/main.go new file mode 100644 index 00000000..1c1ea6be --- /dev/null +++ b/x/wire/forward/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "fmt" + "net" + "strings" + + wpb "github.com/openconfig/kne/proto/wire" + "github.com/openconfig/kne/x/wire" + flag "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + log "k8s.io/klog/v2" +) + +var ( + port = flag.Int("port", 50058, "Wire server port") + interfaces = flag.StringSlice("interfaces", []string{}, "List of local interfaces to serve on the wire server") + endpoints = flag.StringSlice("endpoints", []string{}, "List of strings of the form local_interface/remote_address/remote_interface") +) + +type server struct { + wpb.UnimplementedWireServer + endpoints map[wire.InterfaceEndpoint]*wire.Wire +} + +func newServer(endpoints map[wire.InterfaceEndpoint]*wire.Wire) *server { + return &server{endpoints: endpoints} +} + +func (s *server) Transmit(stream wpb.Wire_TransmitServer) error { + e, err := wire.ParseInterfaceEndpoint(stream.Context()) + if err != nil { + return fmt.Errorf("unable to parse endpoint from incoming stream context: %v", err) + } + log.Infof("New Transmit stream started for endpoint %v", e) + w, ok := s.endpoints[*e] + if !ok { + return fmt.Errorf("no endpoint found on server for request: %v", e) + } + if err := w.Transmit(stream.Context(), stream); err != nil { + return fmt.Errorf("transmit failed: %v", err) + } + return nil +} + +func localEndpoints(intfs []string) (map[wire.InterfaceEndpoint]*wire.Wire, error) { + m := map[wire.InterfaceEndpoint]*wire.Wire{} + for _, intf := range intfs { + rw, err := wire.NewInterfaceReadWriter(intf) + if err != nil { + return nil, err + } + m[*wire.NewInterfaceEndpoint(intf)] = wire.NewWire(rw) + } + return m, nil +} + +func remoteEndpoints(eps []string) (map[string]map[wire.InterfaceEndpoint]*wire.Wire, error) { + m := map[string]map[wire.InterfaceEndpoint]*wire.Wire{} + for _, ep := range eps { + parts := strings.SplitN(ep, "/", 3) + if len(parts) != 3 { + return nil, fmt.Errorf("unable to parse %v into endpoint, got %v", ep, parts) + } + lintf := parts[0] + addr := parts[1] + rintf := parts[2] + rw, err := wire.NewInterfaceReadWriter(lintf) + if err != nil { + return nil, err + } + if _, ok := m[addr]; !ok { + m[addr] = map[wire.InterfaceEndpoint]*wire.Wire{} + } + m[addr][*wire.NewInterfaceEndpoint(rintf)] = wire.NewWire(rw) + } + return m, nil +} + +func main() { + flag.Parse() + ctx := context.Background() + addr := fmt.Sprintf(":%d", *port) + lis, err := net.Listen("tcp6", addr) + if err != nil { + log.Fatalf("Failed to listen: %v", err) + } + s := grpc.NewServer() + le, err := localEndpoints(*interfaces) + if err != nil { + log.Fatalf("Failed to create local endpoints from interfaces: %v") + } + wpb.RegisterWireServer(s, newServer(le)) + g := new(errgroup.Group) + g.Go(func() error { + log.Infof("Wire server listening at %v", lis.Addr()) + return s.Serve(lis) + }) + re, err := remoteEndpoints(*endpoints) + if err != nil { + log.Fatalf("Failed to create remote endpoints: %v", err) + } + for a, m := range re { + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + conn, err := grpc.DialContext(ctx, a, opts...) + if err != nil { + log.Fatalf("Failed to dial %q: %v", a, err) + } + defer conn.Close() + c := wpb.NewWireClient(conn) + for e, w := range m { + c := c + e := e + w := w + g.Go(func() error { + octx := e.NewContext(ctx) + stream, err := c.Transmit(octx) + if err != nil { + return err + } + defer func() { + stream.CloseSend() + }() + log.Infof("Transmitting endpoint %v over wire...", e) + return w.Transmit(ctx, stream) + }) + } + } + if err := g.Wait(); err != nil { + log.Fatalf("Failed to wait for wire transmits: %v", err) + } +} diff --git a/x/wire/wire.go b/x/wire/wire.go index 697a6ffa..219e6265 100644 --- a/x/wire/wire.go +++ b/x/wire/wire.go @@ -209,3 +209,32 @@ func (p *PhysicalEndpoint) NewContext(ctx context.Context) context.Context { }) return metadata.NewOutgoingContext(ctx, md) } + +type InterfaceEndpoint struct { + intf string +} + +func NewInterfaceEndpoint(intf string) *InterfaceEndpoint { + return &InterfaceEndpoint{intf: intf} +} + +func ParseInterfaceEndpoint(ctx context.Context) (*InterfaceEndpoint, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, fmt.Errorf("no metadata in incoming context") + } + i := &InterfaceEndpoint{} + vals := md.Get("interface") + if len(vals) != 1 || vals[0] == "" { + return nil, fmt.Errorf("interface key not found") + } + i.intf = vals[0] + return i, nil +} + +func (i *InterfaceEndpoint) NewContext(ctx context.Context) context.Context { + md := metadata.New(map[string]string{ + "interface": i.intf, + }) + return metadata.NewOutgoingContext(ctx, md) +}