Skip to content

Commit

Permalink
Feat/event processor (#16)
Browse files Browse the repository at this point in the history
* fix(kookin): 🚑 websocket write failed

* docs: 🔥 ignore
  • Loading branch information
Aimerny authored Jul 29, 2024
1 parent 6e64b6b commit 3cce531
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 126 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
conf.json
session_id
dist/
main
main
.idea/
12 changes: 12 additions & 0 deletions .idea/dataSources.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

132 changes: 18 additions & 114 deletions app/internal/client/helper.go
Original file line number Diff line number Diff line change
@@ -1,116 +1,20 @@
package client

//import (
//
// "bytes"
// "encoding/json"
// "fmt"
// "github.com/tencent-connect/botgo/log"
// "io"
// "net/http"
// "time"
//)
//
////const QQApiUrl = "https://api.sgroup.qq.com"
//
//type QQClient struct {
// http.Client
// meta *BotMeta
// accessToken string
//}
//
//type BotMeta struct {
// AppId string `json:"appId"`
// AppSecret string `json:"clientSecret"`
//}
//
//type KookClient struct {
// http.Client
// AccessToken string
//}
//
//func NewQQClient(appId string, accessToken string) *QQClient {
// client := &QQClient{
// http.Client{},
// &BotMeta{
// appId,
// accessToken,
// },
// "",
// }
// go client.flushToken()
//
// return client
//}
//
//func (c *QQClient) Get(url string) ([]byte, error) {
//
// request, err := http.NewRequest("GET", url, nil)
// if err != nil {
// log.Errorf("Http request construct failed! err: %e", err)
// return nil, err
// }
// request.Header.Set("Authorization", fmt.Sprintf("QQBot %s", c.accessToken))
// request.Header.Set("X-Union-Appid", c.meta.AppId)
// return c.doRequestAndReadBody(request)
//}
//
//func (c *QQClient) Post(url string, body any) ([]byte, error) {
// bodyBytes, _ := json.Marshal(body)
// buffer := bytes.NewBuffer(bodyBytes)
// request, err := http.NewRequest("POST", url, buffer)
// if err != nil {
// log.Errorf("Http request construct failed! err: %e", err)
// return nil, err
// }
// request.Header.Set("Authorization", fmt.Sprintf("QQBot %s", c.accessToken))
// request.Header.Set("X-Union-Appid", c.meta.AppId)
// return c.doRequestAndReadBody(request)
//}
//
//func (c *QQClient) doRequestAndReadBody(r *http.Request) ([]byte, error) {
// resp, err := c.Do(r)
// if err != nil {
// log.Errorf("http request failed! err:%e", err)
// return nil, err
// }
// defer resp.Body.Close()
// data, err := io.ReadAll(resp.Body)
// if err != nil {
// log.Errorf("read response body failed! err:%e", err)
// return nil, err
// }
// return data, nil
//}
//
//func (c *QQClient) flushToken() {
// // https://bots.qq.com/app/getAppAccessToken
// log.Info("start flush token")
// body, _ := json.Marshal(c.meta)
// buffer := bytes.NewBuffer(body)
// request, _ := http.NewRequest("POST", "https://bots.qq.com/app/getAppAccessToken", buffer)
// request.Header.Add("Content-Type", "application/json")
// for {
// // 1min 尝试刷新一次
// resp, err := c.Do(request)
// if err != nil {
// log.Errorf("[QQClient] get access token failed")
// continue
// }
// defer resp.Body.Close()
// authInfoBytes, _ := io.ReadAll(resp.Body)
// authInfo := make(map[string]string)
// err = json.Unmarshal(authInfoBytes, &authInfo)
// if err != nil {
// log.Errorf("[QQClient] unmarshal resp body failed. resp: [%s]", string(authInfoBytes))
// continue
// }
// if value := authInfo["access_token"]; value != "" {
// if c.accessToken != value {
// log.Info("[QQClient] accessToken has flushed, new token is:", value)
// c.accessToken = authInfo["access_token"]
// }
// }
// <-time.After(1 * time.Minute)
// }
//}
import (
"github.com/gorilla/websocket"
"sync"
)

type WsClient struct {
*websocket.Conn
Status bool
mutex sync.Mutex
}

func (client *WsClient) Lock() bool {
return client.mutex.TryLock()
}

func (client *WsClient) Unlock() {
client.mutex.Unlock()
}
10 changes: 6 additions & 4 deletions app/internal/server/ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"github/aimerny/elix/app/internal/client"
"github/aimerny/elix/app/internal/service"
"net/http"
)
Expand All @@ -14,8 +15,6 @@ var upgrader = websocket.Upgrader{
CheckOrigin: checkOrigin,
}

var Broadcast = make(chan []byte)

func StartWsProxyServer(port int) {

http.HandleFunc("/ws", EchoMessage)
Expand All @@ -32,13 +31,16 @@ func EchoMessage(w http.ResponseWriter, r *http.Request) {
logrus.Warning("upgrade failed!")
}
//add client to cache
service.Clients[conn] = true
service.Clients = append(service.Clients, &client.WsClient{
Conn: conn,
Status: true,
})
defer conn.Close()
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
logrus.Warning("ws read message error, remove the conn from clients:", err)
delete(service.Clients, conn)

break
}

Expand Down
14 changes: 8 additions & 6 deletions app/internal/service/kookin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
"github/aimerny/elix/app/internal/client"
"github/aimerny/elix/app/internal/dto"
"strings"
)

var Clients = make(map[*websocket.Conn]bool)
var Clients = make([]*client.WsClient, 0)

func FindChannels(searchKey string) *dto.GetChannelResp {
res := &dto.GetChannelResp{
Expand Down Expand Up @@ -58,16 +59,17 @@ func FindChannels(searchKey string) *dto.GetChannelResp {
func ForwardEventToAllClients(evt *model.Event) {
// send to ws clients
clients := Clients
for conn, status := range clients {
if conn == nil || !status {
log.WithField("client", conn).Warn("client status is not active or client is nil.has been removed")
delete(clients, conn)
for _, wsClient := range clients {
if !wsClient.Status {
log.WithField("wsClient", wsClient).Warn("wsClient status is not active or wsClient is nil.has been removed")
continue
}
wsClient.Lock()
bytes, _ := jsoniter.Marshal(evt)
err := conn.WriteMessage(websocket.TextMessage, bytes)
err := wsClient.WriteMessage(websocket.TextMessage, bytes)
if err != nil {
log.WithError(err).WithField("data", string(bytes)).Error("send ws text message failed")
}
wsClient.Unlock()
}
}
2 changes: 1 addition & 1 deletion app/internal/service/onge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var chuniChartTypeList = []string{"Basic", "Advance", "Export", "Master", "Ultra

// InitOngeService Init music game about service
func InitOngeService(dbConf *common.DatasourceConf) {
if OngeServiceDS == nil || dbConf != nil {
if OngeServiceDS != nil || dbConf != nil {
OngeServiceDS = dbConf.DbConfig.ConnectDB()
}
log.Infof("init onge service finished")
Expand Down

0 comments on commit 3cce531

Please sign in to comment.