Skip to content

Commit

Permalink
Merge pull request #2805 from gravitl/NET-908
Browse files Browse the repository at this point in the history
NET-908: EMQX cloud Apis support
  • Loading branch information
abhishek9686 authored Feb 6, 2024
2 parents 48940ac + 82de96d commit e685e3c
Show file tree
Hide file tree
Showing 11 changed files with 785 additions and 423 deletions.
10 changes: 2 additions & 8 deletions auth/host_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,13 @@ func SessionHandler(conn *websocket.Conn) {
select {
case result := <-answer: // a read from req.answerCh has occurred
// add the host, if not exists, handle like enrollment registration
hostPass := result.Host.HostPass
if !logic.HostExists(&result.Host) { // check if host already exists, add if not
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err := mq.CreateEmqxUser(result.Host.ID.String(), result.Host.HostPass, false); err != nil {
if err := mq.GetEmqxHandler().CreateEmqxUser(result.Host.ID.String(), result.Host.HostPass); err != nil {
logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
return
}
if err := mq.CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
if err := mq.GetEmqxHandler().CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
return
}
Expand Down Expand Up @@ -203,11 +202,6 @@ func SessionHandler(conn *websocket.Conn) {
}
server := servercfg.GetServerInfo()
server.TrafficKey = key
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
// set MQ username and password for EMQX clients
server.MQUserName = result.Host.ID.String()
server.MQPassword = hostPass
}
result.Host.HostPass = ""
response := models.RegisterResponse{
ServerConf: server,
Expand Down
10 changes: 2 additions & 8 deletions controllers/enrollmentkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,17 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
)
return
}
hostPass := newHost.HostPass
if !hostExists {
newHost.PersistentKeepalive = models.DefaultPersistentKeepAlive
// register host
logic.CheckHostPorts(&newHost)
// create EMQX credentials and ACLs for host
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err := mq.CreateEmqxUser(newHost.ID.String(), newHost.HostPass, false); err != nil {
if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass); err != nil {
logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
return
}
if err := mq.CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil {
if err := mq.GetEmqxHandler().CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil {
logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
return
}
Expand Down Expand Up @@ -361,11 +360,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
// ready the response
server := servercfg.GetServerInfo()
server.TrafficKey = key
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
// set MQ username and password for EMQX clients
server.MQUserName = newHost.ID.String()
server.MQPassword = hostPass
}
response := models.RegisterResponse{
ServerConf: server,
RequestedHost: newHost,
Expand Down
11 changes: 4 additions & 7 deletions controllers/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ func pull(w http.ResponseWriter, r *http.Request) {
return
}
serverConf := servercfg.GetServerInfo()
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
serverConf.MQUserName = hostID
}
key, keyErr := logic.RetrievePublicTrafficKey()
if keyErr != nil {
logger.Log(0, "error retrieving key:", keyErr.Error())
Expand Down Expand Up @@ -298,7 +295,7 @@ func deleteHost(w http.ResponseWriter, r *http.Request) {
}
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
// delete EMQX credentials for host
if err := mq.DeleteEmqxUser(currHost.ID.String()); err != nil {
if err := mq.GetEmqxHandler().DeleteEmqxUser(currHost.ID.String()); err != nil {
slog.Error("failed to remove host credentials from EMQX", "id", currHost.ID, "error", err)
}
}
Expand Down Expand Up @@ -555,15 +552,15 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {

// Create EMQX creds and ACLs if not found
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err := mq.CreateEmqxUser(host.ID.String(), authRequest.Password, false); err != nil {
if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
slog.Error("failed to create host credentials for EMQX: ", err.Error())
} else {
if err := mq.CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
}
for _, nodeID := range host.Nodes {
if node, err := logic.GetNodeByID(nodeID); err == nil {
if err = mq.AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
slog.Error("failed to add ACLs for EMQX node", "error", err)
}
} else {
Expand Down
3 changes: 0 additions & 3 deletions controllers/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ func migrate(w http.ResponseWriter, r *http.Request) {
return
}
server = servercfg.GetServerInfo()
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
server.MQUserName = host.ID.String()
}
key, keyErr := logic.RetrievePublicTrafficKey()
if keyErr != nil {
slog.Error("retrieving traffickey", "error", err)
Expand Down
4 changes: 0 additions & 4 deletions controllers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,6 @@ func getNode(w http.ResponseWriter, r *http.Request) {
return
}
server := servercfg.GetServerInfo()
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
// set MQ username for EMQX clients
server.MQUserName = host.ID.String()
}
response := models.NodeGet{
Node: node,
Host: *host,
Expand Down
Loading

0 comments on commit e685e3c

Please sign in to comment.