Skip to content
This repository has been archived by the owner on Mar 12, 2021. It is now read-only.

Commit

Permalink
fix unclosed asyncio session error
Browse files Browse the repository at this point in the history
  • Loading branch information
RileyMShea committed Feb 28, 2021
1 parent 3bdfba0 commit d43ef14
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 28 deletions.
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "upoly"
version = "0.1.33"
version = "0.1.34"
description = "High performance asyncio REST client for polygon.io"
authors = ["Riley <[email protected]>"]
license = "AGPL"
Expand All @@ -26,13 +26,13 @@ classifiers = [
[tool.poetry.dependencies]
python = ">=3.8,<3.9"
python-dotenv = "^0.15.0"
orjson = "^3.4.6"
orjson = "^3.5.0"
nest-asyncio = "^1.4.3"
pandas-market-calendars = "^1.6.1"
joblib = "^1.0.1"
numpy = "^1.20.1"
pandas = "^1.2.2"
aiohttp = "^3.7.3"
aiohttp = "^3.7.4"
aiodns = "^2.0.0"
Brotli = "^1.0.9"
cchardet = "^2.1.7"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_upoly.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


def test_version() -> None:
assert __version__ == "0.1.33"
assert __version__ == "0.1.34"


def test_create_dataset_from_polygon():
Expand Down
2 changes: 1 addition & 1 deletion upoly/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.1.33"
__version__ = "0.1.34"

from .polygon_plus import NY as NY
from .polygon_plus import async_polygon_aggs as async_polygon_aggs
63 changes: 40 additions & 23 deletions upoly/polygon_plus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import asyncio
import logging
import os
import sys
from asyncio.proactor_events import _ProactorBasePipeTransport
Expand All @@ -19,6 +20,7 @@
import pandas as pd
import pandas_market_calendars as mcal
import pytz
from dotenv import find_dotenv, load_dotenv

# import uvloop
from joblib import Memory
Expand All @@ -35,6 +37,14 @@

NY = pytz.timezone("America/New_York")

try:
load_dotenv(find_dotenv(raise_error_if_not_found=True))
except IOError:
try:
load_dotenv(find_dotenv(raise_error_if_not_found=True, usecwd=True))
except IOError:
logging.warning("missing .env file")


# prevent errors in Jupyter/Ipython; otherwise attempt to use enhanced event loop
try:
Expand All @@ -60,7 +70,11 @@ def typed_cache(
fn: F,
) -> F:
def wrapper(*args, **kwargs):
return memory.cache(fn)(*args, **kwargs)
is_debug = os.environ.get("UPOLY_DEBUG", None)
if is_debug:
return fn(*args, **kwargs)
else:
return memory.cache(fn)(*args, **kwargs)

return cast(F, wrapper)

Expand Down Expand Up @@ -136,33 +150,36 @@ async def _dispatch_consume_polygon(

queue: asyncio.Queue[bytes] = asyncio.Queue()

POLYGON_KEY_ID = unwrap(os.getenv("POLYGON_KEY_ID"))
try:
POLYGON_KEY_ID = unwrap(os.getenv("POLYGON_KEY_ID"))
except ValueError:
raise ValueError(f"Missing POLYGON_KEY_ID, please load with from python-dotenv")

client = aiohttp.ClientSession()

await asyncio.gather(
*(
_produce_polygon_aggs(
POLYGON_KEY_ID,
client,
queue,
symbol,
timespan,
interval,
_from,
to,
unadjusted,
async with client:

await asyncio.gather(
*(
_produce_polygon_aggs(
POLYGON_KEY_ID,
client,
queue,
symbol,
timespan,
interval,
_from,
to,
unadjusted,
)
for _from, to in intervals
)
for _from, to in intervals
)
)
await client.close()
results: List[bytes] = []
while not queue.empty():
results.append(await queue.get())
queue.task_done()
results: List[bytes] = []
while not queue.empty():
results.append(await queue.get())
queue.task_done()

return results
return results


def combine_chunks(chunks: List[bytes]) -> Iterator[PolyAggResponse]:
Expand Down

0 comments on commit d43ef14

Please sign in to comment.