From 0451bf2ff699a4c1cac321e472abeaeb8b0c0b64 Mon Sep 17 00:00:00 2001 From: kuiki Date: Fri, 26 Apr 2019 16:51:14 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=B0=86config=E7=9A=84=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E5=9C=A8CreateAPP=E6=97=B6=E6=89=A7=E8=A1=8C=EF=BC=8C?= =?UTF-8?q?=E5=B9=B6=E4=B8=94=E5=8F=AF=E4=BB=A5=E4=BC=A0=E5=85=A5=E5=88=9D?= =?UTF-8?q?=E5=A7=8B=E5=8C=96=E5=A5=BD=E7=9A=84conf=E6=9D=A5=E8=87=AA?= =?UTF-8?q?=E7=94=B1=E8=BD=BD=E5=85=A5conf?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/app.go | 134 ++++++++++++++++++++++++++++++++++------------- module/module.go | 2 +- mqant.go | 5 +- 3 files changed, 103 insertions(+), 38 deletions(-) diff --git a/app/app.go b/app/app.go index e3eb95f..b56be11 100644 --- a/app/app.go +++ b/app/app.go @@ -45,11 +45,13 @@ type protocolMarshalImp struct { data []byte } +// GetData 获取app数据 func (this *protocolMarshalImp) GetData() []byte { return this.data } -func NewApp(version string) module.App { +// NewApp 创建app +func NewApp(version string, debug bool, options ...interface{}) module.App { app := new(DefaultApp) app.routes = map[string]func(app module.App, Type string, hash string) module.ServerSession{} app.serverList = map[string]module.ServerSession{} @@ -64,27 +66,20 @@ func NewApp(version string) module.App { } app.rpcserializes = map[string]module.RPCSerialize{} app.version = version - return app -} - -type DefaultApp struct { - //module.App - version string - serverList map[string]module.ServerSession - settings conf.Config - processId string - routes map[string]func(app module.App, Type string, hash string) module.ServerSession - defaultRoutes func(app module.App, Type string, hash string) module.ServerSession - //将一个RPC调用路由到新的路由上 - mapRoute func(app module.App, route string) string - rpcserializes map[string]module.RPCSerialize - configurationLoaded func(app module.App) - startup func(app module.App) - moduleInited func(app module.App, module module.Module) - protocolMarshal func(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string) -} - -func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { + // --------------------------- 执行app初始化配置 --------------------------- + // 从options中解析初始化选项 + var ( + optConf *conf.Config + ) + for _, option := range options { + switch o := option.(type) { + case conf.Config: + optConf = &o + default: + panic(option) + } + } + // 从flag中准备参数 wdPath := flag.String("wd", "", "Server work directory") confPath := flag.String("conf", "", "Server configuration file path") ProcessID := flag.String("pid", "development", "Server ProcessID?") @@ -93,6 +88,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { flag.Parse() //解析输入的参数 app.processId = *ProcessID ApplicationDir := "" + // 根据策略选择working Path工作目录 if *wdPath != "" { _, err := os.Open(*wdPath) if err != nil { @@ -115,24 +111,39 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { defaultLogPath := fmt.Sprintf("%s/bin/logs", ApplicationDir) defaultBIPath := fmt.Sprintf("%s/bin/bi", ApplicationDir) + // 根据策略选择配置路径 if *confPath == "" { - *confPath = defaultConfPath + // 如果未给出conf配置,则使用传入的conf(终端conf参数优先级高于传入conf,最低优先级才是默认confPath + if optConf == nil { + // 也未传入conf,则尝试使用默认的confPath来loadConf + conf.LoadConfig(defaultConfPath) + optConf = &conf.Conf + } + } else { + // flag参数有conf,则最优先使用此conf + conf.LoadConfig(*confPath) + optConf = &conf.Conf } + // 根据策略选择日志目录 if *Logdir == "" { *Logdir = defaultLogPath } + // 根据策略选择二进制文件目录 if *BIdir == "" { *BIdir = defaultBIPath } - f, err := os.Open(*confPath) - if err != nil { - panic(err) - } + // 在conf.LoadConfig时,如果读取错误,会直接panic,所以无需再此多检查一次是否可以打开 + // 验证conf文件是否可以打开 + // f, err := os.Open(*confPath) + // if err != nil { + // panic(err) + // } - _, err = os.Open(*Logdir) + // 检查日志目录是否存在 + _, err := os.Open(*Logdir) if err != nil { //文件不存在 err := os.Mkdir(*Logdir, os.ModePerm) // @@ -141,6 +152,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { } } + // 检查二进制目录是否存在 _, err = os.Open(*BIdir) if err != nil { //文件不存在 @@ -149,11 +161,37 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { fmt.Println(err) } } + + // 加载配置 fmt.Println("Server configuration file path :", *confPath) - conf.LoadConfig(f.Name()) //加载配置文件 - app.Configure(conf.Conf) //配置信息 - log.InitLog(debug, *ProcessID, *Logdir, conf.Conf.Log) - log.InitBI(debug, *ProcessID, *BIdir, conf.Conf.BI) + app.Configure(*optConf) //配置信息 + // 配置文件加载完毕 + log.InitLog(debug, *ProcessID, *Logdir, optConf.Log) + log.InitBI(debug, *ProcessID, *BIdir, optConf.BI) + // app初始化完成 + return app +} + +// DefaultApp 默认app +type DefaultApp struct { + //module.App + version string + serverList map[string]module.ServerSession + settings conf.Config + processId string + routes map[string]func(app module.App, Type string, hash string) module.ServerSession + defaultRoutes func(app module.App, Type string, hash string) module.ServerSession + //将一个RPC调用路由到新的路由上 + mapRoute func(app module.App, route string) string + rpcserializes map[string]module.RPCSerialize + configurationLoaded func(app module.App) + startup func(app module.App) + moduleInited func(app module.App, module module.Module) + protocolMarshal func(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string) +} + +// Run 运行app +func (app *DefaultApp) Run(mods ...module.Module) error { log.Info("mqant %v starting up", app.version) @@ -169,7 +207,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { manager.Register(mods[i]) } app.OnInit(app.settings) - manager.Init(app, *ProcessID) + manager.Init(app, app.processId) if app.startup != nil { app.startup(app) } @@ -194,16 +232,20 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error { } return nil } + +// Route 按模块设置路由方法 func (app *DefaultApp) Route(moduleType string, fn func(app module.App, Type string, hash string) module.ServerSession) error { app.routes[moduleType] = fn return nil } +// SetMapRoute 设置Map路由 func (app *DefaultApp) SetMapRoute(fn func(app module.App, route string) string) error { app.mapRoute = fn return nil } +// getRoute 按模块获取路由方法 func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type string, hash string) module.ServerSession { fn := app.routes[moduleType] if fn == nil { @@ -213,6 +255,7 @@ func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type str return fn } +// AddRPCSerialize 添加rpc序列化器 func (app *DefaultApp) AddRPCSerialize(name string, Interface module.RPCSerialize) error { if _, ok := app.rpcserializes[name]; ok { return fmt.Errorf("The name(%s) has been occupied", name) @@ -221,17 +264,18 @@ func (app *DefaultApp) AddRPCSerialize(name string, Interface module.RPCSerializ return nil } +// GetRPCSerialize 获取rpc序列化器 func (app *DefaultApp) GetRPCSerialize() map[string]module.RPCSerialize { return app.rpcserializes } +// Configure 设置配置 func (app *DefaultApp) Configure(settings conf.Config) error { app.settings = settings return nil } -/** - */ +// OnInit app初始化方法 func (app *DefaultApp) OnInit(settings conf.Config) error { app.serverList = make(map[string]module.ServerSession) for Type, ModuleInfos := range settings.Module { @@ -265,6 +309,7 @@ func (app *DefaultApp) OnInit(settings conf.Config) error { return nil } +// OnDestroy app析构函数 func (app *DefaultApp) OnDestroy() error { for id, session := range app.serverList { log.Info("RPCClient closeing type(%s) id(%s)", session.GetType(), id) @@ -278,6 +323,7 @@ func (app *DefaultApp) OnDestroy() error { return nil } +// RegisterLocalClient 注册本地rpc client func (app *DefaultApp) RegisterLocalClient(serverId string, server mqrpc.RPCServer) error { if session, ok := app.serverList[serverId]; ok { return session.GetRpc().NewLocalClient(server) @@ -287,6 +333,7 @@ func (app *DefaultApp) RegisterLocalClient(serverId string, server mqrpc.RPCServ return nil } +// GetServerById 根据id获取服务 func (app *DefaultApp) GetServerById(serverId string) (module.ServerSession, error) { if session, ok := app.serverList[serverId]; ok { return session, nil @@ -295,6 +342,7 @@ func (app *DefaultApp) GetServerById(serverId string) (module.ServerSession, err } } +// GetServersByType 根据类型获取服务列表 func (app *DefaultApp) GetServersByType(Type string) []module.ServerSession { sessions := make([]module.ServerSession, 0) for _, session := range app.serverList { @@ -305,6 +353,7 @@ func (app *DefaultApp) GetServersByType(Type string) []module.ServerSession { return sessions } +// GetRouteServer func (app *DefaultApp) GetRouteServer(filter string, hash string) (s module.ServerSession, err error) { if app.mapRoute != nil { //进行一次路由转换 @@ -326,12 +375,17 @@ func (app *DefaultApp) GetRouteServer(filter string, hash string) (s module.Serv return } +// GetSettings 获取设置 func (app *DefaultApp) GetSettings() conf.Config { return app.settings } + +// GetProcessID 获取进程ID func (app *DefaultApp) GetProcessID() string { return app.processId } + +// RpcInvoke rpc调用 func (app *DefaultApp) RpcInvoke(module module.RPCModule, moduleType string, _func string, params ...interface{}) (result interface{}, err string) { server, e := app.GetRouteServer(moduleType, module.GetServerId()) if e != nil { @@ -341,6 +395,7 @@ func (app *DefaultApp) RpcInvoke(module module.RPCModule, moduleType string, _fu return server.Call(_func, params...) } +// RpcInvokeNR rpcNR调用 func (app *DefaultApp) RpcInvokeNR(module module.RPCModule, moduleType string, _func string, params ...interface{}) (err error) { server, err := app.GetRouteServer(moduleType, module.GetServerId()) if err != nil { @@ -349,6 +404,7 @@ func (app *DefaultApp) RpcInvokeNR(module module.RPCModule, moduleType string, _ return server.CallNR(_func, params...) } +// RpcInvokeArgs rpc args调用 func (app *DefaultApp) RpcInvokeArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string, args [][]byte) (result interface{}, err string) { server, e := app.GetRouteServer(moduleType, module.GetServerId()) if e != nil { @@ -358,6 +414,7 @@ func (app *DefaultApp) RpcInvokeArgs(module module.RPCModule, moduleType string, return server.CallArgs(_func, ArgsType, args) } +// RpcInvokeNRArgs rpc NR args调用 func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string, args [][]byte) (err error) { server, err := app.GetRouteServer(moduleType, module.GetServerId()) if err != nil { @@ -366,30 +423,36 @@ func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType strin return server.CallNRArgs(_func, ArgsType, args) } +// GetModuleInited 获取inited模块 func (app *DefaultApp) GetModuleInited() func(app module.App, module module.Module) { return app.moduleInited } +// OnConfigurationLoaded func (app *DefaultApp) OnConfigurationLoaded(_func func(app module.App)) error { app.configurationLoaded = _func return nil } +// OnModuleInited func (app *DefaultApp) OnModuleInited(_func func(app module.App, module module.Module)) error { app.moduleInited = _func return nil } +// OnStartup func (app *DefaultApp) OnStartup(_func func(app module.App)) error { app.startup = _func return nil } +// SetProtocolMarshal func (app *DefaultApp) SetProtocolMarshal(protocolMarshal func(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string)) error { app.protocolMarshal = protocolMarshal return nil } +// ProtocolMarshal func (app *DefaultApp) ProtocolMarshal(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string) { if app.protocolMarshal != nil { return app.protocolMarshal(Trace, Result, Error) @@ -407,6 +470,7 @@ func (app *DefaultApp) ProtocolMarshal(Trace string, Result interface{}, Error s } } +// ProtocolMarshal func (app *DefaultApp) NewProtocolMarshal(data []byte) module.ProtocolMarshal { return &protocolMarshalImp{ data: data, diff --git a/module/module.go b/module/module.go index 5fb807c..bb1264c 100644 --- a/module/module.go +++ b/module/module.go @@ -32,7 +32,7 @@ type ServerSession interface { CallNRArgs(_func string, ArgsType []string, args [][]byte) (err error) } type App interface { - Run(debug bool, mods ...Module) error + Run(mods ...Module) error /** 当同一个类型的Module存在多个服务时,需要根据情况选择最终路由到哪一个服务去 fn: func(moduleType string,serverId string,[]*ServerSession)(*ServerSession) diff --git a/mqant.go b/mqant.go index cd9164f..d97acee 100644 --- a/mqant.go +++ b/mqant.go @@ -16,6 +16,7 @@ package mqant import "github.com/liangdas/mqant/module" import "github.com/liangdas/mqant/app" -func CreateApp() module.App { - return defaultApp.NewApp(Version) +// CreateApp 构建mqant应用实例 +func CreateApp(debug bool, options ...interface{}) module.App { + return defaultApp.NewApp(Version, debug, options...) } From 7dabfa32d064320712edd7bb891616ecc121bcfe Mon Sep 17 00:00:00 2001 From: kuiki <911yinhui911@163.com> Date: Sat, 27 Apr 2019 22:29:30 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=B7=BB=E5=8A=A0go=20mod=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 18 ++++++++++++++++++ go.sum | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 go.mod create mode 100644 go.sum diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..faaf8aa --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module github.com/liangdas/mqant + +go 1.12 + +require ( + github.com/Jeffail/tunny v0.0.0-20181108205650-4921fff29480 + github.com/belogik/goes v0.0.0-20151229125003-e54d722c3aff + github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 + github.com/gogo/protobuf v1.2.1 + github.com/golang/protobuf v1.3.1 + github.com/gomodule/redigo v2.0.0+incompatible + github.com/gorilla/websocket v1.4.0 + github.com/kr/pretty v0.1.0 // indirect + github.com/pkg/errors v0.8.1 + github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 + google.golang.org/appengine v1.5.0 + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4544f75 --- /dev/null +++ b/go.sum @@ -0,0 +1,34 @@ +github.com/Jeffail/tunny v0.0.0-20181108205650-4921fff29480 h1:pTK5nW0J4B2IlU9RjSoJMC+tgu+CQI6obTlgsdiznpQ= +github.com/Jeffail/tunny v0.0.0-20181108205650-4921fff29480/go.mod h1:BX3q3G70XX0UmIkDWfDHoDRquDS1xFJA5VTbMf+14wM= +github.com/belogik/goes v0.0.0-20151229125003-e54d722c3aff h1:/kO0p2RTGLB8R5gub7ps0GmYpB2O8LXEoPq8tzFDCUI= +github.com/belogik/goes v0.0.0-20151229125003-e54d722c3aff/go.mod h1:PhH1ZhyCzHKt4uAasyx+ljRCgoezetRNf59CUtwUkqY= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= +github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225 h1:kNX+jCowfMYzvlSvJu5pQWEmyWFrBXJ3PBy10xKMXK8= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= +google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=