Skip to content

Commit

Permalink
Merge branch 'feature/upstream_service'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dot-Liu committed Jun 22, 2022
2 parents 276cfe7 + 59d0958 commit d5aeea4
Show file tree
Hide file tree
Showing 14 changed files with 296 additions and 140 deletions.
6 changes: 3 additions & 3 deletions app/apinto/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
response_rewrite "github.com/eolinker/apinto/drivers/plugins/response-rewrite"
http_router "github.com/eolinker/apinto/drivers/router/http-router"
service_http "github.com/eolinker/apinto/drivers/service/service-http"
upstream_http "github.com/eolinker/apinto/drivers/upstream/upstream-http"
//upstream_http "github.com/eolinker/apinto/drivers/upstream/upstream-http"
plugin_manager "github.com/eolinker/apinto/plugin-manager"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/extends"
Expand All @@ -48,8 +48,8 @@ func Register(extenderRegister eosc.IExtenderDriverRegister) {
// service
service_http.Register(extenderRegister)

// upstream
upstream_http.Register(extenderRegister)
////// upstream
//upstream_http.Register(extenderRegister)

// discovery
static.Register(extenderRegister)
Expand Down
31 changes: 12 additions & 19 deletions drivers/service/service-http/config.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,29 @@
package service_http

import (
"encoding/json"
"strings"

"github.com/eolinker/apinto/plugin"

"github.com/eolinker/eosc"
)

type AnonymousConfig struct {
Type string `json:"type" enum:"round-robin" label:"负载算法"`
Config string `json:"config" label:"配置" description:"{ip}:{port} {weight}"`
}

//Config service_http驱动配置
type Config struct {
Timeout int64 `json:"timeout" label:"请求超时时间" default:"2000" minimum:"1" description:"单位:ms,最小值:1"`
Retry int `json:"retry" label:"失败重试次数"`
Scheme string `json:"scheme" label:"请求协议" enum:"HTTP,HTTPS"`
Upstream eosc.RequireId `json:"upstream" label:"上游" skill:"github.com/eolinker/apinto/upstream.upstream.IUpstream" required:"false" empty_label:"使用匿名上游"`
UpstreamAnonymous *AnonymousConfig `json:"anonymous" label:"匿名上游" switch:"upstream===''" `
PluginConfig map[string]*plugin.Config `json:"plugins" label:"插件"`
Timeout int64 `json:"timeout" label:"请求超时时间" default:"2000" minimum:"1" description:"单位:ms,最小值:1"`
Retry int `json:"retry" label:"失败重试次数"`
Scheme string `json:"scheme" label:"请求协议" enum:"HTTP,HTTPS"`
Discovery eosc.RequireId `json:"discovery" required:"false" empty_label:"使用匿名上游" label:"服务发现" skill:"github.com/eolinker/apinto/discovery.discovery.IDiscovery"`
Service string `json:"service" required:"false" label:"服务名 or 配置" switch:"discovery !==''"`
Nodes []string `json:"nodes" label:"静态配置" switch:"discovery===''"`
Balance string `json:"balance" enum:"round-robin" label:"负载均衡算法"`
PluginConfig map[string]*plugin.Config `json:"plugins" label:"插件"`
}

var validMethods = []string{
"GET",
"POST",
"PUT",
"DELETE",
"PATCH",
"HEAD",
"OPTIONS",
func (c *Config) String() string {
data, _ := json.Marshal(c)
return string(data)
}

var validScheme = []string{
Expand Down
9 changes: 8 additions & 1 deletion drivers/service/service-http/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service_http

import (
round_robin "github.com/eolinker/apinto/upstream/round-robin"
"github.com/eolinker/eosc/utils/schema"
"reflect"

Expand All @@ -14,11 +15,16 @@ import (

var DriverName = "service_http"
var (
defaultDiscovery = static.CreateAnonymous(&static.Config{
defaultHttpDiscovery = static.CreateAnonymous(&static.Config{
Scheme: "http",
Health: nil,
HealthOn: false,
})
defaultHttpsDiscovery = static.CreateAnonymous(&static.Config{
Scheme: "https",
Health: nil,
HealthOn: false,
})
pluginManger plugin.IPluginManager
)

Expand All @@ -40,6 +46,7 @@ func (f *factory) Render() interface{} {

//NewFactory 创建service_http驱动工厂
func NewFactory() eosc.IExtenderDriverFactory {
round_robin.Register()
return &factory{}
}

Expand Down
8 changes: 3 additions & 5 deletions drivers/service/service-http/send_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package service_http

import (
"testing"

upstream_http "github.com/eolinker/apinto/drivers/upstream/upstream-http"
"github.com/eolinker/apinto/upstream/balance"
"testing"
)

func TestSend(t *testing.T) {
_ = upstream_http.NewFactory()

balanceFactory, err := balance.GetFactory("")
if err != nil {
t.Error(err)
return
}

anonymous, err := defaultDiscovery.GetApp("www.baidu.com")
anonymous, err := defaultHttpsDiscovery.GetApp("www.baidu.com")
if err != nil {
t.Error(err)
return
Expand Down
20 changes: 11 additions & 9 deletions drivers/service/service-http/service.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,40 @@
package service_http

import (
"github.com/eolinker/apinto/discovery"
"github.com/eolinker/apinto/upstream/balance"
"time"

"github.com/eolinker/eosc/log"

"github.com/eolinker/apinto/plugin"
"github.com/eolinker/apinto/service"
"github.com/eolinker/apinto/upstream"
)

type Service struct {
upstream upstream.IUpstream
upstream *Upstream
configs map[string]*plugin.Config
handlers *Handlers
retry int
timeout time.Duration

scheme string
}

func (s *Service) reset(upstream upstream.IUpstream, config map[string]*plugin.Config) {
func (s *Service) reset(scheme string, app discovery.IApp, handler balance.IBalanceHandler, config map[string]*plugin.Config) {
s.configs = config
s.upstream = upstream
if s.upstream == nil {
s.upstream = NewUpstream(scheme, app, handler)
} else {
s.upstream.Reset(scheme, app, handler)
}

log.Debug("reset upstream handler...handler size is ", len(s.handlers.List()))
for _, h := range s.handlers.List() {
h.rebuild()
}
}
func (s *Service) Merge(config map[string]*plugin.Config) map[string]*plugin.Config {
configs := plugin.MergeConfig(config, s.configs)
if mg, ok := s.upstream.(plugin.IPluginConfigMerge); ok {
configs = mg.Merge(configs)
}

return configs
}
func (s *Service) Create(id string, configs map[string]*plugin.Config) service.IService {
Expand Down
76 changes: 76 additions & 0 deletions drivers/service/service-http/upstream-handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package service_http

import (
"fmt"
"time"

"github.com/eolinker/eosc/log"

http_service "github.com/eolinker/eosc/http-service"
)

var _ http_service.IChain = (*UpstreamHandler)(nil)

type UpstreamHandler struct {
id string
upstream *Upstream
retry int
timeout time.Duration
}

func (u *UpstreamHandler) Destroy() {

upstream := u.upstream
if upstream != nil {
u.upstream = nil
upstream.handlers.Del(u.id)
}

}

func NewUpstreamHandler(id string, upstream *Upstream, retry int, timeout time.Duration) *UpstreamHandler {
uh := &UpstreamHandler{
id: id,
upstream: upstream,
retry: retry,
timeout: timeout,
}
return uh
}

//DoChain 请求发送
func (u *UpstreamHandler) DoChain(ctx http_service.IHttpContext) error {

var lastErr error

//设置响应开始时间
proxyTime := time.Now()

defer func() {
//设置原始响应状态码
ctx.Response().SetProxyStatus(ctx.Response().StatusCode(), "")
//设置上游响应时间, 单位为毫秒
ctx.WithValue("response_time", time.Now().Sub(proxyTime).Milliseconds())
}()

for doTrice := u.retry + 1; doTrice > 0; doTrice-- {

node, err := u.upstream.handler.Next()
if err != nil {
return err
}
scheme := node.Scheme()
if scheme != "http" && scheme != "https" {
scheme = u.upstream.scheme
}
log.Debug("node: ", node.Addr())
addr := fmt.Sprintf("%s://%s", scheme, node.Addr())
lastErr = ctx.SendTo(addr, u.timeout)
if lastErr == nil {
return nil
}
log.Error("http upstream send error: ", lastErr)
}

return lastErr
}
50 changes: 50 additions & 0 deletions drivers/service/service-http/upstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package service_http

import (
"time"

"github.com/eolinker/apinto/upstream"

"github.com/eolinker/eosc"

"github.com/eolinker/apinto/discovery"
"github.com/eolinker/apinto/upstream/balance"
)

type Upstream struct {
scheme string
app discovery.IApp
handler balance.IBalanceHandler

handlers eosc.IUntyped
}

func (up *Upstream) Create(id string, retry int, timeout time.Duration) (upstream.IUpstreamHandler, error) {
return up.create(id, retry, timeout), nil
}
func (up *Upstream) create(id string, retry int, timeout time.Duration) *UpstreamHandler {
nh := NewUpstreamHandler(id, up, retry, timeout)
up.handlers.Set(id, nh)
return nh
}

func NewUpstream(scheme string, app discovery.IApp, handler balance.IBalanceHandler) *Upstream {
return &Upstream{scheme: scheme, app: app, handler: handler, handlers: eosc.NewUntyped()}
}

//Reset reset
func (up *Upstream) Reset(scheme string, app discovery.IApp, handler balance.IBalanceHandler) {
up.scheme = scheme
up.app = app
up.handler = handler
}

func (up *Upstream) destroy() {
handlers := up.handlers.List()
up.handlers = eosc.NewUntyped()
for _, h := range handlers {
hd := h.(*UpstreamHandler)
hd.Destroy()
}

}
Loading

0 comments on commit d5aeea4

Please sign in to comment.