From a48b5a6c9a2f3feb1fe5b4426bb8ebeaeaee1ab7 Mon Sep 17 00:00:00 2001 From: mmantyla Date: Thu, 16 May 2024 11:02:45 +0300 Subject: [PATCH] Drain and Tipping allow templates to be fetched to df --- loglead/enhancer.py | 51 +++++++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/loglead/enhancer.py b/loglead/enhancer.py index 812593f..6fa8370 100644 --- a/loglead/enhancer.py +++ b/loglead/enhancer.py @@ -20,6 +20,9 @@ ("${start}${end}", r"(?P[^A-Za-z0-9]|^)([0-9a-f]{6,} ?){3,}(?P[^A-Za-z0-9]|$)"), ("${start}${end}", r"(?P[^A-Za-z0-9]|^)([0-9A-F]{4} ?){4,}(?P[^A-Za-z0-9]|$)"), ("${start}${end}", r"(?P[^A-Za-z0-9]|^)(0x[a-f0-9A-F]+)(?P[^A-Za-z0-9]|$)"), +# ("${start}${end}", r"(?P[^A-Za-z0-9]|^)([a-f0-9A-F]+)(?P[^A-Za-z0-9]|$)"), +# ("${start}${end}", r"(?P[^A-Za-z0-9]|^)(0x[a-f0-9A-F]+|[a-f0-9A-F]+)(?P[^A-Za-z0-9]|$)"), +# ("${start}${end}", r"(?P[^A-Za-z0-9]|^)(0x[a-f0-9A-F]{2,}(?:[a-f0-9A-F]{2})*|[a-f0-9A-F]{2}(?:[a-f0-9A-F]{2})*)(?P[^A-Za-z0-9]|$)"), ("${start}${end}", r"(?P[^A-Za-z0-9]|^)([\-\+]?\d+)(?P[^A-Za-z0-9]|$)"), ("${cmd}", r"(?Pexecuted cmd )(\".+?\")") ] @@ -85,7 +88,7 @@ def _create_cngram(self, message, ngram=3): return [message[i:i + ngram] for i in range(len(message) - ngram + 1)] # Enrich with drain parsing results - def parse_drain(self, field = "e_message_normalized", drain_masking=False, reparse=False): + def parse_drain(self, field = "e_message_normalized", drain_masking=False, reparse=False, templates=False): self._handle_prerequisites([field]) if reparse or "e_event_drain_id" not in self.df.columns: import drain3 as dr @@ -125,10 +128,15 @@ def parse_drain(self, field = "e_message_normalized", drain_masking=False, repar self.df = self.df.with_columns( drain=pl.col(field).map_elements(lambda x: tm.add_log_message(x), return_dtype=return_dtype)) - self.df = self.df.with_columns( - # extra letter to ensure we e1 e2 instead of 1 2 - e_event_drain_id=pl.lit("e") + pl.col("drain").struct.field("cluster_id").cast(pl.Utf8), - e_template=pl.col("drain").struct.field("template_mined")) + if templates: + self.df = self.df.with_columns( + # extra letter to ensure we get e1 e2 instead of 1 2 + e_event_drain_id=pl.lit("e") + pl.col("drain").struct.field("cluster_id").cast(pl.Utf8), + e_event_drain_template=pl.col("drain").struct.field("template_mined")) + else: + self.df = self.df.with_columns( + # extra letter to ensure we get e1 e2 instead of 1 2 + e_event_drain_id=pl.lit("e") + pl.col("drain").struct.field("cluster_id").cast(pl.Utf8)) self.df = self.df.drop("drain") # Drop the dictionary produced by drain. Event_id and template are the most important. # tm.drain.print_tree() return self.df @@ -159,7 +167,7 @@ def parse_ael(self,field = "e_message_normalized", reparse=False): return self.df #New parser not yet released to public. Coming early 2024 - def parse_tip(self, field = "e_message_normalized", reparse=False): + def parse_tip(self, field = "e_message_normalized", reparse=False, templates=False): self._handle_prerequisites([field]) if reparse or "e_event_tip_id" not in self.df.columns: if "e_event_tip_id" in self.df.columns: @@ -168,12 +176,27 @@ def parse_tip(self, field = "e_message_normalized", reparse=False): if "row_nr" in self.df.columns: self.df = self.df.drop("row_nr") self.df = self.df.with_row_count() - tipping_clusters = tip.parse(self.df[field], return_templates=False, return_masks=False) - df_new = pl.DataFrame( - { - "e_event_tip_id": tipping_clusters[0], - } - ) + tipping_clusters, tipping_masks, tipping_templates = tip.parse(self.df[field], return_templates=templates, return_masks=False) + if templates: + df_new = pl.DataFrame( + { + "e_event_tip_id": tipping_clusters, + } + ) + #convert sets to lists + tipping_templates = [list(s)[0] if s else None for s in tipping_templates] + df_templates = pl.DataFrame({ + "e_event_tip_id": range(len(tipping_templates)), + "e_event_tip_template": tipping_templates + }) + df_new = df_new.join(df_templates, on="e_event_tip_id", how="left") + df_new = df_new.with_columns(pl.col("e_event_tip_template").cast(pl.Utf8)) + else: + df_new = pl.DataFrame( + { + "e_event_tip_id": tipping_clusters, + } + ) df_new = df_new.with_columns( e_event_tip_id=pl.when(pl.col("e_event_tip_id").is_null()) .then(pl.lit("e_null")) @@ -349,7 +372,9 @@ def normalize(self, regexs=masking_patterns_drain, to_lower=False, twice=True): # print (base_code) # return base_code - def item_cumsum2(self, column="e_message_normalized", chronological_order=1, ano_only=True, unique_only=True, out_column=""): + def item_cumsum2(self, column="e_message_normalized", chronological_order=1, ano_only=True, unique_only=True, out_column=None): + if out_column is None: + out_column = column + "_cumsum" column_name = out_column self._handle_prerequisites([column, 'm_timestamp']) if ano_only: