diff --git a/invenio_records/__init__.py b/invenio_records/__init__.py index 408fd48c..7c546945 100644 --- a/invenio_records/__init__.py +++ b/invenio_records/__init__.py @@ -26,6 +26,8 @@ from .version import __version__ +from .receivers import record_modification + __all__ = ( '__version__', ) diff --git a/invenio_records/api.py b/invenio_records/api.py index a25ce358..3a0cba48 100644 --- a/invenio_records/api.py +++ b/invenio_records/api.py @@ -69,8 +69,7 @@ def __init__(self, data, model=None): @classmethod def create(cls, data, schema=None): - db.session.begin(subtransactions=True) - try: + with db.session.begin_nested(): record = cls(unicodifier(data)) list(functions('recordext')) @@ -85,15 +84,9 @@ def create(cls, data, schema=None): metadata['id'] = record.get('recid') db.session.add(RecordMetadata(**metadata)) - db.session.commit() - - toposort_send(after_record_insert, record) - return record - except Exception: - current_app.logger.exception("Problem creating a record.") - db.session.rollback() - raise + toposort_send(after_record_insert, record) + return record def patch(self, patch): model = self.model @@ -101,8 +94,7 @@ def patch(self, patch): return self.__class__(data, model=model) def commit(self): - db.session.begin(subtransactions=True) - try: + with db.session.begin_nested(): list(functions('recordext')) toposort_send(before_record_update, self) @@ -112,15 +104,10 @@ def commit(self): self.model.json = dict(self) - db.session.add(self.model) - db.session.commit() - - toposort_send(after_record_update, self) + db.session.merge(self.model) - return self - except Exception: - db.session.rollback() - raise + toposort_send(after_record_update, self) + return self @classmethod def get_record(cls, recid, *args, **kwargs): diff --git a/invenio_records/manage.py b/invenio_records/manage.py index 23563200..49efb1fa 100644 --- a/invenio_records/manage.py +++ b/invenio_records/manage.py @@ -48,9 +48,11 @@ def convert_marcxml(source): help="URL or path to a JSON Schema.") @manager.option('-t', '--input-type', dest='input_type', default='json', help="Format of input file.") -def create(source, schema=None, input_type='json'): - """Create new bibliographic record.""" - from .api import Record +@manager.option('--force', dest='force', action='store_true', + help="Force insert.") +def create(source, schema=None, input_type='json', force=False): + """Create new bibliographic record(s).""" + from .tasks.api import create_record processor = current_app.config['RECORD_PROCESSORS'][input_type] if isinstance(processor, six.string_types): @@ -58,9 +60,11 @@ def create(source, schema=None, input_type='json'): data = processor(source) if isinstance(data, dict): - Record.create(data) + create_record.delay(json=data, force=force) else: - [Record.create(item) for item in data] + from celery import group + job = group([create_record.s(json=item, force=force) for item in data]) + result = job.apply_async() @manager.option('-p', '--patch', dest='patch', @@ -78,6 +82,8 @@ def patch(patch, recid=None, schema=None, input_type='jsonpatch'): for r in recid or []: Record.get_record(r).patch(patch_content).commit() + db.session.commit() + def main(): """Run manager.""" diff --git a/invenio_records/models.py b/invenio_records/models.py index 980bdcb2..62150303 100644 --- a/invenio_records/models.py +++ b/invenio_records/models.py @@ -21,12 +21,15 @@ from flask import current_app from intbitset import intbitset +from sqlalchemy.event import listen from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.schema import Index from werkzeug import cached_property +from invenio_collections.models import Collection from invenio_ext.sqlalchemy import db, utils +from .receivers import new_collection class Record(db.Model): @@ -149,6 +152,11 @@ class RecordMetadata(db.Model): } +# FIXME add after_delete +listen(Collection, 'after_insert', new_collection) +listen(Collection, 'after_update', new_collection) + + __all__ = ( 'Record', 'RecordMetadata', diff --git a/invenio_records/providers/recid.py b/invenio_records/providers/recid.py index fb8c5830..17e088ca 100644 --- a/invenio_records/providers/recid.py +++ b/invenio_records/providers/recid.py @@ -43,12 +43,8 @@ def create_new_pid(self, pid_value): record = Record(id=int(pid_value)) else: record = Record() - try: + with db.session.begin_nested(): db.session.add(record) - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - raise ValueError("Duplicate value for %s" % (pid_value, )) return str(record.id) def reserve(self, pid, *args, **kwargs): diff --git a/invenio_records/receivers.py b/invenio_records/receivers.py new file mode 100644 index 00000000..4f2161ce --- /dev/null +++ b/invenio_records/receivers.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2015 CERN. +# +# Invenio is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +from flask import current_app +from flask_sqlalchemy import models_committed + + +# FIXME add after_delete +@models_committed.connect +def record_modification(sender, changes): + from .models import RecordMetadata + from .tasks.index import index_record + for obj, change in changes: + if isinstance(obj, RecordMetadata) and change in ('insert', 'update'): + index_record.delay(obj.id, obj.json) + + +def new_collection(mapper, connection, target): + if target.dbquery is not None: + index_collection_percolator.delay(target.name, target.dbquery) diff --git a/invenio_records/recordext/functions/get_record_collections.py b/invenio_records/recordext/functions/get_record_collections.py new file mode 100644 index 00000000..aa249d5b --- /dev/null +++ b/invenio_records/recordext/functions/get_record_collections.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2015 CERN. +# +# Invenio is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +"""Record field function.""" + +from invenio_records.signals import ( + before_record_insert, + before_record_update, +) +from six import iteritems + +from invenio_utils.datastructures import LazyDict +from invenio_search.api import Query + +COLLECTIONS_DELETED_RECORDS = '{dbquery} AND NOT collection:"DELETED"' + + +def _queries(): + """Preprocess collection queries.""" + from invenio_ext.sqlalchemy import db + from invenio_collections.models import Collection + return dict( + (collection.name, dict( + query=Query(COLLECTIONS_DELETED_RECORDS.format( + dbquery=collection.dbquery) + ), + ancestors=set(c.name for c in collection.ancestors + if c.dbquery is None) + )) + for collection in Collection.query.filter( + Collection.dbquery.isnot(None), + db.not_(Collection.dbquery.like('hostedcollection:%')) + ).all() + ) + +queries = LazyDict(_queries) + + +def get_record_collections(record): + """Return list of collections to which record belongs to. + + :record: Record instance + :returns: list of collection names + """ + output = set() + for name, data in iteritems(queries): + if data['query'].match(record): + output.add(name) + output |= data['ancestors'] + return list(output) + + +@before_record_insert.connect +@before_record_update.connect +def update_collections(sender, *args, **kwargs): + sender['_collections'] = get_record_collections(sender) diff --git a/invenio_records/tasks/__init__.py b/invenio_records/tasks/__init__.py index 07c8db9c..d8a91972 100644 --- a/invenio_records/tasks/__init__.py +++ b/invenio_records/tasks/__init__.py @@ -21,8 +21,13 @@ from __future__ import absolute_import +from .api import create_record from .datacite import datacite_delete, datacite_register, datacite_sync, \ datacite_update, datacite_update_all +from .index import index_record -__all__ = ['datacite_delete', 'datacite_register', 'datacite_sync', - 'datacite_update', 'datacite_update_all'] +__all__ = ('create_record', + 'datacite_delete', 'datacite_register', 'datacite_sync', + 'datacite_update', 'datacite_update_all', + 'index_record' + ) diff --git a/invenio_records/tasks/api.py b/invenio_records/tasks/api.py new file mode 100644 index 00000000..e5fcd932 --- /dev/null +++ b/invenio_records/tasks/api.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2015 CERN. +# +# Invenio is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +from __future__ import absolute_import + + +from celery.utils.log import get_task_logger + +from invenio_celery import celery + +from ..api import Record + +logger = get_task_logger(__name__) + + +@celery.task +def create_record(json, force=False): + from invenio_ext.sqlalchemy import db + try: + return Record.create(json).get('recid') + except exc.IntegrityError: + if force: + current_app.logger.warning( + "Trying to force insert: {0}".format(json)) + return Record(json).commit().get('recid') + finally: + db.session.commit() diff --git a/invenio_records/tasks/index.py b/invenio_records/tasks/index.py new file mode 100644 index 00000000..45d1b470 --- /dev/null +++ b/invenio_records/tasks/index.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2015 CERN. +# +# Invenio is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. + +from invenio_celery import celery +from invenio_ext.es import es + + +@celery.task +def index_record(recid, json): + """Index a record in elasticsearch.""" + es.index( + index='records', + doc_type='record', + body=json, + id=recid + ) + + +@celery.task +def index_collection_percolator(name, dbquery): + """Create an elasticsearch percolator for a given query.""" + from invenio_search.api import Query + from invenio_search.walkers.elasticsearch import ElasticSearchDSL + es.index( + index='records', + doc_type='.percolator', + body={'query': Query(dbquery).query.accept(ElasticSearchDSL())}, + id=name + )