Skip to content

Commit

Permalink
feat: sff pixiu registry
Browse files Browse the repository at this point in the history
  • Loading branch information
ZLBer committed Jul 27, 2023
1 parent 95b463b commit e3a9ed0
Show file tree
Hide file tree
Showing 19 changed files with 444 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,7 @@ require (
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
istio.io/api v0.0.0-20221004225839-607aeaab2827
)

replace istio.io/api => github.com/dubbo-go-pixiu/operator-api v0.0.0-20230521024122-de7669e54430
63 changes: 63 additions & 0 deletions go.sum

Large diffs are not rendered by default.

124 changes: 124 additions & 0 deletions metadata/report/pixiu/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package pixiu

import (
"context"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/registry"
"encoding/json"
gxset "github.com/dubbogo/gost/container/set"
"google.golang.org/grpc"
"istio.io/api/dubbo/v1alpha1"
)

type pixiuMetadataReportFactory struct {
}

// CreateMetadataReport create a new metadata report
func (mf *pixiuMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {

conn, err := grpc.Dial(url.Location)
if err != nil {
panic(err)
}

snpClient := v1alpha1.NewServiceNameMappingServiceClient(conn)

metaClient := v1alpha1.NewServiceMetadataServiceClient(conn)

return &pixiuMetadataReport{snpClient: snpClient, metaClient: metaClient}
}

type pixiuMetadataReport struct {
snpClient v1alpha1.ServiceNameMappingServiceClient
metaClient v1alpha1.ServiceMetadataServiceClient
}

func (p pixiuMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
panic("implement me")
}

func (p pixiuMetadataReport) StoreConsumerMetadata(metadataIdentifier *identifier.MetadataIdentifier, s string) error {
panic("implement me")
}

func (p pixiuMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url *common.URL) error {
panic("implement me")
}

func (p pixiuMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
panic("implement me")
}

func (p pixiuMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
panic("implement me")
}

func (p pixiuMetadataReport) SaveSubscribedData(metadataIdentifier *identifier.SubscriberMetadataIdentifier, s string) error {
panic("implement me")
}

func (p pixiuMetadataReport) GetSubscribedURLs(metadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
panic("implement me")
}

func (p pixiuMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
panic("implement me")
}

func (p pixiuMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
response, err := p.metaClient.Get(context.Background(), &v1alpha1.GetServiceMetadataRequest{
Namespace: metadataIdentifier.Group,
ApplicationName: "",
Revision: metadataIdentifier.Revision,
})
if err != nil {
return nil, err
}
data := response.GetMetadataInfo()
var metadataInfo common.MetadataInfo
err = json.Unmarshal([]byte(data), &metadataInfo)
if err != nil {
return nil, err
}
return &metadataInfo, nil
}

func (p pixiuMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {

data, err := json.Marshal(info)
if err != nil {
return err
}
_, err = p.metaClient.Publish(context.Background(), &v1alpha1.PublishServiceMetadataRequest{
Namespace: metadataIdentifier.Group,
ApplicationName: metadataIdentifier.Application,
Revision: metadataIdentifier.Revision,
MetadataInfo: string(data),
})
if err != nil {
return err
}
return err
}

func (p pixiuMetadataReport) RegisterServiceAppMapping(serviceInterface string, group string, appName string) error {
_, err := p.snpClient.RegisterServiceAppMapping(context.Background(), &v1alpha1.ServiceMappingRequest{
Namespace: group,
ApplicationName: appName,
InterfaceNames: []string{serviceInterface},
})
if err != nil {
return err
}
return nil
}

func (p pixiuMetadataReport) GetServiceAppMapping(key string, group string, listener registry.MappingListener) (*gxset.HashSet, error) {
panic("implement me")
}

func (p pixiuMetadataReport) RemoveServiceAppMappingListener(key string, group string) error {
panic("implement me")
}
26 changes: 26 additions & 0 deletions remoting/xds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package xds

import (
"context"
"errors"
"istio.io/api/dubbo/v1alpha1"
"sync"
"time"
)
Expand Down Expand Up @@ -134,6 +136,8 @@ type WrappedClientImpl struct {
xdsSniffingTimeout stores xds sniffing timeout duration
*/
xdsSniffingTimeout time.Duration

snpClient v1alpha1.ServiceNameMappingServiceClient
}

func GetXDSWrappedClient() *WrappedClientImpl {
Expand Down Expand Up @@ -564,6 +568,25 @@ func (w *WrappedClientImpl) MatchRoute(routerConfig resource.RouteConfigUpdate,
return nil, errors.New("not found route")
}

func (w *WrappedClientImpl) RegisterSNP(serviceInterface string, group string, appName string) error {
_, err := w.snpClient.RegisterServiceAppMapping(context.Background(), &v1alpha1.ServiceMappingRequest{
Namespace: group,
ApplicationName: appName,
InterfaceNames: []string{serviceInterface},
})
if err != nil {
return err
}
return nil
}

func (w *WrappedClientImpl) GetSNP(key string, group string) {
w.xdsClient.WatchDubboServiceNameMapping("", func(update resource.DubboServiceNameMappingUpdate, err error) {

})

}

type XDSWrapperClient interface {
Subscribe(svcUniqueName, interfaceName, hostAddr string, lst registry.NotifyListener) error
UnSubscribe(svcUniqueName string)
Expand All @@ -575,4 +598,7 @@ type XDSWrapperClient interface {
GetHostAddress() xdsCommon.HostAddr
GetIstioPodIP() string
MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error)

RegisterSNP(serviceInterface string, group string, appName string) error
GetSNP(key string, group string)
}
35 changes: 35 additions & 0 deletions remoting/xds/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package xds

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"istio.io/api/dubbo/v1alpha1"
"testing"
)

func TestClient(t *testing.T) {

dial, err := grpc.Dial("127.0.0.1:15010", grpc.WithTransportCredentials(insecure.NewCredentials()))

fmt.Println(dial)
if err != nil {
fmt.Println(err)
return
}

client := v1alpha1.NewServiceNameMappingServiceClient(dial)

mapping, err := client.RegisterServiceAppMapping(context.Background(), &v1alpha1.ServiceMappingRequest{
Namespace: "defaultxx",
ApplicationName: "application",
InterfaceNames: []string{"a", "b", "c"},
})
if err != nil {
fmt.Println(err)
return
}
fmt.Println("res:", mapping)

}
2 changes: 2 additions & 0 deletions xds/client/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type XDSClient interface {
WatchRouteConfig(string, func(resource.RouteConfigUpdate, error)) func()
WatchCluster(string, func(resource.ClusterUpdate, error)) func()
WatchEndpoints(clusterName string, edsCb func(resource.EndpointsUpdate, error)) (cancel func())
WatchDubboServiceNameMapping(clusterName string, snpCb func(resource.DubboServiceNameMappingUpdate, error)) (cancel func())

ReportLoad(server string) (*load.Store, func())

DumpLDS() map[string]resource.UpdateWithMD
Expand Down
12 changes: 12 additions & 0 deletions xds/client/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ func (a *authority) watchEndpoints(clusterName string, cb func(resource.Endpoint
}
}

func (a *authority) watchDubboServiceNameMapping(clusterName string, cb func(resource.DubboServiceNameMappingUpdate, error)) (cancel func()) {
first, cancelF := a.pubsub.WatchDubboServiceNameMapping(clusterName, cb)
if first {
a.controller.AddWatch(resource.DubboServiceNameMappingType, clusterName)
}
return func() {
if cancelF() {
a.controller.RemoveWatch(resource.DubboServiceNameMappingType, clusterName)
}
}
}

func (a *authority) reportLoad(server string) (*load.Store, func()) {
return a.controller.ReportLoad(server)
}
Expand Down
4 changes: 4 additions & 0 deletions xds/client/controller/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func (t *Controller) handleResponse(resp proto.Message) (resource.ResourceType,
var update map[string]resource.EndpointsUpdateErrTuple
update, md, err = resource.UnmarshalEndpoints(opts)
t.updateHandler.NewEndpoints(update, md)
case resource.DubboServiceNameMappingType:
var update map[string]resource.DubboServiceNameMappingTypeErrTuple
update, md, err = resource.UnmarshalDubboServiceNameMapping(opts)
t.updateHandler.NewDubboServiceNameMapping(update, md)
default:
return rType, "", "", resourceversion.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", rType),
Expand Down
2 changes: 2 additions & 0 deletions xds/client/controller/version/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (v3c *client) ParseResponse(r proto.Message) (resource.ResourceType, []*any
rType = resource.ClusterResource
case resource.IsEndpointsResource(url):
rType = resource.EndpointsResource
case resource.IsDubboServiceNameMappingResource(url):
rType = resource.DubboServiceNameMappingType
default:
return rType, nil, "", "", controllerversion.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", resp.GetTypeUrl()),
Expand Down
3 changes: 3 additions & 0 deletions xds/client/pubsub/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ type UpdateHandler interface {
// NewConnectionError handles connection errors from the xDS stream. The
// error will be reported to all the resource watchers.
NewConnectionError(err error)

// NewDubboServiceNameMapping handles updates to dubbo service name mapping
NewDubboServiceNameMapping(map[string]resource.DubboServiceNameMappingTypeErrTuple, resource.UpdateMetadata)
}
21 changes: 21 additions & 0 deletions xds/client/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type Pubsub struct {
edsWatchers map[string]map[*watchInfo]bool
edsCache map[string]resource.EndpointsUpdate
edsMD map[string]resource.UpdateMetadata

snpWatchers map[string]map[*watchInfo]bool
snpCache map[string]resource.DubboServiceNameMappingUpdate
snpMD map[string]resource.UpdateMetadata
}

// New creates a new Pubsub.
Expand All @@ -90,6 +94,9 @@ func New(watchExpiryTimeout time.Duration, logger dubbogoLogger.Logger) *Pubsub
edsWatchers: make(map[string]map[*watchInfo]bool),
edsCache: make(map[string]resource.EndpointsUpdate),
edsMD: make(map[string]resource.UpdateMetadata),
snpWatchers: make(map[string]map[*watchInfo]bool),
snpCache: make(map[string]resource.DubboServiceNameMappingUpdate),
snpMD: make(map[string]resource.UpdateMetadata),
}
go pb.run()
return pb
Expand Down Expand Up @@ -163,6 +170,20 @@ func (pb *Pubsub) WatchEndpoints(clusterName string, cb func(resource.EndpointsU
return pb.watch(wi)
}

func (pb *Pubsub) WatchDubboServiceNameMapping(clusterName string, cb func(resource.DubboServiceNameMappingUpdate, error)) (first bool, cancel func() bool) {
wi := &watchInfo{
c: pb,
rType: resource.DubboServiceNameMappingType,
target: clusterName,
snpCallback: cb,
}

wi.expiryTimer = time.AfterFunc(pb.watchExpiryTimeout, func() {
wi.timeout()
})
return pb.watch(wi)
}

// Close closes the pubsub.
func (pb *Pubsub) Close() {
if pb.done.HasFired() {
Expand Down
45 changes: 45 additions & 0 deletions xds/client/pubsub/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,48 @@ func (pb *Pubsub) NewConnectionError(err error) {
}
}
}

func (pb *Pubsub) NewDubboServiceNameMapping(updates map[string]resource.DubboServiceNameMappingTypeErrTuple, metadata resource.UpdateMetadata) {
pb.mu.Lock()
defer pb.mu.Unlock()

for name, uErr := range updates {
if s, ok := pb.snpWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := pb.snpMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
pb.snpMD[name] = mdCopy
for wi := range s {
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := pb.snpCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
pb.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
pb.snpCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = resource.ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
pb.snpMD[name] = mdCopy
}
}
}
Loading

0 comments on commit e3a9ed0

Please sign in to comment.