Skip to content

Commit

Permalink
Add support for templated logsource data
Browse files Browse the repository at this point in the history
  • Loading branch information
kelnage committed Jul 3, 2024
1 parent eacfc51 commit 98cc45e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
17 changes: 13 additions & 4 deletions sigma/pipelines/loki/loki.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import string
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Union
Expand Down Expand Up @@ -65,6 +66,7 @@ class CustomLogSourceTransformation(Transformation):
referencing log source and/or detection fields from the rule"""

selection: Dict[str, Union[str, List[str]]]
template: bool = False

def apply(
self, pipeline: ProcessingPipeline, rule: Union[SigmaRule, SigmaCorrelationRule]
Expand All @@ -89,7 +91,7 @@ def apply(
detection.to_plain()
for detection in rule.detection.detections.values()
]
field_values: list[dict[str, Union[str, int, None]]] = [
field_values: List[Dict[str, Union[str, int, None]]] = [
d for d in plain if isinstance(d, dict)
]
if len(field_values) > 0:
Expand All @@ -111,9 +113,16 @@ def apply(
label, [str(v) for v in values]
)
)
rule.custom_attributes[LokiCustomAttributes.LOGSOURCE_SELECTION.value] = (
"{" + ",".join(selectors) + "}"
)
formatted_selectors = "{" + ",".join(selectors) + "}"
if self.template:
formatted_selectors = string.Template(formatted_selectors).safe_substitute(
category=rule.logsource.category,
product=rule.logsource.product,
service=rule.logsource.service,
)
rule.custom_attributes[
LokiCustomAttributes.LOGSOURCE_SELECTION.value
] = formatted_selectors
else:
raise SigmaFeatureNotSupportedByBackendError(
"custom log source transforms are not supported for Correlation rules"
Expand Down
9 changes: 7 additions & 2 deletions tests/test_pipelines_loki.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,15 +395,20 @@ def test_simple_custom_log_source_pipeline(sigma_rules: SigmaCollection):
ProcessingItem(
identifier="complex_custom_log_source",
transformation=CustomLogSourceTransformation(
selection={"job": ["a", "b", "c"], "message|fieldref": "msg"}
selection={
"job": ["a", "b", "c"],
"message|fieldref": "msg",
"env": "$product",
},
template=True,
),
)
],
)
backend = LogQLBackend(processing_pipeline=pipeline)
loki_rule = backend.convert(sigma_rules)
assert loki_rule == [
"{job=~`a|b|c`,message=`testing`} | logfmt | msg=~`(?i)^testing$`"
"{job=~`a|b|c`,env=`test`,message=`testing`} | logfmt | msg=~`(?i)^testing$`"
]


Expand Down

0 comments on commit 98cc45e

Please sign in to comment.