Skip to content

Commit

Permalink
send response to manager on computation termination
Browse files Browse the repository at this point in the history
Signed-off-by: WashingtonKK <[email protected]>
  • Loading branch information
WashingtonKK committed Aug 21, 2024
1 parent 31391a3 commit 6a14ac4
Show file tree
Hide file tree
Showing 30 changed files with 748 additions and 732 deletions.
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ build

build
cmd/manager/img
cmd/manager/iso
cmd/manager/tmp

.cov

*.pem

dist/
result.zip
results.zip
*.spec
59 changes: 0 additions & 59 deletions agent/algorithm/results.go

This file was deleted.

3 changes: 2 additions & 1 deletion agent/algorithm/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/ultravioletrs/cocos/agent/algorithm"
"github.com/ultravioletrs/cocos/internal"
)

func TestZipDirectory(t *testing.T) {
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestZipDirectory(t *testing.T) {
}
}

if _, err := algorithm.ZipDirectory(); err != nil {
if _, err := internal.ZipDirectoryToMemory(algorithm.ResultsDir); err != nil {
t.Errorf("ZipDirectory() error = %v", err)
}
})
Expand Down
4 changes: 2 additions & 2 deletions agent/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ func (s *service) AuthenticateUser(ctx context.Context, role UserRole) (context.
}
}
case DataProviderRole:
for i, dp := range s.datasetProviders {
for _, dp := range s.datasetProviders {
if err := verifySignature(role, signature, dp); err == nil {
return agent.IndexToContext(ctx, i), nil
return ctx, nil
}
}
case AlgorithmProviderRole:
Expand Down
2 changes: 1 addition & 1 deletion agent/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestAuthenticateUser(t *testing.T) {

if err == nil {
switch id, ok := agent.IndexFromContext(ctx); {
case tc.role == ConsumerRole, tc.role == DataProviderRole:
case tc.role == ConsumerRole:
assert.True(t, ok, "expected index in context")
assert.Equal(t, 0, id, "expected index 0 in context")
default:
Expand Down
17 changes: 17 additions & 0 deletions agent/computations.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"encoding/json"
"fmt"

"google.golang.org/grpc/metadata"
)

var _ fmt.Stringer = (*Datasets)(nil)
Expand Down Expand Up @@ -69,3 +71,18 @@ func IndexFromContext(ctx context.Context) (int, bool) {
index, ok := ctx.Value(ManifestIndexKey{}).(int)
return index, ok
}

const DecompressKey = "decompress"

func DecompressFromContext(ctx context.Context) bool {
vals := metadata.ValueFromIncomingContext(ctx, DecompressKey)
if len(vals) == 0 {
return false
}

return vals[0] == "true"
}

func DecompressToContext(ctx context.Context, decompress bool) context.Context {
return metadata.AppendToOutgoingContext(ctx, DecompressKey, fmt.Sprintf("%t", decompress))
}
75 changes: 47 additions & 28 deletions agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ultravioletrs/cocos/agent/algorithm/python"
"github.com/ultravioletrs/cocos/agent/algorithm/wasm"
"github.com/ultravioletrs/cocos/agent/events"
"github.com/ultravioletrs/cocos/internal"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -50,6 +51,8 @@ var (
ErrHashMismatch = errors.New("malformed data, hash does not match manifest")
// ErrFileNameMismatch provided dataset filename does not match filename in manifest.
ErrFileNameMismatch = errors.New("malformed data, filename does not match manifest")
// ErrAllResultsConsumed indicates all results have been consumed.
ErrAllResultsConsumed = errors.New("all results have been consumed by declared consumers")
)

// Service specifies an API that must be fullfiled by the domain service
Expand All @@ -64,7 +67,7 @@ type Service interface {
}

type agentService struct {
computation Computation // Holds the current computation request details.
computation Computation // Holds the current computation manifest.
algorithm algorithm.Algorithm // Filepath to the algorithm received for the computation.
result []byte // Stores the result of the computation.
sm *StateMachine // Manages the state transitions of the agent service.
Expand All @@ -90,6 +93,7 @@ func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp
svc.sm.StateFunctions[resultsReady] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[complete] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[running] = svc.runComputation
svc.sm.StateFunctions[failed] = svc.publishEvent("failed", json.RawMessage{})

svc.computation = cmp

Expand Down Expand Up @@ -179,31 +183,40 @@ func (as *agentService) Data(ctx context.Context, dataset Dataset) error {

hash := sha3.Sum256(dataset.Dataset)

index, ok := IndexFromContext(ctx)
if !ok {
return ErrUndeclaredDataset
}

if hash != as.computation.Datasets[index].Hash {
return ErrHashMismatch
}

if as.computation.Datasets[index].Filename != "" && as.computation.Datasets[index].Filename != dataset.Filename {
return ErrFileNameMismatch
}
matched := false
for i, d := range as.computation.Datasets {
if hash == d.Hash {
if d.Filename != "" && d.Filename != dataset.Filename {
return ErrFileNameMismatch
}

as.computation.Datasets = slices.Delete(as.computation.Datasets, index, index+1)
as.computation.Datasets = slices.Delete(as.computation.Datasets, i, i+1)

if DecompressFromContext(ctx) {
if err := internal.UnzipFromMemory(dataset.Dataset, algorithm.DatasetsDir); err != nil {
return fmt.Errorf("error decompressing dataset: %v", err)
}
} else {
f, err := os.Create(fmt.Sprintf("%s/%s", algorithm.DatasetsDir, dataset.Filename))
if err != nil {
return fmt.Errorf("error creating dataset file: %v", err)
}

if _, err := f.Write(dataset.Dataset); err != nil {
return fmt.Errorf("error writing dataset to file: %v", err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("error closing file: %v", err)
}
}

f, err := os.Create(fmt.Sprintf("%s/%s", algorithm.DatasetsDir, dataset.Filename))
if err != nil {
return fmt.Errorf("error creating dataset file: %v", err)
matched = true
break
}
}

if _, err := f.Write(dataset.Dataset); err != nil {
return fmt.Errorf("error writing dataset to file: %v", err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("error closing file: %v", err)
if !matched {
return ErrUndeclaredDataset
}

if len(as.computation.Datasets) == 0 {
Expand All @@ -214,22 +227,22 @@ func (as *agentService) Data(ctx context.Context, dataset Dataset) error {
}

func (as *agentService) Result(ctx context.Context) ([]byte, error) {
if as.sm.GetState() != resultsReady {
if as.sm.GetState() != resultsReady && as.sm.GetState() != failed {
return []byte{}, ErrResultsNotReady
}
if len(as.computation.ResultConsumers) == 0 {
return []byte{}, ErrAllManifestItemsReceived
return []byte{}, ErrAllResultsConsumed
}
index, ok := IndexFromContext(ctx)
if !ok {
return []byte{}, ErrUndeclaredConsumer
}
as.computation.ResultConsumers = slices.Delete(as.computation.ResultConsumers, index, index+1)

if len(as.computation.ResultConsumers) == 0 {
if len(as.computation.ResultConsumers) == 0 && as.sm.GetState() == resultsReady {
as.sm.SendEvent(resultsConsumed)
}
// Return the result file or an error

return as.result, as.runError
}

Expand All @@ -249,7 +262,13 @@ func (as *agentService) Attestation(ctx context.Context, reportData [ReportDataS
func (as *agentService) runComputation() {
as.publishEvent("starting", json.RawMessage{})()
as.sm.logger.Debug("computation run started")
defer as.sm.SendEvent(runComplete)
defer func() {
if as.runError != nil {
as.sm.SendEvent(runFailed)
} else {
as.sm.SendEvent(runComplete)
}
}()

if err := os.Mkdir(algorithm.ResultsDir, 0o755); err != nil {
as.runError = fmt.Errorf("error creating results directory: %s", err.Error())
Expand All @@ -275,7 +294,7 @@ func (as *agentService) runComputation() {
return
}

results, err := algorithm.ZipDirectory()
results, err := internal.ZipDirectoryToMemory(algorithm.ResultsDir)
if err != nil {
as.runError = err
as.sm.logger.Warn(fmt.Sprintf("failed to zip results: %s", err.Error()))
Expand Down
3 changes: 3 additions & 0 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
running
resultsReady
complete
failed
)

type event uint8
Expand All @@ -31,6 +32,7 @@ const (
dataReceived
runComplete
resultsConsumed
runFailed
)

// StateMachine represents the state machine.
Expand Down Expand Up @@ -74,6 +76,7 @@ func NewStateMachine(logger *slog.Logger, cmp Computation) *StateMachine {

sm.Transitions[running] = make(map[event]state)
sm.Transitions[running][runComplete] = resultsReady
sm.Transitions[running][runFailed] = failed

sm.Transitions[resultsReady] = make(map[event]state)
sm.Transitions[resultsReady][resultsConsumed] = complete
Expand Down
5 changes: 3 additions & 2 deletions agent/state_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 26 additions & 6 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ make cli
Retrieves attestation information from the SEV guest and saves it to a file.
To retrieve attestation from agent, use the following command:
```bash
./build/cocos-cli agent attestation get '<report_data>'
./build/cocos-cli attestation get '<report_data>'
```

#### Validate attestation
Validates the retrieved attestation information against a specified policy and checks its authenticity.
To validate and verify attestation from agent, use the following command:
```bash
./build/cocos-cli agent attestation validate '<attestation>' --report_data '<report_data>'
./build/cocos-cli attestation validate '<attestation>' --report_data '<report_data>'
```
##### Flags
- --config: Path to a JSON file containing the validation configuration. This can be used to override individual flags.
Expand Down Expand Up @@ -62,21 +62,41 @@ To validate and verify attestation from agent, use the following command:
To upload an algorithm, use the following command:

```bash
./build/cocos-cli agent algo /path/to/algorithm <private_key_file_path>
./build/cocos-cli algo /path/to/algorithm <private_key_file_path>
```

##### Flags
- -a, --algorithm string Algorithm type to run (default "bin")
- --python-runtime string Python runtime to use (default "python3")
- -r, --requirements string Python requirements file


#### Upload Dataset

To upload a dataset, use the following command:

```bash
./build/cocos-cli agent data /path/to/dataset.csv <private_key_file_path>
./build/cocos-cli data /path/to/dataset.csv <private_key_file_path>
```

Users can also upload directories which will be compressed on transit. Once received by agent they will be stored as compressed files or decompressed if the user passed the decompression argument.

##### Flags
- -d, --decompress Decompress the dataset on agent



#### Retrieve result

To retrieve the computation result, use the following command:

```bash
./build/cocos-cli agent result <private_key_file_path>
```
./build/cocos-cli result <private_key_file_path>
```

#### Checksum
When defining the manifest dataset and algorithm checksums are required. This can be done as below:

```bash
./build/cocos-cli checksum <path_to_dataset_or_algorithm>
```
2 changes: 1 addition & 1 deletion cli/algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (cli *CLI) NewAlgorithmCmd() *cobra.Command {
Run: func(cmd *cobra.Command, args []string) {
algorithmFile := args[0]

log.Println("Uploading algorithm binary:", algorithmFile)
log.Println("Uploading algorithm file:", algorithmFile)

algorithm, err := os.ReadFile(algorithmFile)
if err != nil {
Expand Down
Loading

0 comments on commit 6a14ac4

Please sign in to comment.