Skip to content

Commit

Permalink
feat: improve performance of first_over_time and last_over_time q…
Browse files Browse the repository at this point in the history
…ueries by sharding them (#11605)

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
jeschkies and cstyan committed May 13, 2024
1 parent 35e10d4 commit f66172e
Show file tree
Hide file tree
Showing 12 changed files with 527 additions and 16 deletions.
129 changes: 129 additions & 0 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,62 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
}
}

type MergeFirstOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}

func (e MergeFirstOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}

if i > 0 {
sb.WriteString(" ++ ")
}

sb.WriteString(d.String())
}
return fmt.Sprintf("MergeFirstOverTime<%s>", sb.String())
}

func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}

type MergeLastOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}

func (e MergeLastOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}

if i > 0 {
sb.WriteString(" ++ ")
}

sb.WriteString(d.String())
}
return fmt.Sprintf("MergeLastOverTime<%s>", sb.String())
}

func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}

type Downstreamable interface {
Downstreamer(context.Context) Downstreamer
}
Expand Down Expand Up @@ -471,7 +527,80 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
inner := NewQuantileSketchMatrixStepEvaluator(matrix, params)
return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil
case *MergeFirstOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries[i] = qry
}

acc := NewBufferedAccumulator(len(queries))
results, err := ev.Downstream(ctx, queries, acc)
if err != nil {
return nil, err
}

xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {

switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeFirstOverTimeStepEvaluator(params, xs), nil
case *MergeLastOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries[i] = qry
}

acc := NewBufferedAccumulator(len(queries))
results, err := ev.Downstream(ctx, queries, acc)
if err != nil {
return nil, err
}

xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {

switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func TestMappingEquivalence(t *testing.T) {
`,
false,
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down Expand Up @@ -141,7 +145,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
query string
realtiveError float64
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03},
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02},
} {
q := NewMockQuerier(
Expand Down
10 changes: 5 additions & 5 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
}
defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

next, ts, r := stepEvaluator.Next()
next, _, r := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
Expand All @@ -373,7 +373,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
case ProbabilisticQuantileVector:
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
Expand All @@ -383,7 +383,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, nil
}

func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {

seriesIndex := map[uint64]*promql.Series{}

Expand Down Expand Up @@ -431,15 +431,15 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: ts,
T: p.T,
F: p.F,
})
}
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
}
next, ts, r = stepEvaluator.Next()
next, _, r = stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,28 @@ func newRangeAggEvaluator(
return &QuantileSketchStepEvaluator{
iter: iter,
}, nil
case syntax.OpRangeTypeFirstWithTimestamp:
iter := newFirstWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)

return &RangeVectorEvaluator{
iter: iter,
}, nil
case syntax.OpRangeTypeLastWithTimestamp:
iter := newLastWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)

return &RangeVectorEvaluator{
iter: iter,
}, nil
default:
iter, err := newRangeVectorIterator(
it, expr,
Expand Down
13 changes: 13 additions & 0 deletions pkg/logql/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,16 @@ func (e *BinOpStepEvaluator) Explain(parent Node) {
func (i *VectorIterator) Explain(parent Node) {
parent.Childf("%f vectorIterator", i.val)
}

func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) {
b := parent.Child("QuantileSketchVector")
e.inner.Explain(b)
}

func (e *mergeOverTimeStepEvaluator) Explain(parent Node) {
parent.Child("MergeFirstOverTime")
}

func (EmptyEvaluator[SampleVector]) Explain(parent Node) {
parent.Child("Empty")
}
Loading

0 comments on commit f66172e

Please sign in to comment.