diff --git a/README.md b/README.md index 99102bb..2f88a49 100644 --- a/README.md +++ b/README.md @@ -48,14 +48,17 @@ configuration is stored in the `setup.cfg` file. ### Installation -To install the application system-wide use a PEP517 compatible build tool. E.g. -this can be done with [build](https://github.com/pypa/build) inside the project -directory: - -```sh -pip install build -python -m build -pip install dist/it_jobs_meta*.whl +You can install this application with pip, either from a local repository: + +``` +git clone git@github.com:maciejzj/it-jobs-meta.github +pip install ./it_jobs_meta +``` + +or directly from the GitHub repository with: + +``` +pip install git+git@github.com:maciejzj/it-jobs-meta.git ``` All runtime dependencies will be installed alongside the application. From now, diff --git a/it_jobs_meta/__main__.py b/it_jobs_meta/__main__.py index 0ea16e2..6de9e45 100644 --- a/it_jobs_meta/__main__.py +++ b/it_jobs_meta/__main__.py @@ -6,14 +6,19 @@ DashboardApp, DashboardDataProviderFactory, ) +from it_jobs_meta.data_pipeline.data_etl import EtlLoaderFactory from it_jobs_meta.data_pipeline.data_lake import DataLakeFactory from it_jobs_meta.data_pipeline.data_pipeline import DataPipeline -from it_jobs_meta.data_pipeline.data_etl import EtlLoaderFactory def main(): parser = CliArgumentParser() - setup_logging(parser.args['log_path']) + setup_logging( + parser.args['log_path'], + log_level=CliArgumentParser.LOG_LEVEL_OPTIONS[ + parser.args['log_level'] + ], + ) match parser.args['command']: case 'pipeline': diff --git a/it_jobs_meta/common/cli.py b/it_jobs_meta/common/cli.py index e2817f2..0d9768c 100644 --- a/it_jobs_meta/common/cli.py +++ b/it_jobs_meta/common/cli.py @@ -1,12 +1,13 @@ """Command line parser for the it-jobs-meta application.""" import argparse +import logging from pathlib import Path from typing import Any from it_jobs_meta.dashboard.dashboard import DashboardProviderImpl -from it_jobs_meta.data_pipeline.data_lake import DataLakeImpl from it_jobs_meta.data_pipeline.data_etl import EtlLoaderImpl +from it_jobs_meta.data_pipeline.data_lake import DataLakeImpl class CliArgumentParser: @@ -21,6 +22,13 @@ class CliArgumentParser: 'data lake, load processed data to the data warehouse.' ) DASHBOARD_DESCRIPTION = 'Run data visualization dashboard server.' + LOG_LEVEL_OPTIONS = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'critical': logging.CRITICAL, + 'error': logging.ERROR, + } def __init__(self): self._args: dict[str, Any] | None = None @@ -92,6 +100,14 @@ def extract_data_provider(self) -> tuple[DashboardProviderImpl, Path]: ) def _build_main_command(self): + self._parser.add_argument( + '-v', + '--log-level', + default='info', + type=str, + choices=self.LOG_LEVEL_OPTIONS.keys(), + help='set verbosity/log level of the program', + ) self._parser.add_argument( '-l', '--log-path', diff --git a/it_jobs_meta/common/utils.py b/it_jobs_meta/common/utils.py index 4287b35..9486806 100644 --- a/it_jobs_meta/common/utils.py +++ b/it_jobs_meta/common/utils.py @@ -8,7 +8,7 @@ import yaml -def setup_logging(*args: Path): +def setup_logging(*args: Path, log_level: int = logging.INFO): """Enable logging to stdout and the given files. :param *args: Paths to log output files. @@ -19,7 +19,7 @@ def setup_logging(*args: Path): log_file_handlers.append(logging.FileHandler(log_path)) logging.basicConfig( - level=logging.INFO, + level=log_level, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ *log_file_handlers, diff --git a/it_jobs_meta/dashboard/dashboard.py b/it_jobs_meta/dashboard/dashboard.py index f8ad531..ad58f10 100644 --- a/it_jobs_meta/dashboard/dashboard.py +++ b/it_jobs_meta/dashboard/dashboard.py @@ -34,9 +34,10 @@ def __init__( @property def app(self) -> dash.Dash: if self._app is None: + dashboard_module_path = Path(__file__).parent self._app = dash.Dash( 'it-jobs-meta-dashboard', - assets_folder='it_jobs_meta/dashboard/assets', + assets_folder=dashboard_module_path / 'assets', external_stylesheets=[ dbc.themes.BOOTSTRAP, dbc.icons.FONT_AWESOME, diff --git a/it_jobs_meta/dashboard/dashboard_components.py b/it_jobs_meta/dashboard/dashboard_components.py index d20620e..53383f5 100644 --- a/it_jobs_meta/dashboard/dashboard_components.py +++ b/it_jobs_meta/dashboard/dashboard_components.py @@ -242,7 +242,7 @@ def make_fig(cls, postings_df: pd.DataFrame) -> go.Figure: @GraphRegistry.register(key=Graph.SALARIES_MAP) class SalariesMap(GraphFigure): TITLE = 'Mean salary by location (PLN)' - MIN_CITY_FREQ = 25 + N_MOST_FREQ = 15 @classmethod def make_fig(cls, postings_df) -> go.Figure: @@ -251,6 +251,9 @@ def make_fig(cls, postings_df) -> go.Figure: lambda city: pd.Series([city[0], city[1], city[2]]) ) + postings_df = get_rows_with_n_most_frequent_vals_in_col( + postings_df, 'city', cls.N_MOST_FREQ + ) job_counts = postings_df.groupby('city')['_id'].count() salaries = postings_df.groupby('city')[ ['salary_mean', 'lat', 'lon'] @@ -258,8 +261,6 @@ def make_fig(cls, postings_df) -> go.Figure: cities_salaries = pd.concat( [job_counts.rename('job_counts'), salaries], axis=1 ) - more_than_min = cities_salaries['job_counts'] > cls.MIN_CITY_FREQ - cities_salaries = cities_salaries[more_than_min] cities_salaries = cities_salaries.reset_index() fig = px.scatter_geo( @@ -367,6 +368,12 @@ def make_fig( limited['seniority'].isin(('Junior', 'Mid', 'Senior')) ] limited = sort_by_seniority(limited) + # Plotly has problems with creating violin plots if there are too few + # samples, we filter out seniority and technology paris for which + # there aren't enough data points to make a nice curve + limited = limited.groupby(['seniority', 'technology']).filter( + lambda x: x['technology'].count() > 3 + ) fig = px.violin( limited, diff --git a/it_jobs_meta/data_pipeline/data_etl.py b/it_jobs_meta/data_pipeline/data_etl.py index 575756e..08918d3 100644 --- a/it_jobs_meta/data_pipeline/data_etl.py +++ b/it_jobs_meta/data_pipeline/data_etl.py @@ -61,6 +61,14 @@ class EtlTransformationEngine(Generic[ProcessDataType], ABC): 'referralBonusCurrency', ] + # Look at the ETL pipeline implementation to see the prefined order of the + # string formatting operations in columns. + COLS_TO_LOWER = { + 'technology', + 'category', + } + + # In any column replace these strings VALS_TO_REPLACE = { 'node.js': 'node', 'angular': 'javascript', @@ -96,6 +104,10 @@ def drop_unwanted(self, data: ProcessDataType) -> ProcessDataType: def drop_duplicates(self, data: ProcessDataType) -> ProcessDataType: """Drop duplicated rows in the dataset.""" + @abstractmethod + def unify_to_lower(self, data: ProcessDataType) -> ProcessDataType: + """Unify strings to lower capitalization.""" + @abstractmethod def replace_values(self, data: ProcessDataType) -> ProcessDataType: """Replace values specified in COLS_TO_DROP.""" @@ -192,6 +204,7 @@ def transform(self, data: ProcessDataType) -> ProcessDataType: data = self._transformation_engine.extract_locations(data) data = self._transformation_engine.extract_contract_type(data) data = self._transformation_engine.extract_salaries(data) + data = self._transformation_engine.unify_to_lower(data) data = self._transformation_engine.replace_values(data) data = self._transformation_engine.to_title_case(data) data = self._transformation_engine.to_capitalized(data) @@ -255,6 +268,13 @@ def drop_unwanted(self, data: pd.DataFrame) -> pd.DataFrame: def drop_duplicates(self, data: pd.DataFrame) -> pd.DataFrame: return data[~data.index.duplicated(keep='first')] + def unify_to_lower(self, data: pd.DataFrame) -> pd.DataFrame: + for col in EtlTransformationEngine.COLS_TO_LOWER: + data[col] = data[col][data[col].notna()].transform( + lambda s: s.lower() + ) + return data + def replace_values(self, data: pd.DataFrame) -> pd.DataFrame: return data.replace(to_replace=EtlTransformationEngine.VALS_TO_REPLACE) @@ -267,13 +287,10 @@ def to_title_case(self, data: pd.DataFrame) -> pd.DataFrame: def to_capitalized(self, data: pd.DataFrame) -> pd.DataFrame: specials = EtlTransformationEngine.CAPITALIZE_SPECIAL_NAMES - - def transform_func(s: str) -> str: - s = s.lower() - return specials[s] if s in specials else s.capitalize() - for col in EtlTransformationEngine.COLS_TO_CAPITALIZE: - data[col] = data[col][data[col].notna()].transform(transform_func) + data[col] = data[col][data[col].notna()].transform( + lambda s: specials[s] if s in specials else s.capitalize() + ) return data def extract_remote(self, data: pd.DataFrame) -> pd.DataFrame: diff --git a/it_jobs_meta/data_pipeline/data_pipeline.py b/it_jobs_meta/data_pipeline/data_pipeline.py index 26a4b1b..f425a10 100644 --- a/it_jobs_meta/data_pipeline/data_pipeline.py +++ b/it_jobs_meta/data_pipeline/data_pipeline.py @@ -7,10 +7,6 @@ import croniter -from it_jobs_meta.data_pipeline.data_ingestion import ( - NoFluffJobsPostingsDataSource, -) -from it_jobs_meta.data_pipeline.data_lake import DataLakeFactory from it_jobs_meta.data_pipeline.data_etl import ( EtlLoaderFactory, EtlPipeline, @@ -18,6 +14,10 @@ PandasEtlMongodbLoadingEngine, PandasEtlTransformationEngine, ) +from it_jobs_meta.data_pipeline.data_ingestion import ( + NoFluffJobsPostingsDataSource, +) +from it_jobs_meta.data_pipeline.data_lake import DataLakeFactory class DataPipeline: