Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,97 @@
# rest
gateio Rest Api
# A股30分钟K线数据同步工具

本项目是一个使用Python编写的工具,用于从Tushare Pro获取所有A股的30分钟K线数据,并将其存储到本地的MySQL数据库中。

## 主要功能

- **全量数据下载**: 一次性获取指定股票或所有股票的全部历史30分钟K线数据。
- **增量数据更新**: 每日定时任务,自动获取当天最新的K线数据,并补充到数据库中。
- **进度条显示**: 在进行大量数据下载时,提供清晰的进度条。
- **数据表自动创建**: 首次运行时可自动创建所需的数据库和数据表。
- **定时任务**: 内置一个基于APScheduler的定时任务,可在每个交易日收盘后自动执行增量更新。
- **灵活的命令行接口**: 支持通过命令行参数执行不同的任务。

## 环境要求

- Python 3.7+
- MySQL 5.7+ 或 MariaDB

## 安装与配置

**1. 获取代码**

克隆或下载本项目到你的本地机器。

**2. 安装依赖**

进入项目根目录,通过 `requirements.txt` 文件安装所有必需的Python库。

```bash
pip install -r requirements.txt
```

**3. 配置 Tushare Token 和数据库**

打开 `download_tushare/config.py` 文件,填入你的个人信息:

- `TUSHARE_TOKEN`: 你的Tushare Pro API Token。你可以在 [Tushare Pro官网](https://tushare.pro/user/token) 免费注册并获取。
- `DB_CONFIG`: 你的MySQL数据库连接信息,包括主机、端口、用户名、密码和数据库名。

**4. 创建数据库**

在你的MySQL服务器中,手动创建一个数据库。数据库的名称应与你在 `config.py` 中 `DB_CONFIG['database']` 字段设置的名称一致。

例如,如果你设置的数据库名是 `stock_data`,则执行以下SQL命令:

```sql
CREATE DATABASE stock_data CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
```

## 使用说明

本工具可以通过 `python -m download_tushare` 命令来运行,后跟不同的子命令。

**1. 初始化数据库表**

在第一次运行时,你需要初始化数据库,创建所需的数据表。

```bash
python -m download_tushare initdb
```
该命令会根据 `database.py` 中的定义,自动创建 `stock_basic` 和 `stock_30min` 两张表。

**2. 全量同步历史数据**

如果你是第一次使用,或者想要完整地获取所有历史数据,可以执行全量同步。

**警告**: 此过程会遍历所有A股,下载它们自上市以来的全部30分钟K线数据,将消耗大量时间和网络流量,并对Tushare积分有一定要求。

```bash
python -m download_tushare full
```

**3. 增量更新数据**

用于获取最新的数据。它会自动查找每只股票在数据库中的最新记录,并从该时间点之后开始同步。如果某只股票是新加入的,则会自动进行全量同步。

建议每日收盘后执行此命令。

```bash
python -m download_tushare update
```

## 运行定时任务

为了实现自动化更新,你可以直接运行 `scheduler.py` 脚本。它会启动一个常驻进程,在每个交易日(周一至周五)的下午16:00自动执行增量更新任务。

```bash
python -m download_tushare.scheduler
```

你可以使用 `nohup` (Linux/macOS) 或其他工具让它在后台持续运行。

```bash
nohup python -m download_tushare.scheduler > scheduler.log 2>&1 &
```

按 `Ctrl+C` 可以停止调度器。
Empty file added download_tushare/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions download_tushare/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .main import main

if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions download_tushare/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ------------------ Tushare Configuration ------------------
# 在这里填入你的Tushare Pro API Token。
# 你可以访问 https://tushare.pro/user/token 来获取你的token。
TUSHARE_TOKEN = "YOUR_TUSHARE_TOKEN_HERE"

# ------------------ MySQL Database Configuration ------------------
# 在这里填入你的MySQL数据库连接信息。
DB_CONFIG = {
'host': '127.0.0.1', # 数据库主机地址
'port': 3306, # 端口号
'user': 'root', # 数据库用户名
'password': 'YOUR_DATABASE_PASSWORD_HERE', # 数据库密码
'database': 'stock_data', # 数据库名称
'charset': 'utf8mb4' # 字符集
}

# ------------------ Data Sync Configuration ------------------
# 获取历史数据时的起始日期
# 如果设置为None,将从股票的上市日期开始获取
START_DATE = "2010-01-01"

# 每次API请求的K线数量
# Tushare pro接口单次提取最多5000条
BARS_PER_REQUEST = 5000
136 changes: 136 additions & 0 deletions download_tushare/data_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import tushare as ts
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import func
from tqdm import tqdm

from . import config
from . import database as db

# ------------------ Tushare API Initialization ------------------
try:
ts.set_token(config.TUSHARE_TOKEN)
pro = ts.pro_api()
print("Tushare API 初始化成功。")
except Exception as e:
print(f"Tushare API 初始化失败: {e}")
print("请检查 config.py 中的 TUSHARE_TOKEN 是否正确。")
exit()

def update_stock_basic():
"""
更新本地的股票基础信息表。
"""
print("正在更新股票基础信息...")
try:
# 获取所有A股列表
data = pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,industry,list_date')

# 对'industry'列的None值进行处理,替换为空字符串或其他默认值
data['industry'] = data['industry'].fillna('')

# 使用我们封装的批量插入函数
db.bulk_insert_data(data, db.StockBasic)
print(f"股票基础信息更新完成,共获取 {len(data)} 条数据。")
except Exception as e:
print(f"更新股票基础信息时发生错误: {e}")

def get_stock_list():
"""从数据库获取所有需要同步的股票列表"""
session = db.get_session()
# 选择所有未退市的股票
stocks = session.query(db.StockBasic).filter(db.StockBasic.delist_date == None).all()
session.close()
return stocks

def full_sync():
"""
全量同步所有A股的30分钟K线数据。
这是一个耗时操作。
"""
print("开始全量同步30分钟K线数据,这将是一个非常耗时的操作。")
update_stock_basic() # 首先确保股票列表是最新的
stocks_to_sync = get_stock_list()

with tqdm(total=len(stocks_to_sync), desc="全量同步进度") as pbar:
for stock in stocks_to_sync:
pbar.set_description(f"正在同步 {stock.ts_code}")
_sync_stock_data(stock.ts_code, start_date=config.START_DATE or stock.list_date.strftime('%Y%m%d'))
pbar.update(1)
print("所有股票全量同步完成。")

def update_sync():
"""
增量同步所有A股的30分钟K线数据。
"""
print("开始增量同步30分钟K线数据...")
update_stock_basic()
stocks_to_sync = get_stock_list()

session = db.get_session()

with tqdm(total=len(stocks_to_sync), desc="增量同步进度") as pbar:
for stock in stocks_to_sync:
pbar.set_description(f"增量同步 {stock.ts_code}")

# 查找该股票的最新一条记录时间
latest_record = session.query(func.max(db.Stock30Min.trade_time)).filter(db.Stock30Min.ts_code == stock.ts_code).scalar()

if latest_record:
# 从最新记录的后一天开始同步
start_date = (latest_record + timedelta(days=1)).strftime('%Y%m%d')
else:
# 如果没有记录,则从上市日期开始全量同步
start_date = config.START_DATE or stock.list_date.strftime('%Y%m%d')

_sync_stock_data(stock.ts_code, start_date=start_date)
pbar.update(1)

session.close()
print("所有股票增量同步完成。")


def _sync_stock_data(ts_code: str, start_date: str, end_date: str = None):
"""
获取并存储单个股票在指定时间范围内的30分钟K线数据。
Tushare的get_k_data接口已不推荐,这里使用pro_bar接口。
"""
try:
df = ts.pro_bar(ts_code=ts_code, freq='30min', start_date=start_date, end_date=end_date)
if df is None or df.empty:
return

# 数据清洗和格式化
df.rename(columns={'vol': 'vol', 'amount': 'amount'}, inplace=True)
df['trade_time'] = pd.to_datetime(df['trade_time'])

# 选择需要的列
df = df[['ts_code', 'trade_time', 'open', 'high', 'low', 'close', 'vol', 'amount']]

# 批量插入数据库
db.bulk_insert_data(df, db.Stock30Min)

except Exception as e:
print(f"同步 {ts_code} 数据时出错: {e}")

def fix_missing_data():
"""
检查并修复缺失的交易日数据。
这是一个复杂的功能,我们先实现一个简化版本:
检查最近N天的数据完整性。
"""
# 此功能较为复杂,将在后续迭代中实现。
# 基本思路:
# 1. 获取交易日历
# 2. 对每只股票,获取其在数据库中的所有交易时间
# 3. 对比交易日历和已有数据,找出缺失的交易日
# 4. 对缺失的交易日,重新调用_sync_stock_data来获取数据
print("数据修复功能待实现。")

if __name__ == '__main__':
# 可以添加一些直接运行此脚本时的测试代码
print("这是一个数据同步模块,请通过 main.py 来调用。")
# 例如,测试更新股票列表
# update_stock_basic()
# full_sync()
pass
93 changes: 93 additions & 0 deletions download_tushare/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pandas as pd
from sqlalchemy import create_engine, inspect, Column, String, Date, DateTime, DECIMAL, BigInteger, UniqueConstraint
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from .config import DB_CONFIG

# 定义ORM基类
Base = declarative_base()

# 数据库连接字符串
db_uri = (
f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}"
)

# 创建数据库引擎
try:
engine = create_engine(db_uri, echo=False)
# 创建Session类
Session = sessionmaker(bind=engine)
except Exception as e:
print(f"数据库连接失败: {e}")
print(f"请检查 config.py 中的数据库配置是否正确。")
exit()

class StockBasic(Base):
"""股票基础信息表"""
__tablename__ = 'stock_basic'
ts_code = Column(String(10), primary_key=True, comment='TS股票代码')
symbol = Column(String(10), comment='股票代码')
name = Column(String(30), comment='股票名称')
industry = Column(String(50), comment='所属行业')
list_date = Column(Date, comment='上市日期')
delist_date = Column(Date, nullable=True, comment='退市日期')

def __repr__(self):
return f"<StockBasic(ts_code='{self.ts_code}', name='{self.name}')>"

class Stock30Min(Base):
"""30分钟K线数据表"""
__tablename__ = 'stock_30min'
id = Column(BigInteger, primary_key=True, autoincrement=True)
ts_code = Column(String(10), index=True, comment='TS股票代码')
trade_time = Column(DateTime, index=True, comment='交易时间')
open = Column(DECIMAL(10, 2), comment='开盘价')
high = Column(DECIMAL(10, 2), comment='最高价')
low = Column(DECIMAL(10, 2), comment='最低价')
close = Column(DECIMAL(10, 2), comment='收盘价')
vol = Column(DECIMAL(20, 2), comment='成交量(手)')
amount = Column(DECIMAL(20, 4), comment='成交额(千元)')

# 创建联合唯一索引,防止数据重复
__table_args__ = (UniqueConstraint('ts_code', 'trade_time', name='_ts_code_trade_time_uc'),)

def __repr__(self):
return f"<Stock30Min(ts_code='{self.ts_code}', trade_time='{self.trade_time}')>"

def init_db():
"""
初始化数据库,创建所有定义的表。
"""
try:
print("正在初始化数据库,检查并创建数据表...")
Base.metadata.create_all(engine)
print("数据表创建/检查完成。")
except Exception as e:
print(f"创建数据表时发生错误: {e}")

def get_session():
"""
获取一个新的数据库会话。
"""
return Session()

def bulk_insert_data(df: pd.DataFrame, model_class):
"""
使用原生INSERT IGNORE来批量插入数据,如果主键或唯一索引冲突则忽略。
这比ORM逐条检查快得多。
"""
if df.empty:
return

table_name = model_class.__tablename__

with engine.connect() as connection:
# 使用pandas的to_sql方法,如果已存在则忽略
# 注意:'append'配合唯一索引可以实现 'INSERT IGNORE' 的效果
# 对于MySQL,需要确保主键或唯一索引已建立
df.to_sql(name=table_name, con=connection, if_exists='append', index=False)

if __name__ == '__main__':
# 作为脚本直接运行时,执行数据库初始化
init_db()
Loading