Skip to content

Commit

Permalink
add forward container
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmasi committed Jul 25, 2024
1 parent 396ff82 commit 72b12a8
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ x/wire/file/client/client
x/wire/file/server/server
x/wire/intf/client/client
x/wire/intf/server/server
x/wire/forward/forward
65 changes: 60 additions & 5 deletions topo/node/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package forward
import (
"context"
"fmt"
"net"

fpb "github.com/openconfig/kne/proto/forward"
tpb "github.com/openconfig/kne/proto/topo"
Expand All @@ -27,6 +28,10 @@ import (
"k8s.io/utils/pointer"
)

const (
fwdPort = "50058"
)

func New(nodeImpl *node.Impl) (node.Node, error) {
if nodeImpl == nil {
return nil, fmt.Errorf("nodeImpl cannot be nil")
Expand Down Expand Up @@ -59,6 +64,48 @@ func (n *Node) Create(ctx context.Context) error {
return nil
}

func interfaceFlag(intf string) string {
return fmt.Sprintf("--interfaces=%s", intf)
}

func endpointFlag(lintf, addr, rintf string) string {
return fmt.Sprintf("--endpoints=%s/%s/%s", lintf, addr, rintf)
}

func wireToArg(wire *fpb.Wire) (string, error) {
switch at := wire.A.Endpoint.(type) {
case *fpb.Endpoint_Interface:
// If A is an interface, then this node should serve as the fwd client for this wire.
// Additionally Z should not be an interface.
switch zt := wire.Z.Endpoint.(type) {
case *fpb.Endpoint_Interface:
return "", fmt.Errorf("endpoints A and Z cannot both be interfaces")
case *fpb.Endpoint_LocalNode:
ln := wire.GetZ().GetLocalNode()
//pod, err := n.KubeClient.CoreV1().Pods(n.Namespace).Get(ctx, ln.GetName(), metav1.GetOptions{})

Check failure on line 85 in topo/node/forward/forward.go

View workflow job for this annotation

GitHub Actions / go / lint

commentFormatting: put a space between `//` and comment text (gocritic)
//if err != nil {
// return "", err
//}
return endpointFlag(wire.GetA().GetInterface().GetName(), net.JoinHostPort(ln.GetName(), fwdPort), ln.GetInterface()), nil
default:
return "", fmt.Errorf("endpoint Z type not supported: %T", zt)
}
case *fpb.Endpoint_LocalNode:
// If A is not an interface, then this node should serve as the fwd server for this wire.
// Additionally Z should be an interface.
switch zt := wire.Z.Endpoint.(type) {
case *fpb.Endpoint_Interface:
return interfaceFlag(wire.GetZ().GetInterface().GetName()), nil
case *fpb.Endpoint_LocalNode:
return "", fmt.Errorf("one of endpoints A and Z must be an interface")
default:
return "", fmt.Errorf("endpoint Z type not supported: %T", zt)
}
default:
return "", fmt.Errorf("endpoint A type not supported: %T", at)
}
}

// CreatePod creates a Pod for the Node based on the underlying proto.
func (n *Node) CreatePod(ctx context.Context) error {
pb := n.Proto
Expand All @@ -68,14 +115,22 @@ func (n *Node) CreatePod(ctx context.Context) error {
initContainerImage = node.DefaultInitContainerImage
}

fwdArgs := pb.Config.Args
if vendorData := pb.Config.GetVendorData(); vendorData != nil {
fwdCfg := &fpb.ForwardConfig{}
if err := vendorData.UnmarshalTo(fwdCfg); err != nil {
return err
}
// TODO: Validate and use fwdCfg to pass arguments to the container.
log.Infof("Got fwdCfg: %v", prototext.Format(fwdCfg))
for _, wire := range fwdCfg.GetWires() {
arg, err := wireToArg(wire)
if err != nil {
return err
}
fwdArgs = append(fwdArgs, arg)
}
}
log.Infof("Using container args: %v", fwdArgs)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -99,7 +154,7 @@ func (n *Node) CreatePod(ctx context.Context) error {
Name: pb.Name,
Image: pb.Config.Image,
Command: pb.Config.Command,
Args: pb.Config.Args,
Args: fwdArgs,
Env: node.ToEnvVar(pb.Config.Env),
Resources: node.ToResourceRequirements(pb.Constraints),
ImagePullPolicy: "IfNotPresent",
Expand Down Expand Up @@ -161,9 +216,9 @@ func defaults(pb *tpb.Node) *tpb.Node {
if pb.Config == nil {
pb.Config = &tpb.Config{}
}
if len(pb.GetConfig().GetCommand()) == 0 {
pb.Config.Command = []string{"/bin/sh", "-c", "sleep 2000000000000"}
}
//if len(pb.GetConfig().GetCommand()) == 0 {

Check failure on line 219 in topo/node/forward/forward.go

View workflow job for this annotation

GitHub Actions / go / lint

commentFormatting: put a space between `//` and comment text (gocritic)
// pb.Config.Command = []string{"/bin/sh", "-c", "sleep 2000000000000"}
//}
if pb.Config.EntryCommand == "" {
pb.Config.EntryCommand = fmt.Sprintf("kubectl exec -it %s -- sh", pb.Name)
}
Expand Down
13 changes: 13 additions & 0 deletions x/wire/forward/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM golang:1.21

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY x/wire/ x/wire/
COPY proto/ proto/

RUN go build -o forward x/wire/forward/main.go

ENTRYPOINT ./forward
137 changes: 137 additions & 0 deletions x/wire/forward/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package main

import (
"context"
"fmt"
"net"
"strings"

wpb "github.com/openconfig/kne/proto/wire"
"github.com/openconfig/kne/x/wire"
flag "github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
log "k8s.io/klog/v2"
)

var (
port = flag.Int("port", 50058, "Wire server port")
interfaces = flag.StringSlice("interfaces", []string{}, "List of local interfaces to serve on the wire server")
endpoints = flag.StringSlice("endpoints", []string{}, "List of strings of the form local_interface/remote_address/remote_interface")
)

type server struct {
wpb.UnimplementedWireServer
endpoints map[wire.InterfaceEndpoint]*wire.Wire
}

func newServer(endpoints map[wire.InterfaceEndpoint]*wire.Wire) *server {
return &server{endpoints: endpoints}
}

func (s *server) Transmit(stream wpb.Wire_TransmitServer) error {
e, err := wire.ParseInterfaceEndpoint(stream.Context())
if err != nil {
return fmt.Errorf("unable to parse endpoint from incoming stream context: %v", err)
}
log.Infof("New Transmit stream started for endpoint %v", e)
w, ok := s.endpoints[*e]
if !ok {
return fmt.Errorf("no endpoint found on server for request: %v", e)
}
if err := w.Transmit(stream.Context(), stream); err != nil {
return fmt.Errorf("transmit failed: %v", err)
}
return nil
}

func localEndpoints(intfs []string) (map[wire.InterfaceEndpoint]*wire.Wire, error) {
m := map[wire.InterfaceEndpoint]*wire.Wire{}
for _, intf := range intfs {
rw, err := wire.NewInterfaceReadWriter(intf)
if err != nil {
return nil, err
}
m[*wire.NewInterfaceEndpoint(intf)] = wire.NewWire(rw)
}
return m, nil
}

func remoteEndpoints(eps []string) (map[string]map[wire.InterfaceEndpoint]*wire.Wire, error) {
m := map[string]map[wire.InterfaceEndpoint]*wire.Wire{}
for _, ep := range eps {
parts := strings.SplitN(ep, "/", 3)
if len(parts) != 3 {
return nil, fmt.Errorf("unable to parse %v into endpoint, got %v", ep, parts)
}
lintf := parts[0]
addr := parts[1]
rintf := parts[2]
rw, err := wire.NewInterfaceReadWriter(lintf)
if err != nil {
return nil, err
}
if _, ok := m[addr]; !ok {
m[addr] = map[wire.InterfaceEndpoint]*wire.Wire{}
}
m[addr][*wire.NewInterfaceEndpoint(rintf)] = wire.NewWire(rw)
}
return m, nil
}

func main() {
flag.Parse()
ctx := context.Background()
addr := fmt.Sprintf(":%d", *port)
lis, err := net.Listen("tcp6", addr)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
le, err := localEndpoints(*interfaces)
if err != nil {
log.Fatalf("Failed to create local endpoints from interfaces: %v")
}
wpb.RegisterWireServer(s, newServer(le))
g := new(errgroup.Group)
g.Go(func() error {
log.Infof("Wire server listening at %v", lis.Addr())
return s.Serve(lis)
})
re, err := remoteEndpoints(*endpoints)
if err != nil {
log.Fatalf("Failed to create remote endpoints: %v", err)
}
for a, m := range re {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.DialContext(ctx, a, opts...)
if err != nil {
log.Fatalf("Failed to dial %q: %v", a, err)
}
defer conn.Close()
c := wpb.NewWireClient(conn)
for e, w := range m {
c := c
e := e
w := w
g.Go(func() error {
octx := e.NewContext(ctx)
stream, err := c.Transmit(octx)
if err != nil {
return err
}
defer func() {
stream.CloseSend()
}()
log.Infof("Transmitting endpoint %v over wire...", e)
return w.Transmit(ctx, stream)
})
}
}
if err := g.Wait(); err != nil {
log.Fatalf("Failed to wait for wire transmits: %v", err)

Check failure on line 135 in x/wire/forward/main.go

View workflow job for this annotation

GitHub Actions / go / lint

exitAfterDefer: log.Fatalf will exit, and `defer conn.Close()` will not run (gocritic)
}
}
29 changes: 29 additions & 0 deletions x/wire/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,32 @@ func (p *PhysicalEndpoint) NewContext(ctx context.Context) context.Context {
})
return metadata.NewOutgoingContext(ctx, md)
}

type InterfaceEndpoint struct {
intf string
}

func NewInterfaceEndpoint(intf string) *InterfaceEndpoint {
return &InterfaceEndpoint{intf: intf}
}

func ParseInterfaceEndpoint(ctx context.Context) (*InterfaceEndpoint, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("no metadata in incoming context")
}
i := &InterfaceEndpoint{}
vals := md.Get("interface")
if len(vals) != 1 || vals[0] == "" {
return nil, fmt.Errorf("interface key not found")
}
i.intf = vals[0]
return i, nil
}

func (i *InterfaceEndpoint) NewContext(ctx context.Context) context.Context {
md := metadata.New(map[string]string{
"interface": i.intf,
})
return metadata.NewOutgoingContext(ctx, md)
}

0 comments on commit 72b12a8

Please sign in to comment.