Skip to content

Commit

Permalink
Merge pull request #37 from EvoTestOps/update/polars-v1.5
Browse files Browse the repository at this point in the history
Fix issue #36: Update polars to v1.5
  • Loading branch information
mmantyla authored Aug 26, 2024
2 parents 2762735 + 13c092a commit c31a819
Show file tree
Hide file tree
Showing 14 changed files with 53 additions and 45 deletions.
19 changes: 14 additions & 5 deletions demo/RawLoader_NoLabels.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def split_dataframe(df: pl.DataFrame, label_col: str, drop: list):
write_to_str(train_df_label, "train.label")
write_to_str(test_df_no_labels, "test.log")
write_to_str(test_df_label, "test.label")
print (f"Unlabeled BGL data subsets created to files: train.log and test.log. Labels are in files: train.label and test.label.")


def create_seq_in_file_HDFS(reduce_df_ratio: float = 0.0005, test_ratio: float = 0.5):
Expand Down Expand Up @@ -116,6 +117,8 @@ def write_seqs_to_files(df_redu_seq, output_dir):
output_test_dir = "HDFS_test"
os.makedirs(output_test_dir, exist_ok=True)
write_seqs_to_files(test_df, output_test_dir)
print (f"HDFS data with one sequence at one file created to folders: {output_train_dir} and {output_test_dir}.")
print (f"Anomaly files start with filename: A_ . Normal files start with filename: N_")



Expand Down Expand Up @@ -161,6 +164,7 @@ def train_line_models(df):
joblib.dump(sad.model, 'IF_model.joblib')
sad.train_RarityModel(filter_anos=False)
joblib.dump(sad.model, 'RM_model.joblib')
print (f"Log line anomaly detectors created. Isolation Forrest in: IF_model.joblib, KMeans in: kmeans.joblib, RarityModel in RM_model.joblib")
#sad.train_OOVDetector() #OOV detector does not need training. Vectorizer is enough
#joblib.dump(sad.model, 'OOV_model.joblib')

Expand Down Expand Up @@ -203,13 +207,15 @@ def analyse_with_line_models(df):
df_anos = df_anos.with_columns(predictions)
df_anos.drop("e_words").write_csv("test_log_predicted.csv", quote_style="always", separator='\t')

print (f"Log line anomaly detectors executed. Scored logfile is in file: test_log_predicted.csv with columns separated with \\t")

def containment_similarity(v_binary1, v_binary2):
"""Containment Similarity: Intersection divided by the smaller vector's size"""
intersection = np.logical_and(v_binary1, v_binary2).sum()
return intersection / min(v_binary1.sum(), v_binary2.sum()) if min(v_binary1.sum(), v_binary2.sum()) > 0 else 0


def measure_distance (df_train, df_analyze, field = "m_message", vectorizer = CountVectorizer):
def measure_distance (df_train, df_analyze, field = "m_message", vectorizer = CountVectorizer, print_values=True):
#setup
s_train = df_train.select(pl.col(field).str.concat(" ")).item()
s_analyze = df_analyze.select(pl.col(field).str.concat(" ")).item()
Expand All @@ -227,8 +233,8 @@ def measure_distance (df_train, df_analyze, field = "m_message", vectorizer = Co
v_binary1 = (v_train > 0).astype(int)
v_binary2 = (v_analyse > 0).astype(int)
#print (v_binary1)
j1 = float(jaccard_score(v_binary1, v_binary2, average="samples"))
#containment
jaccard = float(jaccard_score(v_binary1, v_binary2, average="samples"))
#containment (smaller is subset of larger)
intersection = v_binary1.multiply(v_binary2).sum()
containment = float(intersection / min(v_binary1.sum(), v_binary2.sum()) if min(v_binary1.sum(), v_binary2.sum()) > 0 else 0)

Expand All @@ -237,21 +243,24 @@ def measure_distance (df_train, df_analyze, field = "m_message", vectorizer = Co
len2 = len(zlib.compress(s_analyze.encode()))
combined_len = len(zlib.compress((s_train + s_analyze).encode()))
compression = combined_len / (len1 + len2)
if print_values:
print(f"Distance of column {field} is Cosine: {cosine_sim}, Jaccard: {jaccard}, Compression: {compression}, Containment: {containment} ")

return cosine_sim, j1, compression, containment
return cosine_sim, jaccard, compression, containment

def measure_distance_random_files(df_train, df_analyze, field="e_message_normalized", sample_size=10):
# Select unique file names and sample 10 from each DataFrame
unique_train = df_train.select(pl.col("file_name")).unique().sample(sample_size)
unique_analyze = df_analyze.select(pl.col("file_name")).unique().sample(sample_size)

# Loop through the samples and calculate metrics
print (f"File distances of random file pairs: ")
for i in range(sample_size):
df1 = df_train.filter(pl.col("file_name") == unique_train[i, 0])
df2 = df_analyze.filter(pl.col("file_name") == unique_analyze[i, 0])

# Calculate the distances
cos, jaccard, compression, containment = measure_distance(df1, df2, field=field)
cos, jaccard, compression, containment = measure_distance(df1, df2, field=field, print_values=False)

# Print the file names and metrics
print(f"{unique_train[i, 0]} - {unique_analyze[i, 0]}, "
Expand Down
2 changes: 1 addition & 1 deletion loglead/OOV_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def predict(self, X_test):
# Give array of 0s if the needed length column is lacking in the df
if self.len_col not in self.test_df.columns:
print("Column not found for OOVD")
return np.zeros(self.test_df.select(pl.count()).item())
return np.zeros(self.test_df.select(pl.len()).item())
else:
msglen = self.test_df[self.len_col]
test_word_count_np = np.array(X_test.tocsr().sum(axis=1)).squeeze()
Expand Down
23 changes: 12 additions & 11 deletions loglead/enhancers/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def words(self, column="m_message"):
if "e_words" not in self.df.columns:
self.df = self.df.with_columns(pl.col(column).str.split(by=" ").alias("e_words"))
self.df = self.df.with_columns(
e_words_len = pl.col("e_words").list.lengths(),
e_words_len = pl.col("e_words").list.len(),
)
else:
print("e_words already found")
Expand All @@ -58,7 +58,7 @@ def alphanumerics(self, column="m_message"):
pl.col(column).str.extract_all(r"[a-zA-Z\d]+").alias("e_alphanumerics")
)
self.df = self.df.with_columns(
e_alphanumerics_len = pl.col("e_alphanumerics").list.lengths(),
e_alphanumerics_len = pl.col("e_alphanumerics").list.len(),
)
return self.df

Expand All @@ -75,7 +75,7 @@ def trigrams(self, column="m_message"):
lambda mes: self._create_cngram(message=mes, ngram=3), return_dtype=pl.List(pl.Utf8)).alias("e_trigrams")
)
self.df = self.df.with_columns(
e_trigrams_len = pl.col("e_trigrams").list.lengths()
e_trigrams_len = pl.col("e_trigrams").list.len()
)
return self.df

Expand Down Expand Up @@ -159,7 +159,8 @@ def parse_ael(self,field = "e_message_normalized", reparse=False):
self.df = pl.concat([self.df, df_new], how="horizontal")
return self.df

#New parser not yet released to public. Coming early 2024
#See https://pypi.org/project/tipping/
#and https://arxiv.org/abs/2408.00645
def parse_tip(self, field = "e_message_normalized", reparse=False, templates=False):
self._handle_prerequisites([field])
if reparse or "e_event_tip_id" not in self.df.columns:
Expand All @@ -168,7 +169,7 @@ def parse_tip(self, field = "e_message_normalized", reparse=False, templates=Fal
import tipping as tip #See https://pypi.org/project/tipping/
if "row_nr" in self.df.columns:
self.df = self.df.drop("row_nr")
self.df = self.df.with_row_count()
self.df = self.df.with_row_index("row_nr", )
tipping_clusters, tipping_masks, tipping_templates = tip.parse(self.df[field], return_templates=templates, return_masks=False)
if templates:
df_new = pl.DataFrame(
Expand Down Expand Up @@ -205,7 +206,7 @@ def parse_iplom(self, field = "e_message_normalized", reparse=False, CT=0.35, PS
self.df = self.df.drop("e_event_iplom_id")
if "row_nr" in self.df.columns:
self.df = self.df.drop("row_nr")
self.df = self.df.with_row_count()
self.df = self.df.with_row_index("row_nr", )
from loglead.parsers import IPLoMParser
#TODO Storing each parser in self might eat a lot of memeory
iplom_parser = IPLoMParser(messages=self.df[field], CT=CT, PST=PST, lowerBound=lower_bound)#FST not implemented
Expand Down Expand Up @@ -233,7 +234,7 @@ def parse_pliplom(self, field = "e_message_normalized", reparse=False, CT=0.35,
self.df = self.df.drop("e_event_pliplom_id")
if "row_nr" in self.df.columns:
self.df = self.df.drop("row_nr")
self.df = self.df.with_row_count()
self.df = self.df.with_row_index("row_nr", )
from loglead.parsers import PL_IPLoMParser
pliplom_parser = PL_IPLoMParser(self.df, CT=CT, FST=FST, PST=PST, lower_bound=lower_bound, single_outlier_event=single_outlier_event)
df_new = pliplom_parser.parse()
Expand All @@ -254,7 +255,7 @@ def parse_lenma(self, field = "e_message_normalized", reparse=False):
lenma_tm = LenmaTemplateManager(threshold=0.9)
if "row_nr" in self.df.columns:
self.df = self.df.drop("row_nr")
self.df = self.df.with_row_count()
self.df = self.df.with_row_index("row_nr", )
self.df = self.df.with_columns(
lenma_obj=pl.struct(["e_words", "row_nr"])
.map_elements(lambda x:lenma_tm.infer_template(x["e_words"], x["row_nr"]), return_dtype=pl.Object))
Expand Down Expand Up @@ -327,7 +328,7 @@ def length(self, column="m_message"):
self._handle_prerequisites(["m_message"])
if "e_chars_len" not in self.df.columns:
self.df = self.df.with_columns(
e_chars_len=pl.col(column).str.n_chars(),
e_chars_len=pl.col(column).str.len_chars(),
e_lines_len=pl.col(column).str.count_matches(r"(\n|\r|\r\n)"),
e_event_id_len = 1 #Messages are always one event. Added to simplify code later on.

Expand Down Expand Up @@ -387,7 +388,7 @@ def item_cumsum2(self, column="e_message_normalized", chronological_order=1, ano
# In my tests cumsum needs a column in the table
self.df = self.df.with_columns(condition.cast(pl.Int32).alias('count_support'))
self.df = self.df.with_columns(
pl.col('count_support').cumsum().alias(column_name)
pl.col('count_support').cum_sum().alias(column_name)
)
self.df = self.df.drop('count_support')

Expand Down Expand Up @@ -416,7 +417,7 @@ def item_cumsum(self, column="e_message_normalized", chronological_order=True, a
# In my tests cumsum needs a column in the table
self.df = self.df.with_columns(condition.cast(pl.Int32).alias('count_support'))
self.df = self.df.with_columns(
pl.col('count_support').cumsum().alias(column_name)
pl.col('count_support').cum_sum().alias(column_name)
)
self.df = self.df.drop('count_support')

Expand Down
4 changes: 2 additions & 2 deletions loglead/enhancers/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def time_stamp(self):

def seq_len(self):
# Count the number of rows for each seq_id
df_temp = self.df.group_by('seq_id').agg(pl.count().alias('seq_len'))
df_temp = self.df.group_by('seq_id').agg(pl.len().alias('seq_len'))
# Join this result with df_sequences on seq_id
self.df_seq = self.df_seq.join(df_temp, on='seq_id')
# Add an alias that is compatible with the token len naming.
Expand Down Expand Up @@ -60,7 +60,7 @@ def duration(self):
# Calculate the sequence duration for each seq_id as the difference between max and min timestamps
df_temp = self.df.group_by('seq_id').agg(
(pl.col('m_timestamp').max() - pl.col('m_timestamp').min()).alias('duration'),
(pl.col('m_timestamp').max() - pl.col('m_timestamp').min()).dt.seconds().alias('duration_sec')
(pl.col('m_timestamp').max() - pl.col('m_timestamp').min()).dt.total_seconds().alias('duration_sec')
)
# Join this result with df_sequences on seq_id
self.df_seq = self.df_seq.join(df_temp, on='seq_id')
Expand Down
2 changes: 1 addition & 1 deletion loglead/loaders/adfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def preprocess(self):
# Here the sequence based dataframe is created
self.df_seq = self.df.with_columns(
(pl.col('label') != "Normal").alias('is_anomaly')
).groupby('seq_id').agg([
).group_by('seq_id').agg([
pl.col('m_message'), # Aggregates all 'm_message' into a list
pl.any('is_anomaly').alias('anomaly'), # Checks if there's any label not equal to 'Normal'
(~pl.any('is_anomaly')).alias('normal') # Adds the opposite of 'anomaly' as 'normal'
Expand Down
6 changes: 3 additions & 3 deletions loglead/loaders/awsctd.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ def preprocess(self):
)

self.df_seq = self.df_seq.with_columns(
pl.col('m_message').apply(lambda x: x[-1] if len(x) > 0 else None, return_dtype=pl.String).alias('label')
pl.col('m_message').map_elements(lambda x: x[-1] if len(x) > 0 else None, return_dtype=pl.String).alias('label')
)
self.df_seq = self.df_seq.with_columns(
pl.col('m_message').apply(lambda x: x[:-1] if len(x) > 1 else None, return_dtype=pl.List(pl.String))
pl.col('m_message').map_elements(lambda x: x[:-1] if len(x) > 1 else None, return_dtype=pl.List(pl.String))
)
self.df_seq = self.df_seq.with_columns(
pl.col('label').apply(lambda label: "Normal" if label == "Clean" else label, return_dtype=pl.String).alias('label')
pl.col('label').map_elements(lambda label: "Normal" if label == "Clean" else label, return_dtype=pl.String).alias('label')
)

# Explode the 'split_message' while retaining 'seq_id' and 'label' for each exploded item
Expand Down
7 changes: 3 additions & 4 deletions loglead/loaders/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def load(self):
for file in glob.glob(file_pattern):
try:
q = pl.scan_csv(file, has_header=False, infer_schema_length=0,
separator=self._csv_separator, row_count_name="row_nr_per_file")
separator=self._csv_separator, row_index_name="row_nr_per_file")
q = q.with_columns(
pl.lit(seq_id).alias('seq_id'), #Folder is seq_id
pl.lit(os.path.basename(file)).alias('seq_id_sub') #File is seq_id_sub
Expand All @@ -65,11 +65,11 @@ def _merge_multiline_entries(self):


# Generate groups by taking a cumulative sum over the flag. This will group multi-line entries together.
self.df = self.df.with_columns(pl.col("flag").cumsum().alias("group"))
self.df = self.df.with_columns(pl.col("flag").cum_sum().alias("group"))
# Calculate number of lines in each group

# Merge the entries in each group
df_grouped = self.df.groupby("group").agg(
df_grouped = self.df.group_by("group").agg(
pl.col("column_1").str.concat("\n").alias("column_1"),
pl.col("seq_id").first().alias("seq_id"),
pl.col("seq_id_sub").first().alias("seq_id_sub"),
Expand Down Expand Up @@ -143,4 +143,3 @@ def _parse_datetimes(self):
pl.col("m_timestamp").str.strptime(pl.Datetime, "%Y-%m-%d%H:%M:%S,%3f", strict=False)
)
self.df = self.df.with_columns(parsed_times)

15 changes: 7 additions & 8 deletions loglead/loaders/nezha.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def load_trace(self, folder_path, date_str, queries, ano_folder):
file_pattern = os.path.join(folder_path, "*.csv")
for file in glob.glob(file_pattern):
try:
q = pl.scan_csv(file, has_header=True, separator=",", row_count_name="row_nr_per_file")
q = pl.scan_csv(file, has_header=True, separator=",", row_index_name="row_nr_per_file")
q = q.with_columns(
pl.lit(date_str).alias('date_folder'), # Folder is date info
pl.lit(os.path.basename(file)).alias('file_name'), # File name storage
Expand All @@ -202,7 +202,7 @@ def load_log(self, folder_path, date_str, queries, ano_folder):
for file in glob.glob(file_pattern):
try:
q = pl.scan_csv(file, has_header=True, infer_schema_length=0, separator=",",
row_count_name="row_nr_per_file", truncate_ragged_lines=True)
row_index_name="row_nr_per_file", truncate_ragged_lines=True)

q = q.with_columns(
pl.lit(date_str).alias('date_folder'), # Folder is date info
Expand All @@ -227,8 +227,8 @@ def preprocess(self):
self.parse_timestamps()
self.process_metrics()
self.add_labels_to_metrics()
self.df = self.df.with_row_count()
self.df_trace = self.df_trace.with_row_count()
self.df = self.df.with_row_index("row_nr", )
self.df_trace = self.df_trace.with_row_index("row_nr", )
self.df = self.add_labels_to_df(self.df)
self.df_trace = self.add_labels_to_df(self.df_trace)

Expand All @@ -248,7 +248,7 @@ def process_metrics(self):
(pl.col("m_timestamp") + pl.duration(minutes=3)).alias("m_timestamp+3"),
(pl.col("m_timestamp") + pl.duration(minutes=4)).alias("m_timestamp+4"),
)
self.df_metric_default = self.df_metric_default.with_row_count()
self.df_metric_default = self.df_metric_default.with_row_index("row_nr", )
column_names = [
"CpuUsage(m)", "CpuUsageRate(%)", "MemoryUsage(Mi)", "MemoryUsageRate(%)",
"SyscallRead","SyscallWrite","NetworkReceiveBytes", "NetworkTransmitBytes",
Expand All @@ -261,7 +261,7 @@ def process_metrics(self):
self.df_metric_default = self.df_metric_default.cast({col:pl.Float64})

def _extract_log_message(self):
self.df = self.df.with_row_count("row_key")#Used for matching later
self.df = self.df.with_row_index("row_key")#Used for matching later
#Splitting criteria. We have valid json and invalid flag
if self.system == "WebShop":

Expand Down Expand Up @@ -388,7 +388,6 @@ def _extract_log_message(self):
#df_t3 = df_t3.drop(["message", "redu1", "redu2", "message_part", "SpanID", "normal_json"])
df_t3 = df_t3.select(["row_key", "severity", "m_message"])
self.df =self.df.join(df_t3, "row_key", "left")
self.df = self.df.drop(["normal_json"])

#Epoch is corrupted using human readable format
# https://github.com/IntelligentDDS/Nezha/issues/8
Expand Down Expand Up @@ -534,7 +533,7 @@ def add_labels_to_df_seq(self, df_for_agg):
#Also system type
df_seq = df_for_agg.select(pl.col("seq_id")).unique()
df_temp = df_for_agg.group_by('seq_id').agg(
(pl.col("anomaly").sum()/ pl.count()).alias("ano_count"))
(pl.col("anomaly").sum()/ pl.len()).alias("ano_count"))
# Join this result with df_sequences on seq_id
df_seq = df_seq.join(df_temp, on='seq_id')
return df_seq
Expand Down
4 changes: 1 addition & 3 deletions loglead/loaders/pro.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ class ProLoader(BaseLoader):
def load(self):
queries = []
for file in glob.glob(self.filename):
try:
if os.path.getsize(file) > 0: #some files can be empty
q = pl.scan_csv(file, has_header=False, infer_schema_length=0, separator=self._csv_separator)
q = q.with_columns((pl.lit(os.path.basename(file))).alias('seq_id'))
queries.append(q)
except pl.exceptions.NoDataError: # some CSV files can be empty.
continue
dataframes = pl.collect_all(queries)
self.df = pl.concat(dataframes)

Expand Down
4 changes: 2 additions & 2 deletions loglead/loaders/supercomputers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def _split_component_and_pid(self):
component_and_pid = component_and_pid.alias("fields")
component_and_pid = component_and_pid.to_frame()
component_and_pid = component_and_pid.unnest("fields")
component_and_pid = component_and_pid.with_columns(pl.col("component").str.rstrip(":"))
component_and_pid = component_and_pid.with_columns(pl.col("pid").str.rstrip("]:"))
component_and_pid = component_and_pid.with_columns(pl.col("component").str.strip_chars_end(":"))
component_and_pid = component_and_pid.with_columns(pl.col("pid").str.strip_chars_end("]:"))
self.df = pl.concat([self.df, component_and_pid], how="horizontal")
self.df = self.df.drop("component_pid")
self.df = self.df.select(["label", "timestamp", "date", "userid", "month",
Expand Down
Loading

0 comments on commit c31a819

Please sign in to comment.