Skip to content

Commit

Permalink
feat: support running emulator on the same host with Omni
Browse files Browse the repository at this point in the history
Change etcd ports to not to clash with Omni ones.
Fix more places to bind the traffic to the right interface and address.

Signed-off-by: Artem Chernyshev <[email protected]>
  • Loading branch information
Unix4ever committed Jul 23, 2024
1 parent f76f7ae commit c223eaa
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 9 deletions.
1 change: 1 addition & 0 deletions hack/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
- state:/_out/state
container_name: talemu
restart: on-failure
network_mode: host
cap_add:
- NET_ADMIN
build:
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/kubefactory/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ func NewEmbeddedEtcd(ctx context.Context, path string, logger *zap.Logger) (*Etc
cfg.ExperimentalCompactHashCheckEnabled = true
cfg.ExperimentalInitialCorruptCheck = true

// run the emulator etcd on a different port to avoid clashing with Omni etcd
caddr, _ := url.Parse("http://localhost:2400") //nolint:errcheck
paddr, _ := url.Parse("http://localhost:2401") //nolint:errcheck

cfg.InitialCluster = fmt.Sprintf("default=%s", paddr.String())
cfg.ListenClientUrls = []url.URL{*caddr}
cfg.AdvertiseClientUrls = []url.URL{*caddr}

cfg.ListenPeerUrls = []url.URL{*paddr}
cfg.AdvertisePeerUrls = []url.URL{*paddr}

peerURL, err := url.Parse("http://localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to parse URL: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/machine/controllers/grpc_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
stdlibx509 "crypto/x509"
"fmt"
"net"
"net/netip"
"strings"
"time"

Expand Down Expand Up @@ -335,6 +336,7 @@ func (ctrl *GRPCTLSController) generateWorker(ctx context.Context, r controller.
remoteGen, err := gen.NewRemoteGenerator(func(ctx context.Context, addr string) (net.Conn, error) {
var dialer net.Dialer

dialer.LocalAddr = net.TCPAddrFromAddrPort(netip.AddrPortFrom(address.TypedSpec().Address.Addr(), 0))
dialer.Control = emunet.BindToInterface(address.TypedSpec().LinkName)

return dialer.DialContext(ctx, "tcp", addr)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/machine/controllers/log_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (ctrl *LogSinkController) Run(ctx context.Context, r controller.Runtime, _

if err = addresses.ForEachErr(func(r *network.AddressStatus) error {
if strings.HasPrefix(r.TypedSpec().LinkName, constants.SideroLinkName) {
return ctrl.LogSink.ConfigureInterface(ctx, r.TypedSpec().LinkName)
return ctrl.LogSink.ConfigureInterface(ctx, r)
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/machine/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/siderolabs/go-circular"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -56,8 +57,8 @@ func NewZapCore(endpoint string) (*ZapCore, error) {
}

// ConfigureInterface inits the sender.
func (core *ZapCore) ConfigureInterface(ctx context.Context, iface string) error {
core.sender.configure(iface)
func (core *ZapCore) ConfigureInterface(ctx context.Context, res *network.AddressStatus) error {
core.sender.configure(res.TypedSpec().LinkName, res.TypedSpec().Address)

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
Expand Down
17 changes: 11 additions & 6 deletions internal/pkg/machine/logging/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"net"
"net/netip"
"net/url"
"sync"
"time"
Expand All @@ -18,11 +19,12 @@ import (

// LogSender writes zap logs to the remote destination.
type LogSender struct {
endpoint *url.URL
conn net.Conn
sema chan struct{}
iface string
mu sync.Mutex
endpoint *url.URL
conn net.Conn
sema chan struct{}
iface string
localAddr netip.Prefix
mu sync.Mutex
}

// NewLogSender returns log sender that sends logs in JSON over TCP (newline-delimited)
Expand All @@ -45,11 +47,12 @@ func (j *LogSender) ready() bool {
return j.iface != ""
}

func (j *LogSender) configure(iface string) {
func (j *LogSender) configure(iface string, localAddr netip.Prefix) {
j.mu.Lock()
defer j.mu.Unlock()

j.iface = iface
j.localAddr = localAddr
}

func (j *LogSender) tryLock(ctx context.Context) (unlock func()) {
Expand Down Expand Up @@ -80,6 +83,7 @@ func (j *LogSender) marshalJSON(e *LogEvent) ([]byte, error) {
func (j *LogSender) Send(ctx context.Context, e *LogEvent) error {
j.mu.Lock()
iface := j.iface
localAddr := j.localAddr
j.mu.Unlock()

if iface == "" {
Expand All @@ -104,6 +108,7 @@ func (j *LogSender) Send(ctx context.Context, e *LogEvent) error {

var dialer net.Dialer

dialer.LocalAddr = net.TCPAddrFromAddrPort(netip.AddrPortFrom(localAddr.Addr(), 0))
dialer.Control = network.BindToInterface(iface)

// Connect (or "connect" for UDP) if no connection is established already.
Expand Down

0 comments on commit c223eaa

Please sign in to comment.