Skip to content

Commit

Permalink
feat: support tunnel conn (mosn#1640)
Browse files Browse the repository at this point in the history
* feat: add tunnel filter (https://mosn.io/blog/posts/mosn-tunnel/)
  • Loading branch information
CodingSinger authored Jun 8, 2022
1 parent 448da0e commit d12ff6b
Show file tree
Hide file tree
Showing 36 changed files with 2,775 additions and 543 deletions.
1 change: 1 addition & 0 deletions cmd/mosn/main/mosn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
_ "mosn.io/mosn/pkg/filter/network/grpc"
_ "mosn.io/mosn/pkg/filter/network/proxy"
_ "mosn.io/mosn/pkg/filter/network/streamproxy"
_ "mosn.io/mosn/pkg/filter/network/tunnel"
_ "mosn.io/mosn/pkg/filter/stream/dsl"
_ "mosn.io/mosn/pkg/filter/stream/dubbo"
_ "mosn.io/mosn/pkg/filter/stream/faultinject"
Expand Down
80 changes: 80 additions & 0 deletions examples/codes/tunnel-sample/agent_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"close_graceful": true,
"servers": [
{
"default_log_path": "stdout",
"default_log_level": "DEBUG",
"routers":[
{
"router_config_name":"server_router",
"virtual_hosts":[{
"name":"serverHost",
"domains": ["*"],
"routers": [
{
"match":{"headers":[{"name":"service","value":".*"}]},
"route":{"cluster_name":"serverCluster"}
}
]
}]
}
],
"listeners":[
{
"name":"serverListener",
"address": "127.0.0.1:2046",
"bind_port": true,
"filter_chains": [{
"filters": [
{
"type": "proxy",
"config": {
"downstream_protocol": "X",
"upstream_protocol": "X",
"extend_config": {
"sub_protocol": "bolt"
},
"router_config_name":"server_router"
}
}
]
}]
}
]
}
],
"extends": [
{
"type": "tunnel_agent",
"config": {
"hosting_listener": "serverListener",
"server_list": ["127.0.0.1:9999"],
"cluster": "clientCluster",
"enable": true,
"connection_num": 1,
"reconnect_base_duration": "8s",
"tls_context": {
"status": true,
"ca_cert": "../certs/ca.pem",
"server_name": "127.0.0.1"
}
}
}
],
"cluster_manager": {
"clusters": [
{
"Name": "serverCluster",
"type": "SIMPLE",
"lb_type": "LB_RANDOM",
"max_request_per_conn": 1024,
"conn_buffer_limit_bytes": 32768,
"hosts": [
{
"address": "127.0.0.1:8080"
}
]
}
]
}
}
12 changes: 12 additions & 0 deletions examples/codes/tunnel-sample/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
function make_build {
mkdir ./build
cp ../../../cmd/mosn/main/* ./build/
cp credential.go ./build/
cd ./build
go build -o mosn
mv mosn ../
cd ../
rm -rf ./build
}

make_build
107 changes: 107 additions & 0 deletions examples/codes/tunnel-sample/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package main

import (
"context"
"flag"
"fmt"
"net"
"sync"
"time"

"mosn.io/api"
"mosn.io/mosn/pkg/log"
"mosn.io/mosn/pkg/network"
"mosn.io/mosn/pkg/protocol"
"mosn.io/mosn/pkg/protocol/xprotocol"
"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
"mosn.io/mosn/pkg/stream"
_ "mosn.io/mosn/pkg/stream/xprotocol"
xstream "mosn.io/mosn/pkg/stream/xprotocol"
"mosn.io/mosn/pkg/trace"
xtrace "mosn.io/mosn/pkg/trace/sofa/xprotocol"
"mosn.io/mosn/pkg/types"
)

type Client struct {
proto types.ProtocolName
Client stream.Client
conn types.ClientConnection
Id uint64
respWaiter sync.WaitGroup
t bool
}

func NewClient(addr string, proto types.ProtocolName, t bool) *Client {
c := &Client{}
stopChan := make(chan struct{})
remoteAddr, _ := net.ResolveTCPAddr("tcp", addr)
conn := network.NewClientConnection(0, nil, remoteAddr, stopChan)
if err := conn.Connect(); err != nil {
fmt.Println(err)
return nil
}
// pass sub protocol to stream client
c.Client = stream.NewStreamClient(context.Background(), proto, conn, nil)
c.conn = conn
c.proto = proto
c.t = t
return c
}

func (c *Client) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) {
fmt.Printf("[Xprotocol RPC Client] Receive Data:")
if cmd, ok := headers.(api.XFrame); ok {
streamID := protocol.StreamIDConv(cmd.GetRequestId())

if resp, ok := cmd.(api.XRespFrame); ok {
fmt.Println("stream:", streamID, " status:", resp.GetStatusCode())
if !c.t {
c.respWaiter.Done()
}
}
}
}

func (c *Client) OnDecodeError(context context.Context, err error, headers types.HeaderMap) {}

func (c *Client) Request() {
c.Id++
requestEncoder := c.Client.NewStream(context.Background(), c)

var request api.XFrame
switch c.proto {
case bolt.ProtocolName:
request = bolt.NewRpcRequest(uint32(c.Id), protocol.CommonHeader(map[string]string{"service": "testSofa"}), nil)
default:
panic("unknown protocol, please complete the protocol-switch in Client.Request method")
}

requestEncoder.AppendHeaders(context.Background(), request.GetHeader(), true)
}

func main() {
log.InitDefaultLogger("", log.DEBUG)
t := flag.Bool("t", false, "-t")
flag.Parse()
// xprotocol action register
xprotocol.ResgisterXProtocolAction(xstream.NewConnPool, xstream.NewStreamFactory, func(codec api.XProtocolCodec) {
name := codec.ProtocolName()
trace.RegisterTracerBuilder("SOFATracer", name, xtrace.NewTracer)
})
// xprotocol register
_ = xprotocol.RegisterXProtocolCodec(&bolt.XCodec{})
// use bolt as example
if client := NewClient("127.0.0.1:2045", bolt.ProtocolName, *t); client != nil {
for {
if !*t {
client.respWaiter.Add(1)
}
client.Request()
time.Sleep(200 * time.Millisecond)
if !*t {
client.respWaiter.Wait()
return
}
}
}
}
81 changes: 81 additions & 0 deletions examples/codes/tunnel-sample/client_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{
"close_graceful": true,
"servers": [
{
"default_log_path": "stdout",
"default_log_level": "DEBUG",
"routers":[
{
"router_config_name":"client_router",
"virtual_hosts":[{
"name":"clientHost",
"domains": ["*"],
"routers": [
{
"match":{"headers":[{"name":"service","value":".*"}]},
"route":{"cluster_name":"clientCluster"}
}
]
}]
}
],
"listeners": [
{
"name":"clientListener",
"address": "127.0.0.1:2045",
"bind_port": true,
"filter_chains": [{
"filters": [
{
"type": "proxy",
"config": {
"downstream_protocol": "X",
"upstream_protocol": "X",
"extend_config": {
"sub_protocol": "bolt"
},
"router_config_name":"client_router"
}
}
]
}]
},
{
"name": "tunnel_server_listener",
"address": "127.0.0.1:9999",
"bind_port": true,
"connection_idle_timeout":0,
"log_path": "stdout",
"filter_chains": [
{
"tls_context": {
"status": true,
"ca_cert": "../certs/ca.pem",
"cert_chain": "../certs/cert.pem",
"private_key": "../certs/key.pem",
"verify_client": false,
"require_client_cert": false
},
"filters": [
{
"type": "tunnel"
}
]
}
]
}
]
}
],
"cluster_manager": {
"clusters": [
{
"Name": "clientCluster",
"type": "SIMPLE",
"lb_type": "LB_RANDOM",
"max_request_per_conn": 1024,
"conn_buffer_limit_bytes": 32768
}
]
}
}
22 changes: 22 additions & 0 deletions examples/codes/tunnel-sample/credential.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"fmt"
"strings"

"mosn.io/mosn/pkg/filter/network/tunnel/ext"
)

func init() {
ext.RegisterConnectionValidator("test", &CustomValidator{})
ext.RegisterConnectionCredentialGetter("test", func(cluster string) string {
return fmt.Sprintf("%s_%s", "prefix_", cluster)
})
}

type CustomValidator struct {
}

func (c *CustomValidator) Validate(credential string, host string, cluster string) bool {
return strings.HasPrefix(credential, "prefix_")
}
Loading

0 comments on commit d12ff6b

Please sign in to comment.