Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Room pool + custom E2EE + load test report #4

Open
wants to merge 5 commits into
base: c4t
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type SupportedRequestTypesFlag []string
type AppConfig struct {
DeveloperMode bool `mapstructure:"developer_mode"`
SupportedRequestTypes SupportedRequestTypesFlag `mapstructure:"supported_request_types"`
BotMode uint `mapstructure:"bot_mode"` // 0 both, 1 request, 2 response
PrivateRSAFileKey string `mapstructure:"private_rsa_file_key"`
}
type MatrixConfig struct {
Key string `mapstructure:"matrix_key"` // TODO @evlekht I'd suggest to add some parsed config, so we'll see on config read if some fields are invalid
Expand Down
2 changes: 2 additions & 0 deletions config/flag_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ const (
PartnerPluginCAFileKey = "partner_plugin_ca_file"
MessengerTimeoutKey = "messenger_timeout"
SupportedRequestTypesKey = "supported_request_types"
BotModeKey = "bot_mode"
PrivateRSAKey = "private_rsa_key"
)
2 changes: 2 additions & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import "flag"
func readAppConfig(cfg AppConfig, fs *flag.FlagSet) {
fs.BoolVar(&cfg.DeveloperMode, DeveloperMode, false, "Sets developer mode")
fs.Var(&cfg.SupportedRequestTypes, SupportedRequestTypesKey, "The list of supported request types")
fs.UintVar(&cfg.BotMode, BotModeKey, 0, "The bot mode")
fs.StringVar(&cfg.PrivateRSAFileKey, PrivateRSAKey, "", "The private RSA key file")
flag.Parse()

}
Expand Down
139 changes: 121 additions & 18 deletions examples/rpc/client.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package main
import (
typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1"
"context"
"encoding/csv"
"flag"
"fmt"
"log"
"os"
"sort"
"strconv"
"sync"
"time"

"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc"
Expand All @@ -20,27 +24,52 @@ import (
)

func main() {
var mu sync.Mutex
var wg sync.WaitGroup
var logger *zap.Logger
cfg := zap.NewDevelopmentConfig()
cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, _ = cfg.Build()
sLogger := logger.Sugar()
logger.Sync()

argsWithoutProg := os.Args[1:]
unencrypted := len(argsWithoutProg) == 0
times := flag.Int("requests", 1, "Repeat the request n times")
host := flag.String("host", "127.0.0.1", "Distributor bot host")
port := flag.Int("port", 9092, "Distributor bot port")
recipient := flag.String("recipient", "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", "Recipient address (format: @t-kopernikus[...]:matrix.camino.network")
caCertFile := flag.String("ca-cert-file", "", "CA certificate file (optional)")
flag.Parse()

ppConfig := config.PartnerPluginConfig{
Host: "localhost",
Port: 9092,
Unencrypted: unencrypted,
Host: *host,
Port: *port,
Unencrypted: *caCertFile == "",
}
ppConfig.CACertFile = *caCertFile

loadTestData := make([][]string, *times)
for i := 0; i < *times; i++ {
loadTestData[i] = make([]string, 6)
wg.Add(1)
go func(counter int) {
defer wg.Done()
createClientAndRunRequest(counter, ppConfig, sLogger, *recipient, loadTestData, mu)
}(i)
}
if !unencrypted {
ppConfig.CACertFile = argsWithoutProg[0]

wg.Wait()

if len(loadTestData) > 1 || len(loadTestData) == 1 && loadTestData[0][0] != "" { // otherwise no data have been recorded
persistToCSV(loadTestData)
}
}

func createClientAndRunRequest(i int, ppConfig config.PartnerPluginConfig, sLogger *zap.SugaredLogger, recipient string, loadTestData [][]string, mu sync.Mutex) {
c := client.NewClient(&ppConfig, sLogger)
err := c.Start()
if err != nil {
panic(err)
fmt.Errorf("error starting client: %v", err)
return
}
request := &accommodationv1alpha1.AccommodationSearchRequest{
Header: nil,
Expand All @@ -59,28 +88,102 @@ func main() {
},
}

err = c.Start()
if err != nil {
panic(err)
}
md := metadata.New(map[string]string{
"recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network",
"recipient": recipient,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

ass := accommodationv1alpha1grpc.NewAccommodationSearchServiceClient(c.ClientConn)
begin := time.Now()
var header metadata.MD
begin := time.Now()
resp, err := ass.AccommodationSearch(ctx, request, grpc.Header(&header))
if err != nil {
log.Fatal(err)
sLogger.Errorf("error when performing search: %v", err)
return
}
totalTime := time.Since(begin)
fmt.Println(totalTime.Milliseconds())
//fmt.Printf("Total time(ms)|%s|%d\n", resp.Metadata.SearchId.GetValue(), totalTime.Milliseconds())
metadata := &internalmetadata.Metadata{}
err = metadata.FromGrpcMD(header)
if err != nil {
fmt.Print("error extracting metadata")
sLogger.Errorf("error extracting metadata: %v", err)
}
fmt.Printf("Received response after %s => ID: %s\n", time.Since(begin), resp.Metadata.SearchId)

addToDataset(int64(i), totalTime.Milliseconds(), resp, metadata, loadTestData, mu)

c.Shutdown()
}

func addToDataset(counter int64, totalTime int64, resp *accommodationv1alpha1.AccommodationSearchResponse, metadata *internalmetadata.Metadata, loadTestData [][]string, mu sync.Mutex) {
var data []string
var entries []struct {
Key string
Value int64
}
// Populate the slice with map entries
for key, value := range metadata.Timestamps {
entries = append(entries, struct {
Key string
Value int64
}{Key: key, Value: value})
}

// Sort the slice based on values
sort.Slice(entries, func(i, j int) bool {
return entries[i].Value < entries[j].Value
})
lastValue := int64(0)
data = append(data, strconv.FormatInt(counter+1, 10))
data = append(data, strconv.FormatInt(totalTime, 10))
for _, entry := range entries {

if entry.Key == "request-gateway-request" {
lastValue = entry.Value
continue //skip
}
if entry.Key == "processor-request" {

//lastValue = entry.Value
continue //skip
}
fmt.Printf("%d|%s|%s|%d|%.2f\n", entry.Value, entry.Key, resp.Metadata.SearchId.GetValue(), entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime))

data = append(data, strconv.FormatInt(entry.Value-lastValue, 10))
lastValue = entry.Value
}

mu.Lock()
loadTestData[counter] = data
mu.Unlock()
}
func persistToCSV(dataset [][]string) {
// Open a new CSV file
file, err := os.Create("load_test_data.csv")
if err != nil {
fmt.Println("Error creating CSV file:", err)
return
}
defer file.Close()

// Create a CSV writer
writer := csv.NewWriter(file)
defer writer.Flush()

// Write the header row
header := []string{"Request ID", "Total Time", "distributor -> matrix", "matrix -> provider", "provider -> matrix", "matrix -> distributor", "process-response"}
if err := writer.Write(header); err != nil {
fmt.Println("Error writing header:", err)
return
}

// Write the load test data rows
for _, dataRow := range dataset {
if err := writer.Write(dataRow); err != nil {
fmt.Println("Error writing data row:", err)
return
}
}

fmt.Println("CSV file created successfully.")
}
11 changes: 0 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231211091155-5467620e05ed.2 h1:Yy0x91aZhzQOikR33x5eEIFEWS1TZzuzRc+LP8NuCgQ=
buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231211091155-5467620e05ed.2/go.mod h1:xDIPwKMomacOmFbzRICgdUP/gpjEoetNVYVTVr29H0k=
buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231214132539-21b35d953f3d.2 h1:ykl0rTU4nNvPtJRm2lqOCRpgOd93RSt3ev8hNkbozDE=
buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231214132539-21b35d953f3d.2/go.mod h1:tKtDR8xG+DIFkSv8PiW1YM64GxJ/44n3UfZAN+5jfJ8=
buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231220001345-2dbff1450b98.2 h1:E1OG6V0s//gFBHK/aAniN4Cb2l/QFYsKdTT11Ymgh6g=
buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go v1.3.0-20231220001345-2dbff1450b98.2/go.mod h1:6OlE1AqRT7EzKZ6ukFLo6Qmf7iv4I59YUlYkiJFxly8=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231211091155-5467620e05ed.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231214132539-21b35d953f3d.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.28.1-20231220001345-2dbff1450b98.4/go.mod h1:2viX8eSuMFjoDrr8x3FYytCp81PVYkdgfB68aIcGW6c=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231211091155-5467620e05ed.2 h1:8HbCQyMVfu/+Spx4yOPwWThwJpr0JELRxJgt8Kdoso4=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231211091155-5467620e05ed.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231214132539-21b35d953f3d.2 h1:HTcdQrjEKtCEizgMVc1kmNtsGSQQ04WTh7fUNEuqCFE=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231214132539-21b35d953f3d.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231220001345-2dbff1450b98.2 h1:Wne/F/pUbrMAIQ874Akd5nXxoXM2tjzM14PdimMB3X8=
buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go v1.31.0-20231220001345-2dbff1450b98.2/go.mod h1:h8QtMQVd5+WnHrXJrqA/eCt8mGw9efCAmxoHzeORKdw=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down Expand Up @@ -668,7 +658,6 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
Expand Down
16 changes: 13 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
rsa_util "github.com/chain4travel/camino-messenger-bot/utils/rsa"

"github.com/chain4travel/camino-messenger-bot/config"
"github.com/chain4travel/camino-messenger-bot/internal/matrix"
Expand Down Expand Up @@ -56,11 +57,20 @@ func (a *App) Run(ctx context.Context) error {
return nil
})

messenger := matrix.NewMessenger(&a.cfg.MatrixConfig, a.logger)
if a.cfg.BotMode > 2 {
a.logger.Error("Invalid bot mode")
return nil
}
privateRSAKey, err := rsa_util.ParseRSAPrivateKeyFromFile(a.cfg.PrivateRSAFileKey)
if err != nil {
a.logger.Error("Error while parsing private RSA key")
return nil
}
messenger := matrix.NewMessenger(&a.cfg.MatrixConfig, a.logger, privateRSAKey)
userIDUpdated := make(chan string) // Channel to pass the userID
g.Go(func() error {
a.logger.Info("Starting message receiver...")
userID, err := messenger.StartReceiver()
a.logger.Infof("Starting message receiver with botmode %d ...", a.cfg.BotMode)
userID, err := messenger.StartReceiver(a.cfg.BotMode)
if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package matrix

import (
"fmt"

"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
Expand All @@ -21,6 +20,9 @@ func compressAndSplitCaminoMatrixMsg(msg messaging.Message) ([]CaminoMatrixMessa
bytes []byte
err error
)
if err != nil {
return nil, err
}
switch msg.Type.Category() {
case messaging.Request,
messaging.Response:
Expand Down Expand Up @@ -53,7 +55,7 @@ func compressAndSplitCaminoMatrixMsg(msg messaging.Message) ([]CaminoMatrixMessa
for i, chunk := range splitCompressedContent[1:] {
messages = append(messages, CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(msg.Type)},
Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, NumberOfChunks: uint(len(splitCompressedContent)), ChunkIndex: uint(i + 1)},
Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, Recipient: msg.Metadata.Recipient, NumberOfChunks: uint(len(splitCompressedContent)), ChunkIndex: uint(i + 1)},
CompressedContent: chunk,
})
}
Expand Down
File renamed without changes.
85 changes: 85 additions & 0 deletions internal/matrix/encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved.
* See the file LICENSE for licensing terms.
*/

package matrix

import (
"crypto/rsa"
"encoding/base64"
"fmt"
aes_util "github.com/chain4travel/camino-messenger-bot/utils/aes"
rsa_util "github.com/chain4travel/camino-messenger-bot/utils/rsa"
"sync"
)

type EncryptionKeyRepository struct {
pubKeyCache map[string]*rsa.PublicKey
symmetricKeyCache map[string][]byte
mu sync.Mutex
}

func NewEncryptionKeyRepository() *EncryptionKeyRepository {
return &EncryptionKeyRepository{pubKeyCache: make(map[string]*rsa.PublicKey), symmetricKeyCache: make(map[string][]byte)}
}
func (p *EncryptionKeyRepository) getPublicKeyForRecipient(recipient string) (*rsa.PublicKey, error) {
pkey := p.fetchKeyFromCache(recipient)
if pkey != nil {
return pkey, nil
}
var encodedPubKey string
//TODO for now it's all keys are hardcoded. Later we need to get the keys from the key server
switch recipient {
case "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network":
encodedPubKey = "LS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JSUJpZ0tDQVlFQWpwR2R4NlloMkJnR3V2QUdQQTZiMnAzd1dBWnZYalQvWTBQR1ViRlR0U1lVY1hydEc1eEIKcy9DYVo4NGtmTEpHRnhEV3d3d3E4bzRoNHBzOGd0aHJkaG5QMUFIOEtGRFFCTzJNbDY5ZmFZYWd4ajdtVUxnSQpqTWIzUEorVjQzZUQrRktHZis2R0E5aHpXd09RZDhvVWdZVnhSQ2xMU0tMYi82WXFqaU81LzFxK3plTWowZWF2CmJQY2VsK2V6UVlpQmE3UzNJcHFGUFdhL0N0TTd3Qi90UWI2MnAzWFRkU0pnenR1SlJpTU5MeFI2NFU3WWlHcGsKR0VYSjFyd2lPZFhPMjJaRyt4UkxmOEl3ZFF2dEUxR1VnL1llTEtSOWd5blI5WTNiZzA5UWErRkpYQ1FrTjVHUApJY2E0S2R4UGpjQ0xHTklGVlVSTnNkTjFrZnJzcXpLTXNQOVgwQkFUMHNWYTk3WTd5RnAxUTFKTmU4Uy96T1FWCm9XOHJpSVFvWGRqSDNES2Q3cERQekN2TEpQRm50dzF5YWRUZ1pLbGs5Y21tT0dDbXh5SUZwMW5mTXk1R1FDM20KS1AxZ2NIV3J5UmFBcG4reG9BSFdIcHErcVNicmpka0h2MEt1MDRaMTRYcWhaK2Ezc3FtM3oreWpNYTF2OExDUApvK2I3OFI4OGpqVDFBZ01CQUFFPQotLS0tLUVORCBSU0EgUFVCTElDIEtFWS0tLS0tCg=="
case "@t-kopernikus15ss2mvy86h0hcwdhfukdx7y3cvuwdqxm6a0aqm:matrix.camino.network":
encodedPubKey = "LS0tLS1CRUdJTiBSU0EgUFVCTElDIEtFWS0tLS0tCk1JSUJpZ0tDQVlFQXI5a1RrWHkyNWlIaTNhai9ib2VER3VFTmNJZ1dqVmlYRHVmUFJUQ1FSV0I0TEt1eCtaWXoKaElWckdPb2l6eDNoR29NTnMwOUlvODFzb2wyd3crQ0tENUtTYTVHVHNJelh6ZytGcEErTmsrcDFOOGlsWFVINQp4d2NFTlRBclVCK2Y0SmU4Vkl0dEc5ZVhHQW9aQ1RYc2FTRWNmVG1Lc24vVUdsSHVQdGs5WHVpTlNTb3k0ZTMvCnpGcGFjZngyaVR6TVJOMjc1Ky9aZjllZ3RtSnVXS0JKcnNOcC9iQ245Q2ErcURheDNMTmJpdG55TUF2eG5rUmgKTWZrQ1lHTHZmSkFoRVVlSVJUUkRLT0xCSEtRVFJpQ1F4SHlXSVVEQkswbkZtbkt5Ti80RUI1RWkzTkg4RkpFKwpaK1NobmExdmlkdWV0R2NtMjhKRFRweXhGRStyZXZQWWs3aXVJZGF3VEZtTUlabkRrTnpRRkxlSStHaXFPN2JNCkRlT0NSa2FBRDhnSkEzT29OeXBmUlRuaEMvVHFvMWk1VjZ1RlV5RU9LT3dvMHk4cEFCSmNTRzBoUVRxQUh3blAKZkZFLzI2REtsMzQzZ1oxV3lBa29QcUUyVk1ESklSVFVUcHhBR09IMk9qZDRnWjBJWk1QTks0RDYyMWk4V2NrZApNTlI5ZEZRQW1mOS9BZ01CQUFFPQotLS0tLUVORCBSU0EgUFVCTElDIEtFWS0tLS0tCg=="
default:
return nil, fmt.Errorf("no public key found for recipient: %s", recipient)
}
pubKeyBytes, err := base64.StdEncoding.DecodeString(encodedPubKey)
if err != nil {
return nil, err
}
pKey, err := rsa_util.ParseRSAPublicKey(pubKeyBytes)
if err != nil {
return nil, err
}
p.cachePublicKey(recipient, pKey)
return pKey, nil
}

func (p *EncryptionKeyRepository) cachePublicKey(recipient string, key *rsa.PublicKey) {
p.mu.Lock()
defer p.mu.Unlock()
p.pubKeyCache[recipient] = key
}

func (p *EncryptionKeyRepository) fetchKeyFromCache(recipient string) *rsa.PublicKey {
p.mu.Lock()
defer p.mu.Unlock()
return p.pubKeyCache[recipient]
}

func (p *EncryptionKeyRepository) getSymmetricKeyForRecipient(recipient string) []byte {
key := p.fetchSymmetricKeyFromCache(recipient)
if key != nil {
return key
}
key = aes_util.GenerateAESKey()
p.cacheSymmetricKey(recipient, key)
return key
}

func (p *EncryptionKeyRepository) cacheSymmetricKey(recipient string, key []byte) {
p.mu.Lock()
defer p.mu.Unlock()
p.symmetricKeyCache[recipient] = key
}

func (p *EncryptionKeyRepository) fetchSymmetricKeyFromCache(recipient string) []byte {
p.mu.Lock()
defer p.mu.Unlock()
return p.symmetricKeyCache[recipient]
}
Loading