Skip to content

Commit

Permalink
api: nested transactions usage
Browse files Browse the repository at this point in the history
* FIX Uses nested transactions instead of sub-transactions to persist
  record modifications. (addresses inveniosoftware#22)

* NEW Adds new celery task to save a new record to the database.

* Moves in record related tasks from `invenio-collection` and
  Elasticsearch extension.

Signed-off-by: Esteban J. G. Gabancho <[email protected]>
  • Loading branch information
egabancho committed Oct 1, 2015
1 parent d3dc21c commit 02cfd62
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 32 deletions.
2 changes: 2 additions & 0 deletions invenio_records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

from .version import __version__

from .receivers import record_modification

__all__ = (
'__version__',
)
27 changes: 7 additions & 20 deletions invenio_records/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ def __init__(self, data, model=None):

@classmethod
def create(cls, data, schema=None):
db.session.begin(subtransactions=True)
try:
with db.session.begin_nested():
record = cls(unicodifier(data))

list(functions('recordext'))
Expand All @@ -85,24 +84,17 @@ def create(cls, data, schema=None):
metadata['id'] = record.get('recid')

db.session.add(RecordMetadata(**metadata))
db.session.commit()

toposort_send(after_record_insert, record)

return record
except Exception:
current_app.logger.exception("Problem creating a record.")
db.session.rollback()
raise
toposort_send(after_record_insert, record)
return record

def patch(self, patch):
model = self.model
data = apply_patch(dict(self), patch)
return self.__class__(data, model=model)

def commit(self):
db.session.begin(subtransactions=True)
try:
with db.session.begin_nested():
list(functions('recordext'))

toposort_send(before_record_update, self)
Expand All @@ -112,15 +104,10 @@ def commit(self):

self.model.json = dict(self)

db.session.add(self.model)
db.session.commit()

toposort_send(after_record_update, self)
db.session.merge(self.model)

return self
except Exception:
db.session.rollback()
raise
toposort_send(after_record_update, self)
return self

@classmethod
def get_record(cls, recid, *args, **kwargs):
Expand Down
16 changes: 11 additions & 5 deletions invenio_records/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,23 @@ def convert_marcxml(source):
help="URL or path to a JSON Schema.")
@manager.option('-t', '--input-type', dest='input_type', default='json',
help="Format of input file.")
def create(source, schema=None, input_type='json'):
"""Create new bibliographic record."""
from .api import Record
@manager.option('--force', dest='force', action='store_true',
help="Force insert.")
def create(source, schema=None, input_type='json', force=False):
"""Create new bibliographic record(s)."""
from .tasks.api import create_record

processor = current_app.config['RECORD_PROCESSORS'][input_type]
if isinstance(processor, six.string_types):
processor = import_string(processor)
data = processor(source)

if isinstance(data, dict):
Record.create(data)
create_record.delay(json=data, force=force)
else:
[Record.create(item) for item in data]
from celery import group
job = group([create_record.s(json=item, force=force) for item in data])
result = job.apply_async()


@manager.option('-p', '--patch', dest='patch',
Expand All @@ -78,6 +82,8 @@ def patch(patch, recid=None, schema=None, input_type='jsonpatch'):
for r in recid or []:
Record.get_record(r).patch(patch_content).commit()

db.session.commit()


def main():
"""Run manager."""
Expand Down
8 changes: 8 additions & 0 deletions invenio_records/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@

from flask import current_app
from intbitset import intbitset
from sqlalchemy.event import listen
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.schema import Index
from werkzeug import cached_property

from invenio_collections.models import Collection
from invenio_ext.sqlalchemy import db, utils

from .receivers import new_collection

class Record(db.Model):

Expand Down Expand Up @@ -149,6 +152,11 @@ class RecordMetadata(db.Model):
}


# FIXME add after_delete
listen(Collection, 'after_insert', new_collection)
listen(Collection, 'after_update', new_collection)


__all__ = (
'Record',
'RecordMetadata',
Expand Down
6 changes: 1 addition & 5 deletions invenio_records/providers/recid.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@ def create_new_pid(self, pid_value):
record = Record(id=int(pid_value))
else:
record = Record()
try:
with db.session.begin_nested():
db.session.add(record)
db.session.commit()
except SQLAlchemyError:
db.session.rollback()
raise ValueError("Duplicate value for %s" % (pid_value, ))
return str(record.id)

def reserve(self, pid, *args, **kwargs):
Expand Down
36 changes: 36 additions & 0 deletions invenio_records/receivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

from flask import current_app
from flask_sqlalchemy import models_committed


# FIXME add after_delete
@models_committed.connect
def record_modification(sender, changes):
from .models import RecordMetadata
from .tasks.index import index_record
for obj, change in changes:
if isinstance(obj, RecordMetadata) and change in ('insert', 'update'):
index_record.delay(obj.id, obj.json)


def new_collection(mapper, connection, target):
if target.dbquery is not None:
index_collection_percolator.delay(target.name, target.dbquery)
72 changes: 72 additions & 0 deletions invenio_records/recordext/functions/get_record_collections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

"""Record field function."""

from invenio_records.signals import (
before_record_insert,
before_record_update,
)
from six import iteritems

from invenio_utils.datastructures import LazyDict
from invenio_search.api import Query

COLLECTIONS_DELETED_RECORDS = '{dbquery} AND NOT collection:"DELETED"'


def _queries():
"""Preprocess collection queries."""
from invenio_ext.sqlalchemy import db
from invenio_collections.models import Collection
return dict(
(collection.name, dict(
query=Query(COLLECTIONS_DELETED_RECORDS.format(
dbquery=collection.dbquery)
),
ancestors=set(c.name for c in collection.ancestors
if c.dbquery is None)
))
for collection in Collection.query.filter(
Collection.dbquery.isnot(None),
db.not_(Collection.dbquery.like('hostedcollection:%'))
).all()
)

queries = LazyDict(_queries)


def get_record_collections(record):
"""Return list of collections to which record belongs to.
:record: Record instance
:returns: list of collection names
"""
output = set()
for name, data in iteritems(queries):
if data['query'].match(record):
output.add(name)
output |= data['ancestors']
return list(output)


@before_record_insert.connect
@before_record_update.connect
def update_collections(sender, *args, **kwargs):
sender['_collections'] = get_record_collections(sender)
9 changes: 7 additions & 2 deletions invenio_records/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@

from __future__ import absolute_import

from .api import create_record
from .datacite import datacite_delete, datacite_register, datacite_sync, \
datacite_update, datacite_update_all
from .index import index_record

__all__ = ['datacite_delete', 'datacite_register', 'datacite_sync',
'datacite_update', 'datacite_update_all']
__all__ = ('create_record',
'datacite_delete', 'datacite_register', 'datacite_sync',
'datacite_update', 'datacite_update_all',
'index_record'
)
43 changes: 43 additions & 0 deletions invenio_records/tasks/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

from __future__ import absolute_import


from celery.utils.log import get_task_logger

from invenio_celery import celery

from ..api import Record

logger = get_task_logger(__name__)


@celery.task
def create_record(json, force=False):
from invenio_ext.sqlalchemy import db
try:
return Record.create(json).get('recid')
except exc.IntegrityError:
if force:
current_app.logger.warning(
"Trying to force insert: {0}".format(json))
return Record(json).commit().get('recid')
finally:
db.session.commit()
45 changes: 45 additions & 0 deletions invenio_records/tasks/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.

from invenio_celery import celery
from invenio_ext.es import es


@celery.task
def index_record(recid, json):
"""Index a record in elasticsearch."""
es.index(
index='records',
doc_type='record',
body=json,
id=recid
)


@celery.task
def index_collection_percolator(name, dbquery):
"""Create an elasticsearch percolator for a given query."""
from invenio_search.api import Query
from invenio_search.walkers.elasticsearch import ElasticSearchDSL
es.index(
index='records',
doc_type='.percolator',
body={'query': Query(dbquery).query.accept(ElasticSearchDSL())},
id=name
)

0 comments on commit 02cfd62

Please sign in to comment.