From 86e7cdc795702da96a828d18b8eb1d77f3dbd779 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 24 Nov 2022 07:38:20 +0100 Subject: [PATCH 01/23] SyncAPI --- cmd/syncapi/syncapi.go | 188 ++++++++++++++++++++++++++ cmd/syncapi/syncapi_test.go | 211 ++++++++++++++++++++++++++++++ schemas/bigpicture/file-sync.json | 119 +++++++++++++++++ 3 files changed, 518 insertions(+) create mode 100644 cmd/syncapi/syncapi.go create mode 100644 cmd/syncapi/syncapi_test.go create mode 100644 schemas/bigpicture/file-sync.json diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go new file mode 100644 index 00000000..d7c9a0e0 --- /dev/null +++ b/cmd/syncapi/syncapi.go @@ -0,0 +1,188 @@ +package main + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "sda-pipeline/internal/broker" + "sda-pipeline/internal/common" + "sda-pipeline/internal/config" + "sda-pipeline/internal/database" + + "github.com/gorilla/mux" + + log "github.com/sirupsen/logrus" +) + +var Conf *config.Config +var err error + +func main() { + Conf, err = config.NewConfig("api") + if err != nil { + log.Fatal(err) + } + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + if err != nil { + log.Fatal(err) + } + Conf.API.DB, err = database.NewDB(Conf.Database) + if err != nil { + log.Fatal(err) + } + + sigc := make(chan os.Signal, 5) + signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + go func() { + <-sigc + shutdown() + os.Exit(0) + }() + + srv := setup(Conf) + + if Conf.API.ServerCert != "" && Conf.API.ServerKey != "" { + log.Infof("Web server is ready to receive connections at https://%s:%d", Conf.API.Host, Conf.API.Port) + if err := srv.ListenAndServeTLS(Conf.API.ServerCert, Conf.API.ServerKey); err != nil { + shutdown() + log.Fatalln(err) + } + } else { + log.Infof("Web server is ready to receive connections at http://%s:%d", Conf.API.Host, Conf.API.Port) + if err := srv.ListenAndServe(); err != nil { + shutdown() + log.Fatalln(err) + } + } +} + +func setup(config *config.Config) *http.Server { + r := mux.NewRouter().SkipClean(true) + + r.HandleFunc("/ready", readinessResponse).Methods("GET") + r.HandleFunc("/dataset", dataset).Methods("POST") + + cfg := &tls.Config{ + MinVersion: tls.VersionTLS12, + CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256}, + PreferServerCipherSuites: true, + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + }, + } + + srv := &http.Server{ + Addr: config.API.Host + ":" + fmt.Sprint(config.API.Port), + Handler: r, + TLSConfig: cfg, + TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 3 * time.Second, + } + + return srv +} + +func shutdown() { + defer Conf.API.MQ.Channel.Close() + defer Conf.API.MQ.Connection.Close() + defer Conf.API.DB.Close() +} + +func readinessResponse(w http.ResponseWriter, r *http.Request) { + statusCocde := http.StatusOK + + if Conf.API.MQ.Connection.IsClosed() { + statusCocde = http.StatusServiceUnavailable + newConn, err := broker.NewMQ(Conf.Broker) + if err != nil { + log.Errorf("failed to reconnect to MQ, reason: %v", err) + } else { + Conf.API.MQ = newConn + } + } + + if Conf.API.MQ.Channel.IsClosed() { + statusCocde = http.StatusServiceUnavailable + Conf.API.MQ.Connection.Close() + newConn, err := broker.NewMQ(Conf.Broker) + if err != nil { + log.Errorf("failed to reconnect to MQ, reason: %v", err) + } else { + Conf.API.MQ = newConn + } + } + + if DBRes := checkDB(Conf.API.DB, 5*time.Millisecond); DBRes != nil { + log.Debugf("DB connection error :%v", DBRes) + Conf.API.DB.Reconnect() + statusCocde = http.StatusServiceUnavailable + } + + w.WriteHeader(statusCocde) +} + +func checkDB(database *database.SQLdb, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if database.DB == nil { + return fmt.Errorf("database is nil") + } + + return database.DB.PingContext(ctx) +} + +func dataset(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + respondWithError(w, http.StatusBadRequest, "failed to read request body") + + return + } + defer r.Body.Close() + + // the filepath looks funkt for now, it will sort itself out when we switch to sda-common + res, err := common.ValidateJSON(Conf.Broker.SchemasPath+"../bigpicture/file-sync.json", b) + if err != nil { + respondWithError(w, http.StatusBadRequest, "eror on JSON validation: "+err.Error()) + + return + } + if !res.Valid() { + errorString := "" + for _, validErr := range res.Errors() { + errorString += validErr.String() + "\n\n" + } + respondWithError(w, http.StatusBadRequest, "JSON validation failed, reason: "+errorString) + + return + } + + w.WriteHeader(http.StatusOK) +} + +func respondWithError(w http.ResponseWriter, code int, message string) { + respondWithJSON(w, code, map[string]string{"error": message}) +} + +func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) { + log.Infoln(payload) + response, _ := json.Marshal(payload) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _, err = w.Write(response) + if err != nil { + log.Errorf("failed to write HTTP response, reason: %v", err) + } +} diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go new file mode 100644 index 00000000..914db210 --- /dev/null +++ b/cmd/syncapi/syncapi_test.go @@ -0,0 +1,211 @@ +package main + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" + "time" + + "sda-pipeline/internal/broker" + "sda-pipeline/internal/config" + "sda-pipeline/internal/database" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gorilla/mux" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type TestSuite struct { + suite.Suite +} + +func TestApiTestSuite(t *testing.T) { + suite.Run(t, new(TestSuite)) +} + +func TestSetup(t *testing.T) { + viper.Set("log.level", "debug") + viper.Set("log.format", "json") + + viper.Set("broker.host", "test") + viper.Set("broker.port", 123) + viper.Set("broker.user", "test") + viper.Set("broker.password", "test") + viper.Set("broker.queue", "test") + viper.Set("broker.routingkey", "test") + + viper.Set("db.host", "test") + viper.Set("db.port", 123) + viper.Set("db.user", "test") + viper.Set("db.password", "test") + viper.Set("db.database", "test") + + viper.Set("schema.type", "isolated") + + conf := config.Config{} + conf.API.Host = "localhost" + conf.API.Port = 8080 + server := setup(&conf) + + assert.Equal(t, "localhost:8080", server.Addr) +} + +func (suite *TestSuite) SetupTest() { + viper.Set("log.level", "debug") +} + +func TestShutdown(t *testing.T) { + Conf = &config.Config{} + Conf.Broker = broker.MQConf{ + Host: "localhost", + Port: 5672, + User: "test", + Password: "test", + RoutingKey: "test", + Exchange: "sda", + Ssl: false, + Vhost: "/test", + } + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + if err != nil { + t.Skip("skip TestShutdown since broker not present") + } + assert.NoError(t, err) + + Conf.Database = database.DBConf{ + Host: "localhost", + Port: 5432, + User: "lega_in", + Password: "lega_in", + Database: "lega", + SslMode: "disable", + } + Conf.API.DB, err = database.NewDB(Conf.Database) + if err != nil { + t.Skip("skip TestShutdown since broker not present") + } + assert.NoError(t, err) + + // make sure all conections are alive + assert.Equal(t, false, Conf.API.MQ.Channel.IsClosed()) + assert.Equal(t, false, Conf.API.MQ.Connection.IsClosed()) + assert.Equal(t, nil, Conf.API.DB.DB.Ping()) + + shutdown() + assert.Equal(t, true, Conf.API.MQ.Channel.IsClosed()) + assert.Equal(t, true, Conf.API.MQ.Connection.IsClosed()) + assert.Equal(t, "sql: database is closed", Conf.API.DB.DB.Ping().Error()) +} + +func TestReadinessResponse(t *testing.T) { + r := mux.NewRouter() + r.HandleFunc("/ready", readinessResponse) + ts := httptest.NewServer(r) + defer ts.Close() + + Conf = &config.Config{} + Conf.Broker = broker.MQConf{ + Host: "localhost", + Port: 5672, + User: "test", + Password: "test", + RoutingKey: "test", + Exchange: "sda", + Ssl: false, + Vhost: "/test", + } + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + if err != nil { + t.Skip("skip TestShutdown since broker not present") + } + assert.NoError(t, err) + + Conf.Database = database.DBConf{ + Host: "localhost", + Port: 5432, + User: "lega_in", + Password: "lega_in", + Database: "lega", + SslMode: "disable", + } + Conf.API.DB, err = database.NewDB(Conf.Database) + assert.NoError(t, err) + + res, err := http.Get(ts.URL + "/ready") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) + defer res.Body.Close() + + // close the connection to force a reconneciton + Conf.API.MQ.Connection.Close() + res, err = http.Get(ts.URL + "/ready") + assert.NoError(t, err) + assert.Equal(t, http.StatusServiceUnavailable, res.StatusCode) + defer res.Body.Close() + + // reconnect should be fast so now this should pass + res, err = http.Get(ts.URL + "/ready") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) + defer res.Body.Close() + + // close the channel to force a reconneciton + Conf.API.MQ.Channel.Close() + res, err = http.Get(ts.URL + "/ready") + assert.NoError(t, err) + assert.Equal(t, http.StatusServiceUnavailable, res.StatusCode) + defer res.Body.Close() + + // reconnect should be fast so now this should pass + res, err = http.Get(ts.URL + "/ready") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) + defer res.Body.Close() + + // close DB connection to force a reconnection + Conf.API.DB.Close() + res, err = http.Get(ts.URL + "/ready") + assert.NoError(t, err) + assert.Equal(t, http.StatusServiceUnavailable, res.StatusCode) + defer res.Body.Close() + + // reconnect should be fast so now this should pass + res, err = http.Get(ts.URL + "/ready") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) + defer res.Body.Close() +} + +func TestDatabasePingCheck(t *testing.T) { + database := database.SQLdb{} + assert.Error(t, checkDB(&database, 1*time.Second), "nil DB should fail") + + database.DB, _, err = sqlmock.New() + assert.NoError(t, err) + assert.NoError(t, checkDB(&database, 1*time.Second), "ping should succeed") +} + +func TestDatasetRoute(t *testing.T) { + Conf = &config.Config{} + Conf.Broker.SchemasPath = "file://../../schemas/isolated/" + + r := mux.NewRouter() + r.HandleFunc("/dataset", dataset) + ts := httptest.NewServer(r) + defer ts.Close() + + goodJSON := []byte(`{"user":"test.user@example.com", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "dataset_files": [{"filepath": "inbox/user/file1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`) + good, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON)) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, good.StatusCode) + defer good.Body.Close() + + badJSON := []byte(`{"dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "dataset_files": []}`) + bad, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(badJSON)) + assert.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, bad.StatusCode) + defer bad.Body.Close() +} diff --git a/schemas/bigpicture/file-sync.json b/schemas/bigpicture/file-sync.json new file mode 100644 index 00000000..ff554759 --- /dev/null +++ b/schemas/bigpicture/file-sync.json @@ -0,0 +1,119 @@ +{ + "title": "JSON schema for file syncing message interface.", + "$id": "https://github.com/neicnordic/sda-common/tree/master/schemas/file-sync.json", + "$schema": "http://json-schema.org/draft-07/schema", + "type": "object", + "required": [ + "dataset_id", + "dataset_files", + "user" + ], + "additionalProperties": false, + "definitions": { + "dataset_files": { + "$id": "#/definitions/dataset_files", + "type": "object", + "minProperties": 3, + "maxProperties": 3, + "title": "File information schema", + "description": "Informations about a file", + "examples": [ + { + "filepath": "path/to/file", + "file_id": "16f3edd1-3c40-4284-9f82-1055361e655b", + "sha256": "82e4e60e7beb3db2e06a00a079788f7d71f75b61a4b75f28c4c942703dabb6d6" + } + ], + "required": [ + "filepath", + "file_id", + "sha256" + ], + "additionalProperties": false, + "properties": { + "filepath": { + "$id": "#/definitions/dataset_files/properties/filepath", + "type": "string", + "title": "The inbox filepath", + "description": "The inbox filepath", + "minLength": 5 + }, + "file_id": { + "$id": "#/definitions/dataset_files/properties/file_id", + "type": "string", + "title": "The checksum value in hex format", + "description": "The checksum value in (case-insensitive) hex format", + "minLength": 11, + "pattern": "^\\S+$", + "examples": [ + "16f3edd1-3c40-4284-9f82-1055361e655b" + ] + }, + "sha256": { + "$id": "#/definitions/checksum-sha256/properties/sha256", + "type": "string", + "title": "The decrypred checksum value in hex format", + "description": "The checksum value in (case-insensitive) hex format", + "pattern": "^[a-fA-F0-9]{64}$", + "examples": [ + "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6" + ] + } + } + } + }, + "properties": { + "dataset_id": { + "$id": "#/properties/dataset_id", + "type": "string", + "title": "The Accession identifier for the dataset", + "description": "The Accession identifier for the dataset", + "minLength": 11, + "pattern": "^\\S+$", + "examples": [ + "anyidentifier" + ] + }, + "dataset_files": { + "$id": "#/properties/dataset_files", + "type": "array", + "title": "The files in that dataset", + "description": "The files in that dataset", + "minItems": 1, + "examples": [ + [ + { + "filepath": "path/to/file1.c4gh", + "file_id": "16f3edd1-3c40-4284-9f82-1055361e655b" + }, + { + "filepath": "path/to/file2.c4gh", + "file_id": "ba824437-ffc0-4431-b6a0-73968c1bb1ed" + } + ] + ], + "additionalItems": false, + "items": { + "$ref": "#/definitions/dataset_files", + "properties": { + "filepath": { + "$ref": "#/definitions/dataset_files/properties/filepath" + }, + "file_id": { + "$ref": "#/definitions/dataset_files/properties/file_id" + } + } + } + }, + "user": { + "$id": "#/properties/user", + "type": "string", + "title": "The username", + "description": "The username", + "minLength": 5, + "examples": [ + "user.name@example.com" + ] + } + } +} From 1839bf4c1da466d2ac916a71643361e24b1acc3e Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 24 Nov 2022 09:24:16 +0100 Subject: [PATCH 02/23] Add Mappings struct to common --- internal/common/common.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/common/common.go b/internal/common/common.go index c22c48da..80c9eb1c 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -46,3 +46,9 @@ type InfoError struct { Reason string `json:"reason"` OriginalMessage interface{} `json:"original-message"` } + +type Mappings struct { + Type string `json:"type"` + DatasetID string `json:"dataset_id"` + AccessionIDs []string `json:"accession_ids"` +} From 7de7608fccefbd5ce2058f257658d1f7c6415c7d Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 24 Nov 2022 10:01:13 +0100 Subject: [PATCH 03/23] Parse JSON blob and send messages --- cmd/syncapi/syncapi.go | 55 +++++++++++++++++++++++++++++++++++++ cmd/syncapi/syncapi_test.go | 17 +++++++++++- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index d7c9a0e0..51338eca 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -25,6 +25,18 @@ import ( var Conf *config.Config var err error +type syncDataset struct { + DatasetID string `json:"dataset_id"` + DatasetFiles []datasetFiles `json:"dataset_files"` + User string `json:"user"` +} + +type datasetFiles struct { + FilePath string `json:"filepath"` + FileID string `json:"file_id"` + ShaSum string `json:"sha256"` +} + func main() { Conf, err = config.NewConfig("api") if err != nil { @@ -168,9 +180,52 @@ func dataset(w http.ResponseWriter, r *http.Request) { return } + parseMessage(b) + w.WriteHeader(http.StatusOK) } +// parsemessage parses the JSON blob and sends the relevant messages +func parseMessage(msg []byte) { + blob := syncDataset{} + _ = json.Unmarshal(msg, &blob) + + var accessionIDs []string + for _, files := range blob.DatasetFiles { + accessionIDs = append(accessionIDs, files.FileID) + finalize := common.Finalize{ + Type: "accession", + User: blob.User, + Filepath: files.FilePath, + AccessionID: files.FileID, + DecryptedChecksums: []common.Checksums{{Type: "sha256", Value: files.ShaSum}}, + } + finalizeMsg, err := json.Marshal(finalize) + if err != nil { + log.Errorf("Failed to marshal json messge: Reason %v", err) + } + err = Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, "accessionIDs", true, finalizeMsg) + if err != nil { + log.Errorf("Failed to send mapping messge: Reason %v", err) + } + } + + mappings := common.Mappings{ + Type: "mappings", + DatasetID: blob.DatasetID, + AccessionIDs: accessionIDs, + } + mappingMsg, err := json.Marshal(mappings) + if err != nil { + log.Errorf("Failed to marshal json messge: Reason %v", err) + } + + err = Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, "mappings", true, mappingMsg) + if err != nil { + log.Errorf("Failed to send mapping messge: Reason %v", err) + } +} + func respondWithError(w http.ResponseWriter, code int, message string) { respondWithJSON(w, code, map[string]string{"error": message}) } diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index 914db210..541c6885 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -190,7 +190,22 @@ func TestDatabasePingCheck(t *testing.T) { func TestDatasetRoute(t *testing.T) { Conf = &config.Config{} - Conf.Broker.SchemasPath = "file://../../schemas/isolated/" + Conf.Broker = broker.MQConf{ + Host: "localhost", + Port: 5672, + User: "test", + Password: "test", + RoutingKey: "test", + Exchange: "sda", + Ssl: false, + Vhost: "/test", + SchemasPath: "file://../../schemas/isolated/", + } + Conf.API.MQ, err = broker.NewMQ(Conf.Broker) + if err != nil { + t.Skip("skip TestShutdown since broker not present") + } + assert.NoError(t, err) r := mux.NewRouter() r.HandleFunc("/dataset", dataset) From b8285f9d18ae2fba94ef98dcf63425b3d24088f1 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 24 Nov 2022 10:02:14 +0100 Subject: [PATCH 04/23] Use newer messages in the tests --- dev_utils/compose-no-tls.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev_utils/compose-no-tls.yml b/dev_utils/compose-no-tls.yml index 5a0695cc..33000ff1 100644 --- a/dev_utils/compose-no-tls.yml +++ b/dev_utils/compose-no-tls.yml @@ -12,13 +12,13 @@ services: interval: 5s timeout: 20s retries: 3 - image: neicnordic/sda-db:v1.4.0 + image: neicnordic/sda-db:v1.4.15 ports: - "5432:5432" volumes: - /tmp/data:/data mq: - image: neicnordic/sda-mq:v1.4.0 + image: neicnordic/sda-mq:v1.4.15 container_name: mq environment: - MQ_USER=test From 8f083d34b69917ac8b22502e1c7ea4b23aaca2e7 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 24 Nov 2022 11:03:53 +0100 Subject: [PATCH 05/23] Add metadata route --- cmd/syncapi/syncapi.go | 29 ++++++++++++++++++++++++ cmd/syncapi/syncapi_test.go | 22 ++++++++++++++++++ schemas/bigpicture/metadata-sync.json | 32 +++++++++++++++++++++++++++ 3 files changed, 83 insertions(+) create mode 100644 schemas/bigpicture/metadata-sync.json diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 51338eca..826c8e69 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -81,6 +81,7 @@ func setup(config *config.Config) *http.Server { r.HandleFunc("/ready", readinessResponse).Methods("GET") r.HandleFunc("/dataset", dataset).Methods("POST") + r.HandleFunc("/metadata", metadata).Methods("POST") cfg := &tls.Config{ MinVersion: tls.VersionTLS12, @@ -241,3 +242,31 @@ func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) { log.Errorf("failed to write HTTP response, reason: %v", err) } } + +func metadata(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + respondWithError(w, http.StatusBadRequest, "failed to read request body") + + return + } + defer r.Body.Close() + // the filepath looks funkt for now, it will sort itself out when we switch to sda-common + res, err := common.ValidateJSON(Conf.Broker.SchemasPath+"bigpicture/metadata-sync.json", b) + if err != nil { + respondWithError(w, http.StatusBadRequest, "eror on JSON validation: "+err.Error()) + + return + } + if !res.Valid() { + errorString := "" + for _, validErr := range res.Errors() { + errorString += validErr.String() + "\n\n" + } + respondWithError(w, http.StatusBadRequest, "JSON validation failed, reason: "+errorString) + + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index 541c6885..bc4cba04 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -224,3 +224,25 @@ func TestDatasetRoute(t *testing.T) { assert.Equal(t, http.StatusBadRequest, bad.StatusCode) defer bad.Body.Close() } + +func TestMetadataRoute(t *testing.T) { + Conf = &config.Config{} + Conf.Broker.SchemasPath = "file://../../schemas/" + + r := mux.NewRouter() + r.HandleFunc("/metadata", metadata) + ts := httptest.NewServer(r) + defer ts.Close() + + goodJSON := []byte(`{"dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "metadata": {"dummy":"data"}}`) + good, err := http.Post(ts.URL+"/metadata", "application/json", bytes.NewBuffer(goodJSON)) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, good.StatusCode) + defer good.Body.Close() + + badJSON := []byte(`{"dataset_id": "phail", "metadata": {}}`) + bad, err := http.Post(ts.URL+"/metadata", "application/json", bytes.NewBuffer(badJSON)) + assert.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, bad.StatusCode) + defer bad.Body.Close() +} diff --git a/schemas/bigpicture/metadata-sync.json b/schemas/bigpicture/metadata-sync.json new file mode 100644 index 00000000..5f3b34b7 --- /dev/null +++ b/schemas/bigpicture/metadata-sync.json @@ -0,0 +1,32 @@ +{ + "title": "JSON schema for file syncing message interface.", + "$id": "https://github.com/neicnordic/sda-common/tree/master/schemas/metadata-sync.json", + "$schema": "http://json-schema.org/draft-07/schema", + "type": "object", + "required": [ + "dataset_id", + "metadata" + ], + "additionalProperties": false, + "properties": { + "dataset_id": { + "$id": "#/properties/dataset_id", + "type": "string", + "title": "The Accession identifier for the dataset", + "description": "The Accession identifier for the dataset", + "minLength": 11, + "pattern": "^\\S+$", + "examples": [ + "anyidentifier" + ] + }, + "metadata": { + "$id": "#/properties/metadata", + "type": "object", + "title": "Metadata for the dataset", + "description": "Metadata for the dataset", + "minProperties": 1, + "pattern": "^\\S+$" + } + } +} From d72595c76423f284dc93dc6ece5c3b5fafb08168 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Fri, 25 Nov 2022 07:45:56 +0100 Subject: [PATCH 06/23] Build JSON blob for dataset syncing from mapper message --- cmd/syncapi/syncapi.go | 29 ++++++++++++++++++++++++ cmd/syncapi/syncapi_test.go | 44 +++++++++++++++++++++++++++++++++++++ internal/database/db.go | 38 ++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 826c8e69..2dd63163 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -270,3 +270,32 @@ func metadata(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } + +func buildSyncDatasetJSON(b []byte) (syncDataset, error) { + var msg common.Mappings + _ = json.Unmarshal(b, &msg) + + var dataset = syncDataset{ + DatasetID: msg.DatasetID, + } + + for _, ID := range msg.AccessionIDs { + if DBRes := checkDB(Conf.API.DB, 20*time.Millisecond); DBRes != nil { + log.Infof("DB connection error :%v", DBRes) + Conf.API.DB.Reconnect() + } + data, err := Conf.API.DB.GetSyncData(ID) + if err != nil { + return syncDataset{}, err + } + datasetFile := datasetFiles{ + FilePath: data.FilePath, + FileID: ID, + ShaSum: data.Checksum, + } + dataset.DatasetFiles = append(dataset.DatasetFiles, datasetFile) + dataset.User = data.User + } + + return dataset, nil +} diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index bc4cba04..07051570 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -246,3 +246,47 @@ func TestMetadataRoute(t *testing.T) { assert.Equal(t, http.StatusBadRequest, bad.StatusCode) defer bad.Body.Close() } + +func TestBuildJSON(t *testing.T) { + Conf = &config.Config{} + Conf.Database = database.DBConf{ + Host: "localhost", + Port: 5432, + User: "postgres", + Password: "postgres", + Database: "lega", + SslMode: "disable", + } + Conf.API.DB, err = database.NewDB(Conf.Database) + if err != nil { + t.Skip("skip TestShutdown since broker not present") + } + assert.NoError(t, err) + + db := Conf.API.DB.DB + + var fileID int64 + const insert = "INSERT INTO local_ega.main(submission_file_path, submission_user, decrypted_file_checksum, status, submission_file_extension) VALUES($1, $2, $3, 'READY', 'c4gh') RETURNING id;" + const accession = "UPDATE local_ega.files SET stable_id = $1 WHERE inbox_path = $2;" + const mapping = "INSERT INTO local_ega_ebi.filedataset(file_id, dataset_stable_id) VALUES ($1, 'cd532362-e06e-4460-8490-b9ce64b8d9e7');" + + err := db.QueryRow(insert, "dummy.user/test/file1.c4gh", "dummy.user", "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b").Scan(&fileID) + assert.NoError(t, err) + err = db.QueryRow(accession, "ed6af454-d910-49e3-8cda-488a6f246e76", "dummy.user/test/file1.c4gh").Err() + assert.NoError(t, err) + err = db.QueryRow(mapping, fileID).Err() + assert.NoError(t, err) + + err = db.QueryRow(insert, "dummy.user/test/file2.c4gh", "dummy.user", "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6").Scan(&fileID) + assert.NoError(t, err) + err = db.QueryRow(accession, "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "dummy.user/test/file2.c4gh").Err() + assert.NoError(t, err) + err = db.QueryRow(mapping, fileID).Err() + assert.NoError(t, err) + + m := []byte(`{"type":"mapping", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "accession_ids": ["5fe7b660-afea-4c3a-88a9-3daabf055ebb", "ed6af454-d910-49e3-8cda-488a6f246e76"]}`) + ds, err := buildSyncDatasetJSON(m) + assert.NoError(t, err) + assert.Equal(t, "dummy.user", ds.User) + +} diff --git a/internal/database/db.go b/internal/database/db.go index a471fcbc..26e432d8 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -25,6 +25,7 @@ type Database interface { MarkCompleted(checksum string, fileID int) error MarkReady(accessionID, user, filepath, checksum string) error GetArchived(user, filepath, checksum string) (string, int, error) + GetSyncData(accessionID string) (SyncData, error) Close() } @@ -465,3 +466,40 @@ func (dbs *SQLdb) Close() { db := dbs.DB db.Close() } + +type SyncData struct { + User string + FilePath string + Checksum string +} + +// GetSyncData retrieves the file information needed to sync a dataset +func (dbs *SQLdb) GetSyncData(accessionID string) (SyncData, error) { + var ( + s SyncData + err error + ) + + for count := 1; count <= dbRetryTimes; count++ { + s, err = dbs.getSyncData(accessionID) + if err == nil { + break + } + time.Sleep(time.Duration(math.Pow(3, float64(count))) * time.Second) + } + + return s, err +} + +// getSyncData is the actual function performing work for GetSyncData +func (dbs *SQLdb) getSyncData(accessionID string) (SyncData, error) { + dbs.checkAndReconnectIfNeeded() + + const query = "SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = $1 AND status = 'READY';" + var data SyncData + if err := dbs.DB.QueryRow(query, accessionID).Scan(&data.User, &data.FilePath, &data.Checksum); err != nil { + return SyncData{}, err + } + + return data, nil +} From 8ab3f8859bd9c0760293cbec087bf62e2c285200 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Fri, 25 Nov 2022 07:47:02 +0100 Subject: [PATCH 07/23] Add postgres root user to docker compose --- dev_utils/compose-no-tls.yml | 10 ++++++---- dev_utils/compose-sda.yml | 1 + 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/dev_utils/compose-no-tls.yml b/dev_utils/compose-no-tls.yml index 33000ff1..d899fba8 100644 --- a/dev_utils/compose-no-tls.yml +++ b/dev_utils/compose-no-tls.yml @@ -4,6 +4,7 @@ services: command: server /data container_name: db environment: + - POSTGRES_PASSWORD=postgres - DB_LEGA_IN_PASSWORD=lega_in - DB_LEGA_OUT_PASSWORD=lega_out - NOTLS=true @@ -16,7 +17,7 @@ services: ports: - "5432:5432" volumes: - - /tmp/data:/data + - data:/data mq: image: neicnordic/sda-mq:v1.4.15 container_name: mq @@ -205,6 +206,7 @@ services: volumes: - ./config-notls.yaml:/config.yaml volumes: - archive: - backup: - inbox: + archive: + backup: + inbox: + data: diff --git a/dev_utils/compose-sda.yml b/dev_utils/compose-sda.yml index bf2ab570..f960a5d3 100644 --- a/dev_utils/compose-sda.yml +++ b/dev_utils/compose-sda.yml @@ -23,6 +23,7 @@ services: depends_on: - certfixer environment: + - POSTGRES_PASSWORD=postgres - DB_LEGA_IN_PASSWORD=lega_in - DB_LEGA_OUT_PASSWORD=lega_out - PKI_VOLUME_PATH=/certs/ From 28b7f40adccc11d8b6586ba099888eec781e70a3 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Fri, 25 Nov 2022 08:05:42 +0100 Subject: [PATCH 08/23] Get messages from MQ Needs expanding to send outgoing REST POST call --- cmd/syncapi/syncapi.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 2dd63163..ae63bbec 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -59,6 +59,44 @@ func main() { os.Exit(0) }() + forever := make(chan bool) + go func() { + messages, err := Conf.API.MQ.GetMessages(Conf.Broker.Queue) + if err != nil { + log.Fatal(err) + } + for m := range messages { + log.Debugf("Received a message (corr-id: %s, message: %s)", m.CorrelationId, m.Body) + res, err := common.ValidateJSON(Conf.Broker.SchemasPath+"dataset-mapping.json", m.Body) + if err != nil { + if err := m.Nack(false, false); err != nil { + log.Errorf("Failed to nack message, reason: %v", err) + } + + continue + } + if !res.Valid() { + errorString := "" + for _, validErr := range res.Errors() { + errorString += validErr.String() + "\n\n" + } + if err := m.Nack(false, false); err != nil { + log.Errorf("Failed to nack message, reason: %v", err) + } + + continue + } + + _, err = buildSyncDatasetJSON(m.Body) + if err != nil { + log.Errorf("failed to build SyncDatasetJSON, Reason: %v", err) + } + // http.Client send POST to reciever + + } + }() + <-forever + srv := setup(Conf) if Conf.API.ServerCert != "" && Conf.API.ServerKey != "" { From a6e7f1acdcccbb57d4fd38f3f620bf9cfa9d0422 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Fri, 25 Nov 2022 08:45:23 +0100 Subject: [PATCH 09/23] buildSyncDatasetJSOhould return JSON --- cmd/syncapi/syncapi.go | 11 ++++++++--- cmd/syncapi/syncapi_test.go | 4 +--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index ae63bbec..6e62498c 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -309,7 +309,7 @@ func metadata(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func buildSyncDatasetJSON(b []byte) (syncDataset, error) { +func buildSyncDatasetJSON(b []byte) ([]byte, error) { var msg common.Mappings _ = json.Unmarshal(b, &msg) @@ -324,7 +324,7 @@ func buildSyncDatasetJSON(b []byte) (syncDataset, error) { } data, err := Conf.API.DB.GetSyncData(ID) if err != nil { - return syncDataset{}, err + return nil, err } datasetFile := datasetFiles{ FilePath: data.FilePath, @@ -335,5 +335,10 @@ func buildSyncDatasetJSON(b []byte) (syncDataset, error) { dataset.User = data.User } - return dataset, nil + json, err := json.Marshal(dataset) + if err != nil { + return nil, err + } + + return json, nil } diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index 07051570..6d7a6f4f 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -285,8 +285,6 @@ func TestBuildJSON(t *testing.T) { assert.NoError(t, err) m := []byte(`{"type":"mapping", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "accession_ids": ["5fe7b660-afea-4c3a-88a9-3daabf055ebb", "ed6af454-d910-49e3-8cda-488a6f246e76"]}`) - ds, err := buildSyncDatasetJSON(m) + _, err = buildSyncDatasetJSON(m) assert.NoError(t, err) - assert.Equal(t, "dummy.user", ds.User) - } From e57cea58a3cba6000d2778fd32239f6ca638db80 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Fri, 25 Nov 2022 10:10:05 +0100 Subject: [PATCH 10/23] Send POST for dataset sync --- cmd/syncapi/syncapi.go | 23 +++++++++++++++++++++-- cmd/syncapi/syncapi_test.go | 21 +++++++++++++++++++++ internal/config/config.go | 6 ++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 6e62498c..f2942071 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "crypto/tls" "encoding/json" @@ -87,11 +88,13 @@ func main() { continue } - _, err = buildSyncDatasetJSON(m.Body) + blob, err := buildSyncDatasetJSON(m.Body) if err != nil { log.Errorf("failed to build SyncDatasetJSON, Reason: %v", err) } - // http.Client send POST to reciever + if err := sendPOST(blob); err != nil { + log.Errorf("failed to send POST, Reason: %v", err) + } } }() @@ -342,3 +345,19 @@ func buildSyncDatasetJSON(b []byte) ([]byte, error) { return json, nil } + +func sendPOST(payload []byte) error { + client := &http.Client{} + URL := Conf.Sync.Host + "/dataset" + req, err := http.NewRequest("POST", URL, bytes.NewBuffer(payload)) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil || resp.StatusCode != http.StatusOK { + return err + } + defer resp.Body.Close() + + return nil +} diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index 6d7a6f4f..21f31189 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "fmt" "net/http" "net/http/httptest" "testing" @@ -288,3 +289,23 @@ func TestBuildJSON(t *testing.T) { _, err = buildSyncDatasetJSON(m) assert.NoError(t, err) } + +func TestSendPOST(t *testing.T) { + r := http.NewServeMux() + r.HandleFunc("/dataset", func(w http.ResponseWriter, r *http.Request) { + _, err = w.Write([]byte(fmt.Sprint(http.StatusOK))) + assert.NoError(t, err) + }) + ts := httptest.NewServer(r) + defer ts.Close() + + Conf = &config.Config{} + Conf.Sync = config.SyncConf{ + Host: ts.URL, + User: "test", + Password: "test", + } + syncJSON := []byte(`{"user":"test.user@example.com", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "dataset_files": [{"filepath": "inbox/user/file1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`) + err := sendPOST(syncJSON) + assert.NoError(t, err) +} diff --git a/internal/config/config.go b/internal/config/config.go index b745e3fb..1855a4ec 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -33,8 +33,14 @@ type Config struct { Database database.DBConf API APIConf Notify SMTPConf + Sync SyncConf } +type SyncConf struct { + Host string + Password string + User string +} type APIConf struct { CACert string ServerCert string From 8640b76723459927b06b525b1947e341d794cad9 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 10:51:16 +0100 Subject: [PATCH 11/23] Send messages to trigger ingestion --- cmd/syncapi/syncapi.go | 16 +++++++++++++++- internal/common/common.go | 6 ++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index f2942071..78f41892 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -234,6 +234,20 @@ func parseMessage(msg []byte) { var accessionIDs []string for _, files := range blob.DatasetFiles { + ingest := common.Ingest{ + Type: "ingest", + User: blob.User, + FilePath: files.FilePath, + } + ingestMsg, err := json.Marshal(ingest) + if err != nil { + log.Errorf("Failed to marshal json messge: Reason %v", err) + } + err = Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, "ingest", true, ingestMsg) + if err != nil { + log.Errorf("Failed to send ingest messge: Reason %v", err) + } + accessionIDs = append(accessionIDs, files.FileID) finalize := common.Finalize{ Type: "accession", @@ -253,7 +267,7 @@ func parseMessage(msg []byte) { } mappings := common.Mappings{ - Type: "mappings", + Type: "mapping", DatasetID: blob.DatasetID, AccessionIDs: accessionIDs, } diff --git a/internal/common/common.go b/internal/common/common.go index 80c9eb1c..d9c9b9a7 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -41,6 +41,12 @@ type Finalize struct { DecryptedChecksums []Checksums `json:"decrypted_checksums"` } +type Ingest struct { + Type string `json:"type"` + User string `json:"user"` + FilePath string `json:"filepath"` +} + type InfoError struct { Error string `json:"error"` Reason string `json:"reason"` From 661f1850924d41190c98bb891b9cd7818ccf67a4 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 10:56:39 +0100 Subject: [PATCH 12/23] Propagate parseMessage error so we can return a 500 on failure --- cmd/syncapi/syncapi.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 78f41892..e613521b 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -222,13 +222,15 @@ func dataset(w http.ResponseWriter, r *http.Request) { return } - parseMessage(b) + if err := parseMessage(b); err != nil { + w.WriteHeader(http.StatusInternalServerError) + } w.WriteHeader(http.StatusOK) } // parsemessage parses the JSON blob and sends the relevant messages -func parseMessage(msg []byte) { +func parseMessage(msg []byte) error { blob := syncDataset{} _ = json.Unmarshal(msg, &blob) @@ -241,11 +243,11 @@ func parseMessage(msg []byte) { } ingestMsg, err := json.Marshal(ingest) if err != nil { - log.Errorf("Failed to marshal json messge: Reason %v", err) + return fmt.Errorf("Failed to marshal json messge: Reason %v", err) } err = Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, "ingest", true, ingestMsg) if err != nil { - log.Errorf("Failed to send ingest messge: Reason %v", err) + return fmt.Errorf("Failed to send ingest messge: Reason %v", err) } accessionIDs = append(accessionIDs, files.FileID) @@ -258,11 +260,11 @@ func parseMessage(msg []byte) { } finalizeMsg, err := json.Marshal(finalize) if err != nil { - log.Errorf("Failed to marshal json messge: Reason %v", err) + return fmt.Errorf("Failed to marshal json messge: Reason %v", err) } err = Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, "accessionIDs", true, finalizeMsg) if err != nil { - log.Errorf("Failed to send mapping messge: Reason %v", err) + return fmt.Errorf("Failed to send mapping messge: Reason %v", err) } } @@ -273,13 +275,15 @@ func parseMessage(msg []byte) { } mappingMsg, err := json.Marshal(mappings) if err != nil { - log.Errorf("Failed to marshal json messge: Reason %v", err) + return fmt.Errorf("Failed to marshal json messge: Reason %v", err) } err = Conf.API.MQ.SendMessage(fmt.Sprintf("%v", time.Now().Unix()), Conf.Broker.Exchange, "mappings", true, mappingMsg) if err != nil { - log.Errorf("Failed to send mapping messge: Reason %v", err) + return fmt.Errorf("Failed to send mapping messge: Reason %v", err) } + + return nil } func respondWithError(w http.ResponseWriter, code int, message string) { From 74f81d0bf8c1368d410275b9b61ee3288333a7a9 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 10:57:30 +0100 Subject: [PATCH 13/23] Rename parseMessage to parseDatasetMessage --- cmd/syncapi/syncapi.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index e613521b..9417111e 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -222,7 +222,7 @@ func dataset(w http.ResponseWriter, r *http.Request) { return } - if err := parseMessage(b); err != nil { + if err := parseDatasetMessage(b); err != nil { w.WriteHeader(http.StatusInternalServerError) } @@ -230,7 +230,7 @@ func dataset(w http.ResponseWriter, r *http.Request) { } // parsemessage parses the JSON blob and sends the relevant messages -func parseMessage(msg []byte) error { +func parseDatasetMessage(msg []byte) error { blob := syncDataset{} _ = json.Unmarshal(msg, &blob) From d522144313f9d57d0ecf2bcb69b85ee7f025789e Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 13:06:15 +0100 Subject: [PATCH 14/23] Dedicated config for Sync service --- cmd/syncapi/syncapi.go | 2 +- internal/config/config.go | 30 ++++++++++++++++++++++++++++++ internal/config/config_test.go | 25 +++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 9417111e..347b2e4c 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -39,7 +39,7 @@ type datasetFiles struct { } func main() { - Conf, err = config.NewConfig("api") + Conf, err = config.NewConfig("sync") if err != nil { log.Fatal(err) } diff --git a/internal/config/config.go b/internal/config/config.go index 1855a4ec..cf3f9bd7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -39,6 +39,7 @@ type Config struct { type SyncConf struct { Host string Password string + Port int User string } type APIConf struct { @@ -112,6 +113,10 @@ func NewConfig(app string) (*Config, error) { requiredConfVars = []string{ "broker.host", "broker.port", "broker.user", "broker.password", "broker.queue", "smtp.host", "smtp.port", "smtp.password", "smtp.from", } + case "sync": + requiredConfVars = []string{ + "broker.host", "broker.port", "broker.user", "broker.password", "broker.routingkey", "db.host", "db.port", "db.user", "db.password", "db.database", "sync.host", "sync.password", "sync.user", + } default: requiredConfVars = []string{ "broker.host", "broker.port", "broker.user", "broker.password", "broker.queue", "broker.routingkey", "db.host", "db.port", "db.user", "db.password", "db.database", @@ -225,6 +230,20 @@ func NewConfig(app string) (*Config, error) { case "notify": c.configSMTP() + return c, nil + case "sync": + err = c.configDatabase() + if err != nil { + return nil, err + } + + err = c.configAPI() + if err != nil { + return nil, err + } + + c.configSync() + return c, nil } @@ -444,6 +463,17 @@ func (c *Config) configSMTP() { c.Notify.FromAddr = viper.GetString("smtp.from") } +// configSync provides configuration for the outgoing sync settings +func (c *Config) configSync() { + c.Sync = SyncConf{} + c.Sync.Host = viper.GetString("sync.host") + if viper.IsSet("sync.port") { + c.Sync.Port = viper.GetInt("sync.port") + } + c.Sync.Password = viper.GetString("sync.password") + c.Sync.User = viper.GetString("sync.user") +} + // GetC4GHKey reads and decrypts and returns the c4gh key func GetC4GHKey() (*[32]byte, error) { keyPath := viper.GetString("c4gh.filepath") diff --git a/internal/config/config_test.go b/internal/config/config_test.go index a399909b..cca51056 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -740,3 +740,28 @@ func (suite *TestSuite) TestNotifyConfiguration() { assert.NotNil(suite.T(), config) } + +func (suite *TestSuite) TestSyncConfiguration() { + // At this point we should fail because we lack configuration + config, err := NewConfig("sync") + assert.Error(suite.T(), err) + assert.Nil(suite.T(), config) + + suite.SetupTest() + viper.Set("broker.host", "test") + viper.Set("broker.port", 123) + viper.Set("broker.user", "test") + viper.Set("broker.password", "test") + viper.Set("broker.queue", "test") + viper.Set("broker.routingkey", "test") + viper.Set("broker.exchange", "test") + + viper.Set("sync.host", "test") + viper.Set("sync.port", 456) + viper.Set("sync.password", "test") + viper.Set("sync.user", "dummy") + + config, err = NewConfig("sync") + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), config) +} From 41656d129d708ff0d63e46b02d9e46c42e737087 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 13:07:37 +0100 Subject: [PATCH 15/23] Port declaration in host URL overrides configured port --- cmd/syncapi/syncapi.go | 20 +++++++++++++++++++- cmd/syncapi/syncapi_test.go | 12 ++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 347b2e4c..9ec005eb 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "os/signal" "syscall" @@ -366,7 +367,11 @@ func buildSyncDatasetJSON(b []byte) ([]byte, error) { func sendPOST(payload []byte) error { client := &http.Client{} - URL := Conf.Sync.Host + "/dataset" + URL, err := createHostURL(Conf.Sync.Host, Conf.Sync.Port) + if err != nil { + return err + } + req, err := http.NewRequest("POST", URL, bytes.NewBuffer(payload)) if err != nil { return err @@ -379,3 +384,16 @@ func sendPOST(payload []byte) error { return nil } + +func createHostURL(host string, port int) (string, error) { + url, err := url.ParseRequestURI(host) + if err != nil { + return "", err + } + if url.Port() == "" && port != 0 { + url.Host += fmt.Sprintf(":%d", port) + } + url.Path = "/dataset" + + return url.String(), nil +} diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index 21f31189..304b58f6 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -309,3 +309,15 @@ func TestSendPOST(t *testing.T) { err := sendPOST(syncJSON) assert.NoError(t, err) } + +func TestCreateHostURL(t *testing.T) { + Conf = &config.Config{} + Conf.Sync = config.SyncConf{ + Host: "http://localhost", + Port: 443, + } + + s, err := createHostURL(Conf.Sync.Host, Conf.Sync.Port) + assert.NoError(t, err) + assert.Equal(t, "http://localhost:443/dataset", s) +} From 51edc59eec0ea099d0e5fe4435ebc1bfa0a05c2b Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 13:08:44 +0100 Subject: [PATCH 16/23] Create channel inside separated function --- cmd/syncapi/syncapi.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 9ec005eb..86d16c89 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -61,8 +61,8 @@ func main() { os.Exit(0) }() - forever := make(chan bool) go func() { + forever := make(chan bool) messages, err := Conf.API.MQ.GetMessages(Conf.Broker.Queue) if err != nil { log.Fatal(err) @@ -88,7 +88,7 @@ func main() { continue } - + log.Infoln("buildSyncDatasetJSON") blob, err := buildSyncDatasetJSON(m.Body) if err != nil { log.Errorf("failed to build SyncDatasetJSON, Reason: %v", err) @@ -96,10 +96,13 @@ func main() { if err := sendPOST(blob); err != nil { log.Errorf("failed to send POST, Reason: %v", err) } + if err := m.Ack(false); err != nil { + log.Errorf("Failed to ack message: reason %v", err) + } } + <-forever }() - <-forever srv := setup(Conf) From 77eaec24e7e5c8f30c929379b8cc2711828a9cad Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 13:26:09 +0100 Subject: [PATCH 17/23] Add sync to config.yaml --- dev_utils/config.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dev_utils/config.yaml b/dev_utils/config.yaml index f86cd948..aaf5f383 100644 --- a/dev_utils/config.yaml +++ b/dev_utils/config.yaml @@ -81,3 +81,8 @@ inbox: log: level: "debug" format: "json" + +sync: + host: "http://localhost:8080" + password: "pass" + user: "dummy" From 3a918510cb1cfa7f9f53516f7d761e5e5aa5aebe Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 15:46:33 +0100 Subject: [PATCH 18/23] Add basic auth to `dataset` and `metadata` routes --- cmd/syncapi/syncapi.go | 31 +++++++++++++++++++++++++++++-- cmd/syncapi/syncapi_test.go | 29 +++++++++++++++++++++++++++++ internal/config/config.go | 2 ++ 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 86d16c89..0642e2a2 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -3,6 +3,8 @@ package main import ( "bytes" "context" + "crypto/sha256" + "crypto/subtle" "crypto/tls" "encoding/json" "fmt" @@ -125,8 +127,8 @@ func setup(config *config.Config) *http.Server { r := mux.NewRouter().SkipClean(true) r.HandleFunc("/ready", readinessResponse).Methods("GET") - r.HandleFunc("/dataset", dataset).Methods("POST") - r.HandleFunc("/metadata", metadata).Methods("POST") + r.HandleFunc("/dataset", basicAuth(http.HandlerFunc(dataset))).Methods("POST") + r.HandleFunc("/metadata", basicAuth(http.HandlerFunc(metadata))).Methods("POST") cfg := &tls.Config{ MinVersion: tls.VersionTLS12, @@ -379,6 +381,7 @@ func sendPOST(payload []byte) error { if err != nil { return err } + req.SetBasicAuth(Conf.Sync.User, Conf.Sync.Password) resp, err := client.Do(req) if err != nil || resp.StatusCode != http.StatusOK { return err @@ -400,3 +403,27 @@ func createHostURL(host string, port int) (string, error) { return url.String(), nil } + +func basicAuth(auth http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + username, password, ok := r.BasicAuth() + if ok { + usernameHash := sha256.Sum256([]byte(username)) + passwordHash := sha256.Sum256([]byte(password)) + expectedUsernameHash := sha256.Sum256([]byte(Conf.API.User)) + expectedPasswordHash := sha256.Sum256([]byte(Conf.API.Password)) + + usernameMatch := (subtle.ConstantTimeCompare(usernameHash[:], expectedUsernameHash[:]) == 1) + passwordMatch := (subtle.ConstantTimeCompare(passwordHash[:], expectedPasswordHash[:]) == 1) + + if usernameMatch && passwordMatch { + auth.ServeHTTP(w, r) + + return + } + } + + w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + }) +} diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index 304b58f6..87dcadb6 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -321,3 +321,32 @@ func TestCreateHostURL(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "http://localhost:443/dataset", s) } + +func TestBasicAuth(t *testing.T) { + Conf = &config.Config{} + Conf.Broker.SchemasPath = "file://../../schemas/" + Conf.API = config.APIConf{ + User: "dummy", + Password: "test", + } + + r := mux.NewRouter() + r.HandleFunc("/metadata", basicAuth(metadata)) + ts := httptest.NewServer(r) + defer ts.Close() + + goodJSON := []byte(`{"dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "metadata": {"dummy":"data"}}`) + req, err := http.NewRequest("POST", ts.URL+"/metadata", bytes.NewBuffer(goodJSON)) + assert.NoError(t, err) + req.SetBasicAuth(Conf.API.User, Conf.API.Password) + good, err := ts.Client().Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, good.StatusCode) + defer good.Body.Close() + + req.SetBasicAuth(Conf.API.User, "wrongpass") + bad, err := ts.Client().Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusUnauthorized, bad.StatusCode) + defer bad.Body.Close() +} diff --git a/internal/config/config.go b/internal/config/config.go index cf3f9bd7..8b061fb4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,6 +51,8 @@ type APIConf struct { Session SessionConfig DB *database.SQLdb MQ *broker.AMQPBroker + User string + Password string } type SessionConfig struct { From 0cb12924e476116ab7fc567c2d9ddddf8fb555e7 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 16:30:24 +0100 Subject: [PATCH 19/23] Add DB test for GetSyncData --- internal/database/db_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/database/db_test.go b/internal/database/db_test.go index c68d6336..aea19a21 100644 --- a/internal/database/db_test.go +++ b/internal/database/db_test.go @@ -533,3 +533,17 @@ func TestClose(t *testing.T) { assert.Nil(t, r, "Close failed unexpectedly") } + +func TestGetSyncData(t *testing.T) { + r := sqlTesterHelper(t, func(mock sqlmock.Sqlmock, testDb *SQLdb) error { + mock.ExpectQuery("SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = \\$1 AND status = 'READY'"). + WithArgs("accessionId").WillReturnRows(sqlmock.NewRows([]string{"elixir_id", "inbox_path", "decrypted_file_checksum"}).AddRow("dummy", "/file/paht", "abc123")) + + s, err := testDb.GetSyncData("accessionId") + assert.Equal(t, "dummy", s.User) + + return err + }) + + assert.Nil(t, r, "GetSyncData failed unexpectedly") +} From e45294b984d48b55b9bc9082cf9d0eec3943cf9d Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Tue, 29 Nov 2022 17:02:50 +0100 Subject: [PATCH 20/23] For some reason this fixes the test --- .../tests/s3notls/40_ingest_test_notls.sh | 75 +------------------ 1 file changed, 4 insertions(+), 71 deletions(-) diff --git a/.github/integration/tests/s3notls/40_ingest_test_notls.sh b/.github/integration/tests/s3notls/40_ingest_test_notls.sh index c16eb6f5..2cd39f93 100644 --- a/.github/integration/tests/s3notls/40_ingest_test_notls.sh +++ b/.github/integration/tests/s3notls/40_ingest_test_notls.sh @@ -46,33 +46,8 @@ for file in dummy_data.c4gh largefile.c4gh; do curl -vvv -u test:test 'http://localhost:15672/api/exchanges/test/sda/publish' \ -H 'Content-Type: application/json;charset=UTF-8' \ - --data-binary "$(echo '{ - "vhost":"test", - "name":"sda", - "properties":{ - "delivery_mode":2, - "correlation_id":"CORRID", - "content_encoding":"UTF-8", - "content_type":"application/json" - }, - "routing_key":"files", - "payload_encoding":"string", - "payload":"{ - \"type\":\"ingest\", - \"user\":\"test\", - \"filepath\":\"FILENAME\", - \"encrypted_checksums\":[ - { - \"type\":\"sha256\", - \"value\":\"SHA256SUM\" - }, - { - \"type\":\"md5\", - \"value\":\"MD5SUM\" - } - ] - }" - }' | sed -e "s/FILENAME/$file/" -e "s/MD5SUM/${md5sum}/" -e "s/SHA256SUM/${sha256sum}/" -e "s/CORRID/$count/")" + --data-binary "$(echo '{"vhost":"test","name":"sda","properties":{"delivery_mode":2,"correlation_id":"CORRID","content_encoding":"UTF-8","content_type":"application/json"},"routing_key":"files","payload_encoding":"string","payload":"{\"type\":\"ingest\",\"user\":\"test\",\"filepath\":\"FILENAME\",\"encrypted_checksums\":[{\"type\":\"sha256\",\"value\":\"SHA256SUM\"},{\"type\":\"md5\",\"value\":\"MD5SUM\"}]}"}' | sed -e "s/FILENAME/$file/" -e "s/MD5SUM/${md5sum}/" -e "s/SHA256SUM/${sha256sum}/" -e "s/CORRID/$count/")" + RETRY_TIMES=0 until docker logs ingest --since="$now" 2>&1 | grep "File marked as archived"; do @@ -145,34 +120,7 @@ for file in dummy_data.c4gh largefile.c4gh; do # Publish accession id curl -vvv -u test:test 'http://localhost:15672/api/exchanges/test/sda/publish' \ -H 'Content-Type: application/json;charset=UTF-8' \ - --data-binary "$(echo '{ - "vhost":"test", - "name":"sda", - "properties":{ - "delivery_mode":2, - "correlation_id":"CORRID", - "content_encoding":"UTF-8", - "content_type":"application/json" - }, - "routing_key":"files", - "payload_encoding":"string", - "payload":"{ - \"type\":\"accession\", - \"user\":\"test\", - \"filepath\":\"FILENAME\", - \"accession_id\":\"ACCESSIONID\", - \"decrypted_checksums\":[ - { - \"type\":\"sha256\", - \"value\":\"DECSHA256SUM\" - }, - { - \"type\":\"md5\", - \"value\":\"DECMD5SUM\" - } - ] - }" - }' | sed -e "s/FILENAME/$filepath/" -e "s/DECMD5SUM/${decmd5sum}/" -e "s/DECSHA256SUM/${decsha256sum}/" -e "s/ACCESSIONID/${access}/" -e "s/CORRID/$count/")" + --data-binary "$(echo '{"vhost":"test","name":"sda","properties":{"delivery_mode":2,"correlation_id":"CORRID","content_encoding":"UTF-8","content_type":"application/json"},"routing_key":"files","payload_encoding":"string","payload":"{\"type\":\"accession\",\"user\":\"test\",\"filepath\":\"FILENAME\",\"accession_id\":\"ACCESSIONID\",\"decrypted_checksums\":[{\"type\":\"sha256\",\"value\":\"DECSHA256SUM\"},{\"type\":\"md5\",\"value\":\"DECMD5SUM\"}]}"}' | sed -e "s/FILENAME/$filepath/" -e "s/DECMD5SUM/${decmd5sum}/" -e "s/DECSHA256SUM/${decsha256sum}/" -e "s/ACCESSIONID/${access}/" -e "s/CORRID/$count/")" echo "Waiting for finalize/backup to complete" @@ -249,22 +197,7 @@ for file in dummy_data.c4gh largefile.c4gh; do # Map dataset ids curl -vvv -u test:test 'http://localhost:15672/api/exchanges/test/sda/publish' \ -H 'Content-Type: application/json;charset=UTF-8' \ - --data-binary "$(echo '{ - "vhost":"test", - "name":"sda", - "properties":{ - "delivery_mode":2, - "correlation_id":"CORRID", - "content_encoding":"UTF-8", - "content_type":"application/json" - }, - "routing_key":"files", - "payload_encoding":"string", - "payload":"{ - \"type\":\"mapping\", - \"dataset_id\":\"DATASET\", - \"accession_ids\":[\"ACCESSIONID\"]}" - }' | sed -e "s/DATASET/$dataset/" -e "s/ACCESSIONID/$access/" -e "s/CORRID/$count/")" + --data-binary "$(echo '{"vhost":"test","name":"sda","properties":{"delivery_mode":2,"correlation_id":"CORRID","content_encoding":"UTF-8","content_type":"application/json"},"routing_key":"files","payload_encoding":"string","payload":"{\"type\":\"mapping\",\"dataset_id\":\"DATASET\",\"accession_ids\":[\"ACCESSIONID\"]}"}' | sed -e "s/DATASET/$dataset/" -e "s/ACCESSIONID/$access/" -e "s/CORRID/$count/")" RETRY_TIMES=0 dbcheck='' From db2d7db99e6f2350ffdcadb1778281773a65fcc2 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Wed, 30 Nov 2022 10:02:05 +0100 Subject: [PATCH 21/23] Check if dataset already is registered --- cmd/syncapi/syncapi.go | 14 +++++++++++++- cmd/syncapi/syncapi_test.go | 16 +++++++++++++--- internal/database/db.go | 32 ++++++++++++++++++++++++++++++++ internal/database/db_test.go | 18 ++++++++++++++++-- 4 files changed, 74 insertions(+), 6 deletions(-) diff --git a/cmd/syncapi/syncapi.go b/cmd/syncapi/syncapi.go index 0642e2a2..ee16df22 100644 --- a/cmd/syncapi/syncapi.go +++ b/cmd/syncapi/syncapi.go @@ -229,7 +229,11 @@ func dataset(w http.ResponseWriter, r *http.Request) { } if err := parseDatasetMessage(b); err != nil { - w.WriteHeader(http.StatusInternalServerError) + if err.Error() == "Dataset exists" { + w.WriteHeader(http.StatusAlreadyReported) + } else { + w.WriteHeader(http.StatusInternalServerError) + } } w.WriteHeader(http.StatusOK) @@ -240,6 +244,14 @@ func parseDatasetMessage(msg []byte) error { blob := syncDataset{} _ = json.Unmarshal(msg, &blob) + ds, err := Conf.API.DB.CheckIfDatasetExists(blob.DatasetID) + if err != nil { + return fmt.Errorf("Failed to check dataset existance: Reason %v", err) + } + if ds { + return fmt.Errorf("Dataset exists") + } + var accessionIDs []string for _, files := range blob.DatasetFiles { ingest := common.Ingest{ diff --git a/cmd/syncapi/syncapi_test.go b/cmd/syncapi/syncapi_test.go index 87dcadb6..dbb531ec 100644 --- a/cmd/syncapi/syncapi_test.go +++ b/cmd/syncapi/syncapi_test.go @@ -206,14 +206,25 @@ func TestDatasetRoute(t *testing.T) { if err != nil { t.Skip("skip TestShutdown since broker not present") } - assert.NoError(t, err) + Conf.Database = database.DBConf{ + Host: "localhost", + Port: 5432, + User: "postgres", + Password: "postgres", + Database: "lega", + SslMode: "disable", + } + Conf.API.DB, err = database.NewDB(Conf.Database) + if err != nil { + t.Skip("skip TestShutdown since broker not present") + } r := mux.NewRouter() r.HandleFunc("/dataset", dataset) ts := httptest.NewServer(r) defer ts.Close() - goodJSON := []byte(`{"user":"test.user@example.com", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e7", "dataset_files": [{"filepath": "inbox/user/file1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`) + goodJSON := []byte(`{"user":"test.user@example.com", "dataset_id": "cd532362-e06e-4460-8490-b9ce64b8d9e6", "dataset_files": [{"filepath": "inbox/user/file1.c4gh","file_id": "5fe7b660-afea-4c3a-88a9-3daabf055ebb", "sha256": "82E4e60e7beb3db2e06A00a079788F7d71f75b61a4b75f28c4c942703dabb6d6"}, {"filepath": "inbox/user/file2.c4gh","file_id": "ed6af454-d910-49e3-8cda-488a6f246e76", "sha256": "c967d96e56dec0f0cfee8f661846238b7f15771796ee1c345cae73cd812acc2b"}]}`) good, err := http.Post(ts.URL+"/dataset", "application/json", bytes.NewBuffer(goodJSON)) assert.NoError(t, err) assert.Equal(t, http.StatusOK, good.StatusCode) @@ -262,7 +273,6 @@ func TestBuildJSON(t *testing.T) { if err != nil { t.Skip("skip TestShutdown since broker not present") } - assert.NoError(t, err) db := Conf.API.DB.DB diff --git a/internal/database/db.go b/internal/database/db.go index 26e432d8..3fcac10c 100644 --- a/internal/database/db.go +++ b/internal/database/db.go @@ -26,6 +26,7 @@ type Database interface { MarkReady(accessionID, user, filepath, checksum string) error GetArchived(user, filepath, checksum string) (string, int, error) GetSyncData(accessionID string) (SyncData, error) + CheckIfDatasetExists(datasetID string) (bool, error) Close() } @@ -503,3 +504,34 @@ func (dbs *SQLdb) getSyncData(accessionID string) (SyncData, error) { return data, nil } + +// CheckIfDatasetExists checks if a dataset already is registered +func (dbs *SQLdb) CheckIfDatasetExists(datasetID string) (bool, error) { + var ( + ds bool + err error + ) + + for count := 1; count <= dbRetryTimes; count++ { + ds, err = dbs.checkIfDatasetExists(datasetID) + if err == nil { + break + } + time.Sleep(time.Duration(math.Pow(3, float64(count))) * time.Second) + } + + return ds, err +} + +// getSyncData is the actual function performing work for GetSyncData +func (dbs *SQLdb) checkIfDatasetExists(datasetID string) (bool, error) { + dbs.checkAndReconnectIfNeeded() + + const query = "SELECT EXISTS(SELECT id from local_ega_ebi.filedataset WHERE dataset_stable_id = $1);" + var yesNo bool + if err := dbs.DB.QueryRow(query, datasetID).Scan(&yesNo); err != nil { + return yesNo, err + } + + return yesNo, nil +} diff --git a/internal/database/db_test.go b/internal/database/db_test.go index aea19a21..f0d987a5 100644 --- a/internal/database/db_test.go +++ b/internal/database/db_test.go @@ -536,8 +536,8 @@ func TestClose(t *testing.T) { func TestGetSyncData(t *testing.T) { r := sqlTesterHelper(t, func(mock sqlmock.Sqlmock, testDb *SQLdb) error { - mock.ExpectQuery("SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = \\$1 AND status = 'READY'"). - WithArgs("accessionId").WillReturnRows(sqlmock.NewRows([]string{"elixir_id", "inbox_path", "decrypted_file_checksum"}).AddRow("dummy", "/file/paht", "abc123")) + mock.ExpectQuery("SELECT elixir_id, inbox_path, decrypted_file_checksum from local_ega.files WHERE stable_id = \\$1 AND status = 'READY';"). + WithArgs("accessionId").WillReturnRows(sqlmock.NewRows([]string{"elixir_id", "inbox_path", "decrypted_file_checksum"}).AddRow("dummy", "/file/path", "abc123")) s, err := testDb.GetSyncData("accessionId") assert.Equal(t, "dummy", s.User) @@ -547,3 +547,17 @@ func TestGetSyncData(t *testing.T) { assert.Nil(t, r, "GetSyncData failed unexpectedly") } + +func TestCheckIfDatasetExists(t *testing.T) { + r := sqlTesterHelper(t, func(mock sqlmock.Sqlmock, testDb *SQLdb) error { + mock.ExpectQuery("SELECT EXISTS\\(SELECT id from local_ega_ebi.filedataset WHERE dataset_stable_id = \\$1\\);"). + WithArgs("datasetID").WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow("true")) + + s, err := testDb.checkIfDatasetExists("datasetID") + assert.True(t, s) + + return err + }) + + assert.Nil(t, r, "GetSyncData failed unexpectedly") +} From a65aee2d8819c1af2fabd8161754e2adfdec053c Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Wed, 30 Nov 2022 10:02:15 +0100 Subject: [PATCH 22/23] Documentation --- cmd/syncapi/sync.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 cmd/syncapi/sync.md diff --git a/cmd/syncapi/sync.md b/cmd/syncapi/sync.md new file mode 100644 index 00000000..b4cd43cb --- /dev/null +++ b/cmd/syncapi/sync.md @@ -0,0 +1,18 @@ +# sda-pipeline: sync + +The sync service is used exclusively in the [Bigpicture](https://bigpicture.eu/) project. + +## Service Description + +The sync service facilitates replication of data and metadata between the nodes in the consortium. + +When enabled the service will perform the following tasks: + +1. Read messages from the configured queue (sent by the mapper service upon succesful completion of a dataset maping). + 1. Generate a JSON blob with the required file and dataset information required to start and complete ingestion of a dataset on the recieving node. + 2. Send the JSON blob as POST request to the recieving partner. +2. Upon recieving a POST request with JSON data to the `/dataset` route. + 1. Parse the JSON blob and check if dataset is already registered, exit if true. + 2. Build and send messages to start ingestion of files. + 3. Build and send messages to assign stableIDs to files. + 4. Build and send messages to map files to a dataset. From f5c649cea452fadff1423a329918160f3e6492c9 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Fri, 2 Dec 2022 08:53:46 +0100 Subject: [PATCH 23/23] Enable separate external/internal location --- internal/config/config.go | 14 +++++++++----- internal/config/config_test.go | 1 + 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 8b061fb4..910f18d4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -117,7 +117,7 @@ func NewConfig(app string) (*Config, error) { } case "sync": requiredConfVars = []string{ - "broker.host", "broker.port", "broker.user", "broker.password", "broker.routingkey", "db.host", "db.port", "db.user", "db.password", "db.database", "sync.host", "sync.password", "sync.user", + "broker.host", "broker.port", "broker.user", "broker.password", "broker.routingkey", } default: requiredConfVars = []string{ @@ -234,9 +234,11 @@ func NewConfig(app string) (*Config, error) { return c, nil case "sync": - err = c.configDatabase() - if err != nil { - return nil, err + if viper.IsSet("db.host") { + err = c.configDatabase() + if err != nil { + return nil, err + } } err = c.configAPI() @@ -244,7 +246,9 @@ func NewConfig(app string) (*Config, error) { return nil, err } - c.configSync() + if viper.IsSet("sync.host") { + c.configSync() + } return c, nil } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index cca51056..bed3715e 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -743,6 +743,7 @@ func (suite *TestSuite) TestNotifyConfiguration() { func (suite *TestSuite) TestSyncConfiguration() { // At this point we should fail because we lack configuration + viper.Reset() config, err := NewConfig("sync") assert.Error(suite.T(), err) assert.Nil(suite.T(), config)