Skip to content

Commit

Permalink
Add test for a union all query
Browse files Browse the repository at this point in the history
  • Loading branch information
trueleo committed Dec 1, 2023
1 parent 1783fa2 commit a595970
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 20 deletions.
31 changes: 11 additions & 20 deletions quest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,10 @@ func TestSmokeCreateStream(t *testing.T) {
}

func TestSmokeIngestEventsToStream(t *testing.T) {
cmd := exec.Command("flog", "-f", "json", "-n", "50")
var out strings.Builder
cmd.Stdout = &out
err := cmd.Run()
require.NoErrorf(t, err, "Failed to run flog: %s", err)

for _, obj := range strings.SplitN(out.String(), "\n", 50) {
var payload strings.Builder
payload.WriteRune('[')
payload.WriteString(obj)
payload.WriteRune(']')

req, _ := NewGlob.Client.NewRequest("POST", "ingest", bytes.NewBufferString(payload.String()))
req.Header.Add("X-P-Stream", NewGlob.Stream)
response, err := NewGlob.Client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s resp %s", response.Status, readAsString(response.Body))
}

RunFlog(t, NewGlob.Stream)
QueryLogStreamCount(t, NewGlob.Client, NewGlob.Stream, 50)
AssertStreamSchema(t, NewGlob.Client, NewGlob.Stream, FlogJsonSchema)
DeleteStream(t, NewGlob.Client, NewGlob.Stream)
Sleep()
}

func TestSmokeLoadWithK6Stream(t *testing.T) {
Expand All @@ -97,6 +78,16 @@ func TestSmokeLoadWithK6Stream(t *testing.T) {
AssertStreamSchema(t, NewGlob.Client, NewGlob.Stream, SchemaBody)
}

func TestSmokeQueryTwoStreams(t *testing.T) {
stream1 := NewGlob.Stream + "1"
stream2 := NewGlob.Stream + "2"
RunFlog(t, stream1)
RunFlog(t, stream2)
QueryTwoLogStreamCount(t, NewGlob.Client, stream1, stream2, 100)
DeleteStream(t, NewGlob.Client, stream1)
DeleteStream(t, NewGlob.Client, stream2)
}

func TestSmokeSetAlert(t *testing.T) {
req, _ := NewGlob.Client.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/alert", strings.NewReader(AlertBody))
response, err := NewGlob.Client.Do(req)
Expand Down
42 changes: 42 additions & 0 deletions test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io"
"os/exec"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -61,6 +62,27 @@ func DeleteStream(t *testing.T, client HTTPClient, stream string) {
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status)
}

func RunFlog(t *testing.T, stream string) {
cmd := exec.Command("flog", "-f", "json", "-n", "50")
var out strings.Builder
cmd.Stdout = &out
err := cmd.Run()
require.NoErrorf(t, err, "Failed to run flog: %s", err)

for _, obj := range strings.SplitN(out.String(), "\n", 50) {
var payload strings.Builder
payload.WriteRune('[')
payload.WriteString(obj)
payload.WriteRune(']')

req, _ := NewGlob.Client.NewRequest("POST", "ingest", bytes.NewBufferString(payload.String()))
req.Header.Add("X-P-Stream", stream)
response, err := NewGlob.Client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s resp %s", response.Status, readAsString(response.Body))
}
}

func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) {
// Query last 10 minutes of data only
endTime := time.Now().Format(time.RFC3339)
Expand All @@ -81,6 +103,26 @@ func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count u
require.Equalf(t, expected, body, "Query count incorrect; Expected %s, Actual %s", expected, body)
}

func QueryTwoLogStreamCount(t *testing.T, client HTTPClient, stream1 string, stream2 string, count uint64) {
// Query last 10 minutes of data only
endTime := time.Now().Format(time.RFC3339)
startTime := time.Now().Add(-10 * time.Minute).Format(time.RFC3339)

query := map[string]interface{}{
"query": fmt.Sprintf("select sum(c) as count from (select count(*) as c from %s union all select count(*) as c from %s)", stream1, stream2),
"startTime": startTime,
"endTime": endTime,
}
queryJSON, _ := json.Marshal(query)
req, _ := client.NewRequest("POST", "query", bytes.NewBuffer(queryJSON))
response, err := client.Do(req)
require.NoErrorf(t, err, "Request failed: %s", err)
body := readAsString(response.Body)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
expected := fmt.Sprintf(`[{"count":%d}]`, count)
require.Equalf(t, expected, body, "Query count incorrect; Expected %s, Actual %s", expected, body)
}

func AssertStreamSchema(t *testing.T, client HTTPClient, stream string, schema string) {
req, _ := client.NewRequest("GET", "logstream/"+stream+"/schema", nil)
response, err := client.Do(req)
Expand Down

0 comments on commit a595970

Please sign in to comment.