From 09f88bfe1fd805a2f7189bc61a236843641092e9 Mon Sep 17 00:00:00 2001 From: cir9no <44470218+cir9no@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:26:20 +0800 Subject: [PATCH] feat/seasearch: add wiki index script --- seasearch/index_store/index_manager.py | 8 +- seasearch/index_store/wiki_index.py | 7 +- seasearch/index_task/wiki_index_updater.py | 26 +- seasearch/script/wiki_index.sh.template | 11 + seasearch/script/wiki_index_local.py | 284 +++++++++++++++++++++ 5 files changed, 320 insertions(+), 16 deletions(-) create mode 100755 seasearch/script/wiki_index.sh.template create mode 100644 seasearch/script/wiki_index_local.py diff --git a/seasearch/index_store/index_manager.py b/seasearch/index_store/index_manager.py index f006e1d3..d2da257a 100644 --- a/seasearch/index_store/index_manager.py +++ b/seasearch/index_store/index_manager.py @@ -49,13 +49,19 @@ def delete_repo_filename_index(self, repo_id, repo_filename_index, repo_status_f repo_filename_index.delete_index_by_index_name(repo_filename_index_name) repo_status_filename_index.delete_documents_by_repo(repo_id) + def delete_wiki_index(self, wiki_id, wiki_index, wiki_status_index): + # first delete wiki_index + wiki_index_name = WIKI_INDEX_PREFIX + wiki_id + wiki_index.delete_index_by_index_name(wiki_index_name) + wiki_status_index.delete_documents_by_repo(wiki_id) + def keyword_search(self, query, repos, repo_filename_index, count, suffixes=None): return repo_filename_index.search_files(repos, query, 0, count, suffixes) def wiki_search(self, query, wikis, wiki_index, count): return wiki_index.search_wikis(wikis, query, 0, count) - def update_wiki_page_index(self, wiki_id, commit_id, wiki_index, wiki_status_index): + def update_wiki_index(self, wiki_id, commit_id, wiki_index, wiki_status_index): try: new_commit_id = commit_id index_name = WIKI_INDEX_PREFIX + wiki_id diff --git a/seasearch/index_store/wiki_index.py b/seasearch/index_store/wiki_index.py index 3d5eb5ea..7f90c0b2 100644 --- a/seasearch/index_store/wiki_index.py +++ b/seasearch/index_store/wiki_index.py @@ -138,7 +138,7 @@ def get_wiki_conf(self, wiki_id): return json.loads(f.get_content().decode()) def extract_doc_uuids(self, config): - '''Extract the uuid of the undeleted wikis''' + """Extract the uuid of the undeleted wiki pages not in the recycle bin""" def extract_ids_from_navigation(navigation_items, navigation_ids): for item in navigation_items: navigation_ids.add(item['id']) @@ -151,7 +151,7 @@ def extract_ids_from_navigation(navigation_items, navigation_ids): return doc_uuids def extract_deleted_doc_uuids(self, config): - """Extract the uuid of the deleted wikis""" + """Extract the uuid of the deleted wiki pages in the recycle bin""" def extract_ids_from_navigation(navigation_items, navigation_ids): for item in navigation_items: navigation_ids.add(item['id']) @@ -312,3 +312,6 @@ def search_wikis(self, wikis, keyword, start=0, size=10): query_match.extend(res_wikis) return query_match + + def delete_index_by_index_name(self, index_name): + self.seasearch_api.delete_index_by_name(index_name) diff --git a/seasearch/index_task/wiki_index_updater.py b/seasearch/index_task/wiki_index_updater.py index c3869d4e..2abcf99f 100644 --- a/seasearch/index_task/wiki_index_updater.py +++ b/seasearch/index_task/wiki_index_updater.py @@ -78,19 +78,19 @@ def start(self): ).start() -def clear_deleted_repo(wiki_status_index, wiki_index, index_manager, repos): - logger.info("start to clear wiki index deleted repo") +def clear_deleted_wiki(wiki_status_index, wiki_index, index_manager, wikis): + logger.info("start to clear deleted wiki index") - repo_list = wiki_status_index.get_all_repos_from_index() - repo_all = [e.get('repo_id') for e in repo_list] + wiki_list = wiki_status_index.get_all_repos_from_index() + wiki_all = [e.get('repo_id') for e in wiki_list] - repo_deleted = set(repo_all) - set(repos) + wiki_deleted = set(wiki_all) - set(wikis) - logger.info("wiki index %d repos need to be deleted." % len(repo_deleted)) - for repo_id in repo_deleted: - index_manager.delete_repo_filename_index(repo_id, wiki_index, wiki_status_index) - logger.info('Repo %s has been deleted from wiki index.' % repo_id) - logger.info("wiki index deleted repo has been cleared") + logger.info("wiki index %d need to be deleted." % len(wiki_deleted)) + for wiki_id in wiki_deleted: + index_manager.delete_wiki_index(wiki_id, wiki_index, wiki_status_index) + logger.info('Wiki %s has been deleted from wiki index.' % wiki_id) + logger.info("wiki index deleted wiki has been cleared") def update_wiki_indexes(wiki_status_index, wiki_index, index_manager, repo_data): @@ -109,11 +109,11 @@ def update_wiki_indexes(wiki_status_index, wiki_index, index_manager, repo_data) for wiki_id, commit_id in wiki_commits: all_wikis.append(wiki_id) - index_manager.update_wiki_page_index(wiki_id, commit_id, wiki_index, wiki_status_index) + index_manager.update_wiki_index(wiki_id, commit_id, wiki_index, wiki_status_index) logger.info("Finish update wiki index") - clear_deleted_repo(wiki_status_index, wiki_index, index_manager, all_wikis) + clear_deleted_wiki(wiki_status_index, wiki_index, index_manager, all_wikis) class WikiIndexUpdaterTimer(Thread): @@ -128,7 +128,7 @@ def run(self): sched = GeventScheduler() logging.info('Start to update wiki index...') try: - sched.add_job(update_wiki_indexes, CronTrigger(minute='*/5'), + sched.add_job(update_wiki_indexes, CronTrigger(minute='*/15'), args=(self.wiki_status_index, self.wiki_index, self.index_manager, self.repo_data)) except Exception as e: logging.exception('periodical update wiki index error: %s', e) diff --git a/seasearch/script/wiki_index.sh.template b/seasearch/script/wiki_index.sh.template new file mode 100755 index 00000000..25f6c9f5 --- /dev/null +++ b/seasearch/script/wiki_index.sh.template @@ -0,0 +1,11 @@ +export CCNET_CONF_DIR=$CONF_PATH +export SEAFILE_CONF_DIR=$CONF_PATH/seafile-data +export EVENTS_CONFIG_FILE=$CONF_PATH/seafevents.conf +export PYTHONPATH=$COMPILE_PATH:$CONF_PATH:$PYTHONPATH:/usr/lib/python3.8/dist-packages:/usr/lib/python3.8/site-packages:/usr/local/lib/python3.8/dist-packages:/usr/local/lib/python3.8/site-packages:/data/dev/seahub/thirdpart:/data/dev/pyes/pyes:/data/dev/portable-python-libevent/libevent:/data/dev/seafobj:/data/dev/seahub/seahub/:/data/dev/ +export SEAHUB_DIR=/data/dev/seahub/ + +if [[ $# == 1 && $1 == "clear" ]]; then + python -m seafevents.seasearch.script.wiki_index_local clear +else + python -m seafevents.seasearch.script.wiki_index_local update +fi diff --git a/seasearch/script/wiki_index_local.py b/seasearch/script/wiki_index_local.py new file mode 100644 index 00000000..162127fb --- /dev/null +++ b/seasearch/script/wiki_index_local.py @@ -0,0 +1,284 @@ +import os +import sys +import time +import queue +import logging +import argparse +import threading + +from seafobj import commit_mgr, fs_mgr, block_mgr +from seafevents.utils import get_opt_from_conf_or_env +from seafevents.app.config import get_config +from seafevents.seasearch.utils import init_logging +from seafevents.repo_data import repo_data +from seafevents.seasearch.index_store.index_manager import IndexManager +from seafevents.seasearch.utils.seasearch_api import SeaSearchAPI +from seafevents.seasearch.index_store.repo_status_index import RepoStatusIndex +from seafevents.seasearch.utils.constants import WIKI_INDEX_PREFIX, WIKI_STATUS_INDEX_NAME +from seafevents.seasearch.index_store.wiki_index import WikiIndex + +logger = logging.getLogger('seasearch') + +UPDATE_FILE_LOCK = os.path.join(os.path.dirname(__file__), 'update.lock') +lockfile = None +NO_TASKS = False + + +class WikiIndexLocal(object): + """ Independent update wiki page index. + """ + def __init__(self, index_manager, wiki_status_index, wiki_index, repo_data, workers=3): + self.index_manager = index_manager + self.wiki_status_index = wiki_status_index + self.wiki_index = wiki_index + self.repo_data = repo_data + self.error_counter = 0 + self.worker_list = [] + self.workers = workers + + def clear_worker(self): + for th in self.worker_list: + th.join() + logger.info("All worker threads has stopped.") + + def run(self): + time_start = time.time() + wikis_queue = queue.Queue(0) + for i in range(self.workers): + thread_name = "worker" + str(i) + logger.info("starting %s worker threads for wiki indexing" + % thread_name) + t = threading.Thread(target=self.thread_task, args=(wikis_queue, ), name=thread_name) + t.start() + self.worker_list.append(t) + + start, per_size = 0, 1000 + wikis = {} + while True: + global NO_TASKS + try: + wiki_commits = self.repo_data.get_wiki_id_commit_id(start, per_size) + except Exception as e: + logger.error("Error: %s" % e) + NO_TASKS = True + self.clear_worker() + return + else: + if len(wiki_commits) == 0: + NO_TASKS = True + break + for wiki_id, commit_id in wiki_commits: + wikis_queue.put((wiki_id, commit_id)) + wikis[wiki_id] = commit_id + start += per_size + + self.clear_worker() + logger.info("wiki index updated, total time %s seconds" % str(time.time() - time_start)) + try: + self.clear_deleted_wiki(list(wikis.keys())) + except Exception as e: + logger.exception('Delete Wiki Error: %s' % e) + self.incr_error() + + def thread_task(self, wikis_queue): + while True: + try: + queue_data = wikis_queue.get(False) + except queue.Empty: + if NO_TASKS: + logger.debug( + "Queue is empty, %s worker threads stop" + % (threading.currentThread().getName()) + ) + break + else: + time.sleep(2) + else: + wiki_id = queue_data[0] + commit_id = queue_data[1] + try: + self.index_manager.update_wiki_index(wiki_id, commit_id, self.wiki_index, self.wiki_status_index) + except Exception as e: + logger.exception('Wiki index error: %s, wiki_id: %s' % (e, wiki_id), exc_info=True) + self.incr_error() + + logger.info( + "%s worker updated at %s time" + % (threading.currentThread().getName(), + time.strftime("%Y-%m-%d %H:%M", time.localtime(time.time()))) + ) + logger.info( + "%s worker get %s error" + % (threading.currentThread().getName(), + str(self.error_counter)) + ) + + def clear_deleted_wiki(self, wikis): + logger.info("start to clear deletde wiki") + wiki_all = [e.get('repo_id') for e in self.wiki_status_index.get_all_repos_from_index()] + + wiki_deleted = set(wiki_all) - set(wikis) + logger.info("wiki index %d need to be deleted." % len(wiki_deleted)) + for wiki_id in wiki_deleted: + self.delete_wiki_index(wiki_id) + logger.info('Wiki %s has been deleted from index.' % wiki_id) + logger.info("deleted wiki has been cleared") + + def incr_error(self): + self.error_counter += 1 + + def delete_wiki_index(self, wiki_id): + if len(wiki_id) != 36: + return + self.index_manager.delete_wiki_index(wiki_id, self.wiki_index, self.wiki_status_index) + + +def start_index_local(): + if not check_concurrent_update(): + return + section_name = 'SEASEARCH' + seafevents_conf = os.environ.get('EVENTS_CONFIG_FILE') + config = get_config(seafevents_conf) + seasearch_url = get_opt_from_conf_or_env( + config, section_name, 'seasearch_url' + ) + seasearch_token = get_opt_from_conf_or_env( + config, section_name, 'seasearch_token' + ) + + index_manager = IndexManager() + seasearch_api = SeaSearchAPI(seasearch_url, seasearch_token) + wiki_status_index = RepoStatusIndex(seasearch_api, WIKI_STATUS_INDEX_NAME) + wiki_index = WikiIndex(seasearch_api, repo_data, shard_num=1) + + try: + index_local = WikiIndexLocal(index_manager, wiki_status_index, wiki_index, repo_data) + except Exception as e: + logger.error("Index wiki process init error: %s." % e) + return + + logger.info("Index wiki process initialized.") + index_local.run() + + logger.info('\n\nWiki index updated, statistic report:\n') + logger.info('[commit read] %s', commit_mgr.read_count()) + + +def delete_indices(): + section_name = 'SEASEARCH' + conf_path = os.environ.get('CONF_PATH') or os.environ.get('SEAFILE_CENTRAL_CONF_DIR') + seafevents_conf = os.path.join(conf_path, 'seafevents.conf') + config = get_config(seafevents_conf) + seasearch_url = get_opt_from_conf_or_env( + config, section_name, 'seasearch_url' + ) + seasearch_token = get_opt_from_conf_or_env( + config, section_name, 'seasearch_token' + ) + + seasearch_api = SeaSearchAPI(seasearch_url, seasearch_token) + wiki_status_index = RepoStatusIndex(seasearch_api, WIKI_STATUS_INDEX_NAME) + wiki_index = WikiIndex(seasearch_api, repo_data, shard_num=1) + + start, count = 0, 1000 + while True: + try: + wiki_commits = repo_data.get_wiki_id_commit_id(start, count) + except Exception as e: + logger.error("Error: %s" % e) + return + start += 1000 + + if len(wiki_commits) == 0: + break + + for wiki_id, commit_id in wiki_commits: + wiki_index_name = WIKI_INDEX_PREFIX + wiki_id + wiki_index.delete_index_by_index_name(wiki_index_name) + + wiki_status_index.delete_index_by_index_name() + + +def main(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(title='subcommands', description='') + + parser.add_argument( + '--logfile', + default=sys.stdout, + type=argparse.FileType('a'), + help='log file') + + parser.add_argument( + '--loglevel', + default='info', + help='log level') + + # update index + parser_update = subparsers.add_parser('update', help='update seafile wiki index') + parser_update.set_defaults(func=start_index_local) + + # clear + parser_clear = subparsers.add_parser('clear', help='clear all wiki index') + parser_clear.set_defaults(func=delete_indices) + + if len(sys.argv) == 1: + print(parser.format_help()) + return + + args = parser.parse_args() + init_logging(args) + + logger.info('storage: using ' + commit_mgr.get_backend_name()) + + args.func() + + +def do_lock(fn): + if os.name == 'nt': + return do_lock_win32(fn) + else: + return do_lock_linux(fn) + + +def do_lock_win32(fn): + import ctypes + + CreateFileW = ctypes.windll.kernel32.CreateFileW + GENERIC_WRITE = 0x40000000 + OPEN_ALWAYS = 4 + + def lock_file(path): + lock_file_handle = CreateFileW(path, GENERIC_WRITE, 0, None, OPEN_ALWAYS, 0, None) + + return lock_file_handle + + global lockfile + + lockfile = lock_file(fn) + + return lockfile != -1 + + +def do_lock_linux(fn): + from seafevents.seasearch.script import portalocker + global lockfile + lockfile = open(fn, 'w') + try: + portalocker.lock(lockfile, portalocker.LOCK_NB | portalocker.LOCK_EX) + return True + except portalocker.LockException: + return False + + +def check_concurrent_update(): + """Use a lock file to ensure only one task can be running""" + if not do_lock(UPDATE_FILE_LOCK): + logger.error('another index task is running, quit now') + return False + + return True + + +if __name__ == "__main__": + main()