Skip to content

Commit

Permalink
Merge pull request #84 from juliangracin/amp/backend-only-headers
Browse files Browse the repository at this point in the history
AMP - add backend exclusive headers
  • Loading branch information
mprcela authored Mar 28, 2024
2 parents a016f87 + c8d3328 commit c6f1d30
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 62 deletions.
97 changes: 63 additions & 34 deletions amp/amp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,23 @@ import (
"bytes"
"compress/flate"
"encoding/json"
"github.com/minus5/svckit/pkg/compress"
"io"
"net/url"
"strings"
"sync"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/minus5/svckit/pkg/compress"

"github.com/minus5/svckit/log"
)

var (
jsonSerializer = jsoniter.Config{TagKey: "json"}.Froze()
backendSerializer = jsoniter.Config{TagKey: "backend"}.Froze()
)

// Message types
const (
Publish uint8 = iota // stream updated message, view update types below
Expand Down Expand Up @@ -64,7 +71,7 @@ const (

var (
compressionLenLimit = 8 * 1024 // do not compress messages smaller than
separtor = []byte{10}
separator = []byte{10}
)

// Subscriber is the interface for subscribing to the topics
Expand All @@ -86,17 +93,18 @@ type BodyMarshaler interface {

// Msg basic application message structure
type Msg struct {
Type uint8 `json:"t,omitempty"` // message type
ReplyTo string `json:"r,omitempty"` // topic to send replay to
CorrelationID uint64 `json:"i,omitempty"` // correlationID between request and response
Error *Error `json:"e,omitempty"` // error description in response message
URI string `json:"u,omitempty"` // has structure: topic/path
Ts int64 `json:"s,omitempty"` // timestamp unix milli
UpdateType uint8 `json:"p,omitempty"` // explains how to handle publish message
Replay uint8 `json:"l,omitempty"` // is this a re-play message (repeated)
Subscriptions map[string]int64 `json:"b,omitempty"` // topics to subscribe to
CacheDepth int `json:"d,omitempty"` // cache depth for append update type messages
Meta map[string]string `json:"m,omitempty"` // client session metadata
Type uint8 `json:"t,omitempty" backend:"t,omitempty"` // message type
ReplyTo string `json:"r,omitempty" backend:"r,omitempty"` // topic to send replay to
CorrelationID uint64 `json:"i,omitempty" backend:"i,omitempty"` // correlationID between request and response
Error *Error `json:"e,omitempty" backend:"e,omitempty"` // error description in response message
URI string `json:"u,omitempty" backend:"u,omitempty"` // has structure: topic/path
Ts int64 `json:"s,omitempty" backend:"s,omitempty"` // timestamp unix milli
UpdateType uint8 `json:"p,omitempty" backend:"p,omitempty"` // explains how to handle publish message
Replay uint8 `json:"l,omitempty" backend:"l,omitempty"` // is this a re-play message (repeated)
Subscriptions map[string]int64 `json:"b,omitempty" backend:"b,omitempty"` // topics to subscribe to
CacheDepth int `json:"d,omitempty" backend:"d,omitempty"` // cache depth for append update type messages
Meta map[string]string `json:"m,omitempty" backend:"m,omitempty"` // client session metadata
BackendHeaders map[string]string `json:"-" backend:"h,omitempty"` // exclusive for communication between backend services

body []byte
noCompression bool
Expand All @@ -115,17 +123,28 @@ type Error struct {
Code int `json:"c,omitempty"`
}

// Parse decodes Msg from []byte
// Parse decodes Msg received from client.
func Parse(buf []byte) *Msg {
return parse(buf, jsonSerializer)
}

// ParseFromBackend parses the message received from another backend service.
func ParseFromBackend(buf []byte) *Msg {
return parse(buf, backendSerializer)
}

func parse(buf []byte, serializer jsoniter.API) *Msg {
if buf == nil {
return nil
}
parts := bytes.SplitN(buf, separtor, 2)
parts := bytes.SplitN(buf, separator, 2)

m := &Msg{}
if err := json.Unmarshal(parts[0], m); err != nil {
if err := serializer.Unmarshal(parts[0], m); err != nil {
log.S("header", string(parts[0])).Error(err)
return nil
}

if len(parts) > 1 {
m.body = parts[1]
body, err := compress.GunzipIf(m.body)
Expand All @@ -136,6 +155,7 @@ func Parse(buf []byte) *Msg {
m.body = body
}
}

return m
}

Expand Down Expand Up @@ -167,19 +187,25 @@ func Undeflate(data []byte) []byte {
return out.Bytes()
}

// Marshal packs message for sending on the wire
// Marshal the message that will be sent to the client.
func (m *Msg) Marshal() []byte {
buf, _ := m.marshal(CompressionNone, CompatibilityVersionDefault)
buf, _ := m.marshal(CompressionNone, CompatibilityVersionDefault, jsonSerializer)
return buf
}

// MarshalForBackend is used when marshaling Msg for backend to backend communication.
func (m *Msg) MarshalForBackend() []byte {
buf, _ := m.marshal(CompressionNone, CompatibilityVersionDefault, backendSerializer)
return buf
}

// MarshalDeflate packs and compress message
// MarshalDeflate packs and compress message that will be sent to the client.
func (m *Msg) MarshalDeflate() ([]byte, bool) {
return m.marshal(CompressionDeflate, CompatibilityVersionDefault)
return m.marshal(CompressionDeflate, CompatibilityVersionDefault, jsonSerializer)
}

// marshal encodes message into []byte
func (m *Msg) marshal(supportedCompression, version uint8) ([]byte, bool) {
func (m *Msg) marshal(supportedCompression, version uint8, serializer jsoniter.API) ([]byte, bool) {
if version == CompatibilityVersion1 {
if m.UpdateType == BurstStart || m.UpdateType == BurstEnd {
// unsuported mesage types in this version
Expand All @@ -188,6 +214,7 @@ func (m *Msg) marshal(supportedCompression, version uint8) ([]byte, bool) {
}
m.Lock()
defer m.Unlock()

compression := supportedCompression
if m.noCompression {
compression = CompressionNone
Expand All @@ -198,7 +225,7 @@ func (m *Msg) marshal(supportedCompression, version uint8) ([]byte, bool) {
return payload, compression != CompressionNone
}

payload := m.payload(version)
payload := m.payload(version, serializer)
// decide wather we need compression
if len(payload) < compressionLenLimit {
m.noCompression = true
Expand All @@ -217,15 +244,15 @@ func (m *Msg) marshal(supportedCompression, version uint8) ([]byte, bool) {
return payload, compression != CompressionNone
}

func (m *Msg) payload(version uint8) []byte {
func (m *Msg) payload(version uint8, serializer jsoniter.API) []byte {
var header []byte
if version == CompatibilityVersion1 {
header = m.marshalV1header()
} else {
header, _ = json.Marshal(m)
header, _ = serializer.Marshal(m)
}
buf := bytes.NewBuffer(header)
buf.Write(separtor)
buf.Write(separator)
if m.body != nil {
buf.Write(m.body)
}
Expand Down Expand Up @@ -265,9 +292,10 @@ func (m *Msg) Unmarshal(v interface{}) error {
// Response creates response message from original request
func (m *Msg) Response(o interface{}) *Msg {
return &Msg{
Type: Response,
CorrelationID: m.CorrelationID,
src: toBodyMarshaler(o),
Type: Response,
CorrelationID: m.CorrelationID,
src: toBodyMarshaler(o),
BackendHeaders: m.BackendHeaders,
}
}

Expand Down Expand Up @@ -317,12 +345,13 @@ func (m *Msg) ResponseError(err error) *Msg {
// Request creates request type message from original message
func (m *Msg) Request() *Msg {
return &Msg{
Type: Request,
CorrelationID: m.CorrelationID,
URI: m.URI,
Meta: m.Meta,
src: m.src,
body: m.body,
Type: Request,
CorrelationID: m.CorrelationID,
URI: m.URI,
Meta: m.Meta,
BackendHeaders: m.BackendHeaders,
src: m.src,
body: m.body,
}
}

Expand Down
Loading

0 comments on commit c6f1d30

Please sign in to comment.