Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

将config的初始化在CreateAPP时执行,并且可以传入初始化好的conf来自由载入conf #50

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 99 additions & 35 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ type protocolMarshalImp struct {
data []byte
}

// GetData 获取app数据
func (this *protocolMarshalImp) GetData() []byte {
return this.data
}

func NewApp(version string) module.App {
// NewApp 创建app
func NewApp(version string, debug bool, options ...interface{}) module.App {
app := new(DefaultApp)
app.routes = map[string]func(app module.App, Type string, hash string) module.ServerSession{}
app.serverList = map[string]module.ServerSession{}
Expand All @@ -64,27 +66,20 @@ func NewApp(version string) module.App {
}
app.rpcserializes = map[string]module.RPCSerialize{}
app.version = version
return app
}

type DefaultApp struct {
//module.App
version string
serverList map[string]module.ServerSession
settings conf.Config
processId string
routes map[string]func(app module.App, Type string, hash string) module.ServerSession
defaultRoutes func(app module.App, Type string, hash string) module.ServerSession
//将一个RPC调用路由到新的路由上
mapRoute func(app module.App, route string) string
rpcserializes map[string]module.RPCSerialize
configurationLoaded func(app module.App)
startup func(app module.App)
moduleInited func(app module.App, module module.Module)
protocolMarshal func(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string)
}

func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
// --------------------------- 执行app初始化配置 ---------------------------
// 从options中解析初始化选项
var (
optConf *conf.Config
)
for _, option := range options {
switch o := option.(type) {
case conf.Config:
optConf = &o
default:
panic(option)
}
}
// 从flag中准备参数
wdPath := flag.String("wd", "", "Server work directory")
confPath := flag.String("conf", "", "Server configuration file path")
ProcessID := flag.String("pid", "development", "Server ProcessID?")
Expand All @@ -93,6 +88,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
flag.Parse() //解析输入的参数
app.processId = *ProcessID
ApplicationDir := ""
// 根据策略选择working Path工作目录
if *wdPath != "" {
_, err := os.Open(*wdPath)
if err != nil {
Expand All @@ -115,24 +111,39 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
defaultLogPath := fmt.Sprintf("%s/bin/logs", ApplicationDir)
defaultBIPath := fmt.Sprintf("%s/bin/bi", ApplicationDir)

// 根据策略选择配置路径
if *confPath == "" {
*confPath = defaultConfPath
// 如果未给出conf配置,则使用传入的conf(终端conf参数优先级高于传入conf,最低优先级才是默认confPath
if optConf == nil {
// 也未传入conf,则尝试使用默认的confPath来loadConf
conf.LoadConfig(defaultConfPath)
optConf = &conf.Conf
}
} else {
// flag参数有conf,则最优先使用此conf
conf.LoadConfig(*confPath)
optConf = &conf.Conf
}

// 根据策略选择日志目录
if *Logdir == "" {
*Logdir = defaultLogPath
}

// 根据策略选择二进制文件目录
if *BIdir == "" {
*BIdir = defaultBIPath
}

f, err := os.Open(*confPath)
if err != nil {
panic(err)
}
// 在conf.LoadConfig时,如果读取错误,会直接panic,所以无需再此多检查一次是否可以打开
// 验证conf文件是否可以打开
// f, err := os.Open(*confPath)
// if err != nil {
// panic(err)
// }

_, err = os.Open(*Logdir)
// 检查日志目录是否存在
_, err := os.Open(*Logdir)
if err != nil {
//文件不存在
err := os.Mkdir(*Logdir, os.ModePerm) //
Expand All @@ -141,6 +152,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
}
}

// 检查二进制目录是否存在
_, err = os.Open(*BIdir)
if err != nil {
//文件不存在
Expand All @@ -149,11 +161,37 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
fmt.Println(err)
}
}

// 加载配置
fmt.Println("Server configuration file path :", *confPath)
conf.LoadConfig(f.Name()) //加载配置文件
app.Configure(conf.Conf) //配置信息
log.InitLog(debug, *ProcessID, *Logdir, conf.Conf.Log)
log.InitBI(debug, *ProcessID, *BIdir, conf.Conf.BI)
app.Configure(*optConf) //配置信息
// 配置文件加载完毕
log.InitLog(debug, *ProcessID, *Logdir, optConf.Log)
log.InitBI(debug, *ProcessID, *BIdir, optConf.BI)
// app初始化完成
return app
}

// DefaultApp 默认app
type DefaultApp struct {
//module.App
version string
serverList map[string]module.ServerSession
settings conf.Config
processId string
routes map[string]func(app module.App, Type string, hash string) module.ServerSession
defaultRoutes func(app module.App, Type string, hash string) module.ServerSession
//将一个RPC调用路由到新的路由上
mapRoute func(app module.App, route string) string
rpcserializes map[string]module.RPCSerialize
configurationLoaded func(app module.App)
startup func(app module.App)
moduleInited func(app module.App, module module.Module)
protocolMarshal func(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string)
}

// Run 运行app
func (app *DefaultApp) Run(mods ...module.Module) error {

log.Info("mqant %v starting up", app.version)

Expand All @@ -169,7 +207,7 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
manager.Register(mods[i])
}
app.OnInit(app.settings)
manager.Init(app, *ProcessID)
manager.Init(app, app.processId)
if app.startup != nil {
app.startup(app)
}
Expand All @@ -194,16 +232,20 @@ func (app *DefaultApp) Run(debug bool, mods ...module.Module) error {
}
return nil
}

// Route 按模块设置路由方法
func (app *DefaultApp) Route(moduleType string, fn func(app module.App, Type string, hash string) module.ServerSession) error {
app.routes[moduleType] = fn
return nil
}

// SetMapRoute 设置Map路由
func (app *DefaultApp) SetMapRoute(fn func(app module.App, route string) string) error {
app.mapRoute = fn
return nil
}

// getRoute 按模块获取路由方法
func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type string, hash string) module.ServerSession {
fn := app.routes[moduleType]
if fn == nil {
Expand All @@ -213,6 +255,7 @@ func (app *DefaultApp) getRoute(moduleType string) func(app module.App, Type str
return fn
}

// AddRPCSerialize 添加rpc序列化器
func (app *DefaultApp) AddRPCSerialize(name string, Interface module.RPCSerialize) error {
if _, ok := app.rpcserializes[name]; ok {
return fmt.Errorf("The name(%s) has been occupied", name)
Expand All @@ -221,17 +264,18 @@ func (app *DefaultApp) AddRPCSerialize(name string, Interface module.RPCSerializ
return nil
}

// GetRPCSerialize 获取rpc序列化器
func (app *DefaultApp) GetRPCSerialize() map[string]module.RPCSerialize {
return app.rpcserializes
}

// Configure 设置配置
func (app *DefaultApp) Configure(settings conf.Config) error {
app.settings = settings
return nil
}

/**
*/
// OnInit app初始化方法
func (app *DefaultApp) OnInit(settings conf.Config) error {
app.serverList = make(map[string]module.ServerSession)
for Type, ModuleInfos := range settings.Module {
Expand Down Expand Up @@ -265,6 +309,7 @@ func (app *DefaultApp) OnInit(settings conf.Config) error {
return nil
}

// OnDestroy app析构函数
func (app *DefaultApp) OnDestroy() error {
for id, session := range app.serverList {
log.Info("RPCClient closeing type(%s) id(%s)", session.GetType(), id)
Expand All @@ -278,6 +323,7 @@ func (app *DefaultApp) OnDestroy() error {
return nil
}

// RegisterLocalClient 注册本地rpc client
func (app *DefaultApp) RegisterLocalClient(serverId string, server mqrpc.RPCServer) error {
if session, ok := app.serverList[serverId]; ok {
return session.GetRpc().NewLocalClient(server)
Expand All @@ -287,6 +333,7 @@ func (app *DefaultApp) RegisterLocalClient(serverId string, server mqrpc.RPCServ
return nil
}

// GetServerById 根据id获取服务
func (app *DefaultApp) GetServerById(serverId string) (module.ServerSession, error) {
if session, ok := app.serverList[serverId]; ok {
return session, nil
Expand All @@ -295,6 +342,7 @@ func (app *DefaultApp) GetServerById(serverId string) (module.ServerSession, err
}
}

// GetServersByType 根据类型获取服务列表
func (app *DefaultApp) GetServersByType(Type string) []module.ServerSession {
sessions := make([]module.ServerSession, 0)
for _, session := range app.serverList {
Expand All @@ -305,6 +353,7 @@ func (app *DefaultApp) GetServersByType(Type string) []module.ServerSession {
return sessions
}

// GetRouteServer
func (app *DefaultApp) GetRouteServer(filter string, hash string) (s module.ServerSession, err error) {
if app.mapRoute != nil {
//进行一次路由转换
Expand All @@ -326,12 +375,17 @@ func (app *DefaultApp) GetRouteServer(filter string, hash string) (s module.Serv
return
}

// GetSettings 获取设置
func (app *DefaultApp) GetSettings() conf.Config {
return app.settings
}

// GetProcessID 获取进程ID
func (app *DefaultApp) GetProcessID() string {
return app.processId
}

// RpcInvoke rpc调用
func (app *DefaultApp) RpcInvoke(module module.RPCModule, moduleType string, _func string, params ...interface{}) (result interface{}, err string) {
server, e := app.GetRouteServer(moduleType, module.GetServerId())
if e != nil {
Expand All @@ -341,6 +395,7 @@ func (app *DefaultApp) RpcInvoke(module module.RPCModule, moduleType string, _fu
return server.Call(_func, params...)
}

// RpcInvokeNR rpcNR调用
func (app *DefaultApp) RpcInvokeNR(module module.RPCModule, moduleType string, _func string, params ...interface{}) (err error) {
server, err := app.GetRouteServer(moduleType, module.GetServerId())
if err != nil {
Expand All @@ -349,6 +404,7 @@ func (app *DefaultApp) RpcInvokeNR(module module.RPCModule, moduleType string, _
return server.CallNR(_func, params...)
}

// RpcInvokeArgs rpc args调用
func (app *DefaultApp) RpcInvokeArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string, args [][]byte) (result interface{}, err string) {
server, e := app.GetRouteServer(moduleType, module.GetServerId())
if e != nil {
Expand All @@ -358,6 +414,7 @@ func (app *DefaultApp) RpcInvokeArgs(module module.RPCModule, moduleType string,
return server.CallArgs(_func, ArgsType, args)
}

// RpcInvokeNRArgs rpc NR args调用
func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType string, _func string, ArgsType []string, args [][]byte) (err error) {
server, err := app.GetRouteServer(moduleType, module.GetServerId())
if err != nil {
Expand All @@ -366,30 +423,36 @@ func (app *DefaultApp) RpcInvokeNRArgs(module module.RPCModule, moduleType strin
return server.CallNRArgs(_func, ArgsType, args)
}

// GetModuleInited 获取inited模块
func (app *DefaultApp) GetModuleInited() func(app module.App, module module.Module) {
return app.moduleInited
}

// OnConfigurationLoaded
func (app *DefaultApp) OnConfigurationLoaded(_func func(app module.App)) error {
app.configurationLoaded = _func
return nil
}

// OnModuleInited
func (app *DefaultApp) OnModuleInited(_func func(app module.App, module module.Module)) error {
app.moduleInited = _func
return nil
}

// OnStartup
func (app *DefaultApp) OnStartup(_func func(app module.App)) error {
app.startup = _func
return nil
}

// SetProtocolMarshal
func (app *DefaultApp) SetProtocolMarshal(protocolMarshal func(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string)) error {
app.protocolMarshal = protocolMarshal
return nil
}

// ProtocolMarshal
func (app *DefaultApp) ProtocolMarshal(Trace string, Result interface{}, Error string) (module.ProtocolMarshal, string) {
if app.protocolMarshal != nil {
return app.protocolMarshal(Trace, Result, Error)
Expand All @@ -407,6 +470,7 @@ func (app *DefaultApp) ProtocolMarshal(Trace string, Result interface{}, Error s
}
}

// ProtocolMarshal
func (app *DefaultApp) NewProtocolMarshal(data []byte) module.ProtocolMarshal {
return &protocolMarshalImp{
data: data,
Expand Down
18 changes: 18 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module github.com/liangdas/mqant

go 1.12

require (
github.com/Jeffail/tunny v0.0.0-20181108205650-4921fff29480
github.com/belogik/goes v0.0.0-20151229125003-e54d722c3aff
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/websocket v1.4.0
github.com/kr/pretty v0.1.0 // indirect
github.com/pkg/errors v0.8.1
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
google.golang.org/appengine v1.5.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
)
Loading