diff --git a/Dockerfile b/Dockerfile index 12128fc..e779e85 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,8 @@ ENV PATH="/go/bin:${PATH}" ADD ./ /go/src/github.com/bmeg/grip-graphql WORKDIR /go/src/github.com/bmeg/grip-graphql -RUN go install github.com/bmeg/grip@v0.0.0-20241119230816-a7f6fdd48051 + +RUN go install github.com/bmeg/grip@v0.0.0-20241122195322-515a682d56f7 RUN go build --buildmode=plugin ./graphql_gen3 RUN go build --buildmode=plugin ./gen3_writer RUN go build --buildmode=plugin ./grip-graphql-endpoint diff --git a/gen3_writer/fixtures/compbio-examples-fhir/invalid_json.ndjson b/gen3_writer/fixtures/compbio-examples-fhir/invalid_json.ndjson new file mode 100644 index 0000000..2605a3f --- /dev/null +++ b/gen3_writer/fixtures/compbio-examples-fhir/invalid_json.ndjson @@ -0,0 +1,16 @@ +dsfdsf +{"reso +urceType":"Observation","id":"de328d36-2db2-43cd-b0fe-92f0e8302726","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"2069-3","display":"Chloride"}],"text":"Chloride"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/fb61addc-09d0-4546-a934-b5d5ce16d5e7"},"effectiveDateTime":"2012-12-15T14:40:53-05:00","issued":"2012-12-15T14:40:53.188-05:00","valueQuantity":{"value":102.2,"unit":"mmol/L","system":"http://unitsofmeasure.org","code":"mmol/L"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]} +{"resourceType":"Observation","id":"e257467d-a850-4ed8-ab1a-eddd6c49f111","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/StructureDefinition/vitalsigns","http://hl7.org/fhir/StructureDefinition/bp"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"vital-signs","display":"vital-signs"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"85354-9","display":"Blood Pressure"}],"text":"Blood Pressure"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/b9200600-e594-46cb-8a00-7fdb47792665"},"effectiveDateTime":"2013-12-21T14:40:53-05:00","issued":"2013-12-21T14:40:53.188-05:00","component":[{"code":{"coding":[{"system":"http://loinc.org","code":"8462-4","display":"Diastolic Blood Pressure"}],"text":"Diastolic Blood Pressure"},"valueQuantity":{"value":83,"unit":"mm[Hg]","system":"http://unitsofmeasure.org","code":"mm[Hg]"}},{"code":{"coding":[{"system":"http://loinc.org","code":"8480-6","display":"Systolic Blood Pressure"}],"text":"Systolic Blood Pressure"},"valueQuantity":{"value":153,"unit":"mm[Hg]","system":"http://unitsofmeasure.org","code":"mm[Hg]"}}],"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]} +{"resourceType":"Observation","id":"7bbaa006-c5bb-4ee4-b8ce-3cae329dfbd2","meta":{"versionI +d":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"2339-0","display":"Glucose"}],"text":"Glucose"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/2163ebd2-7cca-4de3-b2ec-ec8b22cc5d34"},"effectiveDateTime":"2016-01-02T14:40:53-05:00","issued":"2016-01-02T14:40:53.188-05:00","valueQuantity":{"value":67.69,"unit":"mg/dL","system":"http://unitsofmeasure.org","code":"mg/dL"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]} +{"resourceType":"Observation","id":"e124ee34-83ff-4b68-9d8a-5b3a52ffe496","meta":{"versionId":"1","lastUpdated":"2020:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"49765-1","display":"Calcium"}],"text":"Calcium"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/bd62656a-5e43-46a5-9584-d2402786f7f5"},"effectiveDateTime":"2019-01-19T14:40:53-05:00","issued":"2019-01-19T14:40:53.188-05:00","valueQuantity":{"value":8.59,"unit":"mg/dL","system":"http://unitsofmeasure.org","code":"mg/dL"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]} +["value1", "value2"] +{"resourceType":"Observation","id":"e28354e9-1126-4636-bba1-e2ed2a6ec05b","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"20565-8","display":"Carbon Dioxide"}],"text":"Carbon Dioxide"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/fb61addc-09d0-4546-a934-b5d5ce16d5e7"},"effectiveDateTime":"2012-12-15T14:40:53-05:00","issued":"2012-12-15T14:40:53.188-05:00","valueQuantity":{"value":28.2,"unit":"mmol/L","system":"http://unitsofmeasure.org","code":"mmol/L"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]} +"just a string" +{"resourceType":"Observation","id":"47ad1be4-44fd-44e3-94f7-e0389838c70f","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/StructureDefinition/vitalsigns","http://hl7.org/fhir/StructureDefinition/heartrate"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"vital-signs","display":"vital-signs"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"8867-4","display":"Heart rate"}],"text":"Heart rate"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/b9200600-e594-46cb-8a00-7fdb47792665"},"effectiveDateTime":"2013-12-21T14:40:53-05:00","issued":"2013-12-21T14:40:53.188-05:00","valueQuantity":{"value":78,"unit":"/min","system":"http://unitsofmeasure.org","code":"/min"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]} +234234 +#a mix of invalid json, invalid lines, and valid lines +{"resourceType":"Observation","id":"e28354e9-1126-4636-bba1-e2ed2a6ec05b","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"20565-8","display":"Carbon Dioxide"}],"text":"Carbon Dioxide"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/fb61addc-09d0-4546-a934-b5d5ce16d5e7"},"effectiveDateTime":"2012-12-15T14:40:53-05:00","issued":"2012-12-15T14:40:53.188-05:00","valueQuantity":{"value":28.2,"unit":"mmol/L","system":"http://unitsofmeasure.org","code":"mmol/L"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]} +{ 123: "value1", true: "value2" } +{"key": { "key": { "key": { "key": { "key": { "key": "value" } } } } }} diff --git a/gen3_writer/gen3_test.go b/gen3_writer/gen3_test.go index 081f9cb..73dab4d 100644 --- a/gen3_writer/gen3_test.go +++ b/gen3_writer/gen3_test.go @@ -316,6 +316,7 @@ func Test_Json_Load_Validation_Errors(t *testing.T) { t.Error("Error: ", errors) return } + t.Log(data) if data["message"] == nil || len(data["message"].([]any)) != 2 { t.Error("Expected return message of length 2") @@ -326,6 +327,69 @@ func Test_Json_Load_Validation_Errors(t *testing.T) { } +func Test_Json_Load_Invalid_Json(t *testing.T) { + file, err := os.Open("fixtures/compbio-examples-fhir/invalid_json.ndjson") + if err != nil { + t.Error(err) + return + } + defer file.Close() + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + + part, _ := writer.CreateFormFile("file", filepath.Base("fixtures/compbio-examples-fhir/ValidationErrors.ndjson")) + io.Copy(part, file) + writer.Close() + + req := &Request{ + url: "http://localhost:8201/graphql/JSONTEST/bulk-load-raw/ohsu-test", + method: "POST", + headers: map[string]any{"Authorization": createToken(false, true, true)}, + } + + request, err := http.NewRequest(req.method, req.url, body) + if err != nil { + t.Error("Error creating request:", err) + return + } + for key, val := range req.headers { + request.Header.Set(key, val.(string)) + } + request.Header.Set("Content-Type", writer.FormDataContentType()) + + client := &http.Client{} + resp, err := client.Do(request) + if err != nil { + t.Error("Error sending request:", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Logf("server responded with status: %d", resp.StatusCode) + } + + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(resp.Body) + if err != nil { + t.Error("Error reading response:", err) + return + } + + var data map[string]interface{} + errors := json.Unmarshal([]byte(buf.String()), &data) + if errors != nil { + t.Error("Error: ", errors) + return + } + t.Log(data) + + if data["status"].(float64) != 206 { + t.Error() + } + +} + func Test_Load_Malformed_Token(t *testing.T) { /* Server returns a 400 given an unparsable token */ err, responses := bulkLoad("http://localhost:8201/graphql/TEST/bulk-load/ohsu-test", diff --git a/gen3_writer/handler.go b/gen3_writer/handler.go index c2ae1ff..09cbf3f 100644 --- a/gen3_writer/handler.go +++ b/gen3_writer/handler.go @@ -315,6 +315,7 @@ func StartMultipartForm(c *gin.Context, writer gin.ResponseWriter, request *http } return nil, conn, buf } + func (gh *Handler) AddJsonSchema(c *gin.Context) { /* This function assumes that Json schema of json format will be submitted and must be converted into grip scheam format */ writer, request, graph := getFields(c) @@ -719,6 +720,7 @@ func (gh *Handler) BulkStreamRaw(c *gin.Context) { host := "localhost:8202" var err error var res *gripql.BulkJsonEditResult + var warnings []string err = request.ParseMultipartForm(1024 * 1024 * 1024) // 10 GB limit if err != nil { @@ -736,11 +738,13 @@ func (gh *Handler) BulkStreamRaw(c *gin.Context) { conn, err := gripql.Connect(rpc.ConfigWithDefaults(host), true) wait := make(chan bool) - VertChan, err := streamJsonFromReader(reader, graph, project_id, 5) - if err != nil { - RegError(c, writer, graph, GetInternalServerErr(err)) - return - } + VertChan, warnChan := streamJsonFromReader(reader, graph, project_id, 5) + + go func() { + for warning := range warnChan { + warnings = append(warnings, warning) + } + }() go func() { err, res = conn.BulkAddRaw(VertChan) @@ -751,21 +755,15 @@ func (gh *Handler) BulkStreamRaw(c *gin.Context) { }() <-wait - if len(res.Errors) == 1 { - RegError(c, writer, graph, &middleware.ServerError{StatusCode: 500, Message: fmt.Sprintf("[500] bulk-load-raw %s", res.Errors[0])}) - return - } else if res.InsertCount == 0 && res.Errors == nil { - // This implies that no file was uploaded so EOF triggered immediately and exited - RegError(c, writer, graph, &middleware.ServerError{StatusCode: 400, Message: "[400] bulk-load-raw file of length 0 provided"}) - return - } else if len(res.Errors) > 1 { - log.WithFields(log.Fields{ - "graph": graph, - "status": 206, - }).Info(res.Errors) + nonLoadedEdges := append(res.Errors, warnings...) + if len(nonLoadedEdges) > 0 { + if res.InsertCount == 0 { + RegError(c, writer, graph, &middleware.ServerError{StatusCode: 500, Message: fmt.Sprintln("[500] bulk-load-raw", nonLoadedEdges)}) + return + } c.AbortWithStatusJSON(206, gin.H{ "status": 206, - "message": res.Errors, + "message": nonLoadedEdges, "data": nil, }) return diff --git a/gen3_writer/reader.go b/gen3_writer/reader.go index 9ff83e2..ce0ea7d 100644 --- a/gen3_writer/reader.go +++ b/gen3_writer/reader.go @@ -2,6 +2,8 @@ package main import ( "bufio" + "encoding/json" + "fmt" "io" "sync" @@ -71,10 +73,7 @@ func StreamEdgesFromReader(reader io.Reader, workers int) (chan *gripql.Edge, er if workers > 99 { workers = 99 } - lineChan, err := processReader(reader, workers) - if err != nil { - return nil, err - } + lineChan := processReader(reader, workers) edgeChan := make(chan *gripql.Edge, workers) var wg sync.WaitGroup @@ -112,10 +111,7 @@ func StreamVerticesFromReader(reader io.Reader, workers int) (chan *gripql.Verte if workers > 99 { workers = 99 } - lineChan, err := processReader(reader, workers) - if err != nil { - return nil, err - } + lineChan := processReader(reader, workers) vertChan := make(chan *gripql.Vertex, workers) var wg sync.WaitGroup @@ -145,12 +141,12 @@ func StreamVerticesFromReader(reader io.Reader, workers int) (chan *gripql.Verte return vertChan, nil } -func streamJsonFromReader(reader io.Reader, graph string, project_id string, workers int) (chan *gripql.RawJson, error) { - lineChan, err := processReader(reader, workers) - if err != nil { - return nil, err - } + +func streamJsonFromReader(reader io.Reader, graph string, project_id string, workers int) (chan *gripql.RawJson, chan string) { + lineChan := processReader(reader, workers) + vertChan := make(chan *gripql.RawJson, workers) + warnings := make(chan string, workers) var wg sync.WaitGroup jum := protojson.UnmarshalOptions{DiscardUnknown: true} @@ -159,6 +155,11 @@ func streamJsonFromReader(reader io.Reader, graph string, project_id string, wor go func() { defer wg.Done() for line := range lineChan { + if !json.Valid([]byte(line)) { + log.WithFields(log.Fields{"line": line}).Errorf("Skipping invalid JSON line") + warnings <- fmt.Sprintf("Invalid Json: %s", line) + continue + } rawData := &gripql.RawJson{ Data: &structpb.Struct{}, Graph: graph, @@ -166,7 +167,8 @@ func streamJsonFromReader(reader io.Reader, graph string, project_id string, wor } err := jum.Unmarshal([]byte(line), rawData.Data) if err != nil { - log.WithFields(log.Fields{"error": err}).Errorf("Unmarshaling vertex: %s", line) + log.WithFields(log.Fields{"error": err}).Errorf("Unmarshaling into rawData.Data: %s", line) + warnings <- fmt.Sprintf("Error: %v when unmarshaling: %s", err, line) continue } vertChan <- rawData @@ -177,12 +179,13 @@ func streamJsonFromReader(reader io.Reader, graph string, project_id string, wor go func() { wg.Wait() close(vertChan) + close(warnings) }() - return vertChan, nil + return vertChan, warnings } -func processReader(reader io.Reader, chansize int) (<-chan string, error) { +func processReader(reader io.Reader, chansize int) <-chan string { scanner := bufio.NewScanner(reader) buf := make([]byte, 0, 64*1024) maxCapacity := 16 * 1024 * 1024 @@ -200,5 +203,5 @@ func processReader(reader io.Reader, chansize int) (<-chan string, error) { close(lineChan) }() - return lineChan, nil + return lineChan } diff --git a/go.mod b/go.mod index 7e78a68..3509b51 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/bmeg/grip-graphql go 1.22.5 require ( - github.com/bmeg/grip v0.0.0-20241119230816-a7f6fdd48051 + github.com/bmeg/grip v0.0.0-20241122195322-515a682d56f7 github.com/dop251/goja v0.0.0-20240707163329-b1681fb2a2f5 github.com/gin-gonic/gin v1.8.1 github.com/golang-jwt/jwt/v5 v5.2.1 diff --git a/go.sum b/go.sum index 8d5365e..90d966e 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/bmeg/grip v0.0.0-20241119224722-821431f4728c h1:uRXx01IeIDav7jcMJaelO github.com/bmeg/grip v0.0.0-20241119224722-821431f4728c/go.mod h1:fic/942cjKIBWnZ9HKzoknVhQHWOHYkbfCoTTQLT+2w= github.com/bmeg/grip v0.0.0-20241119230816-a7f6fdd48051 h1:r5AE3WgJqdxA7DJ2U7d2KMB1kaePYz45XoBez9P+BiI= github.com/bmeg/grip v0.0.0-20241119230816-a7f6fdd48051/go.mod h1:fic/942cjKIBWnZ9HKzoknVhQHWOHYkbfCoTTQLT+2w= +github.com/bmeg/grip v0.0.0-20241122195322-515a682d56f7 h1:/y6m1fI9tZaaCy/DAgk+uZmQHdlaykx1G8rCJXQswiA= +github.com/bmeg/grip v0.0.0-20241122195322-515a682d56f7/go.mod h1:2YAjHdHHKDeUHtOui5g/xRFk6uosktkzLPFPY7n/4d8= github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad h1:ICgBexeLB7iv/IQz4rsP+MimOXFZUwWSPojEypuOaQ8= github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad/go.mod h1:ft96Irkp72C7ZrUWRenG7LrF0NKMxXdRvsypo5Njhm4= github.com/bmeg/jsonschema/v5 v5.3.4-0.20241111204732-55db82022a92 h1:Myx/j+WxfEg+P3nDaizR1hBpjKSLgvr4ydzgp1/1pAU=