Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: logs window based pagination to pageSize offset instead of using… #6830

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions pkg/query-service/app/logs/v4/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,19 +534,17 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan
return "", fmt.Errorf("max limit exceeded")
}

// when pageSize is provided, we need to fetch the logs in chunks
if mq.PageSize > 0 {
if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit {
query = logsV3.AddLimitToQuery(query, mq.Limit-mq.Offset)
} else {
query = logsV3.AddLimitToQuery(query, mq.PageSize)
}

// add offset to the query only if it is not orderd by timestamp.
if !logsV3.IsOrderByTs(mq.OrderBy) {
query = logsV3.AddOffsetToQuery(query, mq.Offset)
}

query = logsV3.AddOffsetToQuery(query, mq.Offset)
} else {
// when pageSize is not provided, we fetch all the logs in the limit
query = logsV3.AddLimitToQuery(query, mq.Limit)
}
} else if panelType == v3.PanelTypeTable {
Expand Down
169 changes: 81 additions & 88 deletions pkg/query-service/app/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,110 +328,95 @@ func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryR
for name, v := range params.CompositeQuery.BuilderQueries {
qName = name
pageSize = v.PageSize

// for traces specifically
limit = v.Limit
offset = v.Offset
}

// check if it is a logs query
isLogs := false
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
isLogs = true
}

data := []*v3.Row{}

tracesLimit := limit + offset
limitWithOffset := limit + offset
if isLogs {
// for logs we use pageSize to define the current limit and limit to define the absolute limit
limitWithOffset = pageSize + offset
if limit > 0 && offset >= limit {
return nil, nil, fmt.Errorf("max limit exceeded")
}
}

for _, v := range tsRanges {
params.Start = v.Start
params.End = v.End

length := uint64(0)
// this will to run only once

// appending the filter to get the next set of data
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))
data = append(data, rowList...)
}

if length > 0 {
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{
Key: "id",
IsColumn: true,
DataType: "string",
},
Operator: v3.FilterOperatorLessThan,
Value: data[len(data)-1].Data["id"],
})
}
// max limit + offset is 10k for pagination for traces/logs
// TODO(nitya): define something for logs
if !isLogs && limitWithOffset > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
}

if uint64(len(data)) >= pageSize {
break
}
// we are updating the offset and limit based on the number of traces/logs we have found in the current timerange
// eg -
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// if 100 traces/logs are there in [t1, t10] then 100 will return immediately.
// if 10 traces/logs are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100

//
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// If we find 150 traces/logs with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces/logs
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0

params.CompositeQuery.BuilderQueries[qName].Offset = 0
// if datasource is logs
if isLogs {
// for logs we use limit to define the absolute limit and pagesize to define the current limit
params.CompositeQuery.BuilderQueries[qName].PageSize = limitWithOffset
} else {
// TRACE
// we are updating the offset and limit based on the number of traces we have found in the current timerange
// eg -
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// if 100 traces are there in [t1, t10] then 100 will return immediately.
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100

//
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
//
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0

// max limit + offset is 10k for pagination
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
}
params.CompositeQuery.BuilderQueries[qName].Limit = limitWithOffset
}

params.CompositeQuery.BuilderQueries[qName].Offset = 0
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
queries, err := q.builder.PrepareQueries(params)
queries, err := q.builder.PrepareQueries(params)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, nil, err
}
for name, query := range queries {
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
errs := []error{err}
errQueriesByName := map[string]error{
name: err,
}
length += uint64(len(rowList))

// skip the traces unless offset is 0
for _, row := range rowList {
if offset == 0 {
data = append(data, row)
} else {
offset--
}
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
}
length += uint64(len(rowList))

// skip the traces unless offset is 0
for _, row := range rowList {
if offset == 0 {
data = append(data, row)
} else {
offset--
}
}
tracesLimit = tracesLimit - length
}

if uint64(len(data)) >= limit {
break
}
limitWithOffset = limitWithOffset - length

if isLogs && uint64(len(data)) >= pageSize {
// for logs
break
} else if !isLogs && uint64(len(data)) >= limit {
// for traces
break
}
}
res = append(res, &v3.Result{
Expand All @@ -453,14 +438,22 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
break
}

// only allow of logs queries with timestamp ordering desc
// TODO(nitya): allow for timestamp asc
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
// for traces: allow only with timestamp ordering desc
// for logs: allow only with timestamp ordering desc and id ordering desc
if v.DataSource == v3.DataSourceTraces &&
len(v.OrderBy) == 1 &&
v.OrderBy[0].ColumnName == "timestamp" &&
v.OrderBy[0].Order == "desc" {
startEndArr := utils.GetListTsRanges(params.Start, params.End)
return q.runWindowBasedListQuery(ctx, params, startEndArr)
} else if v.DataSource == v3.DataSourceLogs &&
len(v.OrderBy) == 2 &&
v.OrderBy[0].ColumnName == "timestamp" &&
v.OrderBy[0].Order == "desc" &&
v.OrderBy[1].ColumnName == "id" &&
v.OrderBy[1].Order == "desc" {
startEndArr := utils.GetListTsRanges(params.Start, params.End)
return q.runWindowBasedListQuery(ctx, params, startEndArr)
}
}
}
Expand Down
Loading
Loading