-
Notifications
You must be signed in to change notification settings - Fork 85
/
tasks.py
131 lines (104 loc) · 3.75 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# -*- coding: utf-8 -*-
import sys
from whoosh.index import create_in
from whoosh.fields import *
from celery import group
from config import sqla, celery_app
from whoosh.writing import AsyncWriter
from webs import douban, bilibili
from gevent import monkey
from helpers import upload_qiniu_by_filenames
from config import config
monkey.patch_socket()
'''Fix, task try ... except... don't work, because monkey.patch_socket().
'''
@celery_app.task
def douban_movie_base_task(pool_number):
movie_douban_ids = douban.tasks.get_main_movies_base_data.task(pool_number)
return movie_douban_ids
@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def douban_movie_full_task(self, douban_ids, pool_number):
try:
douban.tasks.get_main_movies_full_data.task(douban_ids, pool_number)
except Exception as e:
print('Error: ' + str(e))
self.retry(countdown=10)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def douban_celebrity_full_task(self, douban_ids, pool_number):
try:
douban.tasks.get_celebrities_full_data.task(douban_ids, pool_number)
except Exception as e:
print('Error: ' + str(e))
self.retry(countdown=10)
@celery_app.task
def bilibili_animation_base_task(pool_number):
animation_bilibili_ids = \
bilibili.tasks.get_animations_base_data.task(pool_number)
return animation_bilibili_ids
@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def bilibili_animation_full_task(self, bilibili_ids, pool_number):
try:
bilibili.tasks.get_animations_full_data.task(bilibili_ids, pool_number)
except Exception as e:
print('Error: ' + str(e))
bilibili_animation_full_task.retry(countdown=10)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def down_video_images_task(self, douban_ids, pool_number):
try:
douban.tasks.down_video_images.task(douban_ids, pool_number)
except Exception as e:
print('Error: ' + str(e))
down_images_task.retry(countdown=10)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def down_celebrity_images_task(self, douban_ids, pool_number):
try:
douban.tasks.down_celebrity_images.task(douban_ids, pool_number)
except Exception as e:
print('Error: ' + str(e))
down_images_task.retry(countdown=10)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def upload_images_task(filenames, pool_number):
access_key = config.get('qiniu', 'access_key')
secret_key = config.get('qiniu', 'secret_key')
bucket_name = config.get('qiniu', 'bucket_name')
try:
upload_qiniu_by_filenames(
access_key,
secret_key,
bucket_name,
'/static/img/',
pool_number,
config.get('photo', 'path'),
filenames,
True
)
except Exception as e:
print('Error: ' + str(e))
upload_images_task.retry(countdown=10)
@celery_app.task
def whoosh_task(ids, pool_number, ix, model_class):
session = sqla['session']
writer = AsyncWriter(ix)
for id_ in ids:
obj = session.query(model_class).filter_by(id=id_).one()
if obj.title is None or obj.summary is None:
continue
writer.add_document(
title=obj.title,
summary=obj.summary
)
writer.commit()
def get_task_group_by_id(ids, task, **kwargs):
id_size = len(ids)
if 'group_size' in kwargs:
group_size = kwargs.pop('group_size')
else:
group_size = 20
kwargs['pool_number'] = group_size
subtasks = [
task.s(
ids[x: x+group_size],
**kwargs
) for x in range(0, id_size, group_size)
]
return group(subtasks)