Skip to content

Commit

Permalink
Merge pull request #48 from maciejzj/develop
Browse files Browse the repository at this point in the history
Release changes v0.1.0 to master

Changelog included in the release git tag.
  • Loading branch information
maciejzj authored Nov 1, 2022
2 parents a051e29 + 167ab4c commit 2daf4ba
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 27 deletions.
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ configuration is stored in the `setup.cfg` file.

### Installation

To install the application system-wide use a PEP517 compatible build tool. E.g.
this can be done with [build](https://github.com/pypa/build) inside the project
directory:

```sh
pip install build
python -m build
pip install dist/it_jobs_meta*.whl
You can install this application with pip, either from a local repository:

```
git clone [email protected]:maciejzj/it-jobs-meta.github
pip install ./it_jobs_meta
```

or directly from the GitHub repository with:

```
pip install [email protected]:maciejzj/it-jobs-meta.git
```

All runtime dependencies will be installed alongside the application. From now,
Expand Down
9 changes: 7 additions & 2 deletions it_jobs_meta/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
DashboardApp,
DashboardDataProviderFactory,
)
from it_jobs_meta.data_pipeline.data_etl import EtlLoaderFactory
from it_jobs_meta.data_pipeline.data_lake import DataLakeFactory
from it_jobs_meta.data_pipeline.data_pipeline import DataPipeline
from it_jobs_meta.data_pipeline.data_etl import EtlLoaderFactory


def main():
parser = CliArgumentParser()
setup_logging(parser.args['log_path'])
setup_logging(
parser.args['log_path'],
log_level=CliArgumentParser.LOG_LEVEL_OPTIONS[
parser.args['log_level']
],
)

match parser.args['command']:
case 'pipeline':
Expand Down
18 changes: 17 additions & 1 deletion it_jobs_meta/common/cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Command line parser for the it-jobs-meta application."""

import argparse
import logging
from pathlib import Path
from typing import Any

from it_jobs_meta.dashboard.dashboard import DashboardProviderImpl
from it_jobs_meta.data_pipeline.data_lake import DataLakeImpl
from it_jobs_meta.data_pipeline.data_etl import EtlLoaderImpl
from it_jobs_meta.data_pipeline.data_lake import DataLakeImpl


class CliArgumentParser:
Expand All @@ -21,6 +22,13 @@ class CliArgumentParser:
'data lake, load processed data to the data warehouse.'
)
DASHBOARD_DESCRIPTION = 'Run data visualization dashboard server.'
LOG_LEVEL_OPTIONS = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'critical': logging.CRITICAL,
'error': logging.ERROR,
}

def __init__(self):
self._args: dict[str, Any] | None = None
Expand Down Expand Up @@ -92,6 +100,14 @@ def extract_data_provider(self) -> tuple[DashboardProviderImpl, Path]:
)

def _build_main_command(self):
self._parser.add_argument(
'-v',
'--log-level',
default='info',
type=str,
choices=self.LOG_LEVEL_OPTIONS.keys(),
help='set verbosity/log level of the program',
)
self._parser.add_argument(
'-l',
'--log-path',
Expand Down
4 changes: 2 additions & 2 deletions it_jobs_meta/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import yaml


def setup_logging(*args: Path):
def setup_logging(*args: Path, log_level: int = logging.INFO):
"""Enable logging to stdout and the given files.
:param *args: Paths to log output files.
Expand All @@ -19,7 +19,7 @@ def setup_logging(*args: Path):
log_file_handlers.append(logging.FileHandler(log_path))

logging.basicConfig(
level=logging.INFO,
level=log_level,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
*log_file_handlers,
Expand Down
3 changes: 2 additions & 1 deletion it_jobs_meta/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ def __init__(
@property
def app(self) -> dash.Dash:
if self._app is None:
dashboard_module_path = Path(__file__).parent
self._app = dash.Dash(
'it-jobs-meta-dashboard',
assets_folder='it_jobs_meta/dashboard/assets',
assets_folder=dashboard_module_path / 'assets',
external_stylesheets=[
dbc.themes.BOOTSTRAP,
dbc.icons.FONT_AWESOME,
Expand Down
13 changes: 10 additions & 3 deletions it_jobs_meta/dashboard/dashboard_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def make_fig(cls, postings_df: pd.DataFrame) -> go.Figure:
@GraphRegistry.register(key=Graph.SALARIES_MAP)
class SalariesMap(GraphFigure):
TITLE = 'Mean salary by location (PLN)'
MIN_CITY_FREQ = 25
N_MOST_FREQ = 15

@classmethod
def make_fig(cls, postings_df) -> go.Figure:
Expand All @@ -251,15 +251,16 @@ def make_fig(cls, postings_df) -> go.Figure:
lambda city: pd.Series([city[0], city[1], city[2]])
)

postings_df = get_rows_with_n_most_frequent_vals_in_col(
postings_df, 'city', cls.N_MOST_FREQ
)
job_counts = postings_df.groupby('city')['_id'].count()
salaries = postings_df.groupby('city')[
['salary_mean', 'lat', 'lon']
].mean()
cities_salaries = pd.concat(
[job_counts.rename('job_counts'), salaries], axis=1
)
more_than_min = cities_salaries['job_counts'] > cls.MIN_CITY_FREQ
cities_salaries = cities_salaries[more_than_min]
cities_salaries = cities_salaries.reset_index()

fig = px.scatter_geo(
Expand Down Expand Up @@ -367,6 +368,12 @@ def make_fig(
limited['seniority'].isin(('Junior', 'Mid', 'Senior'))
]
limited = sort_by_seniority(limited)
# Plotly has problems with creating violin plots if there are too few
# samples, we filter out seniority and technology paris for which
# there aren't enough data points to make a nice curve
limited = limited.groupby(['seniority', 'technology']).filter(
lambda x: x['technology'].count() > 3
)

fig = px.violin(
limited,
Expand Down
29 changes: 23 additions & 6 deletions it_jobs_meta/data_pipeline/data_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ class EtlTransformationEngine(Generic[ProcessDataType], ABC):
'referralBonusCurrency',
]

# Look at the ETL pipeline implementation to see the prefined order of the
# string formatting operations in columns.
COLS_TO_LOWER = {
'technology',
'category',
}

# In any column replace these strings
VALS_TO_REPLACE = {
'node.js': 'node',
'angular': 'javascript',
Expand Down Expand Up @@ -96,6 +104,10 @@ def drop_unwanted(self, data: ProcessDataType) -> ProcessDataType:
def drop_duplicates(self, data: ProcessDataType) -> ProcessDataType:
"""Drop duplicated rows in the dataset."""

@abstractmethod
def unify_to_lower(self, data: ProcessDataType) -> ProcessDataType:
"""Unify strings to lower capitalization."""

@abstractmethod
def replace_values(self, data: ProcessDataType) -> ProcessDataType:
"""Replace values specified in COLS_TO_DROP."""
Expand Down Expand Up @@ -192,6 +204,7 @@ def transform(self, data: ProcessDataType) -> ProcessDataType:
data = self._transformation_engine.extract_locations(data)
data = self._transformation_engine.extract_contract_type(data)
data = self._transformation_engine.extract_salaries(data)
data = self._transformation_engine.unify_to_lower(data)
data = self._transformation_engine.replace_values(data)
data = self._transformation_engine.to_title_case(data)
data = self._transformation_engine.to_capitalized(data)
Expand Down Expand Up @@ -255,6 +268,13 @@ def drop_unwanted(self, data: pd.DataFrame) -> pd.DataFrame:
def drop_duplicates(self, data: pd.DataFrame) -> pd.DataFrame:
return data[~data.index.duplicated(keep='first')]

def unify_to_lower(self, data: pd.DataFrame) -> pd.DataFrame:
for col in EtlTransformationEngine.COLS_TO_LOWER:
data[col] = data[col][data[col].notna()].transform(
lambda s: s.lower()
)
return data

def replace_values(self, data: pd.DataFrame) -> pd.DataFrame:
return data.replace(to_replace=EtlTransformationEngine.VALS_TO_REPLACE)

Expand All @@ -267,13 +287,10 @@ def to_title_case(self, data: pd.DataFrame) -> pd.DataFrame:

def to_capitalized(self, data: pd.DataFrame) -> pd.DataFrame:
specials = EtlTransformationEngine.CAPITALIZE_SPECIAL_NAMES

def transform_func(s: str) -> str:
s = s.lower()
return specials[s] if s in specials else s.capitalize()

for col in EtlTransformationEngine.COLS_TO_CAPITALIZE:
data[col] = data[col][data[col].notna()].transform(transform_func)
data[col] = data[col][data[col].notna()].transform(
lambda s: specials[s] if s in specials else s.capitalize()
)
return data

def extract_remote(self, data: pd.DataFrame) -> pd.DataFrame:
Expand Down
8 changes: 4 additions & 4 deletions it_jobs_meta/data_pipeline/data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

import croniter

from it_jobs_meta.data_pipeline.data_ingestion import (
NoFluffJobsPostingsDataSource,
)
from it_jobs_meta.data_pipeline.data_lake import DataLakeFactory
from it_jobs_meta.data_pipeline.data_etl import (
EtlLoaderFactory,
EtlPipeline,
PandasEtlExtractionFromJsonStr,
PandasEtlMongodbLoadingEngine,
PandasEtlTransformationEngine,
)
from it_jobs_meta.data_pipeline.data_ingestion import (
NoFluffJobsPostingsDataSource,
)
from it_jobs_meta.data_pipeline.data_lake import DataLakeFactory


class DataPipeline:
Expand Down

0 comments on commit 2daf4ba

Please sign in to comment.