Skip to content

Commit

Permalink
minimal diff to be able to return the task response bytes instead of …
Browse files Browse the repository at this point in the history
…just the task response digest (#252)

* minimal diff to be able to return the task response bytes

* add taskReponse type

* only add to taskResponseMap if doesn't already exist

* add interface for hash function

* fix lint

* make local

* add types

* remove mapping
  • Loading branch information
afkbyte authored May 29, 2024
1 parent 54fce96 commit aebab47
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 71 deletions.
36 changes: 26 additions & 10 deletions services/bls_aggregation/blsagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
type BlsAggregationServiceResponse struct {
Err error // if Err is not nil, the other fields are not valid
TaskIndex types.TaskIndex // unique identifier of the task
TaskResponse types.TaskResponse // the task response that was signed
TaskResponseDigest types.TaskResponseDigest // digest of the task response that was signed
// The below 8 fields are the data needed to build the IBLSSignatureChecker.NonSignerStakesAndSignature struct
// users of this service will need to build the struct themselves by converting the bls points
Expand Down Expand Up @@ -97,7 +98,7 @@ type BlsAggregationService interface {
ProcessNewSignature(
ctx context.Context,
taskIndex types.TaskIndex,
taskResponseDigest types.TaskResponseDigest,
taskResponse types.TaskResponse,
blsSignature *bls.Signature,
operatorId types.OperatorId,
) error
Expand Down Expand Up @@ -134,17 +135,20 @@ type BlsAggregatorService struct {
taskChansMutex sync.RWMutex
avsRegistryService avsregistry.AvsRegistryService
logger logging.Logger

hashFunction types.TaskResponseHashFunction
}

var _ BlsAggregationService = (*BlsAggregatorService)(nil)

func NewBlsAggregatorService(avsRegistryService avsregistry.AvsRegistryService, logger logging.Logger) *BlsAggregatorService {
func NewBlsAggregatorService(avsRegistryService avsregistry.AvsRegistryService, hashFunction types.TaskResponseHashFunction, logger logging.Logger) *BlsAggregatorService {
return &BlsAggregatorService{
aggregatedResponsesC: make(chan BlsAggregationServiceResponse),
signedTaskRespsCs: make(map[types.TaskIndex]chan types.SignedTaskResponseDigest),
taskChansMutex: sync.RWMutex{},
avsRegistryService: avsRegistryService,
logger: logger,
hashFunction: hashFunction,
}
}

Expand Down Expand Up @@ -179,7 +183,7 @@ func (a *BlsAggregatorService) InitializeNewTask(
func (a *BlsAggregatorService) ProcessNewSignature(
ctx context.Context,
taskIndex types.TaskIndex,
taskResponseDigest types.TaskResponseDigest,
taskResponse types.TaskResponse,
blsSignature *bls.Signature,
operatorId types.OperatorId,
) error {
Expand All @@ -189,14 +193,16 @@ func (a *BlsAggregatorService) ProcessNewSignature(
if !taskInitialized {
return TaskNotFoundErrorFn(taskIndex)
}

signatureVerificationErrorC := make(chan error)
// send the task to the goroutine processing this task
// and return the error (if any) returned by the signature verification routine

select {
// we need to send this as part of select because if the goroutine is processing another SignedTaskResponseDigest
// and cannot receive this one, we want the context to be able to cancel the request
case taskC <- types.SignedTaskResponseDigest{
TaskResponseDigest: taskResponseDigest,
TaskResponse: taskResponse,
BlsSignature: blsSignature,
OperatorId: operatorId,
SignatureVerificationErrorC: signatureVerificationErrorC,
Expand Down Expand Up @@ -255,13 +261,16 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
select {
case signedTaskResponseDigest := <-signedTaskRespsC:
a.logger.Debug("Task goroutine received new signed task response digest", "taskIndex", taskIndex, "signedTaskResponseDigest", signedTaskResponseDigest)
// compute the taskResponseDigest using the hash function
taskResponseDigest := a.hashFunction(signedTaskResponseDigest.TaskResponse)

err := a.verifySignature(taskIndex, signedTaskResponseDigest, operatorsAvsStateDict)
signedTaskResponseDigest.SignatureVerificationErrorC <- err
if err != nil {
continue
}
// after verifying signature we aggregate its sig and pubkey, and update the signed stake amount
digestAggregatedOperators, ok := aggregatedOperatorsDict[signedTaskResponseDigest.TaskResponseDigest]
digestAggregatedOperators, ok := aggregatedOperatorsDict[taskResponseDigest]
if !ok {
// first operator to sign on this digest
digestAggregatedOperators = aggregatedOperators{
Expand All @@ -286,7 +295,7 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
}
// update the aggregatedOperatorsDict. Note that we need to assign the whole struct value at once,
// because of https://github.com/golang/go/issues/3117
aggregatedOperatorsDict[signedTaskResponseDigest.TaskResponseDigest] = digestAggregatedOperators
aggregatedOperatorsDict[taskResponseDigest] = digestAggregatedOperators

if checkIfStakeThresholdsMet(a.logger, digestAggregatedOperators.signersTotalStakePerQuorum, totalStakePerQuorum, quorumThresholdPercentagesMap) {
nonSignersOperatorIds := []types.OperatorId{}
Expand Down Expand Up @@ -316,10 +325,12 @@ func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
}
return
}

blsAggregationServiceResponse := BlsAggregationServiceResponse{
Err: nil,
TaskIndex: taskIndex,
TaskResponseDigest: signedTaskResponseDigest.TaskResponseDigest,
TaskResponse: signedTaskResponseDigest.TaskResponse,
TaskResponseDigest: taskResponseDigest,
NonSignersPubkeysG1: nonSignersG1Pubkeys,
QuorumApksG1: quorumApksG1,
SignersApkG2: digestAggregatedOperators.signersApkG2,
Expand Down Expand Up @@ -371,18 +382,23 @@ func (a *BlsAggregatorService) verifySignature(
return OperatorNotPartOfTaskQuorumErrorFn(signedTaskResponseDigest.OperatorId, taskIndex)
}

// 0. verify that the msg actually came from the correct operator
taskResponseDigest := a.hashFunction(signedTaskResponseDigest.TaskResponse)

// verify that the msg actually came from the correct operator
operatorG2Pubkey := operatorsAvsStateDict[signedTaskResponseDigest.OperatorId].OperatorInfo.Pubkeys.G2Pubkey
if operatorG2Pubkey == nil {
a.logger.Error("Operator G2 pubkey not found", "operatorId", signedTaskResponseDigest.OperatorId, "taskId", taskIndex)
return fmt.Errorf("taskId %d: Operator G2 pubkey not found (operatorId: %x)", taskIndex, signedTaskResponseDigest.OperatorId)
}
a.logger.Debug("Verifying signed task response digest signature",
"operatorG2Pubkey", operatorG2Pubkey,
"taskResponseDigest", signedTaskResponseDigest.TaskResponseDigest,
"taskResponseDigest", taskResponseDigest,
"blsSignature", signedTaskResponseDigest.BlsSignature,
)
signatureVerified, err := signedTaskResponseDigest.BlsSignature.Verify(operatorG2Pubkey, signedTaskResponseDigest.TaskResponseDigest)

// if the operator signs a digest that is not the digest of the TaskResponse submitted in ProcessNewTask
// then the signature will not be verified
signatureVerified, err := signedTaskResponseDigest.BlsSignature.Verify(operatorG2Pubkey, taskResponseDigest)
if err != nil {
return SignatureVerificationError(err)
}
Expand Down
Loading

0 comments on commit aebab47

Please sign in to comment.