Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add test coverage for hot tier #71

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
34 changes: 20 additions & 14 deletions integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ func (flog *ParquetFlog) Deref() Flog {
// - Download parquet files from the store created by Parseable for the minute
// - Compare the sent logs with the ones loaded from the downloaded parquet
func TestIntegrity(t *testing.T) {

flogs := createAndIngest(t)
parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

rowCount := len(actualFlogs)

for i, expectedFlog := range flogs {
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func createAndIngest(t *testing.T) []Flog {
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
iterations := 2
flogsPerIteration := 100
Expand Down Expand Up @@ -127,20 +146,7 @@ func TestIntegrity(t *testing.T) {
// XXX: We don't need to sleep for the entire minute, just until the next minute boundary.
}

parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

rowCount := len(actualFlogs)

for i, expectedFlog := range flogs {
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
return flogs
}

func ingestFlogs(flogs []Flog, stream string) error {
Expand Down
169 changes: 169 additions & 0 deletions quest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"strings"
Expand Down Expand Up @@ -402,6 +403,174 @@ func TestSmokeGetRetention(t *testing.T) {
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestActivateHotTier(t *testing.T) {
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
activateHotTier(t, "", true)
disableHotTier(t, false)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestActivateNonExistentHotTier(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

status, _ := activateHotTier(t, "", false)
require.NotEqualf(t, status, 200, "Hot tier was activated for a non-existent stream.")
}

func TestHotTierWithTimePartition(t *testing.T) {
time_partition_stream := NewGlob.Stream + "timepartition"
timeHeader := map[string]string{"X-P-Time-Partition": "source_time", "X-P-Time-Partition-Limit": "365d"}
CreateStreamWithHeader(t, NewGlob.QueryClient, time_partition_stream, timeHeader)

payload := StreamHotTier{
Size: "20 GiB",
}
jsonPayload, _ := json.Marshal(payload)

req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+time_partition_stream+"/hottier", bytes.NewBuffer(jsonPayload))
req.Header.Set("Content-Type", "application/json")
response, _ := NewGlob.QueryClient.Do(req)
body := readAsString(response.Body)

require.NotEqualf(t, response.StatusCode, 200, "Hot tier activation succeeded for time partition with message: %s, but was expected to fail", body)
}

func TestHotTierHugeDiskSize(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
status, err := activateHotTier(t, "1000GiB", false) // activate hot tier with huge disk size
require.NotEqualf(t, status, 200, "Hot tier was activated for a huge disk size with message: %s", err)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestHotTierIncreaseSize(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
activateHotTier(t, "", false)
status, err := activateHotTier(t, "30 GiB", false) // increase disk size
require.Equalf(t, 200, status, "Increasing disk size of hot tier failed with error: %s", err)
disableHotTier(t, false)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestHotTierDecreaseSize(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
activateHotTier(t, "", false)
status, message := activateHotTier(t, "10 GiB", false) // decrease disk size
require.NotEqualf(t, 200, status, "Decreasing disk size of hot tier should fail but succeeded with message: %s", message)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestGetNonExistentHotTier(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
getHotTierStatus(t, true)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestDisableNonExistentHotTier(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
disableHotTier(t, true)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

// create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier
func TestHotTierGetsLogs(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

// DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
createAndIngest(t)
activateHotTier(t, "", true)
time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync

htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200)
disableHotTier(t, false)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)

require.Equalf(t, htCount, `[{"count":200}]`, "Ingested 200 logs, but hot tier contains %s", htCount)
}

// create stream, ingest data for a duration, set hot tier, wait for 2-3 mins to see if all data is available in hot tier
func TestHotTierGetsLogsAfter(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

// create a stream without hot tier
createAndIngest(t)
time.Sleep(2 * 60 * time.Second)
prevCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)

// create a second stream with hot tier
createAndIngest(t)
activateHotTier(t, "", true)
time.Sleep(2 * 60 * time.Second) // wait 2 minutes for hot tier to sync

htCount := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200)
disableHotTier(t, false)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)

require.Equalf(t, prevCount, htCount, "With hot tier disabled, the count was %s but with it, the count is %s", prevCount, htCount)
}

// create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match
func TestHotTierLogCount(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

createAndIngest(t)
countBefore := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200)

activateHotTier(t, "", true)
time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync

countAfter := QueryLogStreamCount(t, NewGlob.QueryClient, NewGlob.Stream, 200)

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
require.Equalf(t, countBefore, countAfter, "Ingested %s, but hot tier contains only %s", countBefore, countAfter)
}

// create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both
func TestOldestHotTierEntry(t *testing.T) {
if NewGlob.IngestorUrl.String() == "" {
t.Skip("Skipping in standalone mode")
}

createAndIngest(t)
streamInfo := getStreamInfo(t)

activateHotTier(t, "", true)
time.Sleep(60 * 2 * time.Second)

hottier := getHotTierStatus(t, false)

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
require.Equalf(t, streamInfo.FirstEventAt, hottier.OldestDateTimeEntry, "The first event at in the stream info is %s but the oldest entry in hot tier is %s", *streamInfo.FirstEventAt, *hottier.OldestDateTimeEntry)
}

// This test calls all the User API endpoints
// in a sequence to check if they work as expected.
func TestSmoke_AllUsersAPI(t *testing.T) {
Expand Down
111 changes: 109 additions & 2 deletions test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ const (
sleepDuration = 2 * time.Second
)

type StreamHotTier struct {
Size string `json:"size"`
UsedSize *string `json:"used_size,omitempty"`
AvailableSize *string `json:"available_size,omitempty"`
OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"`
}

type StreamInfo struct {
CreatedAt string `json:"created-at"`
FirstEventAt *string `json:"first-event-at"`
CacheEnabled *bool `json:"cache_enabled"`
TimePartition *string `json:"time_partition"`
TimePartitionLimit *string `json:"time_partition_limit"`
CustomPartition *string `json:"custom_partition"`
StaticSchemaFlag *string `json:"static_schema_flag"`
}

func flogStreamFields() []string {
return []string{
"p_timestamp",
Expand Down Expand Up @@ -68,8 +85,9 @@ func Sleep() {
func CreateStream(t *testing.T, client HTTPClient, stream string) {
req, _ := client.NewRequest("PUT", "logstream/"+stream, nil)
response, err := client.Do(req)
body := readAsString(response.Body)
require.NoErrorf(t, err, "Request failed: %s", err)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s", response.Status)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s with response: %s", response.Status, body)
}

func CreateStreamWithHeader(t *testing.T, client HTTPClient, stream string, header map[string]string) {
Expand Down Expand Up @@ -245,7 +263,7 @@ func IngestOneEventForStaticSchemaStream_SameFieldsInLog(t *testing.T, client HT
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s resp %s", response.Status, readAsString(response.Body))
}

func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) {
func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count uint64) string {
// Query last 30 minutes of data only
endTime := time.Now().Add(time.Second).Format(time.RFC3339Nano)
startTime := time.Now().Add(-30 * time.Minute).Format(time.RFC3339Nano)
Expand All @@ -263,6 +281,7 @@ func QueryLogStreamCount(t *testing.T, client HTTPClient, stream string, count u
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
expected := fmt.Sprintf(`[{"count":%d}]`, count)
require.Equalf(t, expected, body, "Query count incorrect; Expected %s, Actual %s", expected, body)
return body
}

func QueryLogStreamCount_Historical(t *testing.T, client HTTPClient, stream string, count uint64) {
Expand Down Expand Up @@ -561,3 +580,91 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string)
require.Equalf(t, 403, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body))
}
}

func activateHotTier(t *testing.T, size string, verify bool) (int, string) {
if size == "" {
size = "20 GiB" // default hot tier size
}

payload := StreamHotTier{
Size: size,
}
jsonPayload, _ := json.Marshal(payload)

req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", bytes.NewBuffer(jsonPayload))
req.Header.Set("Content-Type", "application/json")
response, err := NewGlob.QueryClient.Do(req)
body := readAsString(response.Body)

if verify {
if NewGlob.IngestorUrl.String() != "" {
require.Equalf(t, 200, response.StatusCode, "Server returned unexpected http code: %s and response: %s", response.Status, body)
require.NoErrorf(t, err, "Activating hot tier failed in distributed mode: %s", err)
} else {
// right now, hot tier is unavailable in standalone so anything other than 200 is fine
require.NotEqualf(t, 200, response.StatusCode, "Hot tier has been activated in standalone mode: %s and response: %s", response.Status, body)
}
}

return response.StatusCode, body
}

func getHotTierStatus(t *testing.T, shouldFail bool) *StreamHotTier {
req, err := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil)
require.NoError(t, err, "Failed to create request")

req.Header.Set("Content-Type", "application/json")

response, err := NewGlob.QueryClient.Do(req)
require.NoError(t, err, "Failed to execute GET /hottier")
defer response.Body.Close()

body := readAsString(response.Body)

if shouldFail {
require.NotEqualf(t, 200, response.StatusCode, "Hot tier was expected to fail but succeeded with body: %s", body)
return &StreamHotTier{Size: "0"}
} else {
require.Equalf(t, 200, response.StatusCode, "GET hot tier failed with status code: %d & body: %s", response.StatusCode, body)
}

var hotTierStatus StreamHotTier
err = json.Unmarshal([]byte(body), &hotTierStatus)
require.NoError(t, err, "The response from GET /hottier isn't of expected schema: %s", body)

return &hotTierStatus
}

func disableHotTier(t *testing.T, shouldFail bool) {
req, _ := NewGlob.QueryClient.NewRequest("DELETE", "logstream/"+NewGlob.Stream+"/hottier", nil)
response, err := NewGlob.QueryClient.Do(req)
body := readAsString(response.Body)

if shouldFail {
require.NotEqualf(t, 200, response.StatusCode, "Non-existent hot tier was disabled with response: %s", body)
} else {
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
}
require.NoErrorf(t, err, "Disabling hot tier failed: %s", err)
}

func getStreamInfo(t *testing.T) *StreamInfo {
req, err := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/info", nil)
require.NoError(t, err, "Failed to create request")

req.Header.Set("Content-Type", "application/json")

response, err := NewGlob.QueryClient.Do(req)
require.NoError(t, err, "Failed to execute GET /logstream/{stream_name}/info")
defer response.Body.Close()

body := readAsString(response.Body)

require.Equal(t, 200, response.StatusCode, "GET /logstream/{stream_name}/info failed with status code: %d & body: %s", response.StatusCode, body)

var streamInfo StreamInfo
err = json.Unmarshal([]byte(body), &streamInfo)
require.NoError(t, err, "The response from GET /info isn't of expected schema: %s", body)

return &streamInfo
}
Loading