Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/3.2.24 #526

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion promql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,15 @@ const getIdxSubquery = (conds, fromMs, toMs) => {
).groupBy('fingerprint')
}

module.exports.getData = async (matchers, fromMs, toMs) => {
module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
const db = DATABASE_NAME()
const subq = (subqueries || {})[getMetricName(matchers)]
if (subq) {
console.log(subq)
const data = await rawRequest(subq + ' FORMAT RowBinary',
null, db, { responseType: 'arraybuffer' })
return new Uint8Array(data.data)
}
const matches = getMatchersIdxCond(matchers)
const idx = getIdxSubquery(matches, fromMs, toMs)
const withIdx = new Sql.With('idx', idx, !!clusterName)
Expand Down Expand Up @@ -176,4 +183,12 @@ module.exports.getData = async (matchers, fromMs, toMs) => {
return new Uint8Array(data.data)
}

function getMetricName(matchers) {
for (const matcher of matchers) {
if (matcher[0] === '__name__' && matcher[1] === '=') {
return matcher[2]
}
}
}

prometheus.getData = module.exports.getData
5 changes: 5 additions & 0 deletions test/traceql_parser.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ it('traceql: max duration', () => {
const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms')
expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms')
})

it('traceql: select', () => {
const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | select(a, b)')
expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | select(a, b)')
})
3 changes: 3 additions & 0 deletions traceql/clickhouse_transpiler/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ module.exports.Planner = class Planner {
if (!agg) {
return
}
if (['count', 'sum', 'min', 'max', 'avg'].indexOf(agg.Child('fn').value) < 0) {
return
}
this.aggFn = agg.Child('fn').value
const labelName = agg.Child('attr').Child('label_name')
this.aggregatedAttr = labelName ? labelName.value : ''
Expand Down
4 changes: 3 additions & 1 deletion traceql/clickhouse_transpiler/traces_data.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ const processFn = (sel, ctx) => {
[new Sql.Raw(
'toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000'
), 'duration_ms'],
[new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name']
[new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name'],
[new Sql.Raw(`groupArrayIf(base64Encode(traces.payload), (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload'],
[new Sql.Raw(`groupArrayIf(traces.payload_type, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload_type']
).from([table, 'traces']).where(Sql.And(
new Sql.In(new Sql.Raw('traces.trace_id'), 'in', new Sql.WithReference(withTraceIds))
)).groupBy('traces.trace_id')
Expand Down
20 changes: 12 additions & 8 deletions traceql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const logger = require('../lib/logger')
const { DATABASE_NAME } = require('../lib/utils')
const { clusterName } = require('../common')
const { rawRequest } = require('../lib/db/clickhouse')
const { postProcess } = require('./post_processor')

/**
*
Expand Down Expand Up @@ -39,12 +40,14 @@ const search = async (query, limit, from, to) => {
} else {
res = await processSmallResult(ctx, scrpit.rootToken)
}
res = postProcess(res, scrpit.rootToken)
res.forEach(t =>
t.spanSets.forEach(
ss => ss.spans.sort(
(a, b) => b.startTimeUnixNano.localeCompare(a.startTimeUnixNano))
)
)
console.log(JSON.stringify(res, 2))
return res
}

Expand All @@ -70,11 +73,12 @@ const evaluateComplexity = async (ctx, script) => {
async function processComplexResult (ctx, script, complexity) {
const planner = ctx.planner.plan()
const maxFilter = Math.floor(complexity / 10000000)
let traces = []
//let traces = []
let response = null
for (let i = 0; i < maxFilter; i++) {
ctx.randomFilter = [maxFilter, i]
const sql = planner(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
if (response.data.data.length === parseInt(ctx.limit)) {
const minStart = response.data.data.reduce((acc, row) =>
acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0
Expand All @@ -88,7 +92,7 @@ async function processComplexResult (ctx, script, complexity) {
ctx.randomFilter = [maxFilter, i]
}
ctx.cachedTraceIds = response.data.data.map(row => row.trace_id)
traces = response.data.data.map(row => ({
/*traces = response.data.data.map(row => ({
traceID: row.trace_id,
rootServiceName: row.root_service_name,
rootTraceName: row.root_trace_name,
Expand All @@ -105,9 +109,9 @@ async function processComplexResult (ctx, script, complexity) {
matched: row.span_id.length
}
]
}))
}))*/
}
return traces
return response.data.data
}

/**
Expand All @@ -119,7 +123,7 @@ async function processSmallResult (ctx, script) {
const planner = ctx.planner.plan()
const sql = planner(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
const traces = response.data.data.map(row => ({
/*const traces = response.data.data.map(row => ({
traceID: row.trace_id,
rootServiceName: row.root_service_name,
rootTraceName: row.root_trace_name,
Expand All @@ -146,8 +150,8 @@ async function processSmallResult (ctx, script) {
matched: row.span_id.length
}
]
}))
return traces
}))*/
return response.data.data
}

module.exports = {
Expand Down
74 changes: 74 additions & 0 deletions traceql/post_processor/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
const Otlp = require('../../lib/db/otlp')
const Zipkin = require('../../lib/db/zipkin')
const protobufjs = require('protobufjs')
const path = require('path')
const OTLPSpan = protobufjs.loadSync(path.join(__dirname, '..', '..',
'lib', 'otlp.proto')).lookupType('Span')
/**
*
* @param rows {Row[]}
* @param script {Token}
*/
function postProcess (rows, script) {
const selectAttrs = script.Children('aggregator')
.filter(x => x.Child('fn').value === 'select')
.map(x => x.Children('label_name'))
.reduce((acc, x) => {
let attrs = x.map(y => ({
name: y.value,
path: y.value.split('.').filter(y => y)
}))
if (attrs[0] === 'span' || attrs[0] === 'resource') {
attrs = attrs.slice(1)
}
return [...acc, ...attrs]
}, [])
rows = rows.map(row => ({
...row,
objs: row.payload.map((payload, i) => {
let span = null
switch (row.payload_type[i]) {
case 1:
return new Zipkin(JSON.parse(Buffer.from(payload, 'base64').toString()))
case 2:
span = OTLPSpan.toObject(
OTLPSpan.decode(Buffer.from(payload, 'base64')), {
longs: String,
bytes: String
})
return new Otlp(span)
}
return null
})
}))
const spans = (row) => row.span_id.map((spanId, i) => ({
spanID: spanId,
startTimeUnixNano: row.timestamp_ns[i],
durationNanos: row.duration[i],
attributes: selectAttrs.map(attr => ({
key: attr.name,
value: {
stringValue: (row.objs[i].tags.find(t => t[0] === attr.path.join('.')) || [null, null])[1]
}
})).filter(x => x.value.stringValue)
}))
const traces = rows.map(row => ({
traceID: row.trace_id,
rootServiceName: row.root_service_name,
rootTraceName: row.root_trace_name,
startTimeUnixNano: row.start_time_unix_nano,
durationMs: row.duration_ms,
spanSet: { spans: spans(row) },
spanSets: [
{
spans: spans(row),
matched: row.span_id.length
}
]
}))
return traces
}

module.exports = {
postProcess
}
11 changes: 11 additions & 0 deletions traceql/post_processor/types.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export interface Row {
trace_id: string;
span_id: string[];
duration: string[];
timestamp_ns: string[];
start_time_unix_nano: string;
duration_ms: number;
root_service_name: string;
payload: string[];
payload_type: number[];
}
7 changes: 4 additions & 3 deletions traceql/traceql.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ complex_head ::= "(" <OWSP> <attr_selector_exp> <OWSP> ")"
tail ::= <attr_selector_exp>
and_or ::= "&&" | "||"

aggregator ::= "|" <OWSP> <fn> <OWSP> <attr> <OWSP> <cmp> <OWSP> <cmp_val>
fn ::= "count"|"sum"|"min"|"max"|"avg"
attr ::= "(" <OWSP> [ <label_name> ] <OWSP> ")"
aggregator ::= "|" <OWSP> <fn> <OWSP> <attr> <OWSP> [ <cmp> <OWSP> <cmp_val> ]
fn ::= "count"|"sum"|"min"|"max"|"avg"|"select"
attr ::= "(" <OWSP> [ <label_names> ] <OWSP> ")"
cmp ::= "="|"!="|"<="|">="|"<"|">"
cmp_val ::= <number> [<measurement>]
measurement ::= "ns"|"us"|"ms"|"s"|"m"|"h"|"d"

label_name ::= ("." | <ALPHA> | "-" | "_") *("." | <ALPHA> | "_" | "-" | <DIGITS>)
label_names ::= <label_name> *(<OWSP> "," <OWSP> <label_name>)
number ::= ["-"] <DIGITS> ["." <DIGITS>]

attr_selector ::= <label_name> <OWSP> <op> <OWSP> <value>
Expand Down
Loading
Loading