Skip to content

Commit

Permalink
feat: collect and serve pre-aggregated bytes and counts (#13020)
Browse files Browse the repository at this point in the history
Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
trevorwhitney and cyriltovena committed Jun 19, 2024
1 parent 384f451 commit 467eb1b
Show file tree
Hide file tree
Showing 54 changed files with 6,533 additions and 350 deletions.
7 changes: 7 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: debug

common:
instance_addr: 127.0.0.1
Expand Down Expand Up @@ -33,6 +34,12 @@ schema_config:
prefix: index_
period: 24h

pattern_ingester:
enabled: true
metric_aggregation:
enabled: true
log_push_observations: true

ruler:
alertmanager_url: http://localhost:9093

Expand Down
11 changes: 11 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,17 @@ pattern_ingester:
# CLI flag: -pattern-ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]

# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]

# Whether to log push observations.
# CLI flag: -pattern-ingester.metric-aggregation.log-push-observations
[log_push_observations: <boolean> | default = false]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ require (

require (
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
Expand Down Expand Up @@ -230,7 +231,6 @@ require (
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.22.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestCacheTest(t *testing.T) {

queryResultsDiff = &mockCounter{} // reset counter
mr.countOverTime = 2.3 // value not important
mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but withing tolerance
mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but within tolerance
c.cacheTest(now)
assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count)

Expand Down
30 changes: 18 additions & 12 deletions pkg/iter/sample_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,31 @@ func (it *peekingSampleIterator) Error() error {
return it.iter.Error()
}

type sampleIteratorHeap struct {
type SampleIteratorHeap struct {
its []SampleIterator
}

func (h sampleIteratorHeap) Len() int { return len(h.its) }
func (h sampleIteratorHeap) Swap(i, j int) { h.its[i], h.its[j] = h.its[j], h.its[i] }
func (h sampleIteratorHeap) Peek() SampleIterator { return h.its[0] }
func (h *sampleIteratorHeap) Push(x interface{}) {
func NewSampleIteratorHeap(its []SampleIterator) SampleIteratorHeap {
return SampleIteratorHeap{
its: its,
}
}

func (h SampleIteratorHeap) Len() int { return len(h.its) }
func (h SampleIteratorHeap) Swap(i, j int) { h.its[i], h.its[j] = h.its[j], h.its[i] }
func (h SampleIteratorHeap) Peek() SampleIterator { return h.its[0] }
func (h *SampleIteratorHeap) Push(x interface{}) {
h.its = append(h.its, x.(SampleIterator))
}

func (h *sampleIteratorHeap) Pop() interface{} {
func (h *SampleIteratorHeap) Pop() interface{} {
n := len(h.its)
x := h.its[n-1]
h.its = h.its[0 : n-1]
return x
}

func (h sampleIteratorHeap) Less(i, j int) bool {
func (h SampleIteratorHeap) Less(i, j int) bool {
s1, s2 := h.its[i].Sample(), h.its[j].Sample()
if s1.Timestamp == s2.Timestamp {
if h.its[i].StreamHash() == 0 {
Expand All @@ -150,7 +156,7 @@ func (h sampleIteratorHeap) Less(i, j int) bool {

// mergeSampleIterator iterates over a heap of iterators by merging samples.
type mergeSampleIterator struct {
heap *sampleIteratorHeap
heap *SampleIteratorHeap
is []SampleIterator
prefetched bool
stats *stats.Context
Expand All @@ -170,7 +176,7 @@ type mergeSampleIterator struct {
// This means using this iterator with a single iterator will result in the same result as the input iterator.
// If you don't need to deduplicate sample, use `NewSortSampleIterator` instead.
func NewMergeSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator {
h := sampleIteratorHeap{
h := SampleIteratorHeap{
its: make([]SampleIterator, 0, len(is)),
}
return &mergeSampleIterator{
Expand Down Expand Up @@ -350,7 +356,7 @@ func (i *mergeSampleIterator) Close() error {

// sortSampleIterator iterates over a heap of iterators by sorting samples.
type sortSampleIterator struct {
heap *sampleIteratorHeap
heap *SampleIteratorHeap
is []SampleIterator
prefetched bool

Expand All @@ -369,7 +375,7 @@ func NewSortSampleIterator(is []SampleIterator) SampleIterator {
if len(is) == 1 {
return is[0]
}
h := sampleIteratorHeap{
h := SampleIteratorHeap{
its: make([]SampleIterator, 0, len(is)),
}
return &sortSampleIterator{
Expand All @@ -378,7 +384,7 @@ func NewSortSampleIterator(is []SampleIterator) SampleIterator {
}
}

// init initialize the underlaying heap
// init initialize the underlying heap
func (i *sortSampleIterator) init() {
if i.prefetched {
return
Expand Down
35 changes: 35 additions & 0 deletions pkg/loghttp/samples.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package loghttp

import (
"net/http"

"github.com/grafana/loki/v3/pkg/logproto"
)

func ParseSamplesQuery(r *http.Request) (*logproto.QuerySamplesRequest, error) {
req := &logproto.QuerySamplesRequest{}

req.Query = query(r)
start, end, err := bounds(r)
if err != nil {
return nil, err
}
req.Start = start
req.End = end

calculatedStep, err := step(r, start, end)
if err != nil {
return nil, err
}
if calculatedStep <= 0 {
return nil, errZeroOrNegativeStep
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (req.End.Sub(req.Start) / calculatedStep) > 11000 {
return nil, errStepTooSmall
}
req.Step = calculatedStep.Milliseconds()

return req, nil
}
122 changes: 122 additions & 0 deletions pkg/loghttp/samples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package loghttp

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
)

func TestParseSamplesQuery(t *testing.T) {
t.Parallel()

tests := []struct {
name string
path string
want *logproto.QuerySamplesRequest
wantErr bool
}{
{
name: "should correctly parse valid params",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=5s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (5 * time.Second).Milliseconds(),
},
},
{
name: "should default empty step param to sensible step for the range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should default start to zero for empty start param",
path: "/loki/api/v1/patterns?query={}&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should accept step with no units as seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Second).Milliseconds(),
},
},
{
name: "should accept step as string duration in seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=15s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (15 * time.Second).Milliseconds(),
},
},
{
name: "should correctly parse long duration for step",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10h",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Hour).Milliseconds(),
},
},
{
name: "should reject negative step value",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=-5s",
want: nil,
wantErr: true,
},
{
name: "should reject very small step for big range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=50ms",
want: nil,
wantErr: true,
},
{
name: "should accept very small step for small range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=110000000000&step=50ms",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(110, 0),
Step: (50 * time.Millisecond).Milliseconds(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, tt.path, nil)
require.NoError(t, err)
err = req.ParseForm()
require.NoError(t, err)

got, err := ParseSamplesQuery(req)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
assert.Equalf(t, tt.want, got, "Incorrect response from input path: %s", tt.path)
})
}
}
33 changes: 33 additions & 0 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
return result
}

func FromMetricsToLabels(metric model.Metric) labels.Labels {
return FromLabelAdaptersToLabels(FromMetricsToLabelAdapters(metric))
}

type byLabel []LabelAdapter

func (s byLabel) Len() int { return len(s) }
Expand Down Expand Up @@ -591,3 +595,32 @@ func (m *DetectedLabelsRequest) LogToSpan(sp opentracing.Span) {
}
sp.LogFields(fields...)
}

func (m *QuerySamplesRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *QuerySamplesRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
clone.End = end
return &clone
}

func (m *QuerySamplesRequest) WithStartEndForCache(start, end time.Time) resultscache.Request {
return m.WithStartEnd(start, end).(resultscache.Request)
}

func (m *QuerySamplesRequest) WithQuery(query string) definitions.Request {
clone := *m
clone.Query = query
return &clone
}

func (m *QuerySamplesRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
otlog.String("query", m.GetQuery()),
otlog.String("start", m.Start.String()),
otlog.String("end", m.End.String()),
otlog.String("step", time.Duration(m.Step).String()),
}
sp.LogFields(fields...)
}
Loading

0 comments on commit 467eb1b

Please sign in to comment.