diff --git a/quest_test.go b/quest_test.go index 64ac600..d4725db 100644 --- a/quest_test.go +++ b/quest_test.go @@ -87,6 +87,32 @@ func TestTimePartition_IncorrectDateTimeFormatTimePartitionInLog(t *testing.T) { DeleteStream(t, NewGlob.Client, historicalStream) } +func TestLoadStreamBatchWithK6_StaticSchema(t *testing.T) { + if NewGlob.Mode == "load" { + staticSchemaStream := NewGlob.Stream + "static_schema" + staticSchemaFlagHeader := map[string]string{"X-P-Static-Schema-Flag": "true"} + CreateStreamWithSchemaBody(t, NewGlob.Client, staticSchemaStream, staticSchemaFlagHeader) + cmd := exec.Command("k6", + "run", + "-e", fmt.Sprintf("P_URL=%s", NewGlob.Url.String()), + "-e", fmt.Sprintf("P_USERNAME=%s", NewGlob.Username), + "-e", fmt.Sprintf("P_PASSWORD=%s", NewGlob.Password), + "-e", fmt.Sprintf("P_STREAM=%s", staticSchemaStream), + "-e", fmt.Sprintf("P_SCHEMA_COUNT=%s", schema_count), + "-e", fmt.Sprintf("P_EVENTS_COUNT=%s", schema_count), + "./scripts/load_batch_events.js", + "--vus=", vus, + "--duration=", duration) + + cmd.Run() + op, err := cmd.Output() + if err != nil { + t.Log(err) + } + t.Log(string(op)) + DeleteStream(t, NewGlob.Client, staticSchemaStream) + } +} func TestSmokeQueryTwoStreams(t *testing.T) { stream1 := NewGlob.Stream + "1" stream2 := NewGlob.Stream + "2" diff --git a/test_utils.go b/test_utils.go index 58802d5..08248e7 100644 --- a/test_utils.go +++ b/test_utils.go @@ -82,6 +82,96 @@ func CreateStreamWithHeader(t *testing.T, client HTTPClient, stream string, head require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status) } +func CreateStreamWithSchemaBody(t *testing.T, client HTTPClient, stream string, header map[string]string) { + var schema_payload string = `{ + "fields":[ + { + "name": "source_time", + "data_type": "string" + }, + { + "name": "level", + "data_type": "string" + }, + { + "name": "message", + "data_type": "string" + }, + { + "name": "version", + "data_type": "string" + }, + { + "name": "user_id", + "data_type": "int" + }, + { + "name": "device_id", + "data_type": "int" + }, + { + "name": "session_id", + "data_type": "string" + }, + { + "name": "os", + "data_type": "string" + }, + { + "name": "host", + "data_type": "string" + }, + { + "name": "uuid", + "data_type": "string" + }, + { + "name": "location", + "data_type": "string" + }, + { + "name": "timezone", + "data_type": "string" + }, + { + "name": "user_agent", + "data_type": "string" + }, + { + "name": "runtime", + "data_type": "string" + }, + { + "name": "request_body", + "data_type": "string" + }, + { + "name": "status_code", + "data_type": "int" + }, + { + "name": "response_time", + "data_type": "int" + }, + { + "name": "process_id", + "data_type": "int" + }, + { + "name": "app_meta", + "data_type": "string" + } + ] + }` + req, _ := client.NewRequest("PUT", "logstream/"+stream, bytes.NewBufferString(schema_payload)) + for k, v := range header { + req.Header.Add(k, v) + } + response, err := client.Do(req) + require.NoErrorf(t, err, "Request failed: %s", err) + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status) +} + func DeleteStream(t *testing.T, client HTTPClient, stream string) { req, _ := client.NewRequest("DELETE", "logstream/"+stream, nil) response, err := client.Do(req)