Skip to content

Commit

Permalink
feat: pixiu registry
Browse files Browse the repository at this point in the history
  • Loading branch information
ZLBer committed Jul 30, 2023
1 parent 95b463b commit 0dfe27a
Show file tree
Hide file tree
Showing 19 changed files with 513 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,7 @@ require (
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
istio.io/api v0.0.0-20221004225839-607aeaab2827
)

replace istio.io/api => github.com/dubbo-go-pixiu/operator-api v0.0.0-20230521024122-de7669e54430
63 changes: 63 additions & 0 deletions go.sum

Large diffs are not rendered by default.

124 changes: 124 additions & 0 deletions metadata/report/pixiu/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package pixiu

import (
"context"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/registry"
"encoding/json"
gxset "github.com/dubbogo/gost/container/set"
"google.golang.org/grpc"
"istio.io/api/dubbo/v1alpha1"
)

type pixiuMetadataReportFactory struct {
}

// CreateMetadataReport create a new metadata report
func (mf *pixiuMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {

conn, err := grpc.Dial(url.Location)
if err != nil {
panic(err)
}

snpClient := v1alpha1.NewServiceNameMappingServiceClient(conn)

metaClient := v1alpha1.NewServiceMetadataServiceClient(conn)

return &pixiuMetadataReport{snpClient: snpClient, metaClient: metaClient}
}

type pixiuMetadataReport struct {
snpClient v1alpha1.ServiceNameMappingServiceClient
metaClient v1alpha1.ServiceMetadataServiceClient
}

func (p pixiuMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
panic("implement me")
}

func (p pixiuMetadataReport) StoreConsumerMetadata(metadataIdentifier *identifier.MetadataIdentifier, s string) error {
panic("implement me")
}

func (p pixiuMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url *common.URL) error {
panic("implement me")
}

func (p pixiuMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
panic("implement me")
}

func (p pixiuMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
panic("implement me")
}

func (p pixiuMetadataReport) SaveSubscribedData(metadataIdentifier *identifier.SubscriberMetadataIdentifier, s string) error {
panic("implement me")
}

func (p pixiuMetadataReport) GetSubscribedURLs(metadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
panic("implement me")
}

func (p pixiuMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
panic("implement me")
}

func (p pixiuMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
response, err := p.metaClient.Get(context.Background(), &v1alpha1.GetServiceMetadataRequest{
Namespace: metadataIdentifier.Group,
ApplicationName: "",
Revision: metadataIdentifier.Revision,
})
if err != nil {
return nil, err
}
data := response.GetMetadataInfo()
var metadataInfo common.MetadataInfo
err = json.Unmarshal([]byte(data), &metadataInfo)
if err != nil {
return nil, err
}
return &metadataInfo, nil
}

func (p pixiuMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {

data, err := json.Marshal(info)
if err != nil {
return err
}
_, err = p.metaClient.Publish(context.Background(), &v1alpha1.PublishServiceMetadataRequest{
Namespace: metadataIdentifier.Group,
ApplicationName: metadataIdentifier.Application,
Revision: metadataIdentifier.Revision,
MetadataInfo: string(data),
})
if err != nil {
return err
}
return err
}

func (p pixiuMetadataReport) RegisterServiceAppMapping(serviceInterface string, group string, appName string) error {
_, err := p.snpClient.RegisterServiceAppMapping(context.Background(), &v1alpha1.ServiceMappingRequest{
Namespace: group,
ApplicationName: appName,
InterfaceNames: []string{serviceInterface},
})
if err != nil {
return err
}
return nil
}

func (p pixiuMetadataReport) GetServiceAppMapping(key string, group string, listener registry.MappingListener) (*gxset.HashSet, error) {
panic("implement me")
}

func (p pixiuMetadataReport) RemoveServiceAppMappingListener(key string, group string) error {
panic("implement me")
}
26 changes: 26 additions & 0 deletions remoting/xds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package xds

import (
"context"
"errors"
"istio.io/api/dubbo/v1alpha1"
"sync"
"time"
)
Expand Down Expand Up @@ -134,6 +136,8 @@ type WrappedClientImpl struct {
xdsSniffingTimeout stores xds sniffing timeout duration
*/
xdsSniffingTimeout time.Duration

snpClient v1alpha1.ServiceNameMappingServiceClient
}

func GetXDSWrappedClient() *WrappedClientImpl {
Expand Down Expand Up @@ -564,6 +568,25 @@ func (w *WrappedClientImpl) MatchRoute(routerConfig resource.RouteConfigUpdate,
return nil, errors.New("not found route")
}

func (w *WrappedClientImpl) RegisterSNP(serviceInterface string, group string, appName string) error {
_, err := w.snpClient.RegisterServiceAppMapping(context.Background(), &v1alpha1.ServiceMappingRequest{
Namespace: group,
ApplicationName: appName,
InterfaceNames: []string{serviceInterface},
})
if err != nil {
return err
}
return nil
}

func (w *WrappedClientImpl) GetSNP(key string, group string) {
w.xdsClient.WatchDubboServiceNameMapping("", func(update resource.DubboServiceNameMappingUpdate, err error) {

})

}

type XDSWrapperClient interface {
Subscribe(svcUniqueName, interfaceName, hostAddr string, lst registry.NotifyListener) error
UnSubscribe(svcUniqueName string)
Expand All @@ -575,4 +598,7 @@ type XDSWrapperClient interface {
GetHostAddress() xdsCommon.HostAddr
GetIstioPodIP() string
MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error)

RegisterSNP(serviceInterface string, group string, appName string) error
GetSNP(key string, group string)
}
104 changes: 104 additions & 0 deletions remoting/xds/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package xds

import (
"context"
"fmt"
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
_struct "github.com/golang/protobuf/ptypes/struct"
"time"

envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
"istio.io/api/dubbo/v1alpha1"
"log"
"testing"
)

const (
xdsServerAddress = "localhost:15010" // 将此地址替换为实际的xDS服务器地址
)

func sendXdsRequest(stream envoy_service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesClient, resourceType, resourceName string, node *envoy_config_core_v3.Node) error {
req := &envoy_service_discovery_v3.DiscoveryRequest{
TypeUrl: resourceType,
ResourceNames: []string{resourceName},
ResponseNonce: time.Now().String(),
Node: node,
}

return stream.Send(req)
}
func TestXDS(t *testing.T) {
conn, err := grpc.Dial(xdsServerAddress, grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect to xDS server: %v", err)
}
defer conn.Close()

adsClient := envoy_service_discovery_v3.NewAggregatedDiscoveryServiceClient(conn)
stream, err := adsClient.StreamAggregatedResources(context.Background())
if err != nil {
log.Fatalf("Failed to open ADS stream: %v", err)
}
//sidecar~ip~{POD_NAME}~{NAMESPACE_NAME}.svc.cluster.local
node := &envoy_config_core_v3.Node{
Id: "sidecar~127.0.0.1~xds_client~default.svc.cluster.local",
Cluster: "default",
Metadata: &_struct.Struct{
Fields: map[string]*structpb.Value{
"env": {
Kind: &structpb.Value_StringValue{
StringValue: "test",
},
},
},
},
}

//发送请求
err = sendXdsRequest(stream, "dubbo.networking.v1alpha1.v1.servicenamemapping", "a|default", node)
if err != nil {
log.Fatalf("Failed to send Listener request: %v", err)
}

//err = sendXdsRequest(stream, resource.ClusterType, "cluster", node)
//if err != nil {
// log.Fatalf("Failed to send Cluster request: %v", err)
//}

// 接收响应
for {
resp, err := stream.Recv()
if err != nil {
log.Fatalf("Failed to receive xDS response: %v", err)
}
log.Printf("Received xDS response: %v", resp)
}
}

func TestClient(t *testing.T) {

dial, err := grpc.Dial("127.0.0.1:15010", grpc.WithTransportCredentials(insecure.NewCredentials()))

fmt.Println(dial)
if err != nil {
fmt.Println(err)
return
}

client := v1alpha1.NewServiceNameMappingServiceClient(dial)

mapping, err := client.RegisterServiceAppMapping(context.Background(), &v1alpha1.ServiceMappingRequest{
Namespace: "default",
ApplicationName: "application-05",
InterfaceNames: []string{"a"},
})
if err != nil {
fmt.Println(err)
return
}
fmt.Println("res:", mapping)

}
2 changes: 2 additions & 0 deletions xds/client/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type XDSClient interface {
WatchRouteConfig(string, func(resource.RouteConfigUpdate, error)) func()
WatchCluster(string, func(resource.ClusterUpdate, error)) func()
WatchEndpoints(clusterName string, edsCb func(resource.EndpointsUpdate, error)) (cancel func())
WatchDubboServiceNameMapping(clusterName string, snpCb func(resource.DubboServiceNameMappingUpdate, error)) (cancel func())

ReportLoad(server string) (*load.Store, func())

DumpLDS() map[string]resource.UpdateWithMD
Expand Down
12 changes: 12 additions & 0 deletions xds/client/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ func (a *authority) watchEndpoints(clusterName string, cb func(resource.Endpoint
}
}

func (a *authority) watchDubboServiceNameMapping(clusterName string, cb func(resource.DubboServiceNameMappingUpdate, error)) (cancel func()) {
first, cancelF := a.pubsub.WatchDubboServiceNameMapping(clusterName, cb)
if first {
a.controller.AddWatch(resource.DubboServiceNameMappingType, clusterName)
}
return func() {
if cancelF() {
a.controller.RemoveWatch(resource.DubboServiceNameMappingType, clusterName)
}
}
}

func (a *authority) reportLoad(server string) (*load.Store, func()) {
return a.controller.ReportLoad(server)
}
Expand Down
4 changes: 4 additions & 0 deletions xds/client/controller/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func (t *Controller) handleResponse(resp proto.Message) (resource.ResourceType,
var update map[string]resource.EndpointsUpdateErrTuple
update, md, err = resource.UnmarshalEndpoints(opts)
t.updateHandler.NewEndpoints(update, md)
case resource.DubboServiceNameMappingType:
var update map[string]resource.DubboServiceNameMappingTypeErrTuple
update, md, err = resource.UnmarshalDubboServiceNameMapping(opts)
t.updateHandler.NewDubboServiceNameMapping(update, md)
default:
return rType, "", "", resourceversion.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", rType),
Expand Down
2 changes: 2 additions & 0 deletions xds/client/controller/version/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (v3c *client) ParseResponse(r proto.Message) (resource.ResourceType, []*any
rType = resource.ClusterResource
case resource.IsEndpointsResource(url):
rType = resource.EndpointsResource
case resource.IsDubboServiceNameMappingResource(url):
rType = resource.DubboServiceNameMappingType
default:
return rType, nil, "", "", controllerversion.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()),
Expand Down
3 changes: 3 additions & 0 deletions xds/client/pubsub/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ type UpdateHandler interface {
// NewConnectionError handles connection errors from the xDS stream. The
// error will be reported to all the resource watchers.
NewConnectionError(err error)

// NewDubboServiceNameMapping handles updates to dubbo service name mapping
NewDubboServiceNameMapping(map[string]resource.DubboServiceNameMappingTypeErrTuple, resource.UpdateMetadata)
}
21 changes: 21 additions & 0 deletions xds/client/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type Pubsub struct {
edsWatchers map[string]map[*watchInfo]bool
edsCache map[string]resource.EndpointsUpdate
edsMD map[string]resource.UpdateMetadata

snpWatchers map[string]map[*watchInfo]bool
snpCache map[string]resource.DubboServiceNameMappingUpdate
snpMD map[string]resource.UpdateMetadata
}

// New creates a new Pubsub.
Expand All @@ -90,6 +94,9 @@ func New(watchExpiryTimeout time.Duration, logger dubbogoLogger.Logger) *Pubsub
edsWatchers: make(map[string]map[*watchInfo]bool),
edsCache: make(map[string]resource.EndpointsUpdate),
edsMD: make(map[string]resource.UpdateMetadata),
snpWatchers: make(map[string]map[*watchInfo]bool),
snpCache: make(map[string]resource.DubboServiceNameMappingUpdate),
snpMD: make(map[string]resource.UpdateMetadata),
}
go pb.run()
return pb
Expand Down Expand Up @@ -163,6 +170,20 @@ func (pb *Pubsub) WatchEndpoints(clusterName string, cb func(resource.EndpointsU
return pb.watch(wi)
}

func (pb *Pubsub) WatchDubboServiceNameMapping(clusterName string, cb func(resource.DubboServiceNameMappingUpdate, error)) (first bool, cancel func() bool) {
wi := &watchInfo{
c: pb,
rType: resource.DubboServiceNameMappingType,
target: clusterName,
snpCallback: cb,
}

wi.expiryTimer = time.AfterFunc(pb.watchExpiryTimeout, func() {
wi.timeout()
})
return pb.watch(wi)
}

// Close closes the pubsub.
func (pb *Pubsub) Close() {
if pb.done.HasFired() {
Expand Down
Loading

0 comments on commit 0dfe27a

Please sign in to comment.