Skip to content

Commit

Permalink
NOISSUE - Switch from mProxy to mGate (#2500)
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Borovcanin <[email protected]>
  • Loading branch information
dborovcanin authored Nov 4, 2024
1 parent e271d0f commit ef8631b
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 61 deletions.
10 changes: 5 additions & 5 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/mproxy"
mproxyhttp "github.com/absmach/mproxy/pkg/http"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate"
mgatehttp "github.com/absmach/mgate/pkg/http"
"github.com/absmach/mgate/pkg/session"
"github.com/caarlos0/env/v11"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -163,7 +163,7 @@ func newService(pub messaging.Publisher, tc magistrala.ThingsServiceClient, logg
}

func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sessionHandler session.Handler) error {
config := mproxy.Config{
config := mgate.Config{
Address: fmt.Sprintf("%s:%s", "", cfg.Port),
Target: fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort),
PathPrefix: "/",
Expand All @@ -177,7 +177,7 @@ func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sess
Certificates: []tls.Certificate{tlsCert},
}
}
mp, err := mproxyhttp.NewProxy(config, sessionHandler, logger)
mp, err := mgatehttp.NewProxy(config, sessionHandler, logger)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
mqttpub "github.com/absmach/magistrala/pkg/messaging/mqtt"
"github.com/absmach/magistrala/pkg/server"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/mproxy"
mproxymqtt "github.com/absmach/mproxy/pkg/mqtt"
"github.com/absmach/mproxy/pkg/mqtt/websocket"
"github.com/absmach/mproxy/pkg/session"
mgate "github.com/absmach/mgate"
mgatemqtt "github.com/absmach/mgate/pkg/mqtt"
"github.com/absmach/mgate/pkg/mqtt/websocket"
"github.com/absmach/mgate/pkg/session"
"github.com/caarlos0/env/v11"
"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -211,11 +211,11 @@ func main() {
}

func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
config := mproxy.Config{
config := mgate.Config{
Address: fmt.Sprintf(":%s", cfg.MQTTPort),
Target: fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort),
}
mproxy := mproxymqtt.New(config, sessionHandler, interceptor, logger)
mproxy := mgatemqtt.New(config, sessionHandler, interceptor, logger)

errCh := make(chan error)
go func() {
Expand All @@ -232,7 +232,7 @@ func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHand
}

func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
config := mproxy.Config{
config := mgate.Config{
Address: fmt.Sprintf("%s:%s", "", cfg.HTTPPort),
Target: fmt.Sprintf("ws://%s:%s%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort, wsPathPrefix),
PathPrefix: wsPathPrefix,
Expand Down
4 changes: 2 additions & 2 deletions cmd/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/absmach/magistrala/ws"
"github.com/absmach/magistrala/ws/api"
"github.com/absmach/magistrala/ws/tracing"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mproxy/pkg/websockets"
"github.com/absmach/mgate/pkg/session"
"github.com/absmach/mgate/pkg/websockets"
"github.com/caarlos0/env/v11"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054
github.com/absmach/mproxy v0.4.3-0.20240712131952-28f88581126a
github.com/absmach/mgate v0.4.5
github.com/absmach/senml v1.0.5
github.com/authzed/authzed-go v1.1.0
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ github.com/absmach/callhome v0.14.0 h1:zB4tIZJ1YUmZ1VGHFPfMA/Lo6/Mv19y2dvoOiXj2B
github.com/absmach/callhome v0.14.0/go.mod h1:l12UJOfibK4Muvg/AbupHuquNV9qSz/ROdTEPg7f2Vk=
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054 h1:NsIwp+ueKxDx8XftruA4hz8WUgyWq7eBE344nJt0LJg=
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054/go.mod h1:bEAb/HjPztlrMmz8dLeJTke4Tzu9yW3+hY5eldEUtSY=
github.com/absmach/mproxy v0.4.3-0.20240712131952-28f88581126a h1:3JtJSekVHb02U3NmIJa5f3a1I15aczKrBBCczGGCbxM=
github.com/absmach/mproxy v0.4.3-0.20240712131952-28f88581126a/go.mod h1:Nevip6o8u5Zx7l3LTtN8BwlCI5h5KpsnI9YnAxF5RT8=
github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.5 h1:zNPRYpGr2Wsb8brAusz8DIfFqemy1a2dNbmMnegY3GE=
github.com/absmach/senml v1.0.5/go.mod h1:NDEjk3O4V4YYu9Bs2/+t/AZ/F+0wu05ikgecp+/FsSU=
github.com/authzed/authzed-go v1.1.0 h1:aFy5mIwe9HzaRss0KmDXBhwAAN2LWIEoRNcPXTaLv8Y=
Expand Down
14 changes: 7 additions & 7 deletions http/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"github.com/absmach/magistrala/pkg/apiutil"
pubsub "github.com/absmach/magistrala/pkg/messaging/mocks"
thmocks "github.com/absmach/magistrala/things/mocks"
"github.com/absmach/mproxy"
mproxyhttp "github.com/absmach/mproxy/pkg/http"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate"
proxy "github.com/absmach/mgate/pkg/http"
"github.com/absmach/mgate/pkg/session"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand All @@ -41,11 +41,11 @@ func newTargetHTTPServer() *httptest.Server {
}

func newProxyHTPPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) {
config := mproxy.Config{
config := mgate.Config{
Address: "",
Target: targetServer.URL,
}
mp, err := mproxyhttp.NewProxy(config, svc, mglog.NewMock())
mp, err := proxy.NewProxy(config, svc, mglog.NewMock())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,15 +151,15 @@ func TestPublish(t *testing.T) {
msg: msg,
contentType: ctSenmlJSON,
key: invalidKey,
status: http.StatusBadRequest,
status: http.StatusUnauthorized,
},
"publish message with invalid basic auth": {
chanID: chanID,
msg: msg,
contentType: ctSenmlJSON,
key: invalidKey,
basicAuth: true,
status: http.StatusBadRequest,
status: http.StatusUnauthorized,
},
"publish message without content type": {
chanID: chanID,
Expand Down
43 changes: 18 additions & 25 deletions http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"log/slog"
"net/http"
"net/url"
"regexp"
"strings"
Expand All @@ -18,7 +19,8 @@ import (
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/policies"
"github.com/absmach/mproxy/pkg/session"
mgate "github.com/absmach/mgate/pkg/http"
"github.com/absmach/mgate/pkg/session"
)

var _ session.Handler = (*handler)(nil)
Expand All @@ -33,13 +35,13 @@ const (

// Error wrappers for MQTT errors.
var (
errMalformedSubtopic = errors.New("malformed subtopic")
errClientNotInitialized = errors.New("client is not initialized")
errMalformedTopic = errors.New("malformed topic")
errMissingTopicPub = errors.New("failed to publish due to missing topic")
errFailedPublish = errors.New("failed to publish")
errFailedParseSubtopic = errors.New("failed to parse subtopic")
errFailedPublishToMsgBroker = errors.New("failed to publish to magistrala message broker")
errMalformedSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed subtopic"))
errMalformedTopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed topic"))
errMissingTopicPub = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to publish due to missing topic"))
errFailedParseSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to parse subtopic"))
)

var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
Expand Down Expand Up @@ -71,9 +73,9 @@ func (h *handler) AuthConnect(ctx context.Context) error {
var tok string
switch {
case string(s.Password) == "":
return errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey)
case strings.HasPrefix(string(s.Password), "Thing"):
tok = extractThingKey(string(s.Password))
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey))
case strings.HasPrefix(string(s.Password), apiutil.ThingPrefix):
tok = strings.TrimPrefix(string(s.Password), apiutil.ThingPrefix)
default:
tok = string(s.Password)
}
Expand Down Expand Up @@ -113,15 +115,15 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e

channelParts := channelRegExp.FindStringSubmatch(*topic)
if len(channelParts) < 2 {
return errors.Wrap(errFailedPublish, errMalformedTopic)
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedPublish, errMalformedTopic))
}

chanID := channelParts[1]
subtopic := channelParts[2]

subtopic, err := parseSubtopic(subtopic)
if err != nil {
return errors.Wrap(errFailedParseSubtopic, err)
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedParseSubtopic, err))
}

msg := messaging.Message{
Expand All @@ -135,8 +137,8 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
switch {
case string(s.Password) == "":
return errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey)
case strings.HasPrefix(string(s.Password), "Thing"):
tok = extractThingKey(string(s.Password))
case strings.HasPrefix(string(s.Password), apiutil.ThingPrefix):
tok = strings.TrimPrefix(string(s.Password), apiutil.ThingPrefix)
default:
tok = string(s.Password)
}
Expand All @@ -147,10 +149,10 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
}
res, err := h.things.Authorize(ctx, ar)
if err != nil {
return err
return mgate.NewHTTPProxyError(http.StatusBadRequest, err)
}
if !res.GetAuthorized() {
return svcerr.ErrAuthorization
return mgate.NewHTTPProxyError(http.StatusUnauthorized, svcerr.ErrAuthorization)
}
msg.Publisher = res.GetId()

Expand Down Expand Up @@ -183,7 +185,7 @@ func parseSubtopic(subtopic string) (string, error) {

subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")

Expand All @@ -195,7 +197,7 @@ func parseSubtopic(subtopic string) (string, error) {
}

if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
}

filteredElems = append(filteredElems, elem)
Expand All @@ -204,12 +206,3 @@ func parseSubtopic(subtopic string) (string, error) {
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}

// extractThingKey returns value of the thing key. If there is no thing key - an empty value is returned.
func extractThingKey(topic string) string {
if !strings.HasPrefix(topic, apiutil.ThingPrefix) {
return ""
}

return strings.TrimPrefix(topic, apiutil.ThingPrefix)
}
2 changes: 1 addition & 1 deletion mqtt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/policies"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate/pkg/session"
)

var _ session.Handler = (*handler)(nil)
Expand Down
2 changes: 1 addition & 1 deletion mqtt/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/absmach/magistrala/pkg/errors"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
thmocks "github.com/absmach/magistrala/things/mocks"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate/pkg/session"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/messaging/handler/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"log/slog"
"time"

"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate/pkg/session"
)

var _ session.Handler = (*loggingMiddleware)(nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/messaging/handler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"context"
"time"

"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate/pkg/session"
"github.com/go-kit/kit/metrics"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/messaging/handler/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package handler
import (
"context"

"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate/pkg/session"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down
8 changes: 4 additions & 4 deletions pkg/sdk/go/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
readersapi "github.com/absmach/magistrala/readers/api"
readersmocks "github.com/absmach/magistrala/readers/mocks"
thmocks "github.com/absmach/magistrala/things/mocks"
"github.com/absmach/mproxy"
mproxyhttp "github.com/absmach/mproxy/pkg/http"
"github.com/absmach/mgate"
proxy "github.com/absmach/mgate/pkg/http"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand All @@ -38,11 +38,11 @@ func setupMessages() (*httptest.Server, *thmocks.ThingsServiceClient, *pubsub.Pu
mux := api.MakeHandler(mglog.NewMock(), "")
target := httptest.NewServer(mux)

config := mproxy.Config{
config := mgate.Config{
Address: "",
Target: target.URL,
}
mp, err := mproxyhttp.NewProxy(config, handler, mglog.NewMock())
mp, err := proxy.NewProxy(config, handler, mglog.NewMock())
if err != nil {
return nil, nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions ws/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
thmocks "github.com/absmach/magistrala/things/mocks"
"github.com/absmach/magistrala/ws"
"github.com/absmach/magistrala/ws/api"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mproxy/pkg/websockets"
"github.com/absmach/mgate/pkg/session"
"github.com/absmach/mgate/pkg/websockets"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down
2 changes: 1 addition & 1 deletion ws/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/policies"
"github.com/absmach/mproxy/pkg/session"
"github.com/absmach/mgate/pkg/session"
)

var _ session.Handler = (*handler)(nil)
Expand Down

0 comments on commit ef8631b

Please sign in to comment.