From a0bab0f698d204369962625f03e7fb3b801e67f2 Mon Sep 17 00:00:00 2001 From: Guido Petri <18634426+guidopetri@users.noreply.github.com> Date: Sun, 21 Apr 2024 17:00:19 -0500 Subject: [PATCH] move code around --- src/chess_pipeline.py | 384 +------------------------------------ src/feature_engineering.py | 106 ++++++++++ src/inference.py | 55 ++++++ src/utils/db.py | 30 +++ src/vendors/lichess.py | 109 +++++++++++ src/vendors/stockfish.py | 93 +++++++++ 6 files changed, 403 insertions(+), 374 deletions(-) create mode 100644 src/feature_engineering.py create mode 100644 src/inference.py create mode 100644 src/utils/db.py create mode 100644 src/vendors/lichess.py create mode 100644 src/vendors/stockfish.py diff --git a/src/chess_pipeline.py b/src/chess_pipeline.py index a024f22..d2ed425 100644 --- a/src/chess_pipeline.py +++ b/src/chess_pipeline.py @@ -1,395 +1,31 @@ #! /usr/bin/env python3 -import hashlib import os -from calendar import timegm from datetime import datetime, timedelta -from typing import Any, Type -import lichess.api import pandas as pd -import psycopg2 -from chess.pgn import Game -from lichess.format import JSON, PYCHESS +from feature_engineering import ( + clean_chess_df, + explode_clocks, + explode_materials, + explode_moves, + explode_positions, +) +from inference import estimate_win_probabilities from luigi import LocalTarget, Task from luigi.format import Nop from luigi.parameter import BoolParameter, DateParameter, Parameter from luigi.util import inherits, requires -from pipeline_import.configs import lichess_token, postgres_cfg, stockfish_cfg -from pipeline_import.models import predict_wp from pipeline_import.postgres_templates import ( CopyWrapper, HashableDict, TransactionFactTable, ) from pipeline_import.transforms import ( - convert_clock_to_seconds, - fix_provisional_columns, - get_clean_fens, - get_sf_evaluation, - parse_headers, transform_game_data, ) -from pipeline_import.visitors import ( - CastlingVisitor, - ClocksVisitor, - EvalsVisitor, - MaterialVisitor, - PositionsVisitor, - PromotionsVisitor, - QueenExchangeVisitor, -) -from utils.types import Json, Visitor - - -def run_remote_sql_query(sql, **params) -> pd.DataFrame: - pg_cfg = postgres_cfg() - user = pg_cfg.user - password = pg_cfg.password - host = pg_cfg.host - port = pg_cfg.port - database = pg_cfg.database - - db = psycopg2.connect(host=host, - database=database, - user=user, - password=password, - port=port, - ) - - df: pd.DataFrame = pd.read_sql_query(sql, db, params=params) - - return df - - -def query_for_column(table, column): - sql = f"""SELECT DISTINCT {column} FROM {table};""" - df = run_remote_sql_query(sql) - return df[column] - - -def fetch_lichess_api_json(player: str, - perf_type: str, - since: datetime, - single_day: bool, - ) -> pd.DataFrame: - if single_day: - unix_time_until: int = timegm((since + timedelta(days=1)).timetuple()) - else: - unix_time_until = timegm(datetime.today().date().timetuple()) - until: int = int(1000 * unix_time_until) - - unix_time_since: int = timegm(since.timetuple()) - since_unix: int = int(1000 * unix_time_since) - - games: list[Json] = lichess.api.user_games(player, - since=since_unix, - until=until, - perfType=perf_type, - auth=lichess_token().token, - evals='false', - clocks='false', - moves='false', - format=JSON) - - df: pd.DataFrame = pd.json_normalize([game for game in games], sep='_') - return df - - -def fetch_lichess_api_pgn(player: str, - perf_type: str, - since: datetime, - single_day: bool, - game_count: int, - task: Task, - ) -> pd.DataFrame: - if single_day: - unix_time_until: int = timegm((since + timedelta(days=1)).timetuple()) - else: - unix_time_until = timegm(datetime.today().date().timetuple()) - until: int = int(1000 * unix_time_until) - - unix_time_since: int = timegm(since.timetuple()) - since_unix: int = int(1000 * unix_time_since) - - games: list[Game] = lichess.api.user_games(player, - since=since_unix, - until=until, - perfType=perf_type, - auth=lichess_token().token, - clocks='true', - evals='true', - opening='true', - format=PYCHESS) - - visitors: list[Type[Visitor]] = [EvalsVisitor, - ClocksVisitor, - QueenExchangeVisitor, - CastlingVisitor, - PromotionsVisitor, - PositionsVisitor, - MaterialVisitor, - ] - - header_infos = [] - - counter: int = 0 - - for game in games: - game_infos: Json = parse_headers(game, visitors) - header_infos.append(game_infos) - - # progress bar stuff - counter += 1 - - current: str = f'{game_infos["UTCDate"]} {game_infos["UTCTime"]}' - - current_progress: float = counter / game_count - task.set_status_message(f'Parsed until {current} :: ' - f'{counter} / {game_count}') - task.set_progress_percentage(round(current_progress * 100, 2)) - - df: pd.DataFrame = pd.DataFrame(header_infos) - - task.set_status_message('Parsed all games') - task.set_progress_percentage(100) - return df - - -def clean_chess_df(pgn: pd.DataFrame, json: pd.DataFrame) -> pd.DataFrame: - json['Site'] = 'https://lichess.org/' + json['id'] - - json = fix_provisional_columns(json) - - json = json[['Site', - 'speed', - 'status', - 'players_black_provisional', - 'players_white_provisional', - ]] - - df: pd.DataFrame = pd.merge(pgn, json, on='Site') - - # rename columns - df.rename(columns={'Black': 'black', - 'BlackElo': 'black_elo', - 'BlackRatingDiff': 'black_rating_diff', - 'Date': 'date_played', - 'ECO': 'opening_played', - 'Event': 'event_type', - 'Result': 'result', - 'Round': 'round', - 'Site': 'game_link', - 'Termination': 'termination', - 'TimeControl': 'time_control', - 'UTCDate': 'utc_date_played', - 'UTCTime': 'time_played', - 'Variant': 'chess_variant', - 'White': 'white', - 'WhiteElo': 'white_elo', - 'WhiteRatingDiff': 'white_rating_diff', - 'Opening': 'lichess_opening', - 'players_black_provisional': 'black_elo_tentative', - 'players_white_provisional': 'white_elo_tentative', - }, - inplace=True) - return df - - -def get_evals(df: pd.DataFrame, - local_stockfish: bool, - task: Task, - ) -> pd.DataFrame: - sf_params: Any = stockfish_cfg() - - df = df[['evaluations', 'eval_depths', 'positions']] - - # explode the two different list-likes separately, then concat - no_evals: pd.DataFrame = df[~df['evaluations'].astype(bool)] - df = df[df['evaluations'].astype(bool)] - - no_evals = pd.DataFrame(no_evals['positions'].explode()) - no_evals['positions'] = get_clean_fens(no_evals['positions']) - - evals: pd.Series = df['evaluations'].explode().reset_index(drop=True) - depths: pd.Series = df['eval_depths'].explode().reset_index(drop=True) - positions: pd.Series = df['positions'].explode().reset_index(drop=True) - positions = get_clean_fens(positions) - - sql: str = """SELECT fen, evaluation, eval_depth - FROM position_evals - WHERE fen IN %(positions)s; - """ - db_evaluations = run_remote_sql_query(sql, - positions=tuple(positions.tolist() + no_evals['positions'].tolist()), # noqa - ) - positions_evaluated = db_evaluations['fen'].drop_duplicates() - - df = pd.concat([positions, evals, depths], axis=1) - - if local_stockfish: - - local_evals: list[float | None] = [] - - counter: int = 0 - position_count: int = len(no_evals['positions']) - evaluation: float | None = None - - for position in no_evals['positions'].tolist(): - if position in positions_evaluated.values: - # position will be dropped later if evaluation is None - evaluation = None - else: - sf_eval: float | None = get_sf_evaluation(position + ' 0', - sf_params.location, - sf_params.depth) - if sf_eval is not None: - # TODO: this is implicitly setting evaluation = last - # eval if in a checkmate position. handle this better - evaluation = sf_eval - - local_evals.append(evaluation) - - # progress bar stuff - counter += 1 - - current_progress = counter / position_count - task.set_status_message(f'Analyzed :: ' - f'{counter} / {position_count}') - task.set_progress_percentage(round(current_progress * 100, 2)) - - task.set_status_message(f'Analyzed all {position_count} positions') - task.set_progress_percentage(100) - - no_evals['evaluations'] = local_evals - no_evals['eval_depths'] = sf_params.depth - no_evals.dropna(inplace=True) - - df = pd.concat([df, no_evals], axis=0, ignore_index=True) - - df = df[~df['positions'].isin(positions_evaluated)] - - df.rename(columns={'evaluations': 'evaluation', - 'eval_depths': 'eval_depth', - 'positions': 'fen'}, - inplace=True) - df['evaluation'] = pd.to_numeric(df['evaluation'], - errors='coerce') - - df.dropna(inplace=True) - df = pd.concat([df, db_evaluations], axis=0, ignore_index=True) - - return df - - -def explode_moves(df: pd.DataFrame) -> pd.DataFrame: - df = df[['game_link', 'moves']] - - df = df.explode('moves') - df.rename(columns={'moves': 'move'}, - inplace=True) - df['half_move'] = df.groupby('game_link').cumcount() + 1 - return df - - -def explode_clocks(df: pd.DataFrame) -> pd.DataFrame: - df = df[['game_link', 'clocks']] - - df = df.explode('clocks') - df.rename(columns={'clocks': 'clock'}, - inplace=True) - df['half_move'] = df.groupby('game_link').cumcount() + 1 - df['clock'] = convert_clock_to_seconds(df['clock']) - return df - - -def explode_positions(df: pd.DataFrame) -> pd.DataFrame: - df = df[['game_link', 'positions']] - - df = df.explode('positions') - df.rename(columns={'positions': 'position'}, - inplace=True) - df['half_move'] = df.groupby('game_link').cumcount() + 1 - - df['fen'] = get_clean_fens(df['position']) - return df - - -def explode_materials(df: pd.DataFrame) -> pd.DataFrame: - df = df[['game_link', 'material_by_move']] - - df = df.explode('material_by_move') - - df = pd.concat([df['game_link'], - df['material_by_move'].apply(pd.Series) - .fillna(0) - .astype(int)], - axis=1) - df.rename(columns={'r': 'rooks_black', - 'n': 'knights_black', - 'b': 'bishops_black', - 'q': 'queens_black', - 'p': 'pawns_black', - 'P': 'pawns_white', - 'R': 'rooks_white', - 'N': 'knights_white', - 'B': 'bishops_white', - 'Q': 'queens_white', - }, - inplace=True) - - df['half_move'] = df.groupby('game_link').cumcount() + 1 - return df - - -def estimate_win_probabilities(game_infos: pd.DataFrame, - evals: pd.DataFrame, - game_positions: pd.DataFrame, - game_clocks: pd.DataFrame, - local_stockfish: bool, - ) -> pd.DataFrame: - game_infos['has_increment'] = (game_infos['increment'] > 0).astype(int) - - game_infos_cols = ['game_link', - 'has_increment', - 'player_color', - 'player_elo', - 'opponent_elo', - ] - - # evals isn't always populated - df = pd.merge(game_positions, evals, on='fen', how='left') - - # if there are missing evals, set to 0 so it doesn't influence the WP - if not local_stockfish: - df['evaluation'].fillna(0, inplace=True) - # this is actually kind of incorrect - evaluation was never scaled - # so the mean isn't 0, but rather something like 0.2 probably. - # since the LR model inputs weren't scaled in the first place, - # i am just ignoring this for now - - df = pd.merge(df, game_clocks, on=['game_link', 'half_move']) - df = pd.merge(df, - game_infos[game_infos_cols], - on='game_link', - ) - - loss, draw, win = predict_wp(df) - - df['win_probability_white'] = win - df['draw_probability'] = draw - df['win_probability_black'] = loss - - model_path = os.path.join(os.path.dirname(__file__), - 'pipeline_import', - 'wp_model.pckl', - ) - - with open(model_path, 'rb') as f: - md5 = hashlib.md5(f.read()).hexdigest() - - df['win_prob_model_version'] = md5[:7] - return df +from vendors.lichess import fetch_lichess_api_json, fetch_lichess_api_pgn +from vendors.stockfish import get_evals class FetchLichessApiJSON(Task): diff --git a/src/feature_engineering.py b/src/feature_engineering.py new file mode 100644 index 0000000..1dea693 --- /dev/null +++ b/src/feature_engineering.py @@ -0,0 +1,106 @@ +import pandas as pd +from pipeline_import.transforms import ( + convert_clock_to_seconds, + fix_provisional_columns, + get_clean_fens, +) + + +def clean_chess_df(pgn: pd.DataFrame, json: pd.DataFrame) -> pd.DataFrame: + json['Site'] = 'https://lichess.org/' + json['id'] + + json = fix_provisional_columns(json) + + json = json[['Site', + 'speed', + 'status', + 'players_black_provisional', + 'players_white_provisional', + ]] + + df: pd.DataFrame = pd.merge(pgn, json, on='Site') + + # rename columns + df.rename(columns={'Black': 'black', + 'BlackElo': 'black_elo', + 'BlackRatingDiff': 'black_rating_diff', + 'Date': 'date_played', + 'ECO': 'opening_played', + 'Event': 'event_type', + 'Result': 'result', + 'Round': 'round', + 'Site': 'game_link', + 'Termination': 'termination', + 'TimeControl': 'time_control', + 'UTCDate': 'utc_date_played', + 'UTCTime': 'time_played', + 'Variant': 'chess_variant', + 'White': 'white', + 'WhiteElo': 'white_elo', + 'WhiteRatingDiff': 'white_rating_diff', + 'Opening': 'lichess_opening', + 'players_black_provisional': 'black_elo_tentative', + 'players_white_provisional': 'white_elo_tentative', + }, + inplace=True) + return df + + +def explode_moves(df: pd.DataFrame) -> pd.DataFrame: + df = df[['game_link', 'moves']] + + df = df.explode('moves') + df.rename(columns={'moves': 'move'}, + inplace=True) + df['half_move'] = df.groupby('game_link').cumcount() + 1 + return df + + +def explode_clocks(df: pd.DataFrame) -> pd.DataFrame: + df = df[['game_link', 'clocks']] + + df = df.explode('clocks') + df.rename(columns={'clocks': 'clock'}, + inplace=True) + df['half_move'] = df.groupby('game_link').cumcount() + 1 + df['clock'] = convert_clock_to_seconds(df['clock']) + return df + + +def explode_positions(df: pd.DataFrame) -> pd.DataFrame: + df = df[['game_link', 'positions']] + + df = df.explode('positions') + df.rename(columns={'positions': 'position'}, + inplace=True) + df['half_move'] = df.groupby('game_link').cumcount() + 1 + + df['fen'] = get_clean_fens(df['position']) + return df + + +def explode_materials(df: pd.DataFrame) -> pd.DataFrame: + df = df[['game_link', 'material_by_move']] + + df = df.explode('material_by_move') + + df = pd.concat([df['game_link'], + df['material_by_move'].apply(pd.Series) + .fillna(0) + .astype(int)], + axis=1) + df.rename(columns={'r': 'rooks_black', + 'n': 'knights_black', + 'b': 'bishops_black', + 'q': 'queens_black', + 'p': 'pawns_black', + 'P': 'pawns_white', + 'R': 'rooks_white', + 'N': 'knights_white', + 'B': 'bishops_white', + 'Q': 'queens_white', + }, + inplace=True) + + df['half_move'] = df.groupby('game_link').cumcount() + 1 + return df diff --git a/src/inference.py b/src/inference.py new file mode 100644 index 0000000..27606d0 --- /dev/null +++ b/src/inference.py @@ -0,0 +1,55 @@ +import hashlib +import os + +import pandas as pd +from pipeline_import.models import predict_wp + + +def estimate_win_probabilities(game_infos: pd.DataFrame, + evals: pd.DataFrame, + game_positions: pd.DataFrame, + game_clocks: pd.DataFrame, + local_stockfish: bool, + ) -> pd.DataFrame: + game_infos['has_increment'] = (game_infos['increment'] > 0).astype(int) + + game_infos_cols = ['game_link', + 'has_increment', + 'player_color', + 'player_elo', + 'opponent_elo', + ] + + # evals isn't always populated + df = pd.merge(game_positions, evals, on='fen', how='left') + + # if there are missing evals, set to 0 so it doesn't influence the WP + if not local_stockfish: + df['evaluation'].fillna(0, inplace=True) + # this is actually kind of incorrect - evaluation was never scaled + # so the mean isn't 0, but rather something like 0.2 probably. + # since the LR model inputs weren't scaled in the first place, + # i am just ignoring this for now + + df = pd.merge(df, game_clocks, on=['game_link', 'half_move']) + df = pd.merge(df, + game_infos[game_infos_cols], + on='game_link', + ) + + loss, draw, win = predict_wp(df) + + df['win_probability_white'] = win + df['draw_probability'] = draw + df['win_probability_black'] = loss + + model_path = os.path.join(os.path.dirname(__file__), + 'pipeline_import', + 'wp_model.pckl', + ) + + with open(model_path, 'rb') as f: + md5 = hashlib.md5(f.read()).hexdigest() + + df['win_prob_model_version'] = md5[:7] + return df diff --git a/src/utils/db.py b/src/utils/db.py new file mode 100644 index 0000000..3d35663 --- /dev/null +++ b/src/utils/db.py @@ -0,0 +1,30 @@ + +import pandas as pd +import psycopg2 +from pipeline_import.configs import postgres_cfg + + +def run_remote_sql_query(sql, **params) -> pd.DataFrame: + pg_cfg = postgres_cfg() + user = pg_cfg.user + password = pg_cfg.password + host = pg_cfg.host + port = pg_cfg.port + database = pg_cfg.database + + db = psycopg2.connect(host=host, + database=database, + user=user, + password=password, + port=port, + ) + + df: pd.DataFrame = pd.read_sql_query(sql, db, params=params) + + return df + + +def query_for_column(table, column): + sql = f"""SELECT DISTINCT {column} FROM {table};""" + df = run_remote_sql_query(sql) + return df[column] diff --git a/src/vendors/lichess.py b/src/vendors/lichess.py new file mode 100644 index 0000000..b060a3f --- /dev/null +++ b/src/vendors/lichess.py @@ -0,0 +1,109 @@ +from calendar import timegm +from datetime import datetime, timedelta +from typing import Type + +import lichess.api +import pandas as pd +from chess.pgn import Game +from lichess.format import JSON, PYCHESS +from luigi import Task +from pipeline_import.configs import lichess_token +from pipeline_import.transforms import parse_headers +from pipeline_import.visitors import ( + CastlingVisitor, + ClocksVisitor, + EvalsVisitor, + MaterialVisitor, + PositionsVisitor, + PromotionsVisitor, + QueenExchangeVisitor, +) +from utils.types import Json, Visitor + + +def fetch_lichess_api_json(player: str, + perf_type: str, + since: datetime, + single_day: bool, + ) -> pd.DataFrame: + if single_day: + unix_time_until: int = timegm((since + timedelta(days=1)).timetuple()) + else: + unix_time_until = timegm(datetime.today().date().timetuple()) + until: int = int(1000 * unix_time_until) + + unix_time_since: int = timegm(since.timetuple()) + since_unix: int = int(1000 * unix_time_since) + + games: list[Json] = lichess.api.user_games(player, + since=since_unix, + until=until, + perfType=perf_type, + auth=lichess_token().token, + evals='false', + clocks='false', + moves='false', + format=JSON) + + df: pd.DataFrame = pd.json_normalize([game for game in games], sep='_') + return df + + +def fetch_lichess_api_pgn(player: str, + perf_type: str, + since: datetime, + single_day: bool, + game_count: int, + task: Task, + ) -> pd.DataFrame: + if single_day: + unix_time_until: int = timegm((since + timedelta(days=1)).timetuple()) + else: + unix_time_until = timegm(datetime.today().date().timetuple()) + until: int = int(1000 * unix_time_until) + + unix_time_since: int = timegm(since.timetuple()) + since_unix: int = int(1000 * unix_time_since) + + games: list[Game] = lichess.api.user_games(player, + since=since_unix, + until=until, + perfType=perf_type, + auth=lichess_token().token, + clocks='true', + evals='true', + opening='true', + format=PYCHESS) + + visitors: list[Type[Visitor]] = [EvalsVisitor, + ClocksVisitor, + QueenExchangeVisitor, + CastlingVisitor, + PromotionsVisitor, + PositionsVisitor, + MaterialVisitor, + ] + + header_infos = [] + + counter: int = 0 + + for game in games: + game_infos: Json = parse_headers(game, visitors) + header_infos.append(game_infos) + + # progress bar stuff + counter += 1 + + current: str = f'{game_infos["UTCDate"]} {game_infos["UTCTime"]}' + + current_progress: float = counter / game_count + task.set_status_message(f'Parsed until {current} :: ' + f'{counter} / {game_count}') + task.set_progress_percentage(round(current_progress * 100, 2)) + + df: pd.DataFrame = pd.DataFrame(header_infos) + + task.set_status_message('Parsed all games') + task.set_progress_percentage(100) + return df diff --git a/src/vendors/stockfish.py b/src/vendors/stockfish.py new file mode 100644 index 0000000..81a56d4 --- /dev/null +++ b/src/vendors/stockfish.py @@ -0,0 +1,93 @@ +from typing import Any + +import pandas as pd +from luigi import Task +from pipeline_import.configs import stockfish_cfg +from pipeline_import.transforms import get_clean_fens, get_sf_evaluation +from utils.db import run_remote_sql_query + + +def get_evals(df: pd.DataFrame, + local_stockfish: bool, + task: Task, + ) -> pd.DataFrame: + sf_params: Any = stockfish_cfg() + + df = df[['evaluations', 'eval_depths', 'positions']] + + # explode the two different list-likes separately, then concat + no_evals: pd.DataFrame = df[~df['evaluations'].astype(bool)] + df = df[df['evaluations'].astype(bool)] + + no_evals = pd.DataFrame(no_evals['positions'].explode()) + no_evals['positions'] = get_clean_fens(no_evals['positions']) + + evals: pd.Series = df['evaluations'].explode().reset_index(drop=True) + depths: pd.Series = df['eval_depths'].explode().reset_index(drop=True) + positions: pd.Series = df['positions'].explode().reset_index(drop=True) + positions = get_clean_fens(positions) + + sql: str = """SELECT fen, evaluation, eval_depth + FROM position_evals + WHERE fen IN %(positions)s; + """ + db_evaluations = run_remote_sql_query(sql, + positions=tuple(positions.tolist() + no_evals['positions'].tolist()), # noqa + ) + positions_evaluated = db_evaluations['fen'].drop_duplicates() + + df = pd.concat([positions, evals, depths], axis=1) + + if local_stockfish: + + local_evals: list[float | None] = [] + + counter: int = 0 + position_count: int = len(no_evals['positions']) + evaluation: float | None = None + + for position in no_evals['positions'].tolist(): + if position in positions_evaluated.values: + # position will be dropped later if evaluation is None + evaluation = None + else: + sf_eval: float | None = get_sf_evaluation(position + ' 0', + sf_params.location, + sf_params.depth) + if sf_eval is not None: + # TODO: this is implicitly setting evaluation = last + # eval if in a checkmate position. handle this better + evaluation = sf_eval + + local_evals.append(evaluation) + + # progress bar stuff + counter += 1 + + current_progress = counter / position_count + task.set_status_message(f'Analyzed :: ' + f'{counter} / {position_count}') + task.set_progress_percentage(round(current_progress * 100, 2)) + + task.set_status_message(f'Analyzed all {position_count} positions') + task.set_progress_percentage(100) + + no_evals['evaluations'] = local_evals + no_evals['eval_depths'] = sf_params.depth + no_evals.dropna(inplace=True) + + df = pd.concat([df, no_evals], axis=0, ignore_index=True) + + df = df[~df['positions'].isin(positions_evaluated)] + + df.rename(columns={'evaluations': 'evaluation', + 'eval_depths': 'eval_depth', + 'positions': 'fen'}, + inplace=True) + df['evaluation'] = pd.to_numeric(df['evaluation'], + errors='coerce') + + df.dropna(inplace=True) + df = pd.concat([df, db_evaluations], axis=0, ignore_index=True) + + return df