Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dash] add zmq_dpu_proxy_address_base parameter to telemetry.go #24

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gnmi_server/client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
if origin == "openconfig" {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{})
} else if IsNativeOrigin(origin) {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "", "")
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, gnmipb.Encoding_JSON_IETF, "", "", "")
} else if len(origin) != 0 {
return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin)
} else if target == "" {
Expand Down
7 changes: 4 additions & 3 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Config struct {
EnableTranslibWrite bool
EnableNativeWrite bool
ZmqPort string
DpuProxyBaseAddr string
IdleConnDuration int
ConfigTableName string
Vrf string
Expand Down Expand Up @@ -410,7 +411,7 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe
}
}
if check := IsNativeOrigin(origin); check {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr)
} else {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions)
}
Expand Down Expand Up @@ -508,7 +509,7 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
return nil, grpc.Errorf(codes.Unimplemented, "GNMI native write is disabled")
}
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, encoding, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr)
} else {
if s.config.EnableTranslibWrite == false {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
Expand Down Expand Up @@ -585,7 +586,7 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest
var supportedModels []gnmipb.ModelData
dc, _ := sdc.NewTranslClient(nil, nil, ctx, extensions)
supportedModels = append(supportedModels, dc.Capabilities()...)
dc, _ = sdc.NewMixedDbClient(nil, nil, "", gnmipb.Encoding_JSON_IETF, s.config.ZmqPort, s.config.Vrf)
dc, _ = sdc.NewMixedDbClient(nil, nil, "", gnmipb.Encoding_JSON_IETF, s.config.ZmqPort, s.config.Vrf, s.config.DpuProxyBaseAddr)
supportedModels = append(supportedModels, dc.Capabilities()...)

suppModels := make([]*gnmipb.ModelData, len(supportedModels))
Expand Down
6 changes: 3 additions & 3 deletions sonic_data_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,17 +793,17 @@ func TestGetZmqClient(t *testing.T) {
dpusTable.Hset("dpu0", "midplane_interface", "dpu0")
dhcpPortTable.Hset("bridge-midplane|dpu0", "ips@", "127.0.0.2,127.0.0.1")

client, err := getZmqClient("dpu0", "", "")
client, err := getZmqClient("dpu0", "", "", "")
if client != nil || err != nil {
t.Errorf("empty ZMQ port should not get ZMQ client")
}

client, err = getZmqClient("dpu0", "1234", "")
client, err = getZmqClient("dpu0", "1234", "", "")
if client == nil {
t.Errorf("get ZMQ client failed")
}

client, err = getZmqClient("", "1234", "")
client, err = getZmqClient("", "1234", "", "")
if client == nil {
t.Errorf("get ZMQ client failed")
}
Expand Down
46 changes: 40 additions & 6 deletions sonic_data_client/mixed_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,47 @@ func getDpuAddress(dpuId string) (string, error) {
return dpuAddressArray[0], nil
}

func getZmqAddress(container string, zmqPort string) (string, error) {
func getDpuProxyAddress(dpuId string, dpuProxyBaseAddr string) (string, error) {
dpuIndexStr := strings.TrimPrefix(dpuId, "dpu")
dpuIndex, err := strconv.Atoi(dpuIndexStr)
if err != nil {
return "", fmt.Errorf("Failed to parse DPU index from %s: %v", dpuId, err)
}

baseIp := net.ParseIP(dpuProxyBaseAddr)
if baseIp == nil {
return "", fmt.Errorf("Invalid DPU proxy base address: %s", dpuProxyBaseAddr)
}

baseIp = baseIp.To4()
if baseIp == nil {
return "", fmt.Errorf("Expecting an IPv4 address for DPU proxy: %s", dpuProxyBaseAddr)
}

lastOctet := int(baseIp[3]) + dpuIndex
if lastOctet > 255 {
return "", fmt.Errorf("DPU index is out of range")
}

baseIp[3] = byte(lastOctet)
return baseIp.String(), nil
}

func getZmqAddress(container string, zmqPort string, dpuProxyBaseAddr string) (string, error) {
// when zmqPort empty, ZMQ feature disabled
if zmqPort == "" {
return "", fmt.Errorf("ZMQ port is empty.")
}

var dpuAddress, err = getDpuAddress(container)
var dpuAddress string
var err error

if dpuProxyBaseAddr != "" {
dpuAddress, err = getDpuProxyAddress(container, dpuProxyBaseAddr)
} else {
dpuAddress, err = getDpuAddress(container)
}

if err != nil {
return "", fmt.Errorf("Get DPU address failed: %v", err)
}
Expand Down Expand Up @@ -181,7 +215,7 @@ func removeZmqClient(zmqClient swsscommon.ZmqClient) (error) {
return fmt.Errorf("Can't find ZMQ client in zmqClientMap: %v", zmqClient)
}

func getZmqClient(dpuId string, zmqPort string, vrf string) (swsscommon.ZmqClient, error) {
func getZmqClient(dpuId string, zmqPort string, vrf string, dpuProxyBaseAddr string) (swsscommon.ZmqClient, error) {
if zmqPort == "" {
// ZMQ feature disabled when zmqPort flag not set
return nil, nil
Expand All @@ -192,7 +226,7 @@ func getZmqClient(dpuId string, zmqPort string, vrf string) (swsscommon.ZmqClien
return getZmqClientByAddress("tcp://" + LOCAL_ADDRESS + ":" + zmqPort, vrf)
}

zmqAddress, err := getZmqAddress(dpuId, zmqPort)
zmqAddress, err := getZmqAddress(dpuId, zmqPort, dpuProxyBaseAddr)
if err != nil {
return nil, fmt.Errorf("Get ZMQ address failed: %v", err)
}
Expand Down Expand Up @@ -493,7 +527,7 @@ func init() {
initRedisDbMap()
}

func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, encoding gnmipb.Encoding, zmqPort string, vrf string) (Client, error) {
func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, encoding gnmipb.Encoding, zmqPort string, vrf string, dpuProxyBaseAddr string) (Client, error) {
var err error

// Initialize RedisDbMap for test
Expand Down Expand Up @@ -556,7 +590,7 @@ func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string,
client.workPath = common_utils.GNMI_WORK_PATH

// continer is DPU ID
client.zmqClient, err = getZmqClient(container, zmqPort, vrf)
client.zmqClient, err = getZmqClient(container, zmqPort, vrf, dpuProxyBaseAddr)
if err != nil {
return nil, fmt.Errorf("Get ZMQ client failed: %v", err)
}
Expand Down
3 changes: 3 additions & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type TelemetryConfig struct {
ConfigTableName *string
ZmqAddress *string
ZmqPort *string
DashProxyAddr *string
Insecure *bool
NoTLS *bool
AllowNoClientCert *bool
Expand Down Expand Up @@ -155,6 +156,7 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) {
ConfigTableName: fs.String("config_table_name", "", "Config table name"),
ZmqAddress: fs.String("zmq_address", "", "Orchagent ZMQ address, deprecated, please use zmq_port."),
ZmqPort: fs.String("zmq_port", "", "Orchagent ZMQ port, when not set or empty string telemetry server will switch to Redis based communication channel."),
DashProxyAddr: fs.String("zmq_dpu_proxy_address_base", "", "Dash offload manager ZMQ base address, when set, the DPU configuration will be send to the proxy address instead of directly to the DPU."),
Insecure: fs.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!"),
NoTLS: fs.Bool("noTLS", false, "disable TLS, for testing only!"),
AllowNoClientCert: fs.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate."),
Expand Down Expand Up @@ -242,6 +244,7 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) {
}

cfg.ZmqPort = zmqPort
cfg.DpuProxyBaseAddr = *telemetryCfg.DashProxyAddr

return telemetryCfg, cfg, nil
}
Expand Down
Loading