Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamingDataset and MutableDataset have now deprecated warning messa… #376

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions scrunch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from .session import connect
from .datasets import (
get_user, get_project, get_dataset, get_team, create_team)
get_user,
get_project,
get_dataset,
get_team,
create_team,
create_dataset
)
from .streaming_dataset import get_streaming_dataset
from .mutable_dataset import get_mutable_dataset, create_dataset
from .mutable_dataset import get_mutable_dataset
from .version import __version__


Expand Down
389 changes: 359 additions & 30 deletions scrunch/datasets.py

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions scrunch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ class InvalidReferenceError(ValueError):
pass


class InvalidDatasetTypeError(Exception):
pass


class InvalidVariableTypeError(Exception):
pass

Expand Down
2 changes: 1 addition & 1 deletion scrunch/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def _resolve_variable(var):

if not is_url:
return var
elif not isinstance(ds, scrunch.datasets.BaseDataset):
elif not isinstance(ds, scrunch.datasets.Dataset):
raise Exception(
'Valid Dataset instance is required to resolve variable urls '
'in the expression'
Expand Down
312 changes: 9 additions & 303 deletions scrunch/mutable_dataset.py
Original file line number Diff line number Diff line change
@@ -1,320 +1,26 @@
import json

from pycrunch.shoji import wait_progress
from scrunch.datasets import (LOG, BaseDataset, _get_connection, _get_dataset,
CATEGORICAL_TYPES)
from scrunch.exceptions import InvalidDatasetTypeError
from scrunch.expressions import parse_expr, process_expr
from scrunch.helpers import shoji_entity_wrapper
from scrunch.datasets import LOG, Dataset, _get_dataset


def get_mutable_dataset(dataset, connection=None, editor=False, project=None):
"""
A simple wrapper of _get_dataset with streaming=False
"""
LOG.warning("""MutableDataset is deprecated, instead use now
Dataset with it's corresponding get_dataset() method""") # noqa: E501
shoji_ds, root = _get_dataset(dataset, connection, editor, project)
# make sure the Dataset is of type streaming != "streaming"
if shoji_ds['body'].get('streaming') == 'streaming':
raise InvalidDatasetTypeError("Dataset %s is of type 'streaming',\
use get_streaming_dataset method instead" % dataset)
ds = MutableDataset(shoji_ds)
ds = Dataset(shoji_ds)
if editor is True:
ds.change_editor(root.session.email)
return ds


def create_dataset(name, variables, connection=None, **kwargs):
if connection is None:
connection = _get_connection()
if not connection:
raise AttributeError(
"Authenticate first with scrunch.connect() or by providing "
"config/environment variables")

dataset_doc = {
'name': name,
'table': {
'element': 'crunch:table',
'metadata': variables
}
}
dataset_doc.update(**kwargs)

shoji_ds = connection.datasets.create(shoji_entity_wrapper(dataset_doc)).refresh()
return MutableDataset(shoji_ds)


class MutableDataset(BaseDataset):
class MutableDataset(Dataset):
"""
Class that enclose mutable dataset methods or any
method that varies the state of the dataset and/or it's data.
"""

def delete(self):
"""
Delete a dataset.
"""
self.resource.delete()

def join(self, left_var, right_ds, right_var, columns=None,
filter=None, wait=True):
"""
Joins a given variable. In crunch joins are left joins, where
left is the dataset variable and right is other dataset variable.
For more information see:
http://docs.crunch.io/?http#merging-and-joining-datasets

:param: columns: Specify a list of variables from right dataset
to bring in the merge:
http://docs.crunch.io/?http#joining-a-subset-of-variables

:param: wait: Wait for the join progress to finish by polling
or simply return a url to the progress resource

:param: filter: Filters out rows based on the given expression,
or on a given url for an existing filter. TODO: for the moment
we only allow expressions
"""
right_var_url = right_ds[right_var].url
left_var_url = self[left_var].url
# this dictionary sets the main part of the join
adapter = {
'function': 'adapt',
'args': [
{'dataset': right_ds.url},
{'variable': right_var_url},
{'variable': left_var_url}
]
}

# wrap the adapter method on a shoji and body entity
payload = shoji_entity_wrapper(adapter)

if columns and isinstance(columns, list):
# overwrite body to new format
payload['body'] = {
'frame': adapter,
'function': 'select',
'args': [
{'map': {}}
]
}
# add the individual variable columns to the payload
for var in columns:
var_url = right_ds[var].url
payload['body']['args'][0]['map'][var_url] = {'variable': var_url}

if filter:
# in the case of a filter, convert it to crunch
# and attach the filter to the payload
expr = process_expr(parse_expr(filter), right_ds)
payload['body']['filter'] = {'expression': expr}

progress = self.resource.variables.post(payload)
# poll for progress to finish or return the url to progress
if wait:
return wait_progress(r=progress, session=self.resource.session, entity=self)
return progress.json()['value']

def compare_dataset(self, dataset, use_crunch=False):
"""
compare the difference in structure between datasets. The
criterion is the following:

(1) variables that, when matched across datasets by alias, have different types.
(2) variables that have the same name but don't match on alias.
(3) for variables that match and have categories, any categories that have the
same id but don't match on name.
(4) for array variables that match, any subvariables that have the same name but
don't match on alias.
(5) array variables that, after assembling the union of their subvariables,
point to subvariables that belong to other ds (Not implemented)
(6) missing rules of the variable.

:param: dataset: Daatset instance to append from
:param: use_crunch: Use the Crunch comparison to compare
:return: a dictionary of differences

NOTE: this sould be done via: http://docs.crunch.io/#post217
but doesn't seem to be a working feature of Crunch
"""

if use_crunch:
resp = self.resource.batches.follow(
'compare', 'dataset={}'.format(dataset.url))
return resp

diff = {
'variables': {
'by_type': [],
'by_alias': [],
'by_missing_rules': [],
},
'categories': {},
'subvariables': {}
}

array_types = ['multiple_response', 'categorical_array']

vars_a = {v.alias: v.type for v in self.values()}
vars_b = {v.alias: v.type for v in dataset.values()}

# 1. match variables by alias and compare types
common_aliases = frozenset(vars_a.keys()) & frozenset(vars_b.keys())
for alias in common_aliases:
if vars_a[alias] != vars_b[alias]:
diff['variables']['by_type'].append(dataset[alias].name)

# 3. match variable alias and distcint categories names for same id's
if vars_b[alias] == 'categorical' and vars_a[alias] == 'categorical':
a_ids = frozenset([v.id for v in self[alias].categories.values()])
b_ids = frozenset([v.id for v in dataset[alias].categories.values()])
common_ids = a_ids & b_ids

for id in common_ids:
a_name = self[alias].categories[id].name
b_name = dataset[alias].categories[id].name
if a_name != b_name:
if diff['categories'].get(dataset[alias].name):
diff['categories'][dataset[alias].name].append(id)
else:
diff['categories'][dataset[alias].name] = []
diff['categories'][dataset[alias].name].append(id)

# 2. match variables by names and compare aliases
common_names = frozenset(self.variable_names()) & frozenset(dataset.variable_names())
for name in common_names:
if self[name].alias != dataset[name].alias:
diff['variables']['by_alias'].append(name)

# 4. array types that match, subvars with same name and != alias
if dataset[name].type == self[name].type and \
self[name].type in array_types and \
self[name].type in array_types:

a_names = frozenset(self[name].variable_names())
b_names = frozenset(dataset[name].variable_names())
common_subnames = a_names & b_names

for sv_name in common_subnames:
if self[name][sv_name].alias != dataset[name][sv_name].alias:
if diff['subvariables'].get(name):
diff['subvariables'][name].append(dataset[name][sv_name].alias)
else:
diff['subvariables'][name] = []
diff['subvariables'][name].append(dataset[name][sv_name].alias)

# 6. missing rules mismatch
if self[name].type not in CATEGORICAL_TYPES and dataset[name].type not in CATEGORICAL_TYPES:
if self[name].missing_rules != dataset[name].missing_rules:
rules1 = self[name].missing_rules
rules2 = dataset[name].missing_rules
if len(rules1.keys()) == len(rules2.keys()):
for key, value in rules1.items():
if key not in rules2 or rules2[key] != value:
diff['variables']['by_missing_rules'].append(name)
else:
diff['variables']['by_missing_rules'].append(name)
return diff

def append_dataset(self, dataset, filter=None, variables=None,
autorollback=True, delete_pk=True):
""" Append dataset into self. If this operation fails, the
append is rolledback. Dataset variables and subvariables
are matched on their aliases and categories are matched by name.

:param: dataset: Daatset instance to append from
:param: filter: An expression to filter dataset rows. cannot be a Filter
according to: http://docs.crunch.io/#get211
:param: variables: A list of variable names to include from dataset
"""
if self.url == dataset.url:
raise ValueError("Cannot append dataset to self")

if variables and not isinstance(variables, list):
raise AttributeError("'variables' must be a list of variable names")

if delete_pk:
LOG.info("Any pk's found will be deleted, to avoid these pass delete_pk=False")
self.resource.pk.delete()
dataset.resource.pk.delete()

payload = shoji_entity_wrapper({'dataset': dataset.url})
payload['autorollback'] = autorollback

if variables:
id_vars = []
for var in variables:
id_vars.append(dataset[var].url)
# build the payload with selected variables
payload['body']['where'] = {
'function': 'select',
'args': [{
'map': {
x: {'variable': x} for x in id_vars
}
}]
}

if filter:
# parse the filter expression
payload['body']['filter'] = process_expr(parse_expr(filter), dataset.resource)

return self.resource.batches.create(payload)

def move_to_categorical_array(
self, name, alias, subvariables, description='', notes=''):
"""
This is a dangerous method that allows moving variables (effectively
translating them as variables in a dataset) as subvariables in the
newly created categorical_array created.

:param: name: Name of the new variable.
:param: alias: Alias of the new variable
:param: subvariables: A list of existing Dataset variables aliases
to move into the new variable as subvariables .i.e;
subvariables = ['var1_alias', 'var2_alias']
:param: description: A description of the new variable
:param: notes: Notes to attach to the new variable
"""
payload = {
'name': name,
'alias': alias,
'description': description,
'notes': notes,
'type': 'categorical_array',
'subvariables': [self[v].url for v in subvariables]
}
self.resource.variables.create(shoji_entity_wrapper(payload))
self._reload_variables()
return self[alias]

def move_to_multiple_response(
self, name, alias, subvariables, description='', notes=''):
"""
This method is a replication of the method move_to_categorical_array,
only this time we are creting a multiple_response variable.
Note: the subvariables need to have at least 1 selected catagory.
"""
payload = {
'name': name,
'alias': alias,
'description': description,
'notes': notes,
'type': 'multiple_response',
'subvariables': [self[v].url for v in subvariables]
}
self.resource.variables.create(shoji_entity_wrapper(payload))
self._reload_variables()
return self[alias]

def move_as_subvariable(self, destination, source):
"""
Moves a variable as a subvariable of an existing array
type variable.

:param: destination: The alias of the variable that will receive the subvariable
:param: source: Alias of the variable to move into destination as subvariable
"""
payload = json.dumps({"element": "shoji:catalog", "index": {self[source].url: {}}})
self[destination].resource.subvariables.patch(payload)
def __init__(self, resource):
LOG.warning("""MutableDataset is deprecated, instead use now
Dataset with it's corresponding get_dataset() method""") # noqa: E501
super(Dataset, self).__init__(resource)
6 changes: 3 additions & 3 deletions scrunch/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self, obj, order, parent=None):
# TODO unreached code
elif isinstance(element, scrunch.datasets.Variable):
self.elements[element.alias] = element
elif isinstance(element, scrunch.datasets.BaseDataset):
elif isinstance(element, scrunch.datasets.Dataset):
self.elements[element.id] = element
else:
raise TypeError('Invalid OrderObject %s' % element)
Expand All @@ -106,7 +106,7 @@ def _get_elements(group):
elements.append({key: _get_elements(obj)})
# TODO unreached code
elif isinstance(obj, (scrunch.datasets.Variable,
scrunch.datasets.BaseDataset)):
scrunch.datasets.Dataset)):
elements.append(obj.name)
else:
elements.append(obj.name)
Expand Down Expand Up @@ -479,7 +479,7 @@ def place(self, entity, path, position=-1, before=None, after=None):
target_group = self.group[str(path)]
if isinstance(entity, scrunch.datasets.Variable):
element = entity.alias
elif isinstance(entity, scrunch.datasets.BaseDataset):
elif isinstance(entity, scrunch.datasets.Dataset):
element = entity.id
else:
raise TypeError('entity must be a `Variable` or `Dataset`')
Expand Down
Loading