Skip to content

Commit

Permalink
Merge pull request #226 from valory-xyz/fix/aealite
Browse files Browse the repository at this point in the history
aealite fixes: p2p acn support, tls support
  • Loading branch information
DavidMinarsch authored Jul 30, 2022
2 parents 04db06d + 2a8bca5 commit 60ad1ba
Show file tree
Hide file tree
Showing 11 changed files with 1,686 additions and 982 deletions.
422 changes: 422 additions & 0 deletions libs/go/aealite/connections/acn/acn.go

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions libs/go/aealite/connections/acn/pipe_iface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* -*- coding: utf-8 -*-
* ------------------------------------------------------------------------------
*
* Copyright 2018-2021 Fetch.AI Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ------------------------------------------------------------------------------
*/
package acn

type Pipe interface {
Connect() error
Read() ([]byte, error)
Write(data []byte) error
//Close() error
}
30 changes: 30 additions & 0 deletions libs/go/aealite/connections/acn/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package acn

import acn_protocol "aealite/protocols/acn/v1_0_0"

type StatusBody = acn_protocol.AcnMessage_StatusBody
type AgentRecord = acn_protocol.AcnMessage_AgentRecord
type AcnMessage = acn_protocol.AcnMessage
type LookupRequest = acn_protocol.AcnMessage_LookupRequest
type LookupResponse = acn_protocol.AcnMessage_LookupResponse
type Status = acn_protocol.AcnMessage_Status
type LookupRequestPerformative = acn_protocol.AcnMessage_Lookup_Request_Performative
type LookupResponsePerformative = acn_protocol.AcnMessage_Lookup_Response_Performative
type StatusPerformative = acn_protocol.AcnMessage_Status_Performative
type RegisterPerformative = acn_protocol.AcnMessage_Register_Performative
type Register = acn_protocol.AcnMessage_Register
type AeaEnvelope = acn_protocol.AcnMessage_AeaEnvelope
type AeaEnvelopePerformative = acn_protocol.AcnMessage_Aea_Envelope_Performative

const ERROR_DECODE = acn_protocol.AcnMessage_StatusBody_ERROR_DECODE
const SUCCESS = acn_protocol.AcnMessage_StatusBody_SUCCESS
const ERROR_UNEXPECTED_PAYLOAD = acn_protocol.AcnMessage_StatusBody_ERROR_UNEXPECTED_PAYLOAD
const ERROR_AGENT_NOT_READY = acn_protocol.AcnMessage_StatusBody_ERROR_AGENT_NOT_READY
const ERROR_UNKNOWN_AGENT_ADDRESS = acn_protocol.AcnMessage_StatusBody_ERROR_UNKNOWN_AGENT_ADDRESS
const ERROR_GENERIC = acn_protocol.AcnMessage_StatusBody_ERROR_GENERIC
const ERROR_WRONG_AGENT_ADDRESS = acn_protocol.AcnMessage_StatusBody_ERROR_WRONG_AGENT_ADDRESS
const ERROR_UNSUPPORTED_LEDGER = acn_protocol.AcnMessage_StatusBody_ERROR_UNSUPPORTED_LEDGER
const ERROR_WRONG_PUBLIC_KEY = acn_protocol.AcnMessage_StatusBody_ERROR_WRONG_PUBLIC_KEY
const ERROR_INVALID_PROOF = acn_protocol.AcnMessage_StatusBody_ERROR_INVALID_PROOF

type Status_ErrCode = acn_protocol.AcnMessage_StatusBody_StatusCodeEnum
165 changes: 91 additions & 74 deletions libs/go/aealite/connections/p2pclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ import (
protocols "aealite/protocols"
wallet "aealite/wallet"

acn "aealite/connections/acn"

"github.com/joho/godotenv"
"github.com/rs/zerolog"
proto "google.golang.org/protobuf/proto"
)

const retryAttempts = 5
const acnStatusTimeout = 5 * time.Second

var logger zerolog.Logger = zerolog.New(zerolog.ConsoleWriter{
Out: os.Stdout,
Expand Down Expand Up @@ -70,14 +73,16 @@ type P2PClientConfig struct {

type P2PClientApi struct {
clientConfig *P2PClientConfig
agentRecord *protocols.AgentRecord
agentRecord *acn.AgentRecord

socket Socket
outQueue chan *protocols.Envelope

closing bool
connected bool
initialised bool

acn_status_chan chan *acn.StatusBody
}

func (client *P2PClientApi) InitFromEnv(envFile string) error {
Expand All @@ -96,7 +101,7 @@ func (client *P2PClientApi) InitFromEnv(envFile string) error {
}
address := os.Getenv("AEA_ADDRESS")
publicKey := os.Getenv("AEA_PUBLIC_KEY")
agentRecord := &protocols.AgentRecord{Address: address, PublicKey: publicKey}
agentRecord := &acn.AgentRecord{Address: address, PublicKey: publicKey}
agentRecord.ServiceId = os.Getenv("AEA_P2P_POR_SERVICE_ID")
agentRecord.LedgerId = os.Getenv("AEA_P2P_POR_LEDGER_ID")
agentRecord.PeerPublicKey = os.Getenv("AEA_P2P_POR_PEER_PUBKEY")
Expand All @@ -122,13 +127,20 @@ func (client *P2PClientApi) InitFromEnv(envFile string) error {
}
client.clientConfig = &P2PClientConfig{host: host, port: uint16(portConv)}

client.socket = NewSocket(client.clientConfig.host, client.clientConfig.port)
client.socket = NewSocket(client.clientConfig.host, client.clientConfig.port, client.agentRecord.PeerPublicKey)
client.initialised = true
return nil
}

func (client *P2PClientApi) Put(envelope *protocols.Envelope) error {
return writeEnvelope(client.socket, envelope)

envelopeBytes, err := proto.Marshal(envelope)
if err != nil {
logger.Error().Str("err", err.Error()).Msgf("while serializing envelope: %s", envelope)
return err
}
return acn.SendEnvelopeMessageAndWaitForStatus(client.socket, envelopeBytes, client.acn_status_chan, acnStatusTimeout)

}

func (client *P2PClientApi) Get() *protocols.Envelope {
Expand Down Expand Up @@ -156,6 +168,7 @@ func (client *P2PClientApi) Disconnect() error {
return err
}
close(client.outQueue)
close(client.acn_status_chan)
client.connected = false
return nil
}
Expand Down Expand Up @@ -183,6 +196,7 @@ func (client *P2PClientApi) Connect() error {

client.closing = false
client.outQueue = make(chan *protocols.Envelope, 10)
client.acn_status_chan = make(chan *acn.StatusBody, 10)
go client.listenForEnvelopes()
logger.Info().Msg("connected to p2p node")

Expand Down Expand Up @@ -242,60 +256,14 @@ func (client *P2PClientApi) registerWithRetry() error {
}

func (client *P2PClientApi) register() error {
registration := &protocols.Register{Record: client.agentRecord}
msg := &protocols.AcnMessage{
Version: protocols.ACNProtocolVersion,
Payload: &protocols.AcnMessage_Register{Register: registration},
}

buf, err := proto.Marshal(msg)
if err != nil {
logger.Error().Str("err", err.Error()).Msgf("while serializing registration msg: %s", msg)
return err
}
err = client.socket.Write(buf)
if err != nil {
logger.Error().Str("err", err.Error()).
Msg("while writing register envelope")
return err
}
data, err := client.socket.Read()
if err != nil {
logger.Error().Str("err", err.Error()).Msg("while receiving data")
return err
}
response := &protocols.AcnMessage{}
err = proto.Unmarshal(data, response)
if err != nil {
logger.Error().Str("err", err.Error()).Msgf("while deserializing response msg")
return err
}

// Get Status message
var status *protocols.Status
switch pl := response.Payload.(type) {
case *protocols.AcnMessage_Status:
status = pl.Status
default:
logger.Error().Str("err", err.Error()).Msgf("response not a status msg")
return err
}

if status.Code != protocols.Status_SUCCESS {
errMsg := fmt.Sprintf(
"registration to peer failed: %s %s",
status.Code.String(),
strings.Join(status.Msgs, ":"),
)
return errors.New(errMsg)
}
return nil
return acn.SendAgentRegisterMessage(client.socket, client.agentRecord)
}

func (client *P2PClientApi) listenForEnvelopes() {
for {
envel, err := readEnvelope(client.socket)
if err != nil {
envel, err := client.HandleAcnMessageFromPipe()

if err != nil && !client.closing {
logger.Error().Str("err", err.Error()).Msg("while receiving envelope")
logger.Info().Msg("disconnecting")
if !client.closing {
Expand All @@ -306,6 +274,10 @@ func (client *P2PClientApi) listenForEnvelopes() {
}
return
}
if envel == nil {
// got acn status, not an envelope
continue
}
if envel.To != client.agentRecord.Address {
logger.Error().
Str("err", "To ("+envel.To+") must match registered address").
Expand All @@ -324,26 +296,6 @@ func (client *P2PClientApi) stop() error {
return client.socket.Disconnect()
}

func writeEnvelope(socket Socket, envelope *protocols.Envelope) error {
data, err := proto.Marshal(envelope)
if err != nil {
logger.Error().Str("err", err.Error()).Msgf("while serializing envelope: %s", envelope)
return err
}
return socket.Write(data)
}

func readEnvelope(socket Socket) (*protocols.Envelope, error) {
envelope := &protocols.Envelope{}
data, err := socket.Read()
if err != nil {
logger.Error().Str("err", err.Error()).Msg("while receiving data")
return envelope, err
}
err = proto.Unmarshal(data, envelope)
return envelope, err
}

// Error type represents list of errors in retry
type Error []error

Expand Down Expand Up @@ -463,3 +415,68 @@ func BackOffDelay(n uint, _ error, config *Config) time.Duration {
func RandomDelay(_ uint, _ error, config *Config) time.Duration {
return time.Duration(rand.Int63n(int64(config.maxJitter)))
}

func (client *P2PClientApi) HandleAcnMessageFromPipe() (*protocols.Envelope, error) {
pipe := client.socket
envelope := &protocols.Envelope{}
var acn_err error

data, err := pipe.Read()

if err != nil {

return nil, err
}

msg_type, acn_envelope, status, acnErr := acn.DecodeAcnMessage(data)

if acnErr != nil {
logger.Error().Str("err", acnErr.Error()).Msg("while handling acn message")
acn_err = acn.SendAcnError(
pipe,
acnErr.Error(),
acnErr.ErrorCode,
)
if acn_err != nil {
logger.Error().Str("err", acn_err.Error()).Msg("on acn send error")
}
return envelope, acnErr
}

switch msg_type {
case "aea_envelope":
{
err = proto.Unmarshal(acn_envelope.Envelope, envelope)
if err != nil {
logger.Error().Str("err", err.Error()).Msg("while decoding envelope")
acn_err = acn.SendAcnError(
pipe,
"error on decoding envelope",
acn.ERROR_DECODE,
)
if acn_err != nil {
logger.Error().Str("err", acn_err.Error()).Msg("on acn send error")
}
return envelope, err
}
err = acn.SendAcnSuccess(pipe)
return envelope, err

}
case "status":
{
logger.Debug().Msgf("got acn status %d", status.Code)
client.acn_status_chan <- status
return nil, nil

}
default:
{
acn_err = acn.SendAcnError(pipe, "Unsupported ACN message")
if acn_err != nil {
logger.Error().Str("err", acn_err.Error()).Msg("on acn send error")
}
return nil, errors.New("unsupported ACN message")
}
}
}
52 changes: 45 additions & 7 deletions libs/go/aealite/connections/tcpsocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,63 @@
package connections

import (
wallet "aealite/wallet"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"net"
"errors"
"strconv"
)

type TCPSocketChannel struct {
address string
port uint16
conn net.Conn
address string
port uint16
conn *tls.Conn
peerPublicKey string
}

func (sock *TCPSocketChannel) Connect() error {
var err error
sock.conn, err = net.Dial("tcp", sock.address+":"+strconv.FormatInt(int64(sock.port), 10))
conf := &tls.Config{
InsecureSkipVerify: true,
}

sock.conn, err = tls.Dial("tcp", sock.address+":"+strconv.FormatInt(int64(sock.port), 10), conf)

if err != nil {
return err
}

state := sock.conn.ConnectionState()
var cert *x509.Certificate

for _, v := range state.PeerCertificates {
cert = v
}

pub := cert.PublicKey.(*ecdsa.PublicKey)
publicKeyBytes := elliptic.Marshal(pub.Curve, pub.X, pub.Y)

signature, err := sock.Read()
logger.Debug().Msgf("got signature %d bytes", len(signature))
if err != nil {
return err
}

pubkey, err := wallet.PubKeyFromFetchAIPublicKey(sock.peerPublicKey)
if err != nil {
return err
}
ok, err := pubkey.Verify(publicKeyBytes, signature)
if err != nil {
return err
}
if !ok {
return errors.New("tls signature check failed")

}
return nil
}

Expand Down Expand Up @@ -70,6 +108,6 @@ func (sock *TCPSocketChannel) Disconnect() error {
return sock.conn.Close()
}

func NewSocket(address string, port uint16) Socket {
return &TCPSocketChannel{address: address, port: port}
func NewSocket(address string, port uint16, peerPublicKey string) Socket {
return &TCPSocketChannel{address: address, port: port, peerPublicKey: peerPublicKey}
}
Loading

0 comments on commit 60ad1ba

Please sign in to comment.