Skip to content

Commit

Permalink
Switch to new provenance schema
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Jan 4, 2024
1 parent feea265 commit 839e454
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 205 deletions.
163 changes: 40 additions & 123 deletions web/filesdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"database/sql"
"errors"
"fmt"
"log"
"strings"
"time"
Expand Down Expand Up @@ -129,8 +128,8 @@ func execute(tx *sql.Tx, stm string, args ...interface{}) ([]Record, error) {
return records, nil
}

// FindMetaID finds dataset attributes
func FindMetaID(stmt string, args ...interface{}) (string, error) {
// findDID finds dataset attributes
func findDID(stmt string, args ...interface{}) (string, error) {
var did string
err := FilesDB.QueryRow(stmt, args...).Scan(&did)
if err == nil {
Expand All @@ -139,37 +138,17 @@ func FindMetaID(stmt string, args ...interface{}) (string, error) {
return did, errors.New("Unable to find id")
}

// FindID finds dataset attributes
func FindID(stmt string, args ...interface{}) (int64, error) {
var did int64
err := FilesDB.QueryRow(stmt, args...).Scan(&did)
if err == nil {
return did, nil
}
return did, errors.New("Unable to find id")
}

// InsertFiles insert given files into FilesDB
func InsertFiles(did, dataset, path string) error {
// look-up files for given path
files := FindFiles(path)

// dataset is a /cycle/beamline/BTR/sample
arr := strings.Split(dataset, "/")
if len(arr) != 5 {
return errors.New(fmt.Sprintf("ERROR: unable to parse given dataset %s", dataset))
}
cycle := arr[1]
beamline := arr[2]
btr := arr[3]
sample := arr[4]
log.Printf("InsertFiles: parse dataset=%s to cycle=%s beamline=%s btr=%s sample=%s", dataset, cycle, beamline, btr, sample)
log.Printf("did=%s type=%T", did, did)
log.Printf("InsertFiles: dataset=%s did=%s", dataset, did)

// check if we have already our dataset in DB
dstmt := "SELECT meta_id FROM datasets JOIN cycles ON datasets.cycle_id=cycles.cycle_id JOIN btrs ON datasets.btr_id=btrs.btr_id JOIN beamlines ON datasets.beamline_id=beamlines.beamline_id JOIN samples ON dataset.sample_id=samples.sample_id WHERE beamlines.name=? and btrs.name=? and cycles.name=? and samaples.name=?"
metaID, e := FindMetaID(dstmt, cycle, beamline, btr, sample)
if e == nil && metaID == did {
dstmt := "SELECT did FROM metadata M JOIN datasets D ON M.meta_id=D.meta_id WHERE D.dataset=? AND M.did=?"
DID, e := findDID(dstmt, dataset, did)
if e == nil && DID == did {
return nil
}
log.Println("proceed with insert")
Expand All @@ -182,98 +161,57 @@ func InsertFiles(did, dataset, path string) error {
}
defer tx.Rollback()

var res []Record
// main attributes
var stmt string
// insert main attributes
stmt = "INSERT INTO cycles (name) VALUES (?)"
_, err = tx.Exec(stmt, cycle)
if err != nil && !strings.Contains(err.Error(), "UNIQUE") {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, cycle, err)
return tx.Rollback()
}
stmt = "INSERT INTO beamlines (name) VALUES (?)"
_, err = tx.Exec(stmt, beamline)
if err != nil && !strings.Contains(err.Error(), "UNIQUE") {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, beamline, err)
return tx.Rollback()
}
stmt = "INSERT INTO btrs (name) VALUES (?)"
_, err = tx.Exec(stmt, btr)
if err != nil && !strings.Contains(err.Error(), "UNIQUE") {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, btr, err)
return tx.Rollback()
}
stmt = "INSERT INTO samples (name) VALUES (?)"
_, err = tx.Exec(stmt, sample)
if err != nil && !strings.Contains(err.Error(), "UNIQUE") {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, sample, err)
return tx.Rollback()
}

// select main attributes ids
var rec Record
var res []Record
create_at := time.Now().Unix()
modify_at := time.Now().Unix()
create_by := "MetaData server"
modify_by := "MetaData server"

stmt = "SELECT cycle_id FROM cycles WHERE name=?"
res, err = execute(tx, stmt, cycle)
if err != nil {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, cycle, err)
return tx.Rollback()
}
rec = res[0]
cycleId := rec["cycle_id"].(int64)

stmt = "SELECT beamline_id FROM beamlines WHERE name=?"
res, err = execute(tx, stmt, beamline)
if err != nil {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, beamline, err)
return tx.Rollback()
}
rec = res[0]
beamlineId := rec["beamline_id"].(int64)

stmt = "SELECT btr_id FROM btrs WHERE name=?"
res, err = execute(tx, stmt, btr)
if err != nil {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, btr, err)
// insert main attributes
stmt = "INSERT INTO metadata (did,create_at,create_by,modify_at,modify_by) VALUES (?,?,?,?,?)"
_, err = tx.Exec(stmt, did, create_at, create_by, modify_at, modify_by)
if err != nil && !strings.Contains(err.Error(), "UNIQUE") {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, did, err)
return tx.Rollback()
}
rec = res[0]
btrId := rec["btr_id"].(int64)
log.Println("### inserted into metadata")

stmt = "SELECT sample_id FROM samples WHERE name=?"
res, err = execute(tx, stmt, sample)
stmt = "SELECT meta_id FROM metadata WHERE did=?"
res, err = execute(tx, stmt, did)
if err != nil {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, sample, err)
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, did, err)
return tx.Rollback()
}
rec = res[0]
sampleId := rec["sample_id"].(int64)
metaId := rec["meta_id"].(int64)

// insert data into datasets table
tstamp := time.Now().UnixNano()
stmt = "INSERT INTO datasets (meta_id,cycle_id,beamline_id,btr_id,sample_id,tstamp) VALUES (?, ?, ?, ?, ?, ?)"
_, err = tx.Exec(stmt, did, cycleId, beamlineId, btrId, sampleId, tstamp)
// insert main attributes
stmt = "INSERT INTO datasets (dataset,meta_id,create_at,create_by,modify_at,modify_by) VALUES (?,?,?,?,?,?)"
_, err = tx.Exec(stmt, dataset, metaId, create_at, create_by, modify_at, modify_by)
if err != nil && !strings.Contains(err.Error(), "UNIQUE") {
log.Printf("ERROR: unable to execute %s, metaId=%v, cycleId=%v, beamlineId=%v, btrId=%v, sampleId=%v, tstamp=%v, error=%v", stmt, did, cycleId, beamlineId, btrId, sampleId, tstamp, err)
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, dataset, err)
return tx.Rollback()
}

// get back dataset id
stmt = "SELECT dataset_id FROM datasets WHERE meta_id=?"
res, err = execute(tx, stmt, did)
// select main attributes ids
stmt = "SELECT dataset_id FROM datasets WHERE dataset=?"
res, err = execute(tx, stmt, dataset)
if err != nil {
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, did, err)
log.Printf("ERROR: unable to execute %s with %v, error=%v", stmt, dataset, err)
return tx.Rollback()
}
rec = res[0]
datasetID := rec["dataset_id"].(int64)
datasetId := rec["dataset_id"].(int64)

// insert files info
for _, name := range files {
stmt = "INSERT INTO files (dataset_id,name) VALUES (?,?)"
_, err = tx.Exec(stmt, datasetID, name)
for _, fname := range files {
stmt = "INSERT INTO files (file,meta_id,dataset_id,create_at,create_by,modify_at,modify_by) VALUES (?,?,?,?,?,?,?)"
_, err = tx.Exec(stmt, fname, metaId, datasetId, create_at, create_by, modify_at, modify_by)
if err != nil && !strings.Contains(err.Error(), "UNIQUE") {
log.Printf("ERROR: unable to execute %s with did=%v name=%s error=%v", stmt, did, name, err)
log.Printf("ERROR: unable to execute %s with did=%v name=%s error=%v", stmt, did, fname, err)
return tx.Rollback()
}
}
Expand All @@ -298,7 +236,7 @@ func getFiles(did string) ([]string, error) {
defer tx.Rollback()
// look-up files info
// stmt := "SELECT name FROM files WHERE meta_id=?"
stmt := "SELECT F.name FROM DATASETS D JOIN files F ON D.dataset_id=F.dataset_id WHERE D.meta_id=?"
stmt := "SELECT F.file FROM files F JOIN metadata M ON M.meta_id=F.meta_id WHERE M.did=?"
res, err := tx.Query(stmt, did)
if err != nil {
log.Printf("ERROR: unable to execute %s, error=%v", stmt, err)
Expand Down Expand Up @@ -344,26 +282,6 @@ func getTableNames(tname string) ([]string, error) {
return out, nil
}

// helper function to get list of cycles
func getCycles() ([]string, error) {
return getTableNames("CYCLES")
}

// helper function to get list of beamlines
func getBeamlines() ([]string, error) {
return getTableNames("BEAMLINES")
}

// helper function to get list of btrs
func getBtrs() ([]string, error) {
return getTableNames("BTRS")
}

// helper function to get list of samples
func getSamples() ([]string, error) {
return getTableNames("SAMPLES")
}

// helper function to get list of datasets
func getDatasets() ([]string, error) {
var out []string
Expand All @@ -375,21 +293,20 @@ func getDatasets() ([]string, error) {
}
defer tx.Rollback()
// dataset is a /cycle/beamline/BTR/sample
stmt := "SELECT C.name, B.name, BT.name, S.name FROM DATASETS D JOIN CYCLES C ON C.cycle_id=D.cycle_id JOIN BEAMLINES B ON B.beamline_id=D.beamline_id JOIN BTRS BT ON BT.btr_id=D.btr_id JOIN SAMPLES S ON S.sample_id=D.sample_id"
stmt := "SELECT D.dataset FROM DATASETS D"
res, err := tx.Query(stmt)
if err != nil {
log.Printf("ERROR: unable to execute %s, error=%v", stmt, err)
return out, tx.Rollback()
}
for res.Next() {
var cname, bname, btname, sname string
err = res.Scan(&cname, &bname, &btname, &sname)
var dataset string
err = res.Scan(&dataset)
if err != nil {
log.Printf("ERROR: unable to scan error=%v", err)
return out, tx.Rollback()
}
d := fmt.Sprintf("/%s/%s/%s/%s", cname, bname, btname, sname)
out = append(out, d)
out = append(out, dataset)
}
return out, nil
}
23 changes: 0 additions & 23 deletions web/filesdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,6 @@ func TestFilesDB(t *testing.T) {
t.Fatal(err)
}

// check if btr/beamline/sample tables do not expand
samples, err := getSamples()
if err != nil {
t.Fatal(err)
}
if len(samples) != 1 {
t.Fatal("wrong number of samples")
}
btrs, err := getBtrs()
if err != nil {
t.Fatal(err)
}
if len(btrs) != 1 {
t.Fatal("wrong number of btrs")
}
beamlines, err := getBeamlines()
if err != nil {
t.Fatal(err)
}
if len(beamlines) != 1 {
t.Fatal("wrong number of beamlines")
}

// get list of datasets
dsets, err := getDatasets()
if err != nil {
Expand Down
88 changes: 59 additions & 29 deletions web/schemas/mysql.sql
Original file line number Diff line number Diff line change
@@ -1,35 +1,65 @@
CREATE TABLE cycles (
cycle_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE
CREATE TABLE PROCESSING (
PROCESSING_ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
PROCESSING VARCHAR(700) NOT NULL UNIQUE,
CREATE_AT INTEGER,
CREATE_BY VARCHAR(500),
MODIFY_AT INTEGER,
MODIFY_BY VARCHAR(500)
);

CREATE TABLE beamlines (
beamline_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE
CREATE TABLE PARENTS (
PARENT_ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
PARENT VARCHAR(700) NOT NULL UNIQUE,
CREATE_AT INTEGER,
CREATE_BY VARCHAR(500),
MODIFY_AT INTEGER,
MODIFY_BY VARCHAR(500)
);

CREATE TABLE btrs (
btr_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE
CREATE TABLE SITES (
SITE_ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
SITE VARCHAR(700) NOT NULL UNIQUE,
CREATE_AT INTEGER,
CREATE_BY VARCHAR(500),
MODIFY_AT INTEGER,
MODIFY_BY VARCHAR(500)
);

CREATE TABLE samples (
sample_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE
CREATE TABLE BUCKETS (
BUCKET_ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
BUCKET VARCHAR(700) NOT NULL UNIQUE,
META_ID VARCHAR(700),
DATASET_ID BIGINT REFERENCES DATASETS(DATASET_ID) ON UPDATE CASCADE,
CREATE_AT INTEGER,
CREATE_BY VARCHAR(500),
MODIFY_AT INTEGER,
MODIFY_BY VARCHAR(500)
);

CREATE TABLE datasets (
dataset_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
meta_id VARCHAR(500) NOT NULL UNIQUE,
cycle_id INTEGER REFERENCES cycles(cycle_id) ON UPDATE CASCADE,
beamline_id INTEGER REFERENCES beamlines(beamline_id) ON UPDATE CASCADE,
btr_id INTEGER REFERENCES btrs(btr_id) ON UPDATE CASCADE,
sample_id INTEGER REFERENCES sample(sample_id) ON UPDATE CASCADE,
tstamp INTEGER NOT NULL UNIQUE
CREATE TABLE METADATA (
META_ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
DID VARCHAR(700) NOT NULL UNIQUE,
CREATE_AT INTEGER,
CREATE_BY VARCHAR(500),
MODIFY_AT INTEGER,
MODIFY_BY VARCHAR(500)
);

CREATE TABLE files (
file_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
dataset_id BIGINT REFERENCES datasets(dataset_id) ON UPDATE CASCADE,
name VARCHAR(100) NOT NULL
CREATE TABLE DATASETS (
DATASET_ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
DATASET VARCHAR(700) NOT NULL UNIQUE,
META_ID BIGINT REFERENCES METADATA(META_ID) ON UPDATE CASCADE,
SITE_ID BIGINT REFERENCES SITES(SITE_ID) ON UPDATE CASCADE,
PROCESSING_ID BIGINT REFERENCES PROCESSINGS(PROCESSING_ID) ON UPDATE CASCADE,
PARENT_ID BIGINT REFERENCES PARENTS(PARENT_ID) ON UPDATE CASCADE,
CREATE_AT INTEGER,
CREATE_BY VARCHAR(500),
MODIFY_AT INTEGER,
MODIFY_BY VARCHAR(500)
);
CREATE TABLE FILES (
FILE_ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
FILE VARCHAR(700) NOT NULL UNIQUE,
IS_FILE_VALID INTEGER DEFAULT 1,
META_ID BIGINT REFERENCES METADATA(META_ID) ON UPDATE CASCADE,
DATASET_ID BIGINT REFERENCES DATASETS(DATASET_ID) ON UPDATE CASCADE,
CREATE_AT INTEGER,
CREATE_BY VARCHAR(500),
MODIFY_AT INTEGER,
MODIFY_BY VARCHAR(500)
);
Loading

0 comments on commit 839e454

Please sign in to comment.