Skip to content

Commit

Permalink
wrap more forwards in each port
Browse files Browse the repository at this point in the history
  • Loading branch information
tzneal committed Dec 9, 2021
1 parent bb6ca30 commit 5eea956
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
13 changes: 10 additions & 3 deletions cmd/exposeAll.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,25 @@ every exposed service and port.`,
if len(svc.Spec.Selector) == 0 {
continue
}
var pc []kube.PortConfig
for _, port := range svc.Spec.Ports {
portNumber := pl.LookupPort(svc, port.TargetPort)
// doesn't support UDP port forwarding yet see https://github.com/kubernetes/kubernetes/issues/47862
if port.Protocol != "TCP" {
continue
}
fw, err := kube.PortForward(f, svc.Namespace, svc.Name, portNumber, localIp, 0)
pc = append(pc, kube.PortConfig{LocalPort: 0, TargetPort: portNumber})
}

if len(pc) > 0 {
fw, err := kube.PortForward(f, svc.Namespace, svc.Name, localIp, pc)
if err != nil {
util.LogError("error forwarding port for %s: %s", svc.Name, err)
return
}
portForwards = append(portForwards, fw)
portForwardingAtLeastOne = true
}
portForwardingAtLeastOne = true
}
if !portForwardingAtLeastOne {
util.LogError("no services found for port forwarding, exiting...")
Expand Down Expand Up @@ -94,7 +99,9 @@ every exposed service and port.`,

util.LogInfoHeader("cleaning up....")
for _, fw := range portForwards {
util.LogInfoListItem("closing port forward %s:%d", fw.Name, fw.Port)
for _, p := range fw.Ports {
util.LogInfoListItem("closing port forward %s:%d", fw.Name, p.LocalPort)
}
fw.Forwarder.Close()
}
},
Expand Down
16 changes: 13 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,25 @@ inside the cluster as described by the configuration file.`,
if !externalSvc.Enabled {
continue
}
var pc []kube.PortConfig
for _, port := range externalSvc.Ports {
fw, err := kube.PortForward(f, externalSvc.Namespace, externalSvc.Name, port.TargetPort, localIp, port.LocalPort)
pc = append(pc, kube.PortConfig{
LocalPort: port.LocalPort,
TargetPort: port.TargetPort,
})
}

if len(pc) > 0 {
fw, err := kube.PortForward(f, externalSvc.Namespace, externalSvc.Name, localIp, pc)
if err != nil {
util.LogError("error forwarding port for %s: %s", externalSvc.Name, err)
return
}
// ensure we close it
defer closePortForward(fw)
portForwards = append(portForwards, fw)
portForwardingAtLeastOne = true
}
portForwardingAtLeastOne = true
}

if !supplantingAtLeastOne && !portForwardingAtLeastOne {
Expand Down Expand Up @@ -273,7 +281,9 @@ func restoreService(cs *kubernetes.Clientset, sb *v1.Service) {
}

func closePortForward(fw kube.PortForwarder) {
util.LogInfoListItem("closing port forward %s:%d", fw.Name, fw.Port)
for _, p := range fw.Ports {
util.LogInfoListItem("closing port forward %s:%d", fw.Name, p.LocalPort)
}
fw.Forwarder.Close()
}

Expand Down
24 changes: 17 additions & 7 deletions kube/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,28 @@ import (
type PortForwarder struct {
Namespace string
Name string
Port int32
LocalPort int32
Ports []PortConfig
Forwarder *portforward.PortForwarder
}

type PortConfig struct {
LocalPort int32
TargetPort int32
}

// PortForward opens up a socket for the given local IP address and port and forwards it to the specified service and target port.
func PortForward(f cmdutil.Factory, namespace string, svcName string, targetPort int32, localIP net.IP, localPort int32) (PortForwarder, error) {
func PortForward(f cmdutil.Factory, namespace string, svcName string, localIP net.IP, ports []PortConfig) (PortForwarder, error) {
builder := f.NewBuilder().WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
ContinueOnError().NamespaceParam(namespace)
builder.ResourceNames("pods", fmt.Sprintf("service/%s", svcName))
obj, err := builder.Do().Object()
if err != nil {
return PortForwarder{}, err
}
if len(ports) == 0 {
util.LogError("no ports specified for forwarding")
return PortForwarder{}, fmt.Errorf("no ports specified for forwarding")
}

getPodTimeout := 10 * time.Second
forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
Expand Down Expand Up @@ -64,10 +72,13 @@ func PortForward(f cmdutil.Factory, namespace string, svcName string, targetPort
}

var strm genericclioptions.IOStreams
ports := []string{fmt.Sprintf("%d:%d", localPort, targetPort)}
var portList []string
for _, port := range ports {
portList = append(portList, fmt.Sprintf("%d:%d", port.LocalPort, port.TargetPort))
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
address := []string{localIP.String()}
fw, err := portforward.NewOnAddresses(dialer, address, ports, stop, ready, strm.Out, strm.ErrOut)
fw, err := portforward.NewOnAddresses(dialer, address, portList, stop, ready, strm.Out, strm.ErrOut)
if err != nil {
return PortForwarder{}, fmt.Errorf("error creating targetPort forward: %w", err)
}
Expand All @@ -82,8 +93,7 @@ func PortForward(f cmdutil.Factory, namespace string, svcName string, targetPort
return PortForwarder{
Namespace: namespace,
Name: svcName,
Port: targetPort,
LocalPort: localPort,
Ports: ports,
Forwarder: fw,
}, nil
}

0 comments on commit 5eea956

Please sign in to comment.