Skip to content

Commit

Permalink
S3 drivers (#44)
Browse files Browse the repository at this point in the history
* initial working commit for s3 driver and database driver

* removing 3.6 formatting

* adding extra requirements list

* adding basic s3 driver test

* Removing unnecessary function

* This ain't 2007

* test updates

* adding s3driver to new corpus structure
  • Loading branch information
mgasvoda authored and OliverSherouse committed Apr 13, 2018
1 parent 2513283 commit ebcb7d2
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 9 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ python:
install:
- pip install ".[testing]"
- pip install ".[nlp]"
- pip install ".[s3driver]"
- python -m nltk.downloader punkt stopwords wordnet
script: pytest
deploy:
Expand Down
4 changes: 3 additions & 1 deletion quantgov/corpora/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
FlatFileCorpusDriver,
RecursiveDirectoryCorpusDriver,
NamePatternCorpusDriver,
IndexDriver
IndexDriver,
S3Driver,
S3DatabaseDriver
)

warnings.warn(
Expand Down
4 changes: 3 additions & 1 deletion quantgov/corpus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
FlatFileCorpusDriver,
RecursiveDirectoryCorpusDriver,
NamePatternCorpusDriver,
IndexDriver
IndexDriver,
S3Driver,
S3DatabaseDriver
)
85 changes: 85 additions & 0 deletions quantgov/corpus/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,40 @@
import csv
import logging

from decorator import decorator
from collections import namedtuple
from pathlib import Path

from .. import utils as qgutils

try:
import boto3
except ImportError:
boto3 = None
try:
import sqlalchemy
except ImportError:
sqlalchemy = None

log = logging.getLogger(__name__)

Document = namedtuple('Document', ['index', 'text'])


@decorator
def check_boto(func, *args, **kwargs):
if boto3 is None:
raise RuntimeError('Must install boto3 to use {}'.format(func))
return func(*args, **kwargs)


@decorator
def check_sqlalchemy(func, *args, **kwargs):
if sqlalchemy is None:
raise RuntimeError('Must install sqlalchemy to use {}'.format(func))
return func(*args, **kwargs)


class CorpusStreamer(object):
"""
A knowledgable wrapper for a CorpusDriver stream
Expand Down Expand Up @@ -243,3 +267,64 @@ def gen_indices_and_paths(self):
next(reader)
for row in reader:
yield tuple(row[:-1]), Path(row[-1])


class S3Driver(IndexDriver):
"""
Serve a whole or partial corpus from a remote file location in s3.
Filtering can be done using the values provided in the index file.
"""

@check_boto
def __init__(self, index, bucket, encoding='utf-8', cache=True):
self.index = Path(index)
self.bucket = bucket
self.client = boto3.client('s3')
self.encoding = encoding
with self.index.open(encoding=encoding) as inf:
index_labels = next(csv.reader(inf))[:-1]
super(IndexDriver, self).__init__(
index_labels=index_labels, encoding=encoding, cache=cache)

def read(self, docinfo):
idx, path = docinfo
body = self.client.get_object(Bucket=self.bucket,
Key=str(path))['Body']
return Document(idx, body.read().decode(self.encoding))

def filter(self, pattern):
""" Filter paths based on index values. """
raise NotImplementedError

def stream(self):
"""Yield text from an object stored in s3. """
return qgutils.lazy_parallel(self.read, self.gen_indices_and_paths())


class S3DatabaseDriver(S3Driver):
"""
Retrieves an index table from a database with an arbitrary, user-provided
query and serves documents like a normal S3Driver.
"""

@check_boto
@check_sqlalchemy
def __init__(self, protocol, user, password, host, db, port, query,
bucket, cache=True, encoding='utf-8'):
self.bucket = bucket
self.client = boto3.client('s3')
self.index = []
engine = sqlalchemy.create_engine('{}://{}:{}@{}:{}/{}'
.format(protocol, user, password,
host, port, db))
conn = engine.connect()
result = conn.execute(query)
for doc in result:
self.index.append(doc)
index_labels = doc.keys()
super(IndexDriver, self).__init__(
index_labels=index_labels, encoding=encoding, cache=cache)

def gen_indices_and_paths(self):
for row in self.index:
yield tuple(row[:-1]), row[-1]
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def find_version(*file_paths):
'nlp': [
'textblob',
'nltk',
],
's3driver': [
'sqlalchemy',
'boto3'
]
},
entry_points={
Expand Down
30 changes: 23 additions & 7 deletions tests/test_corpora.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@


def build_recursive_directory_corpus(directory):
for path, text in (('a/1.txt', u'foo'), ('b/2.txt', u'bar')):
for path, text in (('a/1.txt', 'foo'), ('b/2.txt', 'bar')):
directory.join(path).write_text(text, encoding='utf-8', ensure=True)
return quantgov.corpus.RecursiveDirectoryCorpusDriver(
directory=str(directory), index_labels=('letter', 'number'))


def build_name_pattern_corpus(directory):
for path, text in (('a_1.txt', u'foo'), ('b_2.txt', u'bar')):
for path, text in (('a_1.txt', 'foo'), ('b_2.txt', 'bar')):
path = directory.join(path).write_text(
text, encoding='utf-8', ensure=True)
return quantgov.corpus.NamePatternCorpusDriver(
Expand All @@ -25,23 +25,39 @@ def build_name_pattern_corpus(directory):
def build_index_corpus(directory):
rows = []
for letter, number, path, text in (
('a', '1', 'first.txt', u'foo'),
('b', '2', 'second.txt', u'bar')
('a', '1', 'first.txt', 'foo'),
('b', '2', 'second.txt', 'bar')
):
outpath = directory.join(path, abs=1)
outpath.write_text(text, encoding='utf-8')
rows.append((letter, number, str(outpath)))
index_path = directory.join('index.csv')
with index_path.open('w', encoding='utf-8') as outf:
outf.write(u'letter,number,path\n')
outf.write(u'\n'.join(','.join(row) for row in rows))
return quantgov.corpus.IndexDriver(str(index_path))
outf.write('letter,number,path\n')
outf.write('\n'.join(','.join(row) for row in rows))
return quantgov.corpora.IndexDriver(str(index_path))


def build_s3_corpus(directory):
rows = []
for letter, number, path in (
('a', '1', 'quantgov_tests/first.txt'),
('b', '2', 'quantgov_tests/second.txt')
):
rows.append((letter, number, path))
index_path = directory.join('index.csv')
with index_path.open('w', encoding='utf-8') as outf:
outf.write('letter,number,path\n')
outf.write('\n'.join(','.join(row) for row in rows))
return quantgov.corpora.S3Driver(str(index_path),
bucket='quantgov-databanks')


BUILDERS = {
'RecursiveDirectoryCorpusDriver': build_recursive_directory_corpus,
'NamePatternCorpusDriver': build_name_pattern_corpus,
'IndexDriver': build_index_corpus,
'S3Driver': build_s3_corpus
}


Expand Down

0 comments on commit ebcb7d2

Please sign in to comment.