Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve performance of first_over_time and last_over_time queries by sharding them #11605

Merged
merged 39 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
de536b6
Step 1
jeschkies Jan 11, 2024
235e386
Step 2
jeschkies Jan 11, 2024
689c161
Step 3
jeschkies Jan 11, 2024
09f6d66
Step 4
jeschkies Jan 11, 2024
f7463ae
Step 5
jeschkies Jan 11, 2024
750cec7
Step 6
jeschkies Jan 11, 2024
9281f44
Step 7
jeschkies Jan 11, 2024
8f589fe
Step 8
jeschkies Jan 11, 2024
810b07c
downstreaming of last over time expr
cstyan Jan 13, 2024
5e106bf
add expr type for merging of last over time
cstyan Jan 13, 2024
7b3cadc
add shardmapping for last over time
cstyan Jan 13, 2024
b10c1de
wire up new op type in ast
cstyan Jan 13, 2024
928ccfa
setup last iterator
cstyan Jan 13, 2024
f89ea65
add iterators functionality
cstyan Jan 13, 2024
5ddf9df
add evaluator
cstyan Jan 13, 2024
104892b
add test cases for last over time
cstyan Jan 13, 2024
67dd511
fix some things I misunderstood
cstyan Jan 13, 2024
8e22df0
the last timestamp iterator aggregator needs to select the last sample
cstyan Jan 15, 2024
6404308
Merge remote-tracking branch 'grafana/main' into karsten/first-over-time
jeschkies Jan 18, 2024
0d420dc
Merge branch 'main' into karsten/first-over-time
jeschkies Jan 18, 2024
f8c9c2b
simplify evaluators next functions
cstyan Jan 18, 2024
2bfd0b5
Merge commit 'f8c9c2bdbf2013cc151e986330263bf08dbeeefc' into karsten/…
jeschkies Jan 23, 2024
af67edd
Correct vector check.
jeschkies Jan 23, 2024
361babc
Unify first and last
jeschkies Jan 23, 2024
2fb80d7
Increase relative error
jeschkies Jan 23, 2024
1b6cc02
Document some code
jeschkies Jan 23, 2024
1f04c45
Merge remote-tracking branch 'grafana/main' into karsten/first-over-time
jeschkies Jan 23, 2024
d2b055b
Merge remote-tracking branch 'grafana/main' into karsten/first-over-time
jeschkies Jan 29, 2024
87bbd75
Split up documentation
jeschkies Jan 29, 2024
307bfc1
Drop commented TS
jeschkies Jan 29, 2024
4dc234f
Merge branch 'main' into karsten/first-over-time
jeschkies Jan 30, 2024
995b982
Merge branch 'main' into karsten/first-over-time
jeschkies Mar 6, 2024
5231b6f
Merge branch 'main' into karsten/first-over-time
cstyan Apr 23, 2024
9e10b2c
fixes after merging in main
cstyan Apr 23, 2024
2325e85
Merge branch 'main' into karsten/first-over-time
cstyan May 3, 2024
69f504b
Return an empty result instead of nil
jeschkies May 7, 2024
27a5d66
fix other minor issues
cstyan May 8, 2024
1b09ad3
handle sharding noop case for first/last over time
cstyan May 8, 2024
bd7f275
fix formatting
cstyan May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,62 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
}
}

type MergeFirstOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}

func (e MergeFirstOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}

if i > 0 {
sb.WriteString(" ++ ")
}

sb.WriteString(d.String())
}
return fmt.Sprintf("MergeFirstOverTime<%s>", sb.String())
}

func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}

type MergeLastOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}

func (e MergeLastOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}

if i > 0 {
sb.WriteString(" ++ ")
}

sb.WriteString(d.String())
}
return fmt.Sprintf("MergeLastOverTime<%s>", sb.String())
}

func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}

type Shards []astmapper.ShardAnnotation

func (xs Shards) Encode() (encoded []string) {
Expand Down Expand Up @@ -394,7 +450,78 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
inner := NewQuantileSketchMatrixStepEvaluator(matrix, params)
return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil
case *MergeFirstOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{*shard}.Encode(),
}
}
queries[i] = qry
}

results, err := ev.Downstream(ctx, queries)
if err != nil {
return nil, err
}

xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {

switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
Comment on lines +561 to +562
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that a shard which only retrieves data for a single relevant stream could return a vector? or would the vector for that single series always be wrapped in a matrix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a matrix with one vector I believe. Only for instant queries we return a vector https://github.com/grafana/loki/blob/main/pkg/logql/engine.go#L388

}
}

return NewMergeFirstOverTimeStepEvaluator(params, xs), nil
case *MergeLastOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{*shard}.Encode(),
}
}
queries[i] = qry
}

results, err := ev.Downstream(ctx, queries)
if err != nil {
return nil, err
}

xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {

switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func TestMappingEquivalence(t *testing.T) {
`,
false,
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down Expand Up @@ -132,7 +136,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
query string
realtiveError float64
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03},
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this need to change from 0.03 to 0.05?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I change the test series to have the values spread out this would fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which change is that? the addition of nanos in the timestamp within test_utils.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02},
} {
q := NewMockQuerier(
Expand Down
11 changes: 6 additions & 5 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
}
defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

next, ts, r := stepEvaluator.Next()
next, _, r := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
Expand All @@ -361,7 +361,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
case ProbabilisticQuantileVector:
return JoinQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
Expand All @@ -371,7 +371,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, nil
}

func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {

seriesIndex := map[uint64]*promql.Series{}

Expand Down Expand Up @@ -419,15 +419,16 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: ts,
//T: ts,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can do it like this. What do you think, @cstyan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code on main overrides he timestamps in each vector.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the timestamp from the actual vector samples more accurate than the one got from stepEvaluator.Next()?

Can you help me understand why it won't make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does make sense for us. I'm just not sure some other component is relying on this overridden timestamp.

T: p.T,
F: p.F,
})
}
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
}
next, ts, r = stepEvaluator.Next()
next, _, r = stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,28 @@ func newRangeAggEvaluator(
return &QuantileSketchStepEvaluator{
iter: iter,
}, nil
case syntax.OpRangeTypeFirstWithTimestamp:
iter := newFirstWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)

return &RangeVectorEvaluator{
iter: iter,
}, nil
case syntax.OpRangeTypeLastWithTimestamp:
iter := newLastWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)

return &RangeVectorEvaluator{
iter: iter,
}, nil
default:
iter, err := newRangeVectorIterator(
it, expr,
Expand Down
13 changes: 13 additions & 0 deletions pkg/logql/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,16 @@ func (e *BinOpStepEvaluator) Explain(parent Node) {
func (i *VectorIterator) Explain(parent Node) {
parent.Childf("%f vectorIterator", i.val)
}

func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) {
b := parent.Child("QuantileSketchVector")
e.inner.Explain(b)
}

func (e *mergeOverTimeStepEvaluator) Explain(parent Node) {
parent.Child("MergeFirstOverTime")
}

func (EmptyEvaluator) Explain(parent Node) {
parent.Child("Empty")
}
Loading
Loading