Skip to content

edologgerbird/sfyr-data-pipeline

Repository files navigation

This project was created using publicy available APIs and was created for educational reasons. Contents of this project should ONLY be used for NON-COMMERICAL reasons.

Table of Contents

  1. Authors
  2. Codes and Resources Used
  3. Data Ingestion Sources
  4. Getting Started
  5. Usage
  6. Contact
  7. Acknowledgements

SFYR Data Pipeline Implementation

Our Company, Sfyr (pronounced: Sapphire /sΓ¦faΙͺΙ™(r)/) aims to provide retail investors with the tools and data required to support their high-frequency retail trading decisions and needs. Through the use of a data warehouse, Apache Airflow and public data sources, Sfyr provides its clients with accurate, consolidated and updated stock data.

Pipeline Architecture:

Pipeline Architecture

Keywords:

Data Pipeline, Data Engineering, Data Architecture, Data Warehouse, Scheduler, DAG, Apache Airflow, BigQuery, Firestore, Google Cloud Monitoring, Hugging Face, FinBERT, Sentiment Analysis, Natural Language Processing, Stocks Analysis, Investment Insights, Web Scraping, Data Visualisation, Time Series Data

Authors:

  • Loh Hong Tak Edmund (A0XXX943H)
  • Ng Ting You (A0XXX672N)
  • Tan Yi Bing (A0XXX181U)
  • Wong Zhou Wai (A0XXX509R)
  • Yap Hui Yi (A0XXX707M)

(back to top)

Codes and Resources Used

Python Version: 3.8.10 and 3.7.2

Airflow Version: 2.2.3

Built with: Microsoft Visual Studio Code, Oracle VM Virtual Box, Git, Apache Airflow

Notable Packages: apache-airflow, beautifulsoup4, datetime, firebase-admin, json, numpy, pandas, pandas-gbq, telethon, pandas, parse, pendulum, regex, tokenizers, torch, transformers, virtualenv, yahoo-fin, yfinance (view requirements.txt for full list)

(back to top)

Data Ingestion Sources

Our team extracted both structured and unstructred data from the following sources:

(back to top)

Getting Started

Prerequisites

Make sure you have installed all of the following on your development machine:

  • Python 3.7.X - 3.8.X (Airflow 2.2.3 does not support Python 3.9)
  • Linux Virtual Machine (Highly recommended for running Airflow). Pipeline tested on Oracle VM Virtual Box

(back to top)

Installation

We recommend setting up a virtual machine and virtual environment to run this project.

1. Oracle Virtual Machine

To set up a VM Virtual Box, please follow the steps detailed here.

2. Python Virtual Environment

⚠️ This step should be run within the Linux Virtual Machine

Installing and Creation of a Virtual Environment

pip install virtualenv
virtualenv <your_env_name>
source <your_env_name>/bin/active

The requirements.txt file contains Python libraries that your notebooks depend on, and they will be installed using:

pip install -r requirements.txt

(back to top)

3. Setting Up Airflow

⚠️ This step should be run within the Linux Virtual Machine

Install Airflow in your Virtual Machine and Virtual Environment

export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.2.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Create Airflow Admin Account using the following commands in Shell:

airflow db init
airflow users create \
--username <USERNAME> \
--firstname <YOUR NAME> \
--lastname <YOUR NAME> \
--role Admin \
--email <YOUR EMAIL>

4. Editing Airflow Configs

Edit the airflow.cfg file (located in the airflow directory) with the following rules:

πŸ’‘ A sample airflow.cfg file is located in the utils folder

dags_folder = /home/airflow/<folder_name_containing_code>

enable_xcom_pickling = True

dagbag_import_timeout = 100    #Prevents timeout when downloading FinBERT

load_examples = False    # OPTIONAL: Skips loading of DAG Examples

At this point in time, do also move the code downloaded/pulled from this repository into the dags_folder. This is crucial for the dag to be picked up.

(back to top)

5. Setting Up Databases Access and Error Email Alerts

Creating Google Service Account

This data pipeline makes use of Google Cloud Suite of products our data warehouse. To enable the data pipeline to interface with BigQuery Database and Firestore Database, we will have to create Service Accounts with localised access to BigQuery and Firestore.


❗ Keep your JSON Key Files Safe!

  1. Create a project on Google Cloud Platform using an existing Google Account
  2. Head to IAM & Admin
  3. Create a BigQuery Service Account with the following permissions and download JSON Key File
    • BigQuery Data Owner
    • BigQuery Job User
  4. Create a FireScore Service Account with the following permissions and download JSON Key File
    • Firebase Service Management Service Agent

Update Crediential JSON Key Files

❗ Keep your JSON Key Files Safe!

Place the Google Authentication Crediential JSON Key files in utils/ .

Update utils/serviceAccount.json with the name of the credential files.


Set-up Airflow Email Alerts

The current implementation of Airflow Email Alerts uses Gmail SMTP Service through an Gmail Account.

❗ Keep your SMTP Password Safe!

  1. Generate your Gmail SMTP Password here
  2. Edit the airflow.cfg file (located in the airflow directory) with the following information:
# Assumes the usage of Gmail as SMTP Server

smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = <Email Address>
smtp_password = <SMTP Password>
smtp_port = 587
smtp_mail_from = <Email Address>
smtp_timeout = 30
smtp_retry_limit = 5

If you are not using Gmail as your SMTP Service, edit the information according to your SMTP Server Configurations.

  1. Update the default_args in your DAG file. This will set-up the DAG to trigger an email if a task retries or fails.
default_args = {
    'email': [<Email Address>],
    'email_on_failure': True,
    'email_on_retry': True,
}
  1. Optional - Set-up the number of retries for a failed task and duration between retries
default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

(back to top)

6. Setting Up Google Cloud Telegram Alerts (Optional)

Before setting up the alerts on Google Cloud Monitoring, users will have to set-up a Telegram bot and enable the sending of messages from the bot to the relevant channels.

❗ Keep your API Key Files Safe!

  1. Start a new conversation with the Botfather.
  2. Send /newbot to create a new Telegram bot.
  3. When asked, enter a name for the bot.
  4. Give the Telegram bot a unique username. Note that the bot name must end with the word "bot" (case-insensitive).
  5. Copy and save the Telegram bot's access token for later steps.
  6. Add the bot to your alert channel and enable access to send messages

After this is completed, we will have to set-up a new Alert Webhooks on Google Cloud Monitoring.

Web Hook Set-up

The webhook should utilise the Telegram WebHook API and you can define custom Notification Messages by editing the message_encoded_in_http field

πŸ’‘ You should test connection for errors before using it for alerts

https://api.telegram.org/bot<api_key>/sendMessage?chat_id=<chat_id>text=<message_encoded_in_http>

After setting up the alert channels, you can create monitoring policies to suit your needs. Below are some alerts recommended by the Sfyr team.

  1. Firebase High Error
# Log Query
resource.type="firebase_domain" severity=ERROR
  1. Firestore High Error - Needs to be enabled through the Firestore dashboard
  2. BigQuery High Error
# Log Query
resource.type="bigquery_project" resource.labels.project_id="is3107-group-7" severity=ERROR

# Notification Rate Limit
One notification per 10 minutes

# Incident autoclose duration
30 minutes

(back to top)

7. Setting Up Telegram Scraper Credentials

Before being able to run the Telegram Scraper Module, user will have to register on Telegram and sign into their account via Telegram Login

From there, access the API development tools, create a new project and note down your App

  • api_id
  • api_hash

❗ Keep your API id and hash Safe!

After getting the api_id and api_hash, update utils/serviceAccount.json telegramConfig object with your teleNumber, api_id and api_hash.

{
  "telegramConfig": {
    "teleNumber": "<Phone_Number>",
    "api_id": ,
    "api_hash": "<API_HASH>"
  },
}

Initalise Telegram Session

Telegram security settings require an additional 2FA password (if set-up) and code before session is created and stored after the initial login. You will have to initialise the telegram session for the machine you will be running the DAG on.

  1. Ensure the following code is uncommented in main.py
Telegram_layer = TelegramExtractor()
Telegram_layer.init_tele_session()
  1. cd to the directory with main.py and run main.py with the following command
python main.py
  1. Enter code sent to your telegram number as specified in the utils/serviceAccount.json.

After this first setup and login, the telegram scraper module should work as intended without requiring logins. You can comment out the code mentioned above.


(back to top)

8. Setting Up of ServiceAccount.json

With the Google Cloud Project Created in Step 5, we will have to update ServiceAccount.json which provides a centralised and local copy of the configurations, datasets and datatables.

{
  "bigQueryConfig": {
    "PROJECT_ID": "<Google Cloud projectID>"
  }
}

(back to top)

Usage

1. Initialising Airflow Instance

Initialise Airflow in your Virtual Machine via the following commands:

airflow standalone

By default, Airflow should be hosted on http://localhost:8080. Both processes are running as Daemon Processes. Alternatively, users can set-up Systemd by following this guide.

2. Activating DAG Task

After logging in, you will be brought to the main Airflow dashboard. To enable the DAG Task to run as per schedule, simply toggle the switch next to the task.

Logo

By default, the DAG is scheduled to execute twice a day, at 0930HRS and 2130HRS (GMT+8). To run instantly for demonstration/testing purposes, please set toggle in main_DAG.py.

TOGGLE_TEST = True

(back to top)

Contact

If you would like to reqeuest a feature or report a bug, please contact us at [email protected].

(back to top)

Acknowledgements

We would like to thank Assistant Professor Frank Xing, Dr Gao Yu Ting and Mr Quek Jian Hong Joel of the National University of Singapore for the opportunity to embark on this project.

We would also like to thank Nescafe and Redbull for providing us with the caffeine required to complete this project. We would also like to provide great thanks to the sitcom The Office for the inspirational words "Why say lot word, when few word do trick".


Disclaimer

The information contained herein is provided for general information only and should not be used as a basis for making any investment or business decisions. Past performances are not necessarily indicative of future performances. Investment involves various risks, and you should ensure that you fully understand the risks, characteristics, restrictions and determine if the investment products are suitable for your financial situation, investment objective, and risk tolerance before making any investment decisions. You are recommended to obtain independent professional advice where appropriate.



Copyright (C) 2022. This project was created using publicity available APIs and was created for educational reasons. Any parts of this project should ONLY be used for NON-COMMERICAL reasons. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.You should have received a copy of the GNU General Public License along with this program. If not, see http://www.gnu.org/licenses.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages