Skip to content

Commit

Permalink
fix: raw loader
Browse files Browse the repository at this point in the history
  • Loading branch information
mmantyla authored and bakhtos committed Sep 26, 2024
1 parent 2ddb8fc commit c243a36
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
8 changes: 4 additions & 4 deletions demo/RawLoader_NoLabels.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def write_seqs_to_files(df_redu_seq, output_dir):



def load_and_enhance (file: str, multi_file = False):
def load_and_enhance (file: str, pattern = None):
"""
Loads raw data from a file and enhances it by normalizing and splitting to words.
Expand All @@ -134,7 +134,7 @@ def load_and_enhance (file: str, multi_file = False):
Returns:
- pl.DataFrame: The enhanced DataFrame with normalized and processed event log messages.
"""
loader = RawLoader(file, multi_file)
loader = RawLoader(file, filename_pattern=pattern)
df = loader.execute()
#Normalize data.
enhancer = EventLogEnhancer(df)
Expand Down Expand Up @@ -279,9 +279,9 @@ def measure_distance_random_files(df_train, df_analyze, field="e_message_normali

#HDFS
create_seq_in_file_HDFS()
df_train = load_and_enhance("HDFS_train/*.log", multi_file=True)
df_train = load_and_enhance("HDFS_train", pattern="*.log")
train_line_models(df_train)
df_analyze = load_and_enhance("HDFS_test/*.log", multi_file=True)
df_analyze = load_and_enhance("HDFS_test", pattern="*.log")
analyse_with_line_models(df_analyze)
measure_distance_random_files(df_train, df_analyze, field="e_message_normalized")
measure_distance_random_files(df_train, df_analyze, field="m_message")
Expand Down
44 changes: 27 additions & 17 deletions loglead/loaders/raw.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
import polars as pl
import glob
import os

import warnings
from .base import BaseLoader

__all__ = ['RawLoader']


# Processor for an arbitrary log file. One log event per line.
# Processor for an arbitrary log file. One log event per line.
# You are using RawLoader. This results in a DataFrame with a single column only titled: 'm_message'.
# Consider implementing a dataset-specific loader for better functionality.
class RawLoader(BaseLoader):
def __init__(self, filename, multi_file=False):
self.multi_file = multi_file
def __init__(self, filename, filename_pattern=None, min_file_size=0, strip_full_data_path=None):
self.min_file_size = min_file_size
self.filename_pattern = filename_pattern
self.strip_full_data_prefix = strip_full_data_path
super().__init__(filename)

def load(self):
print(f"WARNING! You are using RawLoader. This results in dataframe with single column only titled: m_message. "
f"Consider implementing dataset specific loader")
if self.multi_file:
force_schema = {'column_1': pl.String} # We should not need this infer_schem = False should be enough. However, it is not.
if self.filename_pattern: #self.nested
queries = []
for file in glob.glob(self.filename):
try:
q = pl.scan_csv(file, has_header=False, infer_schema_length=0, separator=self._csv_separator)
q = q.with_columns((pl.lit(os.path.basename(file))).alias('file_name'))
queries.append(q)
except pl.exceptions.NoDataError: # some CSV files can be empty.
continue
for subdir, _, _ in os.walk(self.filename):
#seq_id = os.path.basename(subdir)
file_pattern = os.path.join(subdir, self.filename_pattern)
# Iterate over all files in the subdirectory that match the given pattern
for file in glob.glob(file_pattern):
if os.path.getsize(file) > self.min_file_size:
#These should be the default settings also in other loaders
q = pl.scan_csv(file, has_header=False, schema = force_schema, infer_schema=False, quote_char=None,
separator=self._csv_separator,
encoding="utf8-lossy", include_file_paths="file_name", truncate_ragged_lines=True)
if self.strip_full_data_prefix:
q = q.with_columns(pl.col("file_name").str.strip_prefix(self.strip_full_data_prefix))
queries.append(q)
dataframes = pl.collect_all(queries)
self.df = pl.concat(dataframes)

else:
self.df = pl.read_csv(self.filename, has_header=False, infer_schema_length=0,
separator=self._csv_separator, ignore_errors=True)
self.df = pl.read_csv(self.filename, has_header=False, schema = force_schema, infer_schema=False, quote_char=None,
separator=self._csv_separator, encoding="utf8-lossy",truncate_ragged_lines=True)

self.df = self.df.rename({"column_1": "m_message"})

Expand Down

0 comments on commit c243a36

Please sign in to comment.