Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: choose cluster dynamically from Go #65

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ envoy_cc_binary(
deps = [
"//src/envoy/bootstrap/dso:config",
"//src/envoy/http/golang:config",
"//src/envoy/http/cluster:config",
"@envoy//source/exe:envoy_main_entry_lib",
],
)
10 changes: 10 additions & 0 deletions api/http/cluster/v3/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"@com_github_cncf_udpa//udpa/annotations:pkg",
"@com_github_cncf_udpa//xds/annotations/v3:pkg",
],
)
28 changes: 28 additions & 0 deletions api/http/cluster/v3/cluster.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto3";

package envoy.extensions.clusters.golang.v3;

import "google/protobuf/any.proto";
import "google/protobuf/struct.proto";

import "xds/annotations/v3/status.proto";

import "udpa/annotations/status.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.extensions.clusters.golang.v3";
option java_outer_classname = "GolangProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/golang/v3;golangv3";
option (udpa.annotations.file_status).package_version_status = ACTIVE;
option (xds.annotations.v3.file_status).work_in_progress = true;

message Config {
// A unique ID for a set of go plugin library.
string so_id = 1 [(validate.rules).string = {min_bytes: 1}];

string default_cluster = 2 [(validate.rules).string = {min_bytes: 1}];

// this configuration is only parsed in the go side.
google.protobuf.Any config = 3;
}
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"os"

"mosn.io/envoy-go-extension/samples/clusters"
"mosn.io/mosn/pkg/streamfilter"

_ "mosn.io/envoy-go-extension/pkg/filter/stream/echo"
Expand All @@ -31,6 +32,7 @@ var DefaultMosnConfigPath string = "/home/admin/mosn/config/mosn.json"

func init() {
http.RegisterHttpFilterConfigFactory(mosn.ConfigFactory)
http.RegisterClusterSpecifierFactory(clusters.ClusterSpecifierFactory)

// load mosn config
mosnConfigPath := DefaultMosnConfigPath
Expand Down
14 changes: 14 additions & 0 deletions pkg/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,17 @@ type FilterCallbacks interface {
type FilterCallbackHandler interface {
FilterCallbacks
}

/* cluster specifier plugin */

type ClusterConfigParser interface {
Parse(any *anypb.Any) interface{}
}

type ClusterSpecifier interface {
// TODO: support header
// Choose(RequestHeaderMap) string
Choose() string
}

type ClusterSpecifierFactory func(config interface{}) ClusterSpecifier
28 changes: 28 additions & 0 deletions pkg/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,31 @@ func moeMergeHttpPluginConfig(parentId uint64, childId uint64) uint64 {
return childId
}
}

/* cluster specifier plugin */

var (
clusterConfigNumGenerator uint64
clusterConfigCache = &sync.Map{} // uint64 -> any
)

//export moeNewClusterConfig
func moeNewClusterConfig(configPtr uint64, configLen uint64) uint64 {
buf := utils.BytesToSlice(configPtr, configLen)
var any anypb.Any
proto.Unmarshal(buf, &any)

configNum := atomic.AddUint64(&clusterConfigNumGenerator, 1)
if clusterConfigParser != nil {
clusterConfigCache.Store(configNum, clusterConfigParser.Parse(&any))
} else {
clusterConfigCache.Store(configNum, &any)
}

return configNum
}

//export moeDestroyClusterConfig
func moeDestroyClusterConfig(id uint64) {
clusterConfigCache.Delete(id)
}
22 changes: 22 additions & 0 deletions pkg/http/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,25 @@ func getOrCreateHttpFilterFactory(configId uint64) api.HttpFilterFactory {
func RegisterStreamingHttpFilterConfigFactory(f api.HttpFilterConfigFactory) {
httpFilterConfigFactory = f
}

/* cluster specifier plugin */

// no cluster config parser by default
var clusterConfigParser api.ClusterConfigParser = nil
var clusterSpecifierFactory api.ClusterSpecifierFactory

func RegisterClusterConfigParser(parser api.ClusterConfigParser) {
clusterConfigParser = parser
}

func RegisterClusterSpecifierFactory(f api.ClusterSpecifierFactory) {
clusterSpecifierFactory = f
}

func getOrCreateClusterSpecifier(configId uint64) api.ClusterSpecifier {
config, ok := configCache.Load(configId)
if !ok {
// TODO: panic
}
return clusterSpecifierFactory(config)
}
19 changes: 19 additions & 0 deletions pkg/http/moe.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sync"

"mosn.io/envoy-go-extension/pkg/api"
"mosn.io/envoy-go-extension/pkg/utils"
)

var ErrDupRequestKey = errors.New("dup request key")
Expand Down Expand Up @@ -225,3 +226,21 @@ func moeOnHttpSemaCallback(r *C.httpRequest) {
defer req.RecoverPanic()
req.sema.Done()
}

//export moeOnClusterSpecify
func moeOnClusterSpecify(headerPtr uint64, configId uint64, bufferPtr uint64, bufferLen uint64) int64 {
specifier := getOrCreateClusterSpecifier(configId)
cluster := specifier.Choose()
l := uint64(len(cluster))
if l == 0 {
// means use the default cluster
return 0
}
if l > bufferLen {
// buffer length is not large enough.
return int64(l)
}
buffer := utils.BufferToSlice(bufferPtr, bufferLen)
copy(buffer, cluster)
return int64(l)
}
10 changes: 10 additions & 0 deletions pkg/utils/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ func BytesToSlice(ptr uint64, len uint64) []byte {
sHdr.Cap = int(len)
return s
}

// BufferToSlice convert the memory buffer from C to a empty slice with reserved len.
func BufferToSlice(ptr uint64, len uint64) []byte {
var s []byte
var sHdr = (*reflect.SliceHeader)(unsafe.Pointer(&s))
sHdr.Data = uintptr(ptr)
sHdr.Len = 0
sHdr.Cap = int(len)
return s
}
16 changes: 16 additions & 0 deletions samples/clusters/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package clusters

import "mosn.io/envoy-go-extension/pkg/api"

type specifier struct {
config interface{}
}

func ClusterSpecifierFactory(config interface{}) api.ClusterSpecifier {
return &specifier{config: config}
}

func (s *specifier) Choose() string {
// use the default cluster
return ""
}
31 changes: 31 additions & 0 deletions src/envoy/common/dso/dso.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ DsoInstance::DsoInstance(const std::string dsoName) : dsoName_(dsoName) {
ENVOY_LOG_MISC(error, "lib: {}, cannot find symbol: moeOnHttpDecodeDestroy, err: {}", dsoName,
dlerror());
}

func = dlsym(handler_, "moeNewClusterConfig");
if (func) {
moeNewClusterConfig_ = reinterpret_cast<GoUint64 (*)(GoUint64 p0, GoUint64 p1)>(func);
} else {
loaded_ = false;
ENVOY_LOG_MISC(error, "lib: {}, cannot find symbol: moeNewClusterConfig, err: {}", dsoName,
dlerror());
}

func = dlsym(handler_, "moeOnClusterSpecify");
if (func) {
moeOnClusterSpecify_ =
reinterpret_cast<GoInt64 (*)(GoUint64 p0, GoUint64 p1, GoUint64 p2, GoUint64 p3)>(func);
} else {
loaded_ = false;
ENVOY_LOG_MISC(error, "lib: {}, cannot find symbol: moeOnClusterSpecify, err: {}", dsoName,
dlerror());
}
}

DsoInstance::~DsoInstance() {
Expand All @@ -127,6 +146,8 @@ DsoInstance::~DsoInstance() {
moeOnHttpData_ = nullptr;
moeOnHttpSemaCallback_ = nullptr;
moeOnHttpDestroy_ = nullptr;
moeNewClusterConfig_ = nullptr;
moeOnClusterSpecify_ = nullptr;

if (handler_ != nullptr) {
dlclose(handler_);
Expand Down Expand Up @@ -166,5 +187,15 @@ void DsoInstance::moeOnHttpDestroy(httpRequest* p0, int p1) {
moeOnHttpDestroy_(p0, GoUint64(p1));
}

GoUint64 DsoInstance::moeNewClusterConfig(GoUint64 p0, GoUint64 p1) {
assert(moeNewClusterConfig_ != nullptr);
return moeNewClusterConfig_(p0, p1);
}

GoInt64 DsoInstance::moeOnClusterSpecify(GoUint64 p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) {
assert(moeOnClusterSpecify_ != nullptr);
return moeOnClusterSpecify_(p0, p1, p2, p3);
}

} // namespace Dso
} // namespace Envoy
6 changes: 6 additions & 0 deletions src/envoy/common/dso/dso.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class DsoInstance {

void moeOnHttpDestroy(httpRequest* p0, int p1);

GoUint64 moeNewClusterConfig(GoUint64 p0, GoUint64 p1);
GoInt64 moeOnClusterSpecify(GoUint64 p0, GoUint64 p1, GoUint64 p2, GoUint64 p3);

bool loaded() { return loaded_; }

private:
Expand All @@ -41,6 +44,9 @@ class DsoInstance {

void (*moeOnHttpSemaCallback_)(httpRequest* p0) = {nullptr};

GoUint64 (*moeNewClusterConfig_)(GoUint64 p0, GoUint64 p1) = {nullptr};
GoInt64 (*moeOnClusterSpecify_)(GoUint64 p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) = {nullptr};

void (*moeOnHttpDestroy_)(httpRequest* p0, GoUint64 p1) = {nullptr};
};

Expand Down
3 changes: 3 additions & 0 deletions src/envoy/common/dso/libgolang.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ extern GoUint64 moeOnHttpHeader(httpRequest* r, GoUint64 endStream, GoUint64 hea
extern GoUint64 moeOnHttpData(httpRequest* r, GoUint64 endStream, GoUint64 buffer, GoUint64 length);
extern void moeOnHttpDestroy(httpRequest* r, GoUint64 reason);
extern void moeOnHttpSemaCallback(httpRequest* r);
extern GoInt64 moeOnClusterSpecify(GoUint64 headerPtr, GoUint64 configId, GoUint64 bufferPtr, GoUint64 bufferLen);
extern GoUint64 moeNewHttpPluginConfig(GoUint64 configPtr, GoUint64 configLen);
extern void moeDestroyHttpPluginConfig(GoUint64 id);
extern GoUint64 moeMergeHttpPluginConfig(GoUint64 parentId, GoUint64 childId);
extern GoUint64 moeNewClusterConfig(GoUint64 configPtr, GoUint64 configLen);
extern void moeDestroyClusterConfig(GoUint64 id);

#ifdef __cplusplus
}
Expand Down
46 changes: 46 additions & 0 deletions src/envoy/http/cluster/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package(default_visibility = ["//visibility:public"])

load(
"@envoy//bazel:envoy_build_system.bzl",
"envoy_cc_library",
)

licenses(["notice"]) # Apache 2

envoy_cc_library(
name = "golang_cluster_lib",
srcs = [
"golang_cluster.cc",
],
hdrs = [
"golang_cluster.h",
],
repository = "@envoy",
deps = [
"@envoy//source/common/common:enum_to_int",
"@envoy//source/common/common:utility_lib",
"@envoy//source/common/grpc:context_lib",
"@envoy//source/common/http:headers_lib",
"@envoy//source/common/http:utility_lib",
"@envoy//source/common/common:linked_object",
"@envoy//source/common/router:config_lib",
"//src/envoy/common/dso:dso_lib",
"//api/http/cluster/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
repository = "@envoy",
deps = [
":golang_cluster_lib",
"@envoy//envoy/registry",
"@envoy//envoy/server:filter_config_interface",
"@envoy//envoy/router:cluster_specifier_plugin_interface",
"@envoy//source/extensions/filters/http/common:factory_base_lib",
"//src/envoy/common/dso:dso_lib",
"//api/http/cluster/v3:pkg_cc_proto",
],
)
24 changes: 24 additions & 0 deletions src/envoy/http/cluster/config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include <chrono>

#include "envoy/router/cluster_specifier_plugin.h"

#include "src/envoy/http/cluster/config.h"

namespace Envoy {
namespace Router {
namespace Golang {

ClusterSpecifierPluginSharedPtr
GolangClusterSpecifierPluginFactoryConfig::createClusterSpecifierPlugin(
const Protobuf::Message& config, Server::Configuration::CommonFactoryContext&) {
const auto& typed_config = dynamic_cast<const GolangClusterProto&>(config);
auto cluster_config = std::make_shared<ClusterConfig>(typed_config);
return std::make_shared<GolangClusterSpecifierPlugin>(cluster_config);
}

REGISTER_FACTORY(GolangClusterSpecifierPluginFactoryConfig,
ClusterSpecifierPluginFactoryConfig){"envoy.golang"};

} // namespace Golang
} // namespace Router
} // namespace Envoy
27 changes: 27 additions & 0 deletions src/envoy/http/cluster/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "source/extensions/filters/http/common/factory_base.h"

#include "src/envoy/http/cluster/golang_cluster.h"

namespace Envoy {
namespace Router {
namespace Golang {

class GolangClusterSpecifierPluginFactoryConfig : public ClusterSpecifierPluginFactoryConfig {
public:
GolangClusterSpecifierPluginFactoryConfig() = default;
ClusterSpecifierPluginSharedPtr
createClusterSpecifierPlugin(const Protobuf::Message& config,
Server::Configuration::CommonFactoryContext&) override;

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return std::make_unique<GolangClusterProto>();
}

std::string name() const override { return "envoy.router.cluster_specifier_plugin.golang"; }
};

} // namespace Golang
} // namespace Router
} // namespace Envoy
Loading