diff --git a/sigma/pipelines/loki/loki.py b/sigma/pipelines/loki/loki.py index 47552a7..657f359 100644 --- a/sigma/pipelines/loki/loki.py +++ b/sigma/pipelines/loki/loki.py @@ -1,3 +1,4 @@ +import string from dataclasses import dataclass from enum import Enum from typing import Any, Dict, List, Union @@ -65,6 +66,7 @@ class CustomLogSourceTransformation(Transformation): referencing log source and/or detection fields from the rule""" selection: Dict[str, Union[str, List[str]]] + template: bool = False def apply( self, pipeline: ProcessingPipeline, rule: Union[SigmaRule, SigmaCorrelationRule] @@ -89,7 +91,7 @@ def apply( detection.to_plain() for detection in rule.detection.detections.values() ] - field_values: list[dict[str, Union[str, int, None]]] = [ + field_values: List[Dict[str, Union[str, int, None]]] = [ d for d in plain if isinstance(d, dict) ] if len(field_values) > 0: @@ -111,9 +113,16 @@ def apply( label, [str(v) for v in values] ) ) - rule.custom_attributes[LokiCustomAttributes.LOGSOURCE_SELECTION.value] = ( - "{" + ",".join(selectors) + "}" - ) + formatted_selectors = "{" + ",".join(selectors) + "}" + if self.template: + formatted_selectors = string.Template(formatted_selectors).safe_substitute( + category=rule.logsource.category, + product=rule.logsource.product, + service=rule.logsource.service, + ) + rule.custom_attributes[ + LokiCustomAttributes.LOGSOURCE_SELECTION.value + ] = formatted_selectors else: raise SigmaFeatureNotSupportedByBackendError( "custom log source transforms are not supported for Correlation rules" diff --git a/tests/test_pipelines_loki.py b/tests/test_pipelines_loki.py index 023b143..b45fa66 100644 --- a/tests/test_pipelines_loki.py +++ b/tests/test_pipelines_loki.py @@ -395,7 +395,12 @@ def test_simple_custom_log_source_pipeline(sigma_rules: SigmaCollection): ProcessingItem( identifier="complex_custom_log_source", transformation=CustomLogSourceTransformation( - selection={"job": ["a", "b", "c"], "message|fieldref": "msg"} + selection={ + "job": ["a", "b", "c"], + "message|fieldref": "msg", + "env": "$product", + }, + template=True, ), ) ], @@ -403,7 +408,7 @@ def test_simple_custom_log_source_pipeline(sigma_rules: SigmaCollection): backend = LogQLBackend(processing_pipeline=pipeline) loki_rule = backend.convert(sigma_rules) assert loki_rule == [ - "{job=~`a|b|c`,message=`testing`} | logfmt | msg=~`(?i)^testing$`" + "{job=~`a|b|c`,env=`test`,message=`testing`} | logfmt | msg=~`(?i)^testing$`" ]