diff --git a/mygpo/administration/group.py b/mygpo/administration/group.py index 0adaf98b6..95729a1ef 100644 --- a/mygpo/administration/group.py +++ b/mygpo/administration/group.py @@ -29,15 +29,18 @@ def __get_episodes(self): def group(self, get_features): + """ Groups the episodes by features extracted using ``get_features`` + + get_features is a callable that expects an episode as parameter, and + returns a value representing the extracted feature(s). + """ episodes = self.__get_episodes() episode_groups = defaultdict(list) - episode_features = map(get_features, episodes.items()) - - for features, episode_id in episode_features: - episode = episodes[episode_id] + for episode in episodes.values(): + features = get_features(episode) episode_groups[features].append(episode) groups = sorted(episode_groups.values(), key=_SORT_KEY) diff --git a/mygpo/administration/tasks.py b/mygpo/administration/tasks.py index ac6af2008..1607e5bc5 100644 --- a/mygpo/administration/tasks.py +++ b/mygpo/administration/tasks.py @@ -1,15 +1,17 @@ +import uuid from collections import Counter from mygpo.podcasts.models import Podcast from mygpo.celery import celery from mygpo.maintenance.merge import PodcastMerger +from mygpo.maintenance.models import MergeTask from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @celery.task -def merge_podcasts(podcast_ids, num_groups): +def merge_podcasts(podcast_ids, num_groups, queue_id=''): """ Task to merge some podcasts""" logger.info('merging podcast ids %s', podcast_ids) @@ -18,11 +20,14 @@ def merge_podcasts(podcast_ids, num_groups): logger.info('merging podcasts %s', podcasts) - actions = Counter() - - pm = PodcastMerger(podcasts, actions, num_groups) + pm = PodcastMerger(podcasts, num_groups) podcast = pm.merge() - logger.info('merging result: %s', actions) + logger.info('merging successful') + + if queue_id: + qid = uuid.UUID(queue_id) + logger.info('Deleting merge queue entry {}'.format(qid)) + MergeTask.objects.filter(id=qid).delete() - return actions, podcast + return podcast diff --git a/mygpo/administration/templates/admin/merge-grouping.html b/mygpo/administration/templates/admin/merge-grouping.html index 94a5c8dae..7b48c5fb6 100644 --- a/mygpo/administration/templates/admin/merge-grouping.html +++ b/mygpo/administration/templates/admin/merge-grouping.html @@ -24,7 +24,7 @@

{% trans "Merge Podcasts and Episodes" %}

{% trans "Episodes that have the same number will be merged. Please verify all your changes by clicking on 'Renew Groups' before starting the Merge." %} -
+ {% csrf_token %} {% for podcast in podcasts %} @@ -39,18 +39,21 @@

{% trans "Merge Podcasts and Episodes" %}

{% endfor %} - {% for n, episodes in groups %} + {% for line in groups %} - {{ n }} + {{ forloop.counter }} - {% for podcast in podcasts %} + {% for episode in line %} - {% for episode in episodes %} - {% if episode.podcast == podcast.get_id %} - - {% episode_link episode podcast %}
- {% endif %} - {% endfor %} + {% if episode %} + + {% episode_link episode episode.podcast %}
+ {% endif %} {% endfor %} @@ -59,11 +62,12 @@

{% trans "Merge Podcasts and Episodes" %}

- - + + +
diff --git a/mygpo/administration/templates/admin/merge-select.html b/mygpo/administration/templates/admin/merge-select.html index 818438b1b..56d2170e4 100644 --- a/mygpo/administration/templates/admin/merge-select.html +++ b/mygpo/administration/templates/admin/merge-select.html @@ -13,7 +13,14 @@

{% trans "Merge Podcasts and Episodes" %}

{% endblock %} {% block content %} -
+
+ Queue Length: {{ queue_length }} + {% if task %} + - Take from Queue + {% endif %} +
+ + {% csrf_token %} {% for url in urls %}
@@ -22,6 +29,7 @@

{% trans "Merge Podcasts and Episodes" %}

{% endfor %} +
{% endblock %} diff --git a/mygpo/administration/templates/admin/task-status.html b/mygpo/administration/templates/admin/task-status.html index 9abe6bdef..d7da3dff1 100644 --- a/mygpo/administration/templates/admin/task-status.html +++ b/mygpo/administration/templates/admin/task-status.html @@ -34,16 +34,6 @@

{% if ready %} -

{% trans "The following actions were recorded:" %} -

-

-

{% trans "Go to podcast" %} {% podcast_group_link podcast %}

{% else %}

{% trans "The operation is still ongoing..." %}

diff --git a/mygpo/administration/tests.py b/mygpo/administration/tests.py index 3a181c868..d0f230af2 100644 --- a/mygpo/administration/tests.py +++ b/mygpo/administration/tests.py @@ -66,13 +66,11 @@ def test_merge(self): # we need that for later e3_id = e3.pk - actions = Counter() - # decide which episodes to merge groups = [(0, [e1]), (1, [e2, e3]), (2, [e4])] # carry out the merge - pm = PodcastMerger([p1, p2], actions, groups) + pm = PodcastMerger([p1, p2], groups) pm.merge() e1 = Episode.objects.get(pk=e1.pk) diff --git a/mygpo/administration/urls.py b/mygpo/administration/urls.py index 1e033b84f..bc5004a62 100644 --- a/mygpo/administration/urls.py +++ b/mygpo/administration/urls.py @@ -16,11 +16,23 @@ views.MergeSelect.as_view(), name='admin-merge'), + path('merge/create', + views.CreateMergeTask.as_view(), + name='admin-merge-create'), + + path('merge/verify/', + views.MergeVerify.as_view(), + name='admin-merge-verify'), + + path('merge/update/', + views.UpdateMergeTask.as_view(), + name='admin-merge-update'), + path('merge/verify', views.MergeVerify.as_view(), name='admin-merge-verify'), - path('merge/process', + path('merge/process/', views.MergeProcess.as_view(), name='admin-merge-process'), diff --git a/mygpo/administration/views.py b/mygpo/administration/views.py index 16353feea..788eff4c5 100644 --- a/mygpo/administration/views.py +++ b/mygpo/administration/views.py @@ -1,5 +1,6 @@ import re import socket +import uuid from itertools import count, chain from collections import Counter from datetime import datetime @@ -17,15 +18,14 @@ from django.template import RequestContext from django.utils.translation import ugettext as _ from django.contrib.sites.requests import RequestSite -from django.views.generic import TemplateView +from django.views.generic import TemplateView, View from django.utils.decorators import method_decorator from django.conf import settings from django.contrib.auth import get_user_model from mygpo.podcasts.models import Podcast, Episode from mygpo.administration.auth import require_staff -from mygpo.administration.group import PodcastGrouper -from mygpo.maintenance.merge import PodcastMerger, IncorrectMergeException +from mygpo.maintenance.models import MergeTask from mygpo.administration.clients import UserAgentStats, ClientStats from mygpo.administration.tasks import merge_podcasts from mygpo.utils import get_git_head @@ -106,18 +106,33 @@ class MergeSelect(AdminView): template_name = 'admin/merge-select.html' def get(self, request): + queue_length = MergeTask.objects.count() + task = MergeTask.objects.first() + num = int(request.GET.get('podcasts', 2)) urls = [''] * num + queue_id = '' return self.render_to_response({ + 'queue_length': queue_length, 'urls': urls, + 'task': task, }) -class MergeBase(AdminView): +class CreateMergeTask(AdminView): + + def post(self, request): + podcasts = self._get_podcasts(request) + + task = MergeTask.objects.create_from_podcasts(podcasts) + + return HttpResponseRedirect( + reverse('admin-merge-verify', args=[task.id]) + ) def _get_podcasts(self, request): - podcasts = [] + for n in count(): podcast_url = request.POST.get('feed%d' % n, None) if podcast_url is None: @@ -127,82 +142,68 @@ def _get_podcasts(self, request): continue p = Podcast.objects.get(urls__url=podcast_url) - podcasts.append(p) + yield p + - return podcasts +class MergeBase(AdminView): + pass class MergeVerify(MergeBase): template_name = 'admin/merge-grouping.html' - def post(self, request): - - try: - podcasts = self._get_podcasts(request) - - grouper = PodcastGrouper(podcasts) - - get_features = lambda id_e: ((id_e[1].url, id_e[1].title), id_e[0]) - - num_groups = grouper.group(get_features) - - - except InvalidPodcast as ip: - messages.error(request, - _('No podcast with URL {url}').format(url=str(ip))) - - podcasts = [] - num_groups = [] - + def get(self, request, task_id): + task = MergeTask.objects.get(id=uuid.UUID(task_id)) + podcasts = list(sorted(task.podcasts, key=lambda p: p.subscribers)) + groups = task.episode_groups() return self.render_to_response({ - 'podcasts': podcasts, - 'groups': num_groups, - }) + 'podcasts': podcasts, + 'groups': groups, + 'task': task, + }) -class MergeProcess(MergeBase): +class UpdateMergeTask(View): RE_EPISODE = re.compile(r'episode_([0-9a-fA-F]{32})') - def post(self, request): + def post(self, request, task_id): + task = MergeTask.objects.get(id=uuid.UUID(task_id)) + podcasts = task.podcasts - try: - podcasts = self._get_podcasts(request) + features = self._features_from_post(request.POST) + get_features = lambda episode: features[episode.id] - except InvalidPodcast as ip: - messages.error(request, - _('No podcast with URL {url}').format(url=str(ip))) + # update groups within MergeTask + task.set_groups(get_features) + task.save() - grouper = PodcastGrouper(podcasts) + return HttpResponseRedirect( + reverse('admin-merge-verify', args=[task.id]) + ) + def _features_from_post(self, post): features = {} - for key, feature in request.POST.items(): + for key, feature in post.items(): m = self.RE_EPISODE.match(key) if m: - episode_id = m.group(1) + episode_id = uuid.UUID(m.group(1)) features[episode_id] = feature - get_features = lambda id_e: (features.get(id_e[0], id_e[0]), id_e[0]) + return features - num_groups = grouper.group(get_features) - - if 'renew' in request.POST: - return render(request, 'admin/merge-grouping.html', { - 'podcasts': podcasts, - 'groups': num_groups, - }) +class MergeProcess(MergeBase): - elif 'merge' in request.POST: + def post(self, request, task_id): - podcast_ids = [p.get_id() for p in podcasts] - num_groups = list(num_groups) + task = MergeTask.objects.get(id=uuid.UUID(task_id)) - res = merge_podcasts.delay(podcast_ids, num_groups) + res = merge_podcasts.delay(task.pk) - return HttpResponseRedirect(reverse('admin-merge-status', - args=[res.task_id])) + return HttpResponseRedirect(reverse('admin-merge-status', + args=[res.task_id])) class MergeStatus(AdminView): @@ -222,16 +223,11 @@ def get(self, request, task_id): # TODO: what to do with multiple frontends? cache.clear() - try: - actions, podcast = result.get() - - except IncorrectMergeException as ime: - messages.error(request, str(ime)) - return HttpResponseRedirect(reverse('admin-merge')) + podcast_id = result.get() + podcast = Podcast.objects.get(id=podcast_id) return self.render_to_response({ 'ready': True, - 'actions': actions.items(), 'podcast': podcast, }) diff --git a/mygpo/maintenance/admin.py b/mygpo/maintenance/admin.py new file mode 100644 index 000000000..7ace1acee --- /dev/null +++ b/mygpo/maintenance/admin.py @@ -0,0 +1,28 @@ +from django.contrib import admin + +from . import models + + +class MergeTaskEntryInline(admin.TabularInline): + model = models.MergeTaskEntry + + fields = ['podcast', ] + readonly_fields = ['podcast', ] + + +@admin.register(models.MergeTask) +class MergeTaskAdmin(admin.ModelAdmin): + + model = models.MergeTask + + readonly_fields = ['id', ] + list_display = ['id', 'num_entries',] + + show_full_result_count = False + + inlines = [ + MergeTaskEntryInline, + ] + + def num_entries(self, obj): + return obj.entries.count() diff --git a/mygpo/maintenance/merge.py b/mygpo/maintenance/merge.py index 6a8c72b4f..740f5ea2b 100644 --- a/mygpo/maintenance/merge.py +++ b/mygpo/maintenance/merge.py @@ -12,6 +12,7 @@ from mygpo.history.models import HistoryEntry, EpisodeHistoryEntry from mygpo.publisher.models import PublishedPodcast from mygpo.subscriptions.models import Subscription +from . import models import logging logger = logging.getLogger(__name__) @@ -27,7 +28,7 @@ class IncorrectMergeException(Exception): class PodcastMerger(object): """ Merges podcasts and their related objects """ - def __init__(self, podcasts, actions, groups): + def __init__(self, podcasts, groups): """ Prepares to merge podcasts[1:] into podcasts[0] """ for n, podcast1 in enumerate(podcasts): @@ -38,7 +39,6 @@ def __init__(self, podcasts, actions, groups): (podcast1.get_id(), podcast2.get_id())) self.podcasts = podcasts - self.actions = actions self.groups = groups def merge(self): @@ -66,46 +66,9 @@ def merge_episodes(self): merge_model_objects(episode, episodes) -def reassign_urls(obj1, obj2): - # Reassign all URLs of obj2 to obj1 - max_order = max([0] + [u.order for u in obj1.urls.all()]) - - for n, url in enumerate(obj2.urls.all(), max_order+1): - url.content_object = obj1 - url.order = n - url.scope = obj1.scope - try: - url.save() - except IntegrityError as ie: - logger.warn('Moving URL failed: %s. Deleting.', str(ie)) - url.delete() - - -def reassign_merged_uuids(obj1, obj2): - # Reassign all IDs of obj2 to obj1 - MergedUUID.objects.create(uuid=obj2.id, content_object=obj1) - for m in obj2.merged_uuids.all(): - m.content_object = obj1 - m.save() - - -def reassign_slugs(obj1, obj2): - # Reassign all Slugs of obj2 to obj1 - max_order = max([0] + [s.order for s in obj1.slugs.all()]) - for n, slug in enumerate(obj2.slugs.all(), max_order+1): - slug.content_object = obj1 - slug.order = n - slug.scope = obj1.scope - try: - slug.save() - except IntegrityError as ie: - logger.warn('Moving Slug failed: %s. Deleting', str(ie)) - slug.delete() - - # based on https://djangosnippets.org/snippets/2283/ @transaction.atomic -def merge_model_objects(primary_object, alias_objects=[], keep_old=False): +def merge_model_objects(primary_object, alias_objects, keep_old=False): """ Use this function to merge model objects (i.e. Users, Organizations, Polls, etc.) and migrate all of the related fields from the alias objects to the @@ -115,10 +78,8 @@ def merge_model_objects(primary_object, alias_objects=[], keep_old=False): from django.contrib.auth.models import User primary_user = User.objects.get(email='good_email@example.com') duplicate_user = User.objects.get(email='good_email+duplicate@example.com') - merge_model_objects(primary_user, duplicate_user) + merge_model_objects(primary_user, [duplicate_user]) """ - if not isinstance(alias_objects, list): - alias_objects = [alias_objects] # check that all aliases are the same class as primary one and that # they are subclass of model @@ -142,11 +103,6 @@ def merge_model_objects(primary_object, alias_objects=[], keep_old=False): for field_name, field in fields: generic_fields.append(field) - blank_local_fields = set( - [field.attname for field - in primary_object._meta.local_fields - if getattr(primary_object, field.attname) in [None, '']]) - # Loop through all alias objects and migrate their data to # the primary object. for alias_object in alias_objects: @@ -160,8 +116,9 @@ def merge_model_objects(primary_object, alias_objects=[], keep_old=False): related_objects = getattr(alias_object, alias_varname) for obj in related_objects.all(): setattr(obj, obj_varname, primary_object) - reassigned(obj, primary_object) - obj.save() + deleted = reassigned(obj, primary_object) + if not deleted: + obj.save() # Migrate all many to many references from alias object to # primary object. @@ -180,8 +137,9 @@ def merge_model_objects(primary_object, alias_objects=[], keep_old=False): obj_varname).all() for obj in related_many_objects.all(): getattr(obj, obj_varname).remove(alias_object) - reassigned(obj, primary_object) - getattr(obj, obj_varname).add(primary_object) + deleted = reassigned(obj, primary_object) + if not deleted: + getattr(obj, obj_varname).add(primary_object) # Migrate all generic foreign key references from alias # object to primary object. @@ -193,7 +151,10 @@ def merge_model_objects(primary_object, alias_objects=[], keep_old=False): related = field.model.objects.filter(**filter_kwargs) for generic_related_object in related: setattr(generic_related_object, field.name, primary_object) - reassigned(generic_related_object, primary_object) + deleted = reassigned(generic_related_object, primary_object) + if deleted: + continue + try: # execute save in a savepoint, so we can resume in the # transaction @@ -203,20 +164,10 @@ def merge_model_objects(primary_object, alias_objects=[], keep_old=False): if ie.__cause__.pgcode == PG_UNIQUE_VIOLATION: merge(generic_related_object, primary_object) - # Try to fill all missing values in primary object by - # values of duplicates - filled_up = set() - for field_name in blank_local_fields: - val = getattr(alias_object, field_name) - if val not in [None, '']: - setattr(primary_object, field_name, val) - filled_up.add(field_name) - blank_local_fields -= filled_up - if not keep_old: before_delete(alias_object, primary_object) alias_object.delete() - primary_object.save() + return primary_object @@ -236,6 +187,17 @@ def _get_all_related_many_to_many_objects(obj): def reassigned(obj, new): + """ handles changes necessary when reassigning `obj` to `new` + + Some objects have a dependent object (eg URL has a Podcast or Episode. + During merging, the object might be assigned from to a new Episode. + The re-assignment requires the "scope" field to be set to the value + of the new episode. In some cases it might require the existing object to + be deleted, to preserve uniqueness. + + Returns whether the object was deleted. + """ + if isinstance(obj, URL): # a URL has its parent's scope obj.scope = new.scope @@ -244,11 +206,27 @@ def reassigned(obj, new): max_order = max([-1] + [u.order for u in existing_urls]) obj.order = max_order+1 + elif isinstance(obj, Slug): + # a Slug has its parent's scope + obj.scope = new.scope + + existing_slugs = new.slugs.all() + max_order = max([-1] + [s.order for s in existing_slugs]) + obj.order = max_order+1 + elif isinstance(obj, Episode): # obj is an Episode, new is a podcast for url in obj.urls.all(): url.scope = new.as_scope - url.save() + try: + with transaction.atomic(): + url.save() + except IntegrityError as ie: + if 'podcasts_url_url_scope_key' in str(ie): + url.delete() + return True + else: + raise elif isinstance(obj, Subscription): pass @@ -259,10 +237,17 @@ def reassigned(obj, new): elif isinstance(obj, HistoryEntry): pass + elif isinstance(obj, models.MergeTaskEntry): + obj.delete() + return True + else: raise TypeError('unknown type for reassigning: {objtype}'.format( objtype=type(obj))) + # Object was not deleted + return False + def before_delete(old, new): diff --git a/mygpo/maintenance/migrations/0001_initial.py b/mygpo/maintenance/migrations/0001_initial.py new file mode 100644 index 000000000..c12a00c51 --- /dev/null +++ b/mygpo/maintenance/migrations/0001_initial.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.4 on 2017-08-08 18:00 +from __future__ import unicode_literals + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ('podcasts', '0037_index_podcast_lastupdate'), + ] + + operations = [ + migrations.CreateModel( + name='MergeQueue', + fields=[ + ('id', models.UUIDField(primary_key=True, serialize=False)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='MergeQueueEntry', + fields=[ + ('id', models.UUIDField(primary_key=True, serialize=False)), + ('podcast', models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to='podcasts.Podcast', + )), + ('queue', models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to='maintenance.MergeQueue', + )), + ], + options={ + 'abstract': False, + }, + ), + ] diff --git a/mygpo/maintenance/migrations/0002_podcast_mergequeue_unique.py b/mygpo/maintenance/migrations/0002_podcast_mergequeue_unique.py new file mode 100644 index 000000000..b50f960b1 --- /dev/null +++ b/mygpo/maintenance/migrations/0002_podcast_mergequeue_unique.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.4 on 2017-08-08 18:14 +from __future__ import unicode_literals + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('podcasts', '0037_index_podcast_lastupdate'), + ('maintenance', '0001_initial'), + ] + + operations = [ + migrations.AlterUniqueTogether( + name='mergequeueentry', + unique_together=set([('podcast',)]), + ), + ] diff --git a/mygpo/maintenance/migrations/0003_rel_name.py b/mygpo/maintenance/migrations/0003_rel_name.py new file mode 100644 index 000000000..b27a2b251 --- /dev/null +++ b/mygpo/maintenance/migrations/0003_rel_name.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.4 on 2017-08-12 14:34 +from __future__ import unicode_literals + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('maintenance', '0002_podcast_mergequeue_unique'), + ] + + operations = [ + migrations.AlterField( + model_name='mergequeueentry', + name='queue', + field=models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name='entries', + related_query_name='entry', + to='maintenance.MergeQueue', + ), + ), + ] diff --git a/mygpo/maintenance/migrations/0004_rename_mergetask.py b/mygpo/maintenance/migrations/0004_rename_mergetask.py new file mode 100644 index 000000000..a06238377 --- /dev/null +++ b/mygpo/maintenance/migrations/0004_rename_mergetask.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.4 on 2017-08-13 07:23 +from __future__ import unicode_literals + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('maintenance', '0003_rel_name'), + ] + + operations = [ + migrations.RenameModel( + old_name='MergeQueue', + new_name='MergeTask', + ), + migrations.RenameModel( + old_name='MergeQueueEntry', + new_name='MergeTaskEntry', + ), + ] diff --git a/mygpo/maintenance/migrations/0005_task.py b/mygpo/maintenance/migrations/0005_task.py new file mode 100644 index 000000000..f6a39f58e --- /dev/null +++ b/mygpo/maintenance/migrations/0005_task.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.4 on 2017-08-13 08:55 +from __future__ import unicode_literals + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('maintenance', '0004_rename_mergetask'), + ] + + operations = [ + migrations.RenameField( + model_name='mergetaskentry', + old_name='queue', + new_name='task', + ), + ] diff --git a/mygpo/maintenance/migrations/0006_mergetask_groups.py b/mygpo/maintenance/migrations/0006_mergetask_groups.py new file mode 100644 index 000000000..1dcbbf9f4 --- /dev/null +++ b/mygpo/maintenance/migrations/0006_mergetask_groups.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.4 on 2017-08-13 09:01 +from __future__ import unicode_literals + +import django.contrib.postgres.fields.jsonb +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('maintenance', '0005_task'), + ] + + operations = [ + migrations.AddField( + model_name='mergetask', + name='groups', + field=django.contrib.postgres.fields.jsonb.JSONField(default=dict), + ), + ] diff --git a/mygpo/maintenance/migrations/__init__.py b/mygpo/maintenance/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mygpo/maintenance/models.py b/mygpo/maintenance/models.py new file mode 100644 index 000000000..bf6d1fdf1 --- /dev/null +++ b/mygpo/maintenance/models.py @@ -0,0 +1,171 @@ +import uuid +from datetime import datetime +from collections import defaultdict + +from django.db import models, transaction +from django.contrib.postgres.fields import JSONField + +from mygpo.core.models import UUIDModel +from mygpo.podcasts.models import Podcast +from mygpo.maintenance.merge import merge_model_objects + +import logging +logger = logging.getLogger(__name__) + +DEFAULT_RELEASE = datetime(1970, 1, 1) +_SORT_KEY = lambda ep: ep.released or DEFAULT_RELEASE + + +class MergeTaskManager(models.Manager): + + @transaction.atomic + def create_from_podcasts(self, podcasts): + task = self.create(id=uuid.uuid4()) + + for podcast in podcasts: + mte = MergeTaskEntry.objects.create( + id=uuid.uuid4(), + podcast=podcast, + task=task, + ) + + get_features = lambda episode: (episode.url, episode.title) + + # update groups within MergeTask + task.set_groups(get_features) + task.save() + + return task + + +class MergeTask(UUIDModel): + """ A Group of podcasts that could be merged """ + + objects = MergeTaskManager() + + groups = JSONField(default=dict) + + @property + def podcasts(self): + """ Returns the podcasts of the task, sorted by subscribers """ + podcasts = [entry.podcast for entry in self.entries.all()] + podcasts = sorted(podcasts, + key=lambda p: p.subscribers, reverse=True) + return podcasts + + + def set_groups(self, get_features): + """ Groups the episodes by features extracted using ``get_features`` + + get_features is a callable that expects an episode as parameter, and + returns a value representing the extracted feature(s). + """ + + episodes = self.episodes + + episode_groups = defaultdict(list) + + for episode in episodes.values(): + features = get_features(episode) + episode_groups[features].append(episode.pk.hex) + + groups = sorted(episode_groups.values())#, key=_SORT_KEY) + self.groups = list(groups) + + @property + def episodes(self): + episodes = {} + for podcast in self.podcasts: + episodes.update(dict((e.id.hex, e) for e in podcast.episode_set.all())) + + return episodes + + def episode_groups(self): + """ Return a list of episode lists + + podcasts = [p1, p2, p3] + + Returns + groups = [ + [ep1 of p1, ep1 of p2, None], + [ep2 of p2, None, ep2 of p3], + ] + """ + + episodes = self.episodes + podcasts = self.podcasts + groups = [] + print(episodes) + print(podcasts) + print(self.groups) + + for episode_ids in self.groups: + line = [] + # go through the podcasts in order + for podcast in podcasts: + for episode_id in episode_ids: + episode = episodes.get(episode_id, None) + if episode is None: + continue + + if episode.podcast == podcast: + line.append(episode) + break + else: + # if nothing was found, add None + line.append(None) + + groups.append(line) + print(groups) + return groups + + def merge(self): + """ Carries out the actual merging """ + + logger.info('Start merging of podcasts: %r', self.podcasts) + + podcasts = self.podcasts + podcast1 = podcasts.pop(0) + logger.info('Merge target: %r', podcast1) + + self.merge_episodes() + merge_model_objects(podcast1, podcasts) + + return podcast1 + + def merge_episodes(self): + """ Merges the episodes according to the groups """ + + for episodes in self.episode_groups(): + print('Episodes') + print(episodes) + + if not episodes: + continue + + episode = episodes.pop(0) + + if not episode: + continue + + # the list can contain Nones + episodes = list(filter(None, episodes)) + + logger.info('Merging %d episodes', len(episodes)) + merge_model_objects(episode, episodes) + + +class MergeTaskEntry(UUIDModel): + """ An entry in a MergeTask """ + + podcast = models.ForeignKey(Podcast, on_delete=models.CASCADE) + + task = models.ForeignKey(MergeTask, + on_delete=models.CASCADE, + related_name='entries', + related_query_name='entry') + + class Meta: + unique_together = [ + ['podcast', ] # a podcast can only belong to one task + ] diff --git a/mygpo/maintenance/tasks.py b/mygpo/maintenance/tasks.py new file mode 100644 index 000000000..c30c3bb61 --- /dev/null +++ b/mygpo/maintenance/tasks.py @@ -0,0 +1,67 @@ +import uuid + +from mygpo.celery import celery +from mygpo.podcasts.models import Podcast + +from . import models + +from celery.utils.log import get_task_logger +logger = get_task_logger(__name__) + + +@celery.task +def populate_merge_queue(): + """ Populate the merge queue with merge candidates """ + + candidates = Podcast.objects.filter(mergequeueentry__isnull=True) + + for podcast in candidates.iterator(): + + # randomly pick an episode + episode = podcast.episode_set.order_by('?').first() + + if not episode: + continue + + if not episode.url: + continue + + # get a group of similar podcasts + # it is sufficient to have one episode in common -- this could be + # extended to require multiple common episodes + podcasts = Podcast.objects.filter(episode__urls__url=episode.url) + + # a group of one is no real group -- there'd be nothing to merge + if podcasts.count() <= 1: + continue + + mqs = _get_merge_queues(podcasts) + + if len(mqs) == 0: + mq = models.MergeTask.objects.create(id=uuid.uuid4()) + + if len(mqs) == 1: + mq = mqs.pop() + + if len(mqs) > 1: + continue # merge queues would need to be merged -- not yet supported + + for podcast in podcasts.iterator(): + + # already in a merge queue + if podcast.entries.exists(): + continue + + # add to merge queue + mqe = models.MergeTaskEntry.objects.create( + id=uuid.uuid4(), + podcast=podcast, + queue=mq, + ) + + +def _get_merge_queues(podcasts): + mqs = models.MergeTask.objects.filter( + mergequeueentry__podcast__in=podcasts, + ).distinct() + return set(mqs) diff --git a/mygpo/maintenance/tests.py b/mygpo/maintenance/tests.py index 544d7d351..8029b6a4c 100644 --- a/mygpo/maintenance/tests.py +++ b/mygpo/maintenance/tests.py @@ -42,8 +42,7 @@ def setUp(self): def test_merge_podcasts(self): # decide which episodes to merge groups = [(0, [self.episode1, self.episode2])] - counter = Counter() - pm = PodcastMerger([self.podcast1, self.podcast2], counter, groups) + pm = PodcastMerger([self.podcast1, self.podcast2], groups) pm.merge() @@ -94,9 +93,8 @@ def test_merge_podcasts(self): # decide which episodes to merge groups = [(0, [self.episode1, self.episode2])] - counter = Counter() - pm = PodcastMerger([self.podcast1, self.podcast2], counter, groups) + pm = PodcastMerger([self.podcast1, self.podcast2], groups) pm.merge() history = EpisodeHistoryEntry.objects.filter( @@ -186,11 +184,10 @@ def test_merge_podcasts(self): # decide which episodes to merge groups = [(0, [self.episode1, self.episode2])] - counter = Counter() episode2_id = self.episode2.id - pm = PodcastMerger([podcast2, podcast1], counter, groups) + pm = PodcastMerger([podcast2, podcast1], groups) pm.merge() history = EpisodeHistoryEntry.objects.filter( diff --git a/mygpo/settings.py b/mygpo/settings.py index 7731c5f2f..091618016 100644 --- a/mygpo/settings.py +++ b/mygpo/settings.py @@ -121,6 +121,10 @@ def get_intOrNone(name, default): }, }] +if DEBUG: + # don't use cached template loader + TEMPLATES[0]['OPTIONS']['loaders'] = TEMPLATES[0]['OPTIONS']['loaders'][0][1] + MIDDLEWARE = [ 'django.middleware.common.CommonMiddleware', diff --git a/mygpo/users/tests.py b/mygpo/users/tests.py index 6d5f137c4..e83fd86fe 100644 --- a/mygpo/users/tests.py +++ b/mygpo/users/tests.py @@ -84,7 +84,7 @@ def test_merge_podcasts(self): subscribe(self.podcast2, self.user, self.device) # merge podcast2 into podcast1 - pm = PodcastMerger([self.podcast1, self.podcast2], Counter(), []) + pm = PodcastMerger([self.podcast1, self.podcast2], []) pm.merge() # get podcast for URL of podcast2 and unsubscribe from it diff --git a/mygpo/web/templatetags/episodes.py b/mygpo/web/templatetags/episodes.py index b75c19c7b..fb41dce04 100644 --- a/mygpo/web/templatetags/episodes.py +++ b/mygpo/web/templatetags/episodes.py @@ -131,11 +131,6 @@ def episode_link(episode, podcast, title=None): title=title) -@register.simple_tag -def get_id(obj): - return obj._id - - @register.simple_tag def episode_number(episode, podcast): num = episode.get_episode_number(podcast.common_episode_title)