diff --git a/.dlt/.pipelines b/.dlt/.pipelines new file mode 100644 index 0000000000..52f1a972fe --- /dev/null +++ b/.dlt/.pipelines @@ -0,0 +1,23 @@ +engine_version: 1 +pipelines: + chess: + is_dirty: false + last_commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 + last_commit_timestamp: '2023-06-05T14:09:46+02:00' + files: + chess/__init__.py: + commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 + git_sha: 66ccbbdf9e59e8bcb0b932a80eb033a298c3e9f5 + sha3_256: fe7d5923ae62f7bba3b6aa0447af9f7c187f719636e62410d3d0c0c193abed26 + chess/README.md: + commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 + git_sha: 4d2c6988018890d8834b32874887f59d3c4ef4a1 + sha3_256: 7c0de2dd5788615f1a18f5b1018ad0013070b82204a780becb8706869447a1c8 + chess/settings.py: + commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 + git_sha: e7c335449972e4e4de5ee1035114571f135fd9cf + sha3_256: 01728e594272c541af519f58ebb3ef1b13bcd9253fafc8cfdea532449f96d017 + chess/helpers.py: + commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 + git_sha: c607c5172afbf52578ec1563cc5eb45f4c209cfd + sha3_256: ed62520d9d09960ce88719306d5f080876435b6425808c5587a6078eaa093538 diff --git a/.dlt/config.toml b/.dlt/config.toml new file mode 100644 index 0000000000..bd81efa07e --- /dev/null +++ b/.dlt/config.toml @@ -0,0 +1,9 @@ +# put your configuration values here + +[runtime] +log_level="WARNING" # the system log level of dlt +# use the dlthub_telemetry setting to enable/disable anonymous usage data reporting, see https://dlthub.com/docs/telemetry +dlthub_telemetry = true + +[sources.chess] +config_int = 0 # please set me up! diff --git a/chess/README.md b/chess/README.md new file mode 100644 index 0000000000..4d2c698801 --- /dev/null +++ b/chess/README.md @@ -0,0 +1,75 @@ +--- +title: Chess.com +description: dlt pipeline for Chess.com API +keywords: [chess.com api, chess.com pipeline, chess.com] +--- + +# Chess.com + +This pipeline can be used to load player data from the [Chess.com API](https://www.chess.com/news/view/published-data-api) into a [destination](../general-usage/glossary.md#destination) of your choice. + +## Initialize the pipeline + +Initialize the pipeline with the following command: +``` +dlt init chess bigquery +``` +Here, we chose BigQuery as the destination. To choose a different destination, replace `bigquery` with your choice of destination. + +Running this command will create a directory with the following structure: +```bash +├── .dlt +│ ├── .pipelines +│ ├── config.toml +│ └── secrets.toml +├── chess +│ └── __pycache__ +│ └── __init__.py +├── .gitignore +├── chess_pipeline.py +└── requirements.txt +``` + +## Add credentials + +Before running the pipeline you may need to add credentials in the `.dlt/secrets.toml` file for your chosen destination. For instructions on how to do this, follow the steps detailed under the desired destination in the [destinations](https://dlthub.com/docs/destinations) page. + +## Run the pipeline + +1. Install the necessary dependencies by running the following command: +``` +pip install -r requirements.txt +``` +2. Now the pipeline can be run by using the command: +``` +python3 chess_pipeline.py +``` +3. To make sure that everything is loaded as expected, use the command: +``` +dlt pipeline chess_pipeline show +``` + +## Customize parameters + +Without any modifications, the chess pipeline will load data for a default list of players over a default period of time. You can change these values in the `chess_pipeline.py` script. + +For example, if you wish to load player games for a specific set of players, add the player list to the function `load_player_games_example` as below. +```python +def load_players_games_example(start_month: str, end_month: str): + + pipeline = dlt.pipeline(pipeline_name="chess_pipeline", destination='bigquery', dataset_name="chess_players_games_data") + + data = chess( + [], # Specify your list of players here + start_month=start_month, + end_month=end_month + ) + + info = pipeline.run(data.with_resources("players_games", "players_profiles")) + print(info) +``` +To specify the time period, pass the starting and ending months as parameters when calling the function in the `__main__` block: +```python +if __name__ == "__main__" : + load_players_games_example("2022/11", "2022/12") # Replace the strings "2022/11" and "2022/12" with different months in the "YYYY/MM" format +``` diff --git a/chess/__init__.py b/chess/__init__.py new file mode 100644 index 0000000000..66ccbbdf9e --- /dev/null +++ b/chess/__init__.py @@ -0,0 +1,166 @@ +"""A pipeline loading player profiles and games from chess.com api""" + +from typing import Callable, Iterator, List, Sequence, Dict, Any + +import dlt +from dlt.common import pendulum +from dlt.common.typing import TDataItem, StrAny +from dlt.extract.source import DltResource +from dlt.sources.helpers import requests +from .helpers import get_url_with_retry, get_path_with_retry, validate_month_string + +from .settings import UNOFFICIAL_CHESS_API_URL + + +@dlt.source(name="chess") +def source( + players: List[str], start_month: str = None, end_month: str = None +) -> Sequence[DltResource]: + """ + A dlt source for the chess.com api. It groups several resources (in this case chess.com API endpoints) containing + various types of data: user profiles or chess match results + Args: + players (List[str]): A list of the player usernames for which to get the data. + start_month (str, optional): Filters out all the matches happening before `start_month`. Defaults to None. + end_month (str, optional): Filters out all the matches happening after `end_month`. Defaults to None. + Returns: + Sequence[DltResource]: A sequence of resources that can be selected from including players_profiles, + players_archives, players_games, players_online_status + """ + return ( + players_profiles(players), + players_archives(players), + players_games(players, start_month=start_month, end_month=end_month), + players_online_status(players), + ) + + +@dlt.resource(write_disposition="replace") +def players_profiles(players: List[str]) -> Iterator[TDataItem]: + """ + Yields player profiles for a list of player usernames. + Args: + players (List[str]): List of player usernames to retrieve profiles for. + Yields: + Iterator[TDataItem]: An iterator over player profiles data. + """ + + # get archives in parallel by decorating the http request with defer + @dlt.defer + def _get_profile(username: str) -> TDataItem: + return get_path_with_retry(f"player/{username}") + + for username in players: + yield _get_profile(username) + + +@dlt.resource(write_disposition="replace", selected=False) +def players_archives(players: List[str]) -> Iterator[List[TDataItem]]: + """ + Yields url to game archives for specified players. + Args: + players (List[str]): List of player usernames to retrieve archives for. + Yields: + Iterator[List[TDataItem]]: An iterator over list of player archive data. + """ + for username in players: + data = get_path_with_retry(f"player/{username}/games/archives") + yield data.get("archives", []) + + +@dlt.resource(write_disposition="append") +def players_games( + players: List[str], start_month: str = None, end_month: str = None +) -> Iterator[Callable[[], List[TDataItem]]]: + """ + Yields `players` games that happened between `start_month` and `end_month`. + Args: + players (List[str]): List of player usernames to retrieve games for. + start_month (str, optional): The starting month in the format "YYYY/MM". Defaults to None. + end_month (str, optional): The ending month in the format "YYYY/MM". Defaults to None. + Yields: + Iterator[Callable[[], List[TDataItem]]]: An iterator over callables that return a list of games for each player. + """ # do a simple validation to prevent common mistakes in month format + validate_month_string(start_month) + validate_month_string(end_month) + + # get a list of already checked archives + # from your point of view, the state is python dictionary that will have the same content the next time this function is called + checked_archives = dlt.current.resource_state().setdefault("archives", []) + # get player archives, note that you can call the resource like any other function and just iterate it like a list + archives = players_archives(players) + + # get archives in parallel by decorating the http request with defer + @dlt.defer + def _get_archive(url: str) -> List[TDataItem]: + print(f"Getting archive from {url}") + try: + games = get_url_with_retry(url).get("games", []) + return games # type: ignore + except requests.HTTPError as http_err: + # sometimes archives are not available and the error seems to be permanent + if http_err.response.status_code == 404: + return [] + raise + + # enumerate the archives + url: str = None + for url in archives: + # the `url` format is https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} + if start_month and url[-7:] < start_month: + continue + if end_month and url[-7:] > end_month: + continue + # do not download archive again + if url in checked_archives: + continue + else: + checked_archives.append(url) + # get the filtered archive + yield _get_archive(url) + + +@dlt.resource(write_disposition="append") +def players_online_status(players: List[str]) -> Iterator[TDataItem]: + """ + Returns current online status for a list of players. + Args: + players (List[str]): List of player usernames to check online status for. + Yields: + Iterator[TDataItem]: An iterator over the online status of each player. + """ + # we'll use unofficial endpoint to get online status, the official seems to be removed + for player in players: + status = get_url_with_retry( + "%suser/popup/%s" % (UNOFFICIAL_CHESS_API_URL, player) + ) + # return just relevant selection + yield { + "username": player, + "onlineStatus": status["onlineStatus"], + "lastLoginDate": status["lastLoginDate"], + "check_time": pendulum.now(), # dlt can deal with native python dates + } + + +@dlt.source +def chess_dlt_config_example( + secret_str: str = dlt.secrets.value, + secret_dict: Dict[str, Any] = dlt.secrets.value, + config_int: int = dlt.config.value, +) -> DltResource: + """ + An example of a source that uses dlt to provide secrets and config values. + Args: + secret_str (str, optional): Secret string provided by dlt.secrets.value. Defaults to dlt.secrets.value. + secret_dict (Dict[str, Any], optional): Secret dictionary provided by dlt.secrets.value. Defaults to dlt.secrets.value. + config_int (int, optional): Config integer provided by dlt.config.value. Defaults to dlt.config.value. + Returns: + DltResource: Returns a resource yielding the configured values. + """ + print(secret_str) + print(secret_dict) + print(config_int) + + # returns a resource yielding the configured values - it is just a test + return dlt.resource([secret_str, secret_dict, config_int], name="config_values") diff --git a/chess/helpers.py b/chess/helpers.py new file mode 100644 index 0000000000..c607c5172a --- /dev/null +++ b/chess/helpers.py @@ -0,0 +1,20 @@ +"""Chess pipeline helpers""" + +from dlt.common.typing import StrAny +from dlt.sources.helpers import requests +from .settings import OFFICIAL_CHESS_API_URL + + +def get_url_with_retry(url: str) -> StrAny: + r = requests.get(url) + return r.json() # type: ignore + + +def get_path_with_retry(path: str) -> StrAny: + return get_url_with_retry(f"{OFFICIAL_CHESS_API_URL}{path}") + + +def validate_month_string(string: str) -> None: + """Validates that the string is in YYYY/MM format""" + if string and string[4] != "/": + raise ValueError(string) diff --git a/chess/settings.py b/chess/settings.py new file mode 100644 index 0000000000..e7c3354499 --- /dev/null +++ b/chess/settings.py @@ -0,0 +1,4 @@ +"""Chess pipeline settings and constants""" + +OFFICIAL_CHESS_API_URL = "https://api.chess.com/pub/" +UNOFFICIAL_CHESS_API_URL = "https://www.chess.com/callback/" diff --git a/chess_pipeline.py b/chess_pipeline.py new file mode 100644 index 0000000000..0a3ed6c1be --- /dev/null +++ b/chess_pipeline.py @@ -0,0 +1,52 @@ +import dlt +from chess import source +import os + +def load_table_counts(p: dlt.Pipeline, *table_names: str): + """Returns row counts for `table_names` as dict""" + + # try sql, could be other destination though + query = "\nUNION ALL\n".join([f"SELECT '{name}' as name, COUNT(1) as c FROM {name}" for name in table_names]) + with p.sql_client() as c: + with c.execute_query(query) as cur: + rows = list(cur.fetchall()) + return {r[0]: r[1] for r in rows} + + +def load_players_games_example(start_month: str, end_month: str) -> None: + """Constructs a pipeline that will load chess games of specific players for a range of months.""" + os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = "freeze-and-raise" + + # configure the pipeline: provide the destination and dataset name to which the data should go + pipeline = dlt.pipeline( + import_schema_path="schemas/import", + export_schema_path="schemas/export", + pipeline_name="chess_pipeline", + destination='duckdb', + dataset_name="chess_players_games_data", + full_refresh=True + ) + # create the data source by providing a list of players and start/end month in YYYY/MM format + data = source( + ["magnuscarlsen", "vincentkeymer", "dommarajugukesh", "rpragchess"], + start_month=start_month, + end_month=end_month, + ) + # load the "players_games" and "players_profiles" out of all the possible resources + info = pipeline.run(data.with_resources("players_games", "players_profiles")) + print(info) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + print(table_counts) + + +def load_players_games_incrementally() -> None: + """Pipeline will not load the same game archive twice""" + # loads games for 11.2022 + load_players_games_example("2022/11", "2022/11") + # second load skips games for 11.2022 but will load for 12.2022 + load_players_games_example("2022/11", "2022/12") + + +if __name__ == "__main__": + # run our main example + load_players_games_example("2022/11", "2022/12") diff --git a/schemas/export/chess.schema.yaml b/schemas/export/chess.schema.yaml new file mode 100644 index 0000000000..ee5fc1de60 --- /dev/null +++ b/schemas/export/chess.schema.yaml @@ -0,0 +1,232 @@ +version: 4 +version_hash: QXP0jgGWoD++eoDNy0CGSMirwZofWtzN92kKXkAh9g0= +engine_version: 6 +name: chess +tables: + _dlt_version: + columns: + version: + nullable: false + data_type: bigint + engine_version: + nullable: false + data_type: bigint + inserted_at: + nullable: false + data_type: timestamp + schema_name: + nullable: false + data_type: text + version_hash: + nullable: false + data_type: text + schema: + nullable: false + data_type: text + write_disposition: skip + description: Created by DLT. Tracks schema updates + _dlt_loads: + columns: + load_id: + nullable: false + data_type: text + schema_name: + nullable: true + data_type: text + status: + nullable: false + data_type: bigint + inserted_at: + nullable: false + data_type: timestamp + schema_version_hash: + nullable: true + data_type: text + write_disposition: skip + description: Created by DLT. Tracks completed loads + players_profiles: + columns: + avatar: + nullable: true + data_type: text + player_id: + nullable: true + data_type: bigint + aid: + nullable: true + data_type: text + url: + nullable: true + data_type: text + name: + nullable: true + data_type: text + username: + nullable: true + data_type: text + title: + nullable: true + data_type: text + followers: + nullable: true + data_type: bigint + country: + nullable: true + data_type: text + location: + nullable: true + data_type: text + last_online: + nullable: true + data_type: timestamp + joined: + nullable: true + data_type: bigint + status: + nullable: true + data_type: text + is_streamer: + nullable: true + data_type: bool + verified: + nullable: true + data_type: bool + league: + nullable: true + data_type: text + _dlt_load_id: + nullable: false + data_type: text + _dlt_id: + nullable: false + unique: true + data_type: text + write_disposition: replace + players_games: + columns: + url: + nullable: true + data_type: text + pgn: + nullable: true + data_type: text + time_control: + nullable: true + data_type: text + end_time: + nullable: true + data_type: timestamp + rated: + nullable: true + data_type: bool + accuracies__white: + nullable: true + data_type: double + accuracies__black: + nullable: true + data_type: double + tcn: + nullable: true + data_type: text + uuid: + nullable: true + data_type: text + initial_setup: + nullable: true + data_type: text + fen: + nullable: true + data_type: text + time_class: + nullable: true + data_type: text + rules: + nullable: true + data_type: text + white__rating: + nullable: true + data_type: bigint + white__result: + nullable: true + data_type: text + white__aid: + nullable: true + data_type: text + white__username: + nullable: true + data_type: text + white__uuid: + nullable: true + data_type: text + black__rating: + nullable: true + data_type: bigint + black__result: + nullable: true + data_type: text + black__aid: + nullable: true + data_type: text + black__username: + nullable: true + data_type: text + black__uuid: + nullable: true + data_type: text + _dlt_load_id: + nullable: false + data_type: text + _dlt_id: + nullable: false + unique: true + data_type: text + tournament: + nullable: true + data_type: text + write_disposition: append + _dlt_pipeline_state: + columns: + version: + nullable: false + data_type: bigint + engine_version: + nullable: false + data_type: bigint + pipeline_name: + nullable: false + data_type: text + state: + nullable: false + data_type: text + created_at: + nullable: false + data_type: timestamp + _dlt_load_id: + nullable: false + data_type: text + _dlt_id: + nullable: false + unique: true + data_type: text + write_disposition: append +settings: + detections: + - timestamp + - iso_timestamp + default_hints: + not_null: + - _dlt_parent_id + - _dlt_list_idx + - _dlt_load_id + - _dlt_root_id + - _dlt_id + foreign_key: + - _dlt_parent_id + root_key: + - _dlt_root_id + unique: + - _dlt_id +normalizers: + names: duck_case + json: + module: dlt.common.normalizers.json.relational diff --git a/schemas/import/chess.schema.yaml b/schemas/import/chess.schema.yaml new file mode 100644 index 0000000000..a1fe6fe083 --- /dev/null +++ b/schemas/import/chess.schema.yaml @@ -0,0 +1,217 @@ +version: 4 +version_hash: g+cn8CnyAaoquAnZDAl2vBBdfLkKFnbK6OYtxUApiCA= +engine_version: 6 +name: chess +tables: + _dlt_version: + columns: + version: + nullable: false + data_type: bigint + engine_version: + nullable: false + data_type: bigint + inserted_at: + nullable: false + data_type: timestamp + schema_name: + nullable: false + data_type: text + version_hash: + nullable: false + data_type: text + schema: + nullable: false + data_type: text + write_disposition: skip + description: Created by DLT. Tracks schema updates + _dlt_loads: + columns: + load_id: + nullable: false + data_type: text + schema_name: + nullable: true + data_type: text + status: + nullable: false + data_type: bigint + inserted_at: + nullable: false + data_type: timestamp + schema_version_hash: + nullable: true + data_type: text + write_disposition: skip + description: Created by DLT. Tracks completed loads + players_profiles: + columns: + avatar: + nullable: true + data_type: text + player_id: + nullable: true + data_type: bigint + aid: + nullable: true + data_type: text + url: + nullable: true + data_type: text + name: + nullable: true + data_type: text + username: + nullable: true + data_type: text + title: + nullable: true + data_type: text + followers: + nullable: true + data_type: bigint + country: + nullable: true + data_type: text + location: + nullable: true + data_type: text + last_online: + nullable: true + data_type: timestamp + joined: + nullable: true + data_type: bigint + status: + nullable: true + data_type: text + is_streamer: + nullable: true + data_type: bool + verified: + nullable: true + data_type: bool + league: + nullable: true + data_type: text + _dlt_load_id: + nullable: false + data_type: text + _dlt_id: + nullable: false + unique: true + data_type: text + write_disposition: replace + players_games: + columns: + url: + nullable: true + data_type: text + end_time: + nullable: true + data_type: timestamp + rated: + nullable: true + data_type: bool + accuracies__white: + nullable: true + data_type: double + accuracies__black: + nullable: true + data_type: double + tcn: + nullable: true + data_type: text + uuid: + nullable: true + data_type: text + initial_setup: + nullable: true + data_type: text + fen: + nullable: true + data_type: text + time_class: + nullable: true + data_type: text + rules: + nullable: true + data_type: text + white__rating: + nullable: true + data_type: bigint + white__result: + nullable: true + data_type: text + white__aid: + nullable: true + data_type: text + white__username: + nullable: true + data_type: text + white__uuid: + nullable: true + data_type: text + black__rating: + nullable: true + data_type: bigint + black__result: + nullable: true + data_type: text + _dlt_load_id: + nullable: false + data_type: text + _dlt_id: + nullable: false + unique: true + data_type: text + tournament: + nullable: true + data_type: text + write_disposition: append + _dlt_pipeline_state: + columns: + version: + nullable: false + data_type: bigint + engine_version: + nullable: false + data_type: bigint + pipeline_name: + nullable: false + data_type: text + state: + nullable: false + data_type: text + created_at: + nullable: false + data_type: timestamp + _dlt_load_id: + nullable: false + data_type: text + _dlt_id: + nullable: false + unique: true + data_type: text + write_disposition: append +settings: + detections: + - timestamp + - iso_timestamp + default_hints: + not_null: + - _dlt_root_id + - _dlt_parent_id + - _dlt_load_id + - _dlt_id + - _dlt_list_idx + foreign_key: + - _dlt_parent_id + root_key: + - _dlt_root_id + unique: + - _dlt_id +normalizers: + names: duck_case + json: + module: dlt.common.normalizers.json.relational