Skip to content

Commit

Permalink
Merge mapper from sda-pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jbygdell committed Oct 31, 2023
1 parent 96d1909 commit d9a2aad
Show file tree
Hide file tree
Showing 7 changed files with 547 additions and 0 deletions.
184 changes: 184 additions & 0 deletions sda/cmd/mapper/mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// The mapper service register mapping of accessionIDs
// (IDs for files) to datasetIDs.
package main

import (
"encoding/json"
"fmt"

"github.com/neicnordic/sensitive-data-archive/internal/broker"
"github.com/neicnordic/sensitive-data-archive/internal/config"
"github.com/neicnordic/sensitive-data-archive/internal/database"
"github.com/neicnordic/sensitive-data-archive/internal/schema"
"github.com/neicnordic/sensitive-data-archive/internal/storage"

log "github.com/sirupsen/logrus"
)

func main() {
forever := make(chan bool)
conf, err := config.NewConfig("mapper")
if err != nil {
log.Fatal(err)
}
mq, err := broker.NewMQ(conf.Broker)
if err != nil {
log.Fatal(err)
}
db, err := database.NewSDAdb(conf.Database)
if err != nil {
log.Fatal(err)
}
inbox, err := storage.NewBackend(conf.Inbox)
if err != nil {
log.Fatal(err)
}

defer mq.Channel.Close()
defer mq.Connection.Close()
defer db.Close()

go func() {
connError := mq.ConnectionWatcher()
log.Error(connError)
forever <- false
}()

go func() {
connError := mq.ChannelWatcher()
log.Error(connError)
forever <- false
}()

log.Info("Starting mapper service")
var mappings schema.DatasetMapping

go func() {
messages, err := mq.GetMessages(conf.Broker.Queue)
if err != nil {
log.Fatalf("Failed to get message from mq (error: %v)", err)
}

for delivered := range messages {
log.Debugf("received a message: %s", delivered.Body)
schemaType, err := schemaFromDatasetOperation(delivered.Body)
if err != nil {
log.Errorf(err.Error())
if err := delivered.Ack(false); err != nil {
log.Errorf("failed to ack message: %v", err)
}
if err := mq.SendMessage(delivered.CorrelationId, mq.Conf.Exchange, "error", delivered.Body); err != nil {
log.Errorf("failed to send error message: %v", err)
}

continue
}

err = schema.ValidateJSON(fmt.Sprintf("%s/%s.json", conf.Broker.SchemasPath, schemaType), delivered.Body)
if err != nil {
log.Errorf("validation of incoming message (%s) failed, reason: %v ", schemaType, err)
if err := delivered.Ack(false); err != nil {
log.Errorf("Failed acking canceled work, reason: %v", err)
}

continue
}

// we unmarshal the message in the validation step so this is safe to do
_ = json.Unmarshal(delivered.Body, &mappings)

switch mappings.Type {
case "mapping":
log.Debug("Mapping type operation, mapping files to dataset")
if err := db.MapFilesToDataset(mappings.DatasetID, mappings.AccessionIDs); err != nil {
log.Errorf("failed to map files to dataset, reason: %v", err)

// Nack message so the server gets notified that something is wrong and requeue the message
if err := delivered.Nack(false, true); err != nil {
log.Errorf("failed to Nack message, reason: (%v)", err)
}

continue
}

for _, aID := range mappings.AccessionIDs {
log.Debugf("Mapped file to dataset (corr-id: %s, datasetid: %s, accessionid: %s)", delivered.CorrelationId, mappings.DatasetID, aID)
filePath, err := db.GetInboxPath(aID)
if err != nil {
log.Errorf("failed to get inbox path for file with stable ID: %s", aID)
}
err = inbox.RemoveFile(filePath)
if err != nil {
log.Errorf("Remove file from inbox failed, reason: %v", err)
}
}

if err := db.UpdateDatasetEvent(mappings.DatasetID, "registered", string(delivered.Body)); err != nil {
log.Errorf("failed to set dataset status for dataset: %s", mappings.DatasetID)
if err = delivered.Nack(false, false); err != nil {
log.Errorf("Failed to Nack message, reason: (%s)", err.Error())
}

continue
}
case "release":
log.Debug("Release type operation, marking dataset as released")
if err := db.UpdateDatasetEvent(mappings.DatasetID, "released", string(delivered.Body)); err != nil {
log.Errorf("failed to set dataset status for dataset: %s", mappings.DatasetID)
if err = delivered.Nack(false, false); err != nil {
log.Errorf("Failed to Nack message, reason: (%s)", err.Error())
}

continue
}
case "deprecate":
log.Debug("Deprecate type operation, marking dataset as deprecated")
if err := db.UpdateDatasetEvent(mappings.DatasetID, "deprecated", string(delivered.Body)); err != nil {
log.Errorf("failed to set dataset status for dataset: %s", mappings.DatasetID)
if err = delivered.Nack(false, false); err != nil {
log.Errorf("Failed to Nack message, reason: (%s)", err.Error())
}

continue
}
}

if err := delivered.Ack(false); err != nil {
log.Errorf("failed to Ack message, reason: (%v)", err)
}
}
}()

<-forever
}

// schemaFromDatasetOperation returns the operation done with dataset supplied in body of the message
func schemaFromDatasetOperation(body []byte) (string, error) {
message := make(map[string]interface{})
err := json.Unmarshal(body, &message)
if err != nil {
return "", err
}

datasetMessageType, ok := message["type"]
if !ok {
return "", fmt.Errorf("malformed message, dataset message type is missing")
}

datasetOpsType, ok := datasetMessageType.(string)
if !ok {
return "", fmt.Errorf("could not cast operation attribute to string")
}

switch datasetOpsType {
case "mapping":
return "dataset-mapping", nil
case "release":
return "dataset-release", nil
case "deprecate":
return "dataset-deprecate", nil
default:
return "", fmt.Errorf("could not recognize mapping operation")
}

}
90 changes: 90 additions & 0 deletions sda/cmd/mapper/mapper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# sda-pipeline: mapper

The mapper service registers mapping of accessionIDs (stable ids for files) to datasetIDs.

## Configuration

There are a number of options that can be set for the mapper service.
These settings can be set by mounting a yaml-file at `/config.yaml` with settings.

ex.

```yaml
log:
level: "debug"
format: "json"
```
They may also be set using environment variables like:
```bash
export LOG_LEVEL="debug"
export LOG_FORMAT="json"
```

### RabbitMQ broker settings

These settings control how mapper connects to the RabbitMQ message broker.

- `BROKER_HOST`: hostname of the rabbitmq server
- `BROKER_PORT`: rabbitmq broker port (commonly `5671` with TLS and `5672` without)
- `BROKER_QUEUE`: message queue to read messages from (commonly `mapper`)
- `BROKER_USER`: username to connect to rabbitmq
- `BROKER_PASSWORD`: password to connect to rabbitmq
- `BROKER_PREFETCHCOUNT`: Number of messages to pull from the message server at the time (default to 2)

### PostgreSQL Database settings

- `DB_HOST`: hostname for the postgresql database
- `DB_PORT`: database port (commonly 5432)
- `DB_USER`: username for the database
- `DB_PASSWORD`: password for the database
- `DB_DATABASE`: database name
- `DB_SSLMODE`: The TLS encryption policy to use for database connections. Valid options are:
- `disable`
- `allow`
- `prefer`
- `require`
- `verify-ca`
- `verify-full`

More information is available in the [postgresql documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION)
Note that if `DB_SSLMODE` is set to anything but `disable`, then `DB_CACERT` needs to be set, and if set to `verify-full`, then `DB_CLIENTCERT`, and `DB_CLIENTKEY` must also be set

- `DB_CLIENTKEY`: key-file for the database client certificate
- `DB_CLIENTCERT`: database client certificate file
- `DB_CACERT`: Certificate Authority (CA) certificate for the database to use

### Logging settings

- `LOG_FORMAT` can be set to “json” to get logs in json format. All other values result in text logging
- `LOG_LEVEL` can be set to one of the following, in increasing order of severity:
- `trace`
- `debug`
- `info`
- `warn` (or `warning`)
- `error`
- `fatal`
- `panic`

## Service Description

The mapper service maps file accessionIDs to datasetIDs.

When running, mapper reads messages from the configured RabbitMQ queue (default: "mappings").
For each message, these steps are taken (if not otherwise noted, errors halt progress and the service moves on to the next message):

1. The message is validated as valid JSON that matches the "dataset-mapping" schema.
If the message can’t be validated it is discarded with an error message is logged.
2. AccessionIDs from the message are mapped to a datasetID (also in the message) in the database.
On error the service sleeps for up to 5 minutes to allow for database recovery, after 5 minutes the message is Nacked, re-queued and an error message is written to the logs.
3. The uploaded files related to each AccessionID is removed from the inbox
If this fails an error will be written to the logs.
4. The RabbitMQ message is Ack'ed.

## Communication

- Mapper reads messages from one rabbitmq queue (default `mappings`).
- Mapper maps files to datasets in the database using the `MapFilesToDataset` function.
- Mapper retrieves the inbox filepath from the database for each file using the `GetInboxPath` function.
- Mapper sets the status of a dataset in the database usig the `UpdateDatasetEvent` function.
20 changes: 20 additions & 0 deletions sda/cmd/mapper/mapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/suite"
)

type TestSuite struct {
suite.Suite
}

func TestConfigTestSuite(t *testing.T) {
suite.Run(t, new(TestSuite))
}

func (suite *TestSuite) SetupTest() {
viper.Set("log.level", "debug")
}
Loading

0 comments on commit d9a2aad

Please sign in to comment.