Skip to content

Commit

Permalink
支持每条端口转发的同时,再分发给多个端口,满足某些测试场景
Browse files Browse the repository at this point in the history
  • Loading branch information
tavenli committed Jun 2, 2017
1 parent 6bf3e86 commit ccc8972
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 250 deletions.
2 changes: 2 additions & 0 deletions conf/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ viewspath = "views"
httpaddr = "0.0.0.0"
httpport = 8000
recoverpanic = false
app.store = "mysql"
mysql.url = "root:grant@tcp(127.0.0.1:3306)/PortForwardDb?charset=utf8&loc=Asia%2FShanghai"
[prod]
httpaddr = "0.0.0.0"
httpport = 8000
Expand Down
2 changes: 0 additions & 2 deletions conf/data.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@

mysql.url = "root:grant@tcp(127.0.0.1:3306)/PortForwardDb?charset=utf8&loc=Asia%2FShanghai"

api.auth = "26CCD056107481F45D1AC805A24A9E59"


2 changes: 1 addition & 1 deletion controllers/RestApiCtrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *RestApiCtrl) OpenForward() {
//测试
//http://127.0.0.1:8000/api/v1/OpenForward?auth=26CCD056107481F45D1AC805A24A9E59&fromAddr=:8010&toAddr=127.0.0.1:3306
resultChan := make(chan models.ResultData)
go services.ForwardS.StartPortForward(fromAddr, toAddr, resultChan)
go services.ForwardS.StartPortForward(entity, resultChan)

c.Data["json"] = <-resultChan

Expand Down
12 changes: 8 additions & 4 deletions controllers/TcpForwardCtrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (c *ForwardCtrl) SaveForward() {
//protocol := c.GetString("protocol", "TCP")
targetAddr := c.GetString("targetAddr", "")
targetPort, _ := c.GetInt("targetPort")
others := c.GetString("others", "")

if utils.IsEmpty(name) {
//
Expand Down Expand Up @@ -142,6 +143,11 @@ func (c *ForwardCtrl) SaveForward() {
return
}

// if utils.IsNotEmpty(others) {
// //如果有others信息,则检查

// }

if id > 0 {
entity := services.SysDataS.GetPortForwardById(id)
key := services.ForwardS.GetKeyByEntity(entity)
Expand All @@ -164,6 +170,7 @@ func (c *ForwardCtrl) SaveForward() {
entity.Protocol = "TCP"
entity.TargetAddr = targetAddr
entity.TargetPort = targetPort
entity.Others = others

err := services.SysDataS.SavePortForward(entity)
if err == nil {
Expand All @@ -181,11 +188,8 @@ func (c *ForwardCtrl) OpenForward() {
id, _ := c.GetInt("id")
entity := services.SysDataS.GetPortForwardById(id)

fromAddr := fmt.Sprint(entity.Addr, ":", entity.Port)
toAddr := fmt.Sprint(entity.TargetAddr, ":", entity.TargetPort)

resultChan := make(chan models.ResultData)
go services.ForwardS.StartPortForward(fromAddr, toAddr, resultChan)
go services.ForwardS.StartPortForward(entity, resultChan)

c.Data["json"] = <-resultChan

Expand Down
Binary file modified data/data.db
Binary file not shown.
2 changes: 2 additions & 0 deletions models/PortForward.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type PortForward struct {
// 端口号
TargetPort int `orm:"column(targetPort);null"`
CreateTime time.Time `orm:"column(createTime);type(datetime)"`
//暂时用来存放端口分发配置,后续版本再调整
Others string `orm:"column(others);size(500);null"`
}

func (t *PortForward) TableName() string {
Expand Down
60 changes: 52 additions & 8 deletions services/ForwardService.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"net"
"port-forward/models"
"port-forward/utils"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -91,7 +92,11 @@ func (_self *ForwardService) GetKey(sourcePort, targetPort string) string {
//
// sourcePort 源地址和端口,例如:0.0.0.0:8700,本程序会新建立监听
// targetPort 数据转发给哪个端口,例如:192.168.1.100:3306
func (_self *ForwardService) StartPortForward(sourcePort string, targetPort string, result chan models.ResultData) {
func (_self *ForwardService) StartPortForward(portForward *models.PortForward, result chan models.ResultData) {

sourcePort := fmt.Sprint(portForward.Addr, ":", portForward.Port)
targetPort := fmt.Sprint(portForward.TargetAddr, ":", portForward.TargetPort)

resultData := &models.ResultData{Code: 0, Msg: ""}
logs.Debug("StartTcpPortForward sourcePort: ", sourcePort, " targetPort:", targetPort)

Expand Down Expand Up @@ -135,18 +140,43 @@ func (_self *ForwardService) StartPortForward(sourcePort string, targetPort stri
//targetPort := "172.16.128.83:22"
targetConn, err := net.DialTimeout("tcp", targetPort, 30*time.Second)

go func() {
_, err = _self.Copy(targetConn, sourceConn)
if err != nil {
logs.Error("1网络连接异常:", err)
_self.UnRegistryClient(fmt.Sprint(sourcePort, "_", sourceConn.RemoteAddr().String()))
if utils.IsNotEmpty(portForward.Others) {
var dispatchConns []io.Writer
dispatchConns = append(dispatchConns, targetConn)
//分发方式
dispatchTargets := utils.Split(portForward.Others, ";")

for _, dispatchTarget := range dispatchTargets {
logs.Debug("分发到:", dispatchTarget)
dispatchTargetConn, err := net.DialTimeout("tcp", dispatchTarget, 30*time.Second)
if err == nil {
dispatchConns = append(dispatchConns, dispatchTargetConn)
}

}
}()

go func() {
mWriter := io.MultiWriter(dispatchConns...)
_, err = _self.Copy(mWriter, sourceConn)
if err != nil {
logs.Error("Dispatch网络连接异常:", err)
}
}()

} else {
go func() {
_, err = _self.Copy(targetConn, sourceConn)
if err != nil {
logs.Error("客户端来源数据转发到目标端口异常:", err)
_self.UnRegistryClient(fmt.Sprint(sourcePort, "_", sourceConn.RemoteAddr().String()))
}
}()
}

go func() {
_, err = _self.Copy(sourceConn, targetConn)
if err != nil {
logs.Error("2网络连接异常:", err)
logs.Error("目标端口返回响应数据异常:", err)
_self.UnRegistryPort(sourcePort)
}
}()
Expand All @@ -157,6 +187,20 @@ func (_self *ForwardService) StartPortForward(sourcePort string, targetPort stri

}

func (_self *ForwardService) DataDispatch(src io.Reader, targetPorts []string) {
for _, target := range targetPorts {
logs.Debug("分发到:", target)
go func() {
targetConn, err := net.DialTimeout("tcp", target, 30*time.Second)
_, err = _self.Copy(targetConn, src)
if err != nil {
logs.Error("Dispatch网络连接异常:", err)
}
}()
}

}

func (_self *ForwardService) ClosePortForward(sourcePort string, targetPort string, result chan models.ResultData) {
resultData := &models.ResultData{Code: 0, Msg: ""}

Expand Down
24 changes: 15 additions & 9 deletions services/InitServices.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package services

import (
"github.com/astaxie/beego"
"github.com/astaxie/beego/orm"
_ "github.com/go-sql-driver/mysql"
_ "github.com/mattn/go-sqlite3"
)

Expand All @@ -12,16 +14,20 @@ var (
)

func init() {
//数据库连接
//_ "github.com/mattn/go-sqlite3"
orm.RegisterDriver("sqlite3", orm.DRSqlite)
orm.RegisterDataBase("default", "sqlite3", "file:data/data.db?cache=shared&loc=auto")
//orm.RegisterDataBase("default", "sqlite3", "file::memory:?mode=memory&cache=shared&loc=auto")
appStore := beego.AppConfig.DefaultString("app.store", "sqlite3")

//_ "github.com/go-sql-driver/mysql"
//dataSource := beego.AppConfig.String("mysql.url")
//orm.RegisterDriver("mysql", orm.DRMySQL)
//orm.RegisterDataBase("default", "mysql", dataSource)
//数据库连接
if appStore == "mysql" {
//_ "github.com/go-sql-driver/mysql"
dataSource := beego.AppConfig.String("mysql.url")
orm.RegisterDriver("mysql", orm.DRMySQL)
orm.RegisterDataBase("default", "mysql", dataSource)
} else {
//_ "github.com/mattn/go-sqlite3"
orm.RegisterDriver("sqlite3", orm.DRSqlite)
orm.RegisterDataBase("default", "sqlite3", "file:data/data.db?cache=shared&loc=auto")
//orm.RegisterDataBase("default", "sqlite3", "file::memory:?mode=memory&cache=shared&loc=auto")
}

//开启DEBUG模式,输出SQL信息
orm.Debug = true
Expand Down
5 changes: 3 additions & 2 deletions services/SysDataService.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,15 @@ func (_self *SysDataService) SavePortForward(entity *models.PortForward) error {
update.Protocol = entity.Protocol
update.TargetAddr = entity.TargetAddr
update.TargetPort = entity.TargetPort
update.Others = entity.Others

_, err1 := OrmerS.Update(update)
return err1
} else {
entity.CreateTime = time.Now()
//_, err := OrmerS.Insert(entity)
res, err := OrmerS.Raw("INSERT INTO t_port_forward(name, status, addr, port, protocol, targetAddr, targetPort, createTime) values(?,?,?,?,?,?,?,?)",
entity.Name, entity.Status, entity.Addr, entity.Port, entity.Protocol, entity.TargetAddr, entity.TargetPort, entity.CreateTime).Exec()
res, err := OrmerS.Raw("INSERT INTO t_port_forward(name, status, addr, port, protocol, targetAddr, targetPort, createTime, others) values(?,?,?,?,?,?,?,?,?)",
entity.Name, entity.Status, entity.Addr, entity.Port, entity.Protocol, entity.TargetAddr, entity.TargetPort, entity.CreateTime, entity.Others).Exec()
if err == nil {
num, _ := res.RowsAffected()
logs.Debug("AddPortForward", num)
Expand Down
15 changes: 14 additions & 1 deletion views/ucenter/forwardForm.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,20 @@
<input type="text" name="targetPort" value="{{if gt .entity.TargetPort 0}}{{.entity.TargetPort}}{{end}}" lay-verify="required|number" placeholder="目标端口号,例如3306" autocomplete="off" class="layui-input">
</div>
</div>

<div class="layui-form-item">
<label class="layui-form-label">多分发</label>
<div class="layui-input-block">
<input type="text" name="others" value="{{.entity.Others}}" placeholder="如 192.1.1.100:8081;192.1.1.100:8082" title="多个用 ; 号隔开" autocomplete="off" class="layui-input">
</div>
</div>
<div class="layui-form-item">
<pre class="layui-code" style="text-align: left;">
*多分发说明:
如果不需要“多分发”,请留空;
如果需要分发到多个端口,填写格式为 “IP:端口”,多个用 ; 号隔开
如 192.1.1.100:8081;192.1.1.100:8082
</pre>
</div>
</form>

</div>
Expand Down
Loading

0 comments on commit ccc8972

Please sign in to comment.