diff --git a/repo_metadata/index_worker.py b/repo_metadata/index_worker.py index cca8a97a..41b147aa 100644 --- a/repo_metadata/index_worker.py +++ b/repo_metadata/index_worker.py @@ -79,8 +79,8 @@ def worker_handler(self): if len(msg) != 3: logger.info('Bad message: %s' % str(msg)) else: - repo_id, commit_id = msg[1], msg[2] - self.worker_task_handler(self.mq, repo_id, commit_id, self.should_stop) + op_type, repo_id, commit_id = msg[0], msg[1], msg[2] + self.worker_task_handler(self.mq, repo_id, commit_id, op_type, self.should_stop) except (ResponseError, NoMQAvailable, TimeoutError) as e: logger.error('The connection to the redis server failed: %s' % e) except Exception as e: @@ -89,7 +89,7 @@ def worker_handler(self): # prevent case that redis break at program running. time.sleep(0.3) - def worker_task_handler(self, mq, repo_id, commit_id, should_stop): + def worker_task_handler(self, mq, repo_id, commit_id, op_type, should_stop): # Python cannot kill threads, so stop it generate more locked key. if not should_stop.isSet(): # set key-value if does not exist which will expire 30 minutes later @@ -100,7 +100,7 @@ def worker_task_handler(self, mq, repo_id, commit_id, should_stop): (threading.currentThread().getName(), repo_id)) lock_key = self._get_lock_key(repo_id) self.locked_keys.add(lock_key) - self.update_metadata(repo_id) + self.update_metadata(repo_id, op_type) try: self.locked_keys.remove(lock_key) except KeyError: @@ -112,14 +112,17 @@ def worker_task_handler(self, mq, repo_id, commit_id, should_stop): # the repo is updated by other thread, push back to the queue self.add_to_undo_task(mq, repo_id, commit_id) - def update_metadata(self, repo_id): + def update_metadata(self, repo_id, op_type): commit_id = repo_data.get_repo_head_commit(repo_id) if not commit_id: # invalid repo without head commit id logger.error("invalid repo : %s " % repo_id) return try: - self.metadata_manager.update_metadata(repo_id, commit_id) + if op_type == 'init_metadata': + self.metadata_manager.create_metadata(repo_id) + else: + self.metadata_manager.update_metadata(repo_id, commit_id) except Exception as e: logger.exception('update repo: %s metadata error: %s', repo_id, e) diff --git a/repo_metadata/metadata_server_api.py b/repo_metadata/metadata_server_api.py index 7b0c7795..06c2c507 100644 --- a/repo_metadata/metadata_server_api.py +++ b/repo_metadata/metadata_server_api.py @@ -3,35 +3,43 @@ from seafevents.app.config import METADATA_SERVER_SECRET_KEY, METADATA_SERVER_URL -class StructureTable(object): - def __init__(self, id, name): - self.id = id +class MetadataTable(object): + def __init__(self, table_id, name): + self.id = table_id self.name = name + @property + def columns(self): + return MetadataColumns() -class StructureColumn(object): + +class MetadataColumns(object): + def __init__(self): + self.id = MetadataColumn('_id', '_id', 'text') + self.file_creator = MetadataColumn('_file_creator', '_file_creator', 'text') + self.file_ctime = MetadataColumn('_file_ctime', '_file_ctime', 'date') + self.file_modifier = MetadataColumn('_file_modifier', '_file_modifier', 'text') + self.file_mtime = MetadataColumn('_file_mtime', '_file_mtime', 'date') + self.parent_dir = MetadataColumn('_parent_dir', '_parent_dir', 'text') + self.file_name = MetadataColumn('_name', '_name', 'text') + self.is_dir = MetadataColumn('_is_dir', '_is_dir', 'text') + + +class MetadataColumn(object): def __init__(self, key, name, type): self.key = key self.name = name self.type = type - def to_build_column_dict(self): + def to_dict(self): return { 'key': self.key, 'name': self.name, 'type': self.type } -#metadata base -METADATA_TABLE = StructureTable('0001', 'Table1') -METADATA_COLUMN_ID = StructureColumn('_id', '_id', 'text') -METADATA_COLUMN_CREATOR = StructureColumn('_file_creator', '_file_creator', 'text') -METADATA_COLUMN_CREATED_TIME = StructureColumn('_file_ctime', '_file_ctime', 'date') -METADATA_COLUMN_MODIFIER = StructureColumn('_file_modifier', '_file_modifier', 'text') -METADATA_COLUMN_MODIFIED_TIME = StructureColumn('_file_mtime', '_file_mtime', 'date') -METADATA_COLUMN_PARENT_DIR = StructureColumn('_parent_dir', '_parent_dir', 'text') -METADATA_COLUMN_NAME = StructureColumn('_name', '_name', 'text') -METADATA_COLUMN_IS_DIR = StructureColumn('_is_dir', '_is_dir', 'text') + +METADATA_TABLE = MetadataTable('0001', 'Table1') def parse_response(response): diff --git a/repo_metadata/repo_metadata.py b/repo_metadata/repo_metadata.py index 2cfebc20..0a0f6dcf 100644 --- a/repo_metadata/repo_metadata.py +++ b/repo_metadata/repo_metadata.py @@ -1,9 +1,10 @@ import os import logging -from seafevents.repo_metadata.metadata_server_api import METADATA_TABLE, METADATA_COLUMN_ID, \ - METADATA_COLUMN_CREATOR, METADATA_COLUMN_CREATED_TIME, METADATA_COLUMN_MODIFIER, \ - METADATA_COLUMN_MODIFIED_TIME, METADATA_COLUMN_PARENT_DIR, METADATA_COLUMN_NAME, METADATA_COLUMN_IS_DIR +from seafevents.repo_metadata.metadata_server_api import METADATA_TABLE + # METADATA_COLUMN_ID, \ + # METADATA_COLUMN_CREATOR, METADATA_COLUMN_CREATED_TIME, METADATA_COLUMN_MODIFIER, \ + # METADATA_COLUMN_MODIFIED_TIME, METADATA_COLUMN_PARENT_DIR, METADATA_COLUMN_NAME, METADATA_COLUMN_IS_DIR from seafevents.utils import timestamp_to_isoformat_timestr @@ -57,13 +58,13 @@ def add_files(self, repo_id, added_files): continue row = { - METADATA_COLUMN_CREATOR.name: modifier, - METADATA_COLUMN_CREATED_TIME.name: timestamp_to_isoformat_timestr(mtime), - METADATA_COLUMN_MODIFIER.name: modifier, - METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(mtime), - METADATA_COLUMN_PARENT_DIR.name: parent_dir, - METADATA_COLUMN_NAME.name: file_name, - METADATA_COLUMN_IS_DIR.name: 'False', + METADATA_TABLE.columns.file_creator.name: modifier, + METADATA_TABLE.columns.file_ctime.name: timestamp_to_isoformat_timestr(mtime), + METADATA_TABLE.columns.file_modifier.name: modifier, + METADATA_TABLE.columns.file_mtime.name: timestamp_to_isoformat_timestr(mtime), + METADATA_TABLE.columns.parent_dir.name: parent_dir, + METADATA_TABLE.columns.file_name.name: file_name, + METADATA_TABLE.columns.is_dir.name: 'False', } rows.append(row) if not rows: @@ -74,7 +75,7 @@ def delete_files(self, repo_id, deleted_files): if not deleted_files: return - sql = f'SELECT `{METADATA_COLUMN_ID.name}` FROM `{METADATA_TABLE.name}` WHERE' + sql = f'SELECT `{METADATA_TABLE.columns.file_name.name}` FROM `{METADATA_TABLE.name}` WHERE' need_deleted = False for file in deleted_files: path = file.path.rstrip('/') @@ -83,7 +84,7 @@ def delete_files(self, repo_id, deleted_files): need_deleted = True parent_dir = os.path.dirname(path) file_name = os.path.basename(path) - sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + sql += f' (`{METADATA_TABLE.columns.parent_dir.name}` = "{parent_dir}" AND `{METADATA_TABLE.columns.file_name.name}` = "{file_name}") OR' if not need_deleted: return @@ -95,7 +96,7 @@ def delete_files(self, repo_id, deleted_files): row_ids = [] for row in query_result: - row_ids.append(row[METADATA_COLUMN_ID.name]) + row_ids.append(row[METADATA_TABLE.columns.id.name]) self.metadata_server_api.delete_rows(repo_id, METADATA_TABLE.id, row_ids) @@ -103,7 +104,7 @@ def update_files(self, repo_id, modified_files): if not modified_files: return - sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + sql = f'SELECT `{METADATA_TABLE.columns.id.name}`, `{METADATA_TABLE.columns.parent_dir.name}`, `{METADATA_TABLE.columns.file_name.name}` FROM `{METADATA_TABLE.name}` WHERE' path_to_file_dict = {} need_update = False for file in modified_files: @@ -116,7 +117,7 @@ def update_files(self, repo_id, modified_files): key = parent_dir + file_name path_to_file_dict[key] = file - sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + sql += f' (`{METADATA_TABLE.columns.parent_dir.name}` = "{parent_dir}" AND `{METADATA_TABLE.columns.file_name.name}` = "{file_name}") OR' if not need_update: return @@ -128,16 +129,16 @@ def update_files(self, repo_id, modified_files): updated_rows = [] for row in query_result: - row_id = row[METADATA_COLUMN_ID.name] - parent_dir = row[METADATA_COLUMN_PARENT_DIR.name] - file_name = row[METADATA_COLUMN_NAME.name] + row_id = row[METADATA_TABLE.columns.id.name] + parent_dir = row[METADATA_TABLE.columns.parent_dir.name] + file_name = row[METADATA_TABLE.columns.file_name.name] key = parent_dir + file_name new_row = path_to_file_dict.get(key) update_row = { - METADATA_COLUMN_ID.name: row_id, - METADATA_COLUMN_MODIFIER.name: new_row.modifier, - METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(new_row.mtime), + METADATA_TABLE.columns.id.name: row_id, + METADATA_TABLE.columns.file_modifier.name: new_row.modifier, + METADATA_TABLE.columns.file_mtime.name: timestamp_to_isoformat_timestr(new_row.mtime), } updated_rows.append(update_row) @@ -157,13 +158,13 @@ def add_dirs(self, repo_id, added_dirs): mtime = de.mtime row = { - METADATA_COLUMN_CREATOR.name: '', - METADATA_COLUMN_CREATED_TIME.name: timestamp_to_isoformat_timestr(mtime), - METADATA_COLUMN_MODIFIER.name: '', - METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(mtime), - METADATA_COLUMN_PARENT_DIR.name: parent_dir, - METADATA_COLUMN_NAME.name: file_name, - METADATA_COLUMN_IS_DIR.name: 'True', + METADATA_TABLE.columns.file_creator.name: '', + METADATA_TABLE.columns.file_ctime.name: timestamp_to_isoformat_timestr(mtime), + METADATA_TABLE.columns.file_modifier.name: '', + METADATA_TABLE.columns.file_mtime.name: timestamp_to_isoformat_timestr(mtime), + METADATA_TABLE.columns.parent_dir.name: parent_dir, + METADATA_TABLE.columns.file_name.name: file_name, + METADATA_TABLE.columns.is_dir.name: 'True', } rows.append(row) @@ -176,7 +177,7 @@ def add_dirs(self, repo_id, added_dirs): def delete_dirs(self, repo_id, deleted_dirs): if not deleted_dirs: return - sql = f'SELECT `{METADATA_COLUMN_ID.name}` FROM `{METADATA_TABLE.name}` WHERE' + sql = f'SELECT `{METADATA_TABLE.columns.id.name}` FROM `{METADATA_TABLE.name}` WHERE' need_delete = False for d in deleted_dirs: path = d.path.rstrip('/') @@ -185,7 +186,7 @@ def delete_dirs(self, repo_id, deleted_dirs): need_delete = True parent_dir = os.path.dirname(path) dir_name = os.path.basename(path) - sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{dir_name}") OR' + sql += f' (`{METADATA_TABLE.columns.parent_dir.name}` = "{parent_dir}" AND `{METADATA_TABLE.columns.file_name.name}` = "{dir_name}") OR' if not need_delete: return @@ -197,7 +198,7 @@ def delete_dirs(self, repo_id, deleted_dirs): row_ids = [] for row in query_result: - row_ids.append(row[METADATA_COLUMN_ID.name]) + row_ids.append(row[METADATA_TABLE.columns.id.name]) self.metadata_server_api.delete_rows(repo_id, METADATA_TABLE.id, row_ids) @@ -205,7 +206,7 @@ def rename_files(self, repo_id, renamed_files): if not renamed_files: return - sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + sql = f'SELECT `{METADATA_TABLE.columns.id.name}`, `{METADATA_TABLE.columns.parent_dir.name}`, `{METADATA_TABLE.columns.file_name.name}` FROM `{METADATA_TABLE.name}` WHERE' path_to_file_dict = {} need_update = False for file in renamed_files: @@ -218,7 +219,7 @@ def rename_files(self, repo_id, renamed_files): key = parent_dir + file_name path_to_file_dict[key] = file - sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + sql += f' (`{METADATA_TABLE.columns.parent_dir.name}` = "{parent_dir}" AND `{METADATA_TABLE.columns.file_name.name}` = "{file_name}") OR' if not need_update: return sql = sql.rstrip(' OR') @@ -229,9 +230,9 @@ def rename_files(self, repo_id, renamed_files): updated_rows = [] for row in query_result: - row_id = row[METADATA_COLUMN_ID.name] - parent_dir = row[METADATA_COLUMN_PARENT_DIR.name] - file_name = row[METADATA_COLUMN_NAME.name] + row_id = row[METADATA_TABLE.columns.id.name] + parent_dir = row[METADATA_TABLE.columns.parent_dir.name] + file_name = row[METADATA_TABLE.columns.file_name.name] key = parent_dir + file_name new_row = path_to_file_dict.get(key) @@ -240,11 +241,11 @@ def rename_files(self, repo_id, renamed_files): new_file_name = os.path.basename(new_path) update_row = { - METADATA_COLUMN_ID.name: row_id, - METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, - METADATA_COLUMN_NAME.name: new_file_name, - METADATA_COLUMN_MODIFIER.name: new_row.modifier, - METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(new_row.mtime), + METADATA_TABLE.columns.id.name: row_id, + METADATA_TABLE.columns.parent_dir.name: new_parent_dir, + METADATA_TABLE.columns.file_name.name: new_file_name, + METADATA_TABLE.columns.file_modifier.name: new_row.modifier, + METADATA_TABLE.columns.file_mtime.name: timestamp_to_isoformat_timestr(new_row.mtime), } updated_rows.append(update_row) @@ -257,7 +258,7 @@ def move_files(self, repo_id, moved_files): if not moved_files: return - sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + sql = f'SELECT `{METADATA_TABLE.columns.id.name}`, `{METADATA_TABLE.columns.parent_dir.name}`, `{METADATA_TABLE.columns.file_name.name}` FROM `{METADATA_TABLE.name}` WHERE' path_to_file_dict = {} need_update = False for file in moved_files: @@ -270,7 +271,7 @@ def move_files(self, repo_id, moved_files): key = parent_dir + file_name path_to_file_dict[key] = file - sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + sql += f' (`{METADATA_TABLE.columns.parent_dir.name}` = "{parent_dir}" AND `{METADATA_TABLE.columns.file_name.name}` = "{file_name}") OR' if not need_update: return @@ -282,17 +283,17 @@ def move_files(self, repo_id, moved_files): updated_rows = [] for row in query_result: - row_id = row[METADATA_COLUMN_ID.name] - parent_dir = row[METADATA_COLUMN_PARENT_DIR.name] - file_name = row[METADATA_COLUMN_NAME.name] + row_id = row[METADATA_TABLE.columns.id.name] + parent_dir = row[METADATA_TABLE.columns.parent_dir.name] + file_name = row[METADATA_TABLE.columns.file_name.name] key = parent_dir + file_name new_row = path_to_file_dict.get(key) new_path = new_row.new_path new_parent_dir = os.path.dirname(new_path) update_row = { - METADATA_COLUMN_ID.name: row_id, - METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + METADATA_TABLE.columns.id.name: row_id, + METADATA_TABLE.columns.parent_dir.name: new_parent_dir, } updated_rows.append(update_row) @@ -304,7 +305,7 @@ def move_files(self, repo_id, moved_files): def rename_dirs(self, repo_id, renamed_dirs): if not renamed_dirs: return - sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + sql = f'SELECT `{METADATA_TABLE.columns.id.name}`, `{METADATA_TABLE.columns.parent_dir.name}`, `{METADATA_TABLE.columns.file_name.name}` FROM `{METADATA_TABLE.name}` WHERE' for d in renamed_dirs: old_path = d.path.rstrip('/') parent_dir = os.path.dirname(old_path) @@ -314,8 +315,8 @@ def rename_dirs(self, repo_id, renamed_dirs): if self.is_excluded_path(old_path): continue - sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{dir_name}") OR' \ - f' (`{METADATA_COLUMN_PARENT_DIR.name}` LIKE "{old_path}%")' + sql += f' (`{METADATA_TABLE.columns.parent_dir.name}` = "{parent_dir}" AND `{METADATA_TABLE.columns.file_name.name}` = "{dir_name}") OR' \ + f' (`{METADATA_TABLE.columns.parent_dir.name}` LIKE "{old_path}%")' query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) if not query_result: @@ -323,17 +324,17 @@ def rename_dirs(self, repo_id, renamed_dirs): updated_rows = [] for row in query_result: - row_id = row[METADATA_COLUMN_ID.name] - p_dir = row[METADATA_COLUMN_PARENT_DIR.name] - name = row[METADATA_COLUMN_NAME.name] + row_id = row[METADATA_TABLE.columns.id.name] + p_dir = row[METADATA_TABLE.columns.parent_dir.name] + name = row[METADATA_TABLE.columns.file_name.name] new_parent_dir = os.path.dirname(new_path) new_name = os.path.basename(new_path) if parent_dir == p_dir and dir_name == name: update_row = { - METADATA_COLUMN_ID.name: row_id, - METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, - METADATA_COLUMN_NAME.name: new_name + METADATA_TABLE.columns.id.name: row_id, + METADATA_TABLE.columns.parent_dir.name: new_parent_dir, + METADATA_TABLE.columns.file_name.name: new_name } updated_rows.append(update_row) else: @@ -342,8 +343,8 @@ def rename_dirs(self, repo_id, renamed_dirs): new_parent_dir = p_dir.replace(old_dir_prefix, new_dir_prefix) update_row = { - METADATA_COLUMN_ID.name: row_id, - METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + METADATA_TABLE.columns.id.name: row_id, + METADATA_TABLE.columns.parent_dir.name: new_parent_dir, } updated_rows.append(update_row) @@ -352,7 +353,7 @@ def rename_dirs(self, repo_id, renamed_dirs): def move_dirs(self, repo_id, moved_dirs): if not moved_dirs: return - sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + sql = f'SELECT `{METADATA_TABLE.columns.id.name}`, `{METADATA_TABLE.columns.parent_dir.name}`, `{METADATA_TABLE.columns.file_name.name}` FROM `{METADATA_TABLE.name}` WHERE' for d in moved_dirs: old_path = d.path.rstrip('/') parent_dir = os.path.dirname(old_path) @@ -361,8 +362,8 @@ def move_dirs(self, repo_id, moved_dirs): continue new_path = d.new_path - sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{dir_name}") OR' \ - f' (`{METADATA_COLUMN_PARENT_DIR.name}` LIKE "{old_path}%")' + sql += f' (`{METADATA_TABLE.columns.parent_dir.name}` = "{parent_dir}" AND `{METADATA_TABLE.columns.file_name.name}` = "{dir_name}") OR' \ + f' (`{METADATA_TABLE.columns.parent_dir.name}` LIKE "{old_path}%")' query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) if not query_result: @@ -370,17 +371,17 @@ def move_dirs(self, repo_id, moved_dirs): updated_rows = [] for row in query_result: - row_id = row[METADATA_COLUMN_ID.name] - p_dir = row[METADATA_COLUMN_PARENT_DIR.name] - name = row[METADATA_COLUMN_NAME.name] + row_id = row[METADATA_TABLE.columns.id.name] + p_dir = row[METADATA_TABLE.columns.parent_dir.name] + name = row[METADATA_TABLE.columns.file_name.name] new_parent_dir = os.path.dirname(new_path) new_name = os.path.basename(new_path) if parent_dir == p_dir and dir_name == name: update_row = { - METADATA_COLUMN_ID.name: row_id, - METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, - METADATA_COLUMN_NAME.name: new_name, + METADATA_TABLE.columns.id.name: row_id, + METADATA_TABLE.columns.parent_dir.name: new_parent_dir, + METADATA_TABLE.columns.file_name.name: new_name, } updated_rows.append(update_row) else: @@ -389,8 +390,8 @@ def move_dirs(self, repo_id, moved_dirs): new_parent_dir = p_dir.replace(old_dir_prefix, new_dir_prefix) update_row = { - METADATA_COLUMN_ID.name: row_id, - METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + METADATA_TABLE.columns.id.name: row_id, + METADATA_TABLE.columns.parent_dir.name: new_parent_dir, } updated_rows.append(update_row) @@ -405,13 +406,13 @@ def delete_base(self, repo_id): def init_columns(self, repo_id): # initial md-server base and insert records # Add columns: creator, created_time, modifier, modified_time, parent_dir, name - self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_CREATOR.to_build_column_dict()) - self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_CREATED_TIME.to_build_column_dict()) - self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_MODIFIER.to_build_column_dict()) - self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_MODIFIED_TIME.to_build_column_dict()) - self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_PARENT_DIR.to_build_column_dict()) - self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_NAME.to_build_column_dict()) - self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_IS_DIR.to_build_column_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_TABLE.columns.file_creator.to_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_TABLE.columns.file_ctime.to_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_TABLE.columns.file_modifier.to_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_TABLE.columns.file_mtime.to_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_TABLE.columns.parent_dir.to_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_TABLE.columns.file_name.to_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_TABLE.columns.is_dir.to_dict()) def create_base(self, repo_id): self.metadata_server_api.create_base(repo_id) diff --git a/seafevent_server/task_manager.py b/seafevent_server/task_manager.py index 7d0410a5..52057ca9 100644 --- a/seafevent_server/task_manager.py +++ b/seafevent_server/task_manager.py @@ -65,6 +65,8 @@ def get_info(self): def __str__(self): return f'' +from seafevents.app.event_redis import RedisClient +from seafevents.repo_metadata.metadata_manager import ZERO_OBJ_ID class TaskManager: @@ -79,6 +81,7 @@ def __init__(self): 'workers': 3, 'expire_time': 30 * 60 } + self._redis_connection = None self._db_session_class = None self._metadata_server_api = None self.repo_metadata = None @@ -95,28 +98,18 @@ def init(self, app, workers, task_expire_time, config): self._metadata_server_api = MetadataServerAPI('seafevents') self.repo_metadata = RepoMetadata(self._metadata_server_api) self.metadata_manager = MetadataManager(self._db_session_class, self.repo_metadata) + self._redis_connection = RedisClient(config).connection def get_pending_or_running_task(self, readable_id): task = self.readable_id2task_map.get(readable_id) return task def add_init_metadata_task(self, username, repo_id): - - readable_id = repo_id - with self.check_task_lock: - task = self.get_pending_or_running_task(readable_id) - if task: - return task.id - - task_id = str(uuid.uuid4()) - task = IndexTask(task_id, readable_id, self.metadata_manager.create_metadata, - (repo_id, ) - ) - self.tasks_map[task_id] = task - self.readable_id2task_map[task.readable_id] = task - self.tasks_queue.put(task) - - return task_id + msg_content = "init_metadata\t" + repo_id + '\t' + ZERO_OBJ_ID + if self._redis_connection.publish('metadata_update', msg_content) > 0: + logging.debug('Publish event: %s' % msg_content) + else: + logging.info('No one subscribed to metadata_update channel, event (%s) has not been send' % msg_content) def query_task(self, task_id): return self.tasks_map.get(task_id)