diff --git a/app/app.py b/app/app.py index ad0e5ebf..7e45b3f6 100644 --- a/app/app.py +++ b/app/app.py @@ -4,6 +4,11 @@ WorkWinxinNoticeSender, FileUpdatesSender, RepoOldFileAutoDelScanner,\ DeletedFilesCountCleaner +from seafevents.repo_metadata.index_master import RepoMetadataIndexMaster +from seafevents.repo_metadata.index_worker import RepoMetadataIndexWorker +from seafevents.seafevent_server.seafevent_server import SeafEventServer +from seafevents.app.config import ENABLE_METADATA_MANAGEMENT + class App(object): def __init__(self, config, ccnet_config, seafile_config, @@ -17,6 +22,7 @@ def __init__(self, config, ccnet_config, seafile_config, self._events_handler = EventsHandler(config) self._count_traffic_task = CountTrafficInfo(config) self._update_login_record_task = CountUserActivity(config) + self._seafevent_server = SeafEventServer(self, config) if self._bg_tasks_enabled: self._index_updater = IndexUpdater(config) @@ -29,12 +35,16 @@ def __init__(self, config, ccnet_config, seafile_config, self._file_updates_sender = FileUpdatesSender() self._repo_old_file_auto_del_scanner = RepoOldFileAutoDelScanner(config) self._deleted_files_count_cleaner = DeletedFilesCountCleaner(config) + if ENABLE_METADATA_MANAGEMENT: + self._index_master = RepoMetadataIndexMaster(config) + self._index_worker = RepoMetadataIndexWorker(config) def serve_forever(self): if self._fg_tasks_enabled: self._events_handler.start() self._update_login_record_task.start() self._count_traffic_task.start() + self._seafevent_server.start() if self._bg_tasks_enabled: self._file_updates_sender.start() @@ -47,3 +57,6 @@ def serve_forever(self): self._content_scanner.start() self._repo_old_file_auto_del_scanner.start() self._deleted_files_count_cleaner.start() + if ENABLE_METADATA_MANAGEMENT: + self._index_master.start() + self._index_worker.start() diff --git a/app/config.py b/app/config.py index acd4853b..1e6dcec6 100644 --- a/app/config.py +++ b/app/config.py @@ -22,6 +22,10 @@ DTABLE_WEB_SERVER = getattr(seahub_settings, 'DTABLE_WEB_SERVER', None) SEATABLE_EX_PROPS_BASE_API_TOKEN = getattr(seahub_settings, 'SEATABLE_EX_PROPS_BASE_API_TOKEN', None) EX_PROPS_TABLE = getattr(seahub_settings, 'EX_PROPS_TABLE', None) + SEAHUB_SECRET_KEY = getattr(seahub_settings, 'SECRET_KEY', '') + METADATA_SERVER_SECRET_KEY = getattr(seahub_settings, 'METADATA_SERVER_SECRET_KEY', '') + METADATA_SERVER_URL = getattr(seahub_settings, 'METADATA_SERVER_URL', '') + ENABLE_METADATA_MANAGEMENT = getattr(seahub_settings, 'ENABLE_METADATA_MANAGEMENT', False) except ImportError: logger.critical("Can not import seahub settings.") raise RuntimeError("Can not import seahub settings.") diff --git a/app/event_redis.py b/app/event_redis.py index d182d358..5109f0f7 100644 --- a/app/event_redis.py +++ b/app/event_redis.py @@ -15,11 +15,8 @@ def __init__(self, config, socket_connect_timeout=30, socket_timeout=None): self._parse_config(config, socket_connect_timeout, socket_timeout) def _parse_config(self, config, socket_connect_timeout, socket_timeout): - mq_type = '' - if config.has_option('EVENTS PUBLISH', 'mq_type'): - mq_type = config.get('EVENTS PUBLISH', 'mq_type').upper() - if mq_type != 'REDIS': - logging.warning("Unknown database backend: %s" % mq_type) + + if not config.has_section('REDIS'): return if config.has_option('REDIS', 'server'): diff --git a/app/mq_handler.py b/app/mq_handler.py index ae1f69a0..24a52d73 100644 --- a/app/mq_handler.py +++ b/app/mq_handler.py @@ -9,6 +9,7 @@ import seafevents.statistics.handlers as stats_handlers from seafevents.db import init_db_session_class from seafevents.app.event_redis import RedisClient +import seafevents.repo_metadata.handlers as metadata_handler logger = logging.getLogger(__name__) @@ -47,7 +48,7 @@ def handle_message(self, config, session, redis_connection, channel, msg): funcs = self._handlers.get(msg_type) for func in funcs: try: - if func.__name__ == 'RepoUpdatePublishHandler': + if func.__name__ == 'RepoUpdatePublishHandler' or func.__name__ == 'RepoMetadataUpdateHandler': func(config, redis_connection, msg) else: func(config, session, msg) @@ -83,6 +84,7 @@ def init_message_handlers(config): events_handlers.register_handlers(message_handler, enable_audit) stats_handlers.register_handlers(message_handler) publisher_handlers.register_handlers(message_handler) + metadata_handler.register_handlers(message_handler) class EventsHandler(object): diff --git a/main.py b/main.py index 68aaf5fa..b77c5f13 100644 --- a/main.py +++ b/main.py @@ -60,6 +60,8 @@ def main(background_tasks_only=False): foreground_tasks_enabled = True background_tasks_enabled = False + from gevent import monkey; monkey.patch_all() + app = App(config, ccnet_config, seafile_config, foreground_tasks_enabled=foreground_tasks_enabled, background_tasks_enabled=background_tasks_enabled) diff --git a/mq.py b/mq.py new file mode 100644 index 00000000..90286eb0 --- /dev/null +++ b/mq.py @@ -0,0 +1,18 @@ +import redis +import logging + +logger = logging.getLogger(__name__) + + +def get_mq(server, port, password): + rdp = redis.ConnectionPool(host=server, port=port, + password=password, retry_on_timeout=True, decode_responses=True) + mq = redis.StrictRedis(connection_pool=rdp) + try: + mq.ping() + except Exception as e: + logger.error("Redis server can't be connected: host %s, port %s, error %s", + server, port, e) + finally: + # python redis is a client, each operation tries to connect and retry exec + return mq diff --git a/repo_data/__init__.py b/repo_data/__init__.py new file mode 100644 index 00000000..bebac191 --- /dev/null +++ b/repo_data/__init__.py @@ -0,0 +1,129 @@ +import os +import logging +from sqlalchemy.sql import text + +from seafevents.repo_data.db import init_db_session_class + +logger = logging.getLogger(__name__) + + +class RepoData(object): + def __init__(self): + if 'SEAFILE_CENTRAL_CONF_DIR' in os.environ: + confdir = os.environ['SEAFILE_CENTRAL_CONF_DIR'] + else: + confdir = os.environ['SEAFILE_CONF_DIR'] + self.seafile_conf = os.path.join(confdir, 'seafile.conf') + self.db_session = init_db_session_class(self.seafile_conf) + + def to_dict(self, result_proxy): + res = [] + for i in result_proxy.mappings(): + res.append(i) + return res + + def _get_repo_id_commit_id(self, start, count): + session = self.db_session() + try: + cmd = """SELECT repo_id, commit_id + FROM Branch WHERE name = :name + AND repo_id NOT IN (SELECT repo_id from VirtualRepo) + limit :start, :count""" + res = [(r[0], r[1]) for r in session.execute(text(cmd), + {'name': 'master', + 'start': start, + 'count': count})] + return res + except Exception as e: + raise e + finally: + session.close() + + + def _get_all_trash_repo_list(self): + session = self.db_session() + try: + cmd = """SELECT repo_id, repo_name, head_id, owner_id, + size, org_id, del_time FROM RepoTrash ORDER BY del_time DESC""" + res = session.execute(text(cmd)) + return self.to_dict(res) + except Exception as e: + raise e + finally: + session.close() + + def _get_all_repo_list(self): + session = self.db_session() + try: + cmd = """SELECT r.repo_id, c.file_count FROM Repo r LEFT JOIN RepoFileCount c + ON r.repo_id = c.repo_id""" + res = session.execute(text(cmd)) + return self.to_dict(res) + except Exception as e: + raise e + finally: + session.close() + + def _get_repo_head_commit(self, repo_id): + session = self.db_session() + try: + cmd = """SELECT b.commit_id + from Branch as b inner join Repo as r + where b.repo_id=r.repo_id and b.repo_id=:repo_id""" + res = session.execute(text(cmd), {'repo_id': repo_id}).fetchone() + return res[0] if res else None + except Exception as e: + raise e + finally: + session.close() + + def _get_repo_name_mtime_size(self, repo_id): + session = self.db_session() + try: + cmd = """SELECT RepoInfo.name, RepoInfo.update_time, RepoSize.size + FROM RepoInfo INNER JOIN RepoSize ON RepoInfo.repo_id = RepoSize.repo_id + AND RepoInfo.repo_id = :repo_id""" + res = session.execute(text(cmd), {'repo_id': repo_id}) + return self.to_dict(res) + except Exception as e: + raise e + finally: + session.close() + + def get_repo_name_mtime_size(self, repo_id): + try: + return self._get_repo_name_mtime_size(repo_id) + except Exception as e: + logger.error(e) + return self._get_repo_name_mtime_size(repo_id) + + def get_all_repo_list(self): + try: + return self._get_all_repo_list() + except Exception as e: + logger.error(e) + return self._get_all_repo_list() + + def get_all_trash_repo_list(self): + try: + return self._get_all_trash_repo_list() + except Exception as e: + logger.error(e) + return self._get_all_trash_repo_list() + + def get_repo_id_commit_id(self, start, count): + try: + return self._get_repo_id_commit_id(start, count) + except Exception as e: + logger.error(e) + return self._get_repo_id_commit_id(start, count) + + def get_repo_head_commit(self, repo_id): + try: + return self._get_repo_head_commit(repo_id) + except Exception as e: + logger.error(e) + return self._get_repo_head_commit(repo_id) + + +repo_data = RepoData() diff --git a/repo_data/db.py b/repo_data/db.py new file mode 100644 index 00000000..2c766c4d --- /dev/null +++ b/repo_data/db.py @@ -0,0 +1,79 @@ +import logging +import configparser + +from urllib.parse import quote_plus + +from sqlalchemy import create_engine +from sqlalchemy.event import contains as has_event_listener, listen as add_event_listener +from sqlalchemy.exc import DisconnectionError +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import Pool + + +# base class of model classes in events.models and stats.models +class Base(DeclarativeBase): + pass + + +logger = logging.getLogger('seafes') + +def create_engine_from_conf(config_file): + seaf_conf = configparser.ConfigParser() + seaf_conf.read(config_file) + backend = seaf_conf.get('database', 'type') + if backend == 'mysql': + db_server = 'localhost' + db_port = 3306 + + if seaf_conf.has_option('database', 'host'): + db_server = seaf_conf.get('database', 'host') + if seaf_conf.has_option('database', 'port'): + db_port =seaf_conf.getint('database', 'port') + db_username = seaf_conf.get('database', 'user') + db_passwd = seaf_conf.get('database', 'password') + db_name = seaf_conf.get('database', 'db_name') + db_url = "mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8" % \ + (db_username, quote_plus(db_passwd), + db_server, db_port, db_name) + else: + logger.critical("Unknown Database backend: %s" % backend) + raise RuntimeError("Unknown Database backend: %s" % backend) + + kwargs = dict(pool_recycle=300, echo=False, echo_pool=False) + + engine = create_engine(db_url, **kwargs) + if not has_event_listener(Pool, 'checkout', ping_connection): + # We use has_event_listener to double check in case we call create_engine + # multipe times in the same process. + add_event_listener(Pool, 'checkout', ping_connection) + + return engine + +def init_db_session_class(config_file): + """Configure Session class for mysql according to the config file.""" + try: + engine = create_engine_from_conf(config_file) + except (configparser.NoOptionError, configparser.NoSectionError) as e: + logger.error(e) + raise RuntimeError("invalid config file %s", config_file) + + Session = sessionmaker(bind=engine) + return Session + +# This is used to fix the problem of "MySQL has gone away" that happens when +# mysql server is restarted or the pooled connections are closed by the mysql +# server beacause being idle for too long. +# +# See http://stackoverflow.com/a/17791117/1467959 +def ping_connection(dbapi_connection, connection_record, connection_proxy): # pylint: disable=unused-argument + cursor = dbapi_connection.cursor() + try: + cursor.execute("SELECT 1") + cursor.close() + except: + logger.info('fail to ping database server, disposing all cached connections') + connection_proxy._pool.dispose() # pylint: disable=protected-access + + # Raise DisconnectionError so the pool would create a new connection + raise DisconnectionError() diff --git a/repo_metadata/__init__.py b/repo_metadata/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/repo_metadata/handlers.py b/repo_metadata/handlers.py new file mode 100644 index 00000000..e96894ed --- /dev/null +++ b/repo_metadata/handlers.py @@ -0,0 +1,22 @@ +import logging + + +def RepoMetadataUpdateHandler(config, redis_connection, msg): + + elements = msg['content'].split('\t') + if len(elements) != 3: + logging.warning("got bad message: %s", elements) + return + + try: + if redis_connection.publish('metadata_update', msg['content']) > 0: + logging.debug('Publish event: %s' % msg['content']) + else: + logging.info('No one subscribed to metadata_update channel, event (%s) has not been send' % msg['content']) + except Exception as e: + logging.error(e) + logging.error("Failed to publish metadata_update event: %s " % msg['content']) + + +def register_handlers(handlers): + handlers.add_handler('seaf_server.event:repo-update', RepoMetadataUpdateHandler) diff --git a/repo_metadata/index_master.py b/repo_metadata/index_master.py new file mode 100644 index 00000000..50f8660a --- /dev/null +++ b/repo_metadata/index_master.py @@ -0,0 +1,64 @@ +import time +import logging +from threading import Thread + +from seafevents.mq import get_mq +from seafevents.utils import get_opt_from_conf_or_env + +logger = logging.getLogger(__name__) + + +class RepoMetadataIndexMaster(Thread): + """ Publish the news of the events obtained from ccnet + """ + def __init__(self, config): + Thread.__init__(self) + + self.mq_server = '127.0.0.1' + self.mq_port = 6379 + self.mq_password = '' + + self._parse_config(config) + + self.mq = get_mq(self.mq_server, self.mq_port, self.mq_password) + + def _parse_config(self, config): + section_name = 'REDIS' + key_server = 'server' + key_port = 'port' + key_password = 'password' + + if not config.has_section(section_name): + return + + self.mq_server = get_opt_from_conf_or_env(config, section_name, key_server, default='') + self.mq_port = get_opt_from_conf_or_env(config, section_name, key_port, default=6379) + self.mq_password = get_opt_from_conf_or_env(config, section_name, key_password, default='') + + def run(self): + logger.info('metadata master starting work') + while True: + try: + self.master_handler() + except Exception as e: + logger.error('Error handing master task: %s' % e) + #prevent waste resource if redis or others didn't connectioned + time.sleep(0.2) + + def master_handler(self): + p = self.mq.pubsub(ignore_subscribe_messages=True) + try: + p.subscribe('metadata_update') + except Exception as e: + logger.error('The connection to the redis server failed: %s' % e) + else: + logger.info('metadata master starting listen') + while True: + message = p.get_message() + if message is not None and isinstance(message['data'], str) and message['data'].count('\t') == 2: + self.mq.lpush('metadata_task', message['data']) + logger.info('%s has been add to metadata task queue' % message['data']) + + if message is None: + # prevent waste resource when no message has been send + time.sleep(5) diff --git a/repo_metadata/index_worker.py b/repo_metadata/index_worker.py new file mode 100644 index 00000000..cca8a97a --- /dev/null +++ b/repo_metadata/index_worker.py @@ -0,0 +1,152 @@ +import time +import logging +import threading + +from redis.exceptions import ConnectionError as NoMQAvailable, ResponseError, TimeoutError + +from seafevents.repo_data import repo_data +from seafevents.mq import get_mq +from seafevents.utils import get_opt_from_conf_or_env +from seafevents.db import init_db_session_class +from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI +from seafevents.repo_metadata.repo_metadata import RepoMetadata +from seafevents.repo_metadata.metadata_manager import MetadataManager + +logger = logging.getLogger(__name__) + + +class RepoMetadataIndexWorker(object): + """ The handler for redis message queue + """ + + def __init__(self, config): + self._db_session_class = init_db_session_class(config) + self.metadata_server_api = MetadataServerAPI('seafevents') + self.repo_metadata = RepoMetadata(self.metadata_server_api) + + self.metadata_manager = MetadataManager(self._db_session_class, self.repo_metadata) + + self.should_stop = threading.Event() + self.LOCK_TIMEOUT = 1800 # 30 minutes + self.REFRESH_INTERVAL = 600 + self.locked_keys = set() + self.mq_server = '127.0.0.1' + self.mq_port = 6379 + self.mq_password = '' + self._parse_config(config) + + self.mq = get_mq(self.mq_server, self.mq_port, self.mq_password) + + def _parse_config(self, config): + section_name = 'REDIS' + key_server = 'server' + key_port = 'port' + key_password = 'password' + + if not config.has_section(section_name): + return + + self.mq_server = get_opt_from_conf_or_env(config, section_name, key_server, default='') + self.mq_port = get_opt_from_conf_or_env(config, section_name, key_port, default=6379) + self.mq_password = get_opt_from_conf_or_env(config, section_name, key_password, default='') + + def _get_lock_key(self, repo_id): + """Return lock key in redis. + """ + return 'v2_' + repo_id + + @property + def tname(self): + return threading.current_thread().name + + def start(self): + for i in range(2): + threading.Thread(target=self.worker_handler, name='subscribe_' + str(i), + daemon=True).start() + threading.Thread(target=self.refresh_lock, name='refresh_thread', daemon=True).start() + + def worker_handler(self): + logger.info('%s starting update metadata work' % self.tname) + try: + while not self.should_stop.isSet(): + self.should_stop.wait(5) + if not self.should_stop.is_set(): + try: + res = self.mq.brpop('metadata_task', timeout=30) + if res is not None: + key, value = res + msg = value.split('\t') + if len(msg) != 3: + logger.info('Bad message: %s' % str(msg)) + else: + repo_id, commit_id = msg[1], msg[2] + self.worker_task_handler(self.mq, repo_id, commit_id, self.should_stop) + except (ResponseError, NoMQAvailable, TimeoutError) as e: + logger.error('The connection to the redis server failed: %s' % e) + except Exception as e: + logger.error('%s Handle Worker Task Error' % self.tname) + logger.error(e, exc_info=True) + # prevent case that redis break at program running. + time.sleep(0.3) + + def worker_task_handler(self, mq, repo_id, commit_id, should_stop): + # Python cannot kill threads, so stop it generate more locked key. + if not should_stop.isSet(): + # set key-value if does not exist which will expire 30 minutes later + if mq.set(self._get_lock_key(repo_id), time.time(), + ex=self.LOCK_TIMEOUT, nx=True): + # get lock + logger.info('%s start updating repo %s' % + (threading.currentThread().getName(), repo_id)) + lock_key = self._get_lock_key(repo_id) + self.locked_keys.add(lock_key) + self.update_metadata(repo_id) + try: + self.locked_keys.remove(lock_key) + except KeyError: + logger.error("%s is already removed. SHOULD NOT HAPPEN!" % lock_key) + mq.delete(lock_key) + logger.info("%s Finish updating repo: %s, delete redis lock %s" % + (self.tname, repo_id, lock_key)) + else: + # the repo is updated by other thread, push back to the queue + self.add_to_undo_task(mq, repo_id, commit_id) + + def update_metadata(self, repo_id): + commit_id = repo_data.get_repo_head_commit(repo_id) + if not commit_id: + # invalid repo without head commit id + logger.error("invalid repo : %s " % repo_id) + return + try: + self.metadata_manager.update_metadata(repo_id, commit_id) + except Exception as e: + logger.exception('update repo: %s metadata error: %s', repo_id, e) + + def add_to_undo_task(self, mq, repo_id, commit_id): + """Push task back to the end of the queue. + """ + mq.lpush('metadata_task', '\t'.join(['repo-update', repo_id, commit_id])) + logger.debug('%s push back task (%s, %s) to the queue' % + (self.tname, repo_id, commit_id)) + + # avoid get the same task repeatedly + time.sleep(0.5) + + def refresh_lock(self): + logger.info('%s Starting refresh locks' % self.tname) + while True: + try: + # workaround for the RuntimeError: Set changed size during iteration + copy = self.locked_keys.copy() + + for lock in copy: + ttl = self.mq.ttl(lock) + new_ttl = ttl + self.REFRESH_INTERVAL + self.mq.expire(lock, new_ttl) + logger.debug('%s Refresh lock [%s] timeout from %s to %s' % + (self.tname, lock, ttl, new_ttl)) + time.sleep(self.REFRESH_INTERVAL) + except Exception as e: + logger.error(e) + time.sleep(1) diff --git a/repo_metadata/metadata_manager.py b/repo_metadata/metadata_manager.py new file mode 100644 index 00000000..5aa34101 --- /dev/null +++ b/repo_metadata/metadata_manager.py @@ -0,0 +1,145 @@ +# coding: UTF-8 + +import logging +from datetime import datetime +from sqlalchemy import text + +from seafobj.exceptions import GetObjectError +from seafobj import CommitDiffer, commit_mgr, fs_mgr +from seafevents.repo_data import repo_data + + +logger = logging.getLogger(__name__) + +ZERO_OBJ_ID = '0000000000000000000000000000000000000000' + + +def get_diff_files(repo_id, old_commit_id, new_commit_id): + if old_commit_id == new_commit_id: + return + + old_root = None + if old_commit_id: + try: + old_commit = commit_mgr.load_commit(repo_id, 0, old_commit_id) + old_root = old_commit.root_id + except GetObjectError as e: + logger.debug(e) + old_root = None + + try: + new_commit = commit_mgr.load_commit(repo_id, 0, new_commit_id) + except GetObjectError as e: + # new commit should exists in the obj store + logger.warning(e) + return + + new_root = new_commit.root_id + version = new_commit.get_version() + + if old_root == new_root: + return + + old_root = old_root if old_root else ZERO_OBJ_ID + + differ = CommitDiffer(repo_id, version, old_root, new_root, False, False) + + return differ.diff() + + +class MetadataManager(object): + + def __init__(self, session, repo_metadata): + self.session = session + self.repo_metadata = repo_metadata + + def update_metadata_index(self, repo_id, old_commit_id, new_commit_id): + added_files, deleted_files, added_dirs, deleted_dirs, modified_files, renamed_files, moved_files, \ + renamed_dirs, moved_dirs = get_diff_files(repo_id, old_commit_id, new_commit_id) + + self.repo_metadata.update(repo_id, added_files, deleted_files, added_dirs, deleted_dirs, modified_files, + renamed_files, moved_files, renamed_dirs, moved_dirs) + + def recovery(self, repo_id, from_commit, to_commit): + logger.warning('%s: metadata in recovery', repo_id) + self.update_metadata_index(repo_id, from_commit, to_commit) + + self.finish_update_metadata(repo_id, to_commit) + + def update_metadata(self, repo_id, latest_commit_id): + with self.session() as session: + sql = "SELECT enabled, from_commit, to_commit FROM repo_metadata WHERE repo_id='%s'" % repo_id + + record = session.execute(text(sql)).fetchone() + + if not record or not record[0]: + return + + from_commit = record[1] + to_commit = record[2] + + if to_commit: + self.recovery(repo_id, from_commit, to_commit) + from_commit = to_commit + + if latest_commit_id != from_commit: + logger.info('Updating repo %s' % repo_id) + logger.debug('latest_commit_id: %s, from_commit: %s' % + (latest_commit_id, from_commit)) + + self.begin_update_metadata(repo_id, from_commit, latest_commit_id) + self.update_metadata_index(repo_id, from_commit, latest_commit_id) + self.finish_update_metadata(repo_id, latest_commit_id) + else: + logger.debug('Repo %s already uptdate', repo_id) + + def begin_update_metadata(self, repo_id, old_commit_id, new_commit_id): + with self.session() as session: + session.execute( + text('update repo_metadata set from_commit=:from_commit, to_commit=:to_commit where repo_id=:repo_id'), + {'from_commit': old_commit_id, 'to_commit': new_commit_id, 'repo_id': repo_id}) + session.commit() + + def finish_update_metadata(self, repo_id, new_commit_id): + with self.session() as session: + session.execute( + text('update repo_metadata set from_commit=:from_commit, to_commit=:to_commit where repo_id=:repo_id'), + {'from_commit': new_commit_id, 'to_commit': '', 'repo_id': repo_id}) + session.commit() + + def delete_metadata(self, repo_id): + self.repo_metadata.delete_base(repo_id) + + def begin_create_metadata(self, repo_id, commit_id, new_commit_id): + with self.session() as session: + sql = "SELECT enabled, from_commit, to_commit FROM repo_metadata WHERE repo_id='%s'" % repo_id + record = session.execute(text(sql)).fetchone() + if not record: + with self.session() as session: + session.execute( + text(""" + INSERT INTO repo_metadata (`repo_id`, `enabled`, `from_commit`, `to_commit`, `modified_time`, `created_time`) VALUES (:repo_id, :enabled, :from_commit, :to_commit, :modified_time, :created_time) + """), + {'from_commit': commit_id, 'to_commit': new_commit_id, 'enabled': 1, 'repo_id': repo_id, + 'modified_time': datetime.utcnow(), 'created_time': datetime.utcnow()}) + session.commit() + else: + with self.session() as session: + session.execute( + text( + 'update repo_metadata set from_commit=:from_commit, to_commit=:to_commit, enabled=:enabled where repo_id=:repo_id'), + {'from_commit': commit_id, 'to_commit': new_commit_id, 'repo_id': repo_id, 'enabled': 1}) + session.commit() + + def create_metadata(self, repo_id): + new_commit_id = repo_data.get_repo_head_commit(repo_id) + old_commit_id = ZERO_OBJ_ID + + # delete base to prevent dirty data caused by last failure + self.repo_metadata.delete_base(repo_id) + + self.repo_metadata.create_base(repo_id) + self.begin_create_metadata(repo_id, old_commit_id, new_commit_id) + self.update_metadata_index(repo_id, old_commit_id, new_commit_id) + self.finish_update_metadata(repo_id, new_commit_id) + diff --git a/repo_metadata/metadata_server_api.py b/repo_metadata/metadata_server_api.py new file mode 100644 index 00000000..7b0c7795 --- /dev/null +++ b/repo_metadata/metadata_server_api.py @@ -0,0 +1,128 @@ +import requests, jwt, time + +from seafevents.app.config import METADATA_SERVER_SECRET_KEY, METADATA_SERVER_URL + + +class StructureTable(object): + def __init__(self, id, name): + self.id = id + self.name = name + + +class StructureColumn(object): + def __init__(self, key, name, type): + self.key = key + self.name = name + self.type = type + + def to_build_column_dict(self): + return { + 'key': self.key, + 'name': self.name, + 'type': self.type + } + +#metadata base +METADATA_TABLE = StructureTable('0001', 'Table1') +METADATA_COLUMN_ID = StructureColumn('_id', '_id', 'text') +METADATA_COLUMN_CREATOR = StructureColumn('_file_creator', '_file_creator', 'text') +METADATA_COLUMN_CREATED_TIME = StructureColumn('_file_ctime', '_file_ctime', 'date') +METADATA_COLUMN_MODIFIER = StructureColumn('_file_modifier', '_file_modifier', 'text') +METADATA_COLUMN_MODIFIED_TIME = StructureColumn('_file_mtime', '_file_mtime', 'date') +METADATA_COLUMN_PARENT_DIR = StructureColumn('_parent_dir', '_parent_dir', 'text') +METADATA_COLUMN_NAME = StructureColumn('_name', '_name', 'text') +METADATA_COLUMN_IS_DIR = StructureColumn('_is_dir', '_is_dir', 'text') + + +def parse_response(response): + if response.status_code >= 400 or response.status_code < 200: + raise ConnectionError(response.status_code, response.text) + else: + try: + return response.json() + except: + pass + + +class MetadataServerAPI: + def __init__(self, user, timeout=30): + self.user = user + self.timeout = timeout + self.secret_key = METADATA_SERVER_SECRET_KEY + self.server_url = METADATA_SERVER_URL + + def gen_headers(self, base_id): + payload = { + 'exp': int(time.time()) + 3600, + 'base_id': base_id, + 'user': self.user + } + token = jwt.encode(payload, self.secret_key, algorithm='HS256') + return {"Authorization": "Bearer %s" % token} + + def create_base(self, base_id): + headers = self.gen_headers(base_id) + url = f'{self.server_url}/api/v1/base/{base_id}' + response = requests.post(url, headers=headers, timeout=self.timeout) + return parse_response(response) + + def delete_base(self, base_id): + headers = self.gen_headers(base_id) + url = f'{self.server_url}/api/v1/base/{base_id}' + response = requests.delete(url, headers=headers, timeout=self.timeout) + + if response.status_code == 404: + return {'success': True} + return parse_response(response) + + def add_column(self, base_id, table_id, column): + headers = self.gen_headers(base_id) + url = f'{self.server_url}/api/v1/base/{base_id}/columns' + data = { + 'table_id': table_id, + 'column': column + } + response = requests.post(url, json=data, headers=headers, timeout=self.timeout) + return parse_response(response) + + def insert_rows(self, base_id, table_id, rows): + headers = self.gen_headers(base_id) + url = f'{self.server_url}/api/v1/base/{base_id}/rows' + data = { + 'table_id': table_id, + 'rows': rows + } + response = requests.post(url, json=data, headers=headers, timeout=self.timeout) + return parse_response(response) + + def update_rows(self, base_id, table_id, rows): + headers = self.gen_headers(base_id) + url = f'{self.server_url}/api/v1/base/{base_id}/rows' + data = { + 'table_id': table_id, + 'rows': rows + } + response = requests.put(url, json=data, headers=headers, timeout=self.timeout) + return parse_response(response) + + def delete_rows(self, base_id, table_id, row_ids): + headers = self.gen_headers(base_id) + url = f'{self.server_url}/api/v1/base/{base_id}/rows' + data = { + 'table_id': table_id, + 'row_ids': row_ids + } + response = requests.delete(url, json=data, headers=headers, timeout=self.timeout) + return parse_response(response) + + def query_rows(self, base_id, sql, params=[]): + headers = self.gen_headers(base_id) + post_data = { + 'sql': sql + } + + if params: + post_data['params'] = params + url = f'{self.server_url}/api/v1/base/{base_id}/query' + response = requests.post(url, json=post_data, headers=headers, timeout=self.timeout) + return parse_response(response) diff --git a/repo_metadata/repo_metadata.py b/repo_metadata/repo_metadata.py new file mode 100644 index 00000000..2cfebc20 --- /dev/null +++ b/repo_metadata/repo_metadata.py @@ -0,0 +1,418 @@ +import os +import logging + +from seafevents.repo_metadata.metadata_server_api import METADATA_TABLE, METADATA_COLUMN_ID, \ + METADATA_COLUMN_CREATOR, METADATA_COLUMN_CREATED_TIME, METADATA_COLUMN_MODIFIER, \ + METADATA_COLUMN_MODIFIED_TIME, METADATA_COLUMN_PARENT_DIR, METADATA_COLUMN_NAME, METADATA_COLUMN_IS_DIR + +from seafevents.utils import timestamp_to_isoformat_timestr + +logger = logging.getLogger(__name__) + +EXCLUDED_PATHS = ['/_Internal', '/images'] + + +class RepoMetadata: + + def __init__(self, metadata_server_api): + self.metadata_server_api = metadata_server_api + + def update(self, repo_id, added_files, deleted_files, added_dirs, deleted_dirs, modified_files, + renamed_files, moved_files, renamed_dirs, moved_dirs): + + # delete added_files delete added dirs for preventing duplicate insertions + self.delete_files(repo_id, added_files) + self.delete_dirs(repo_id, added_dirs) + + self.add_files(repo_id, added_files) + self.delete_files(repo_id, deleted_files) + self.add_dirs(repo_id, added_dirs) + self.delete_dirs(repo_id, deleted_dirs) + self.update_files(repo_id, modified_files) + + # self.rename_files(repo_id, renamed_files) + # self.move_files(repo_id, moved_files) + # self.rename_dirs(repo_id, renamed_dirs) + # self.move_dirs(repo_id, moved_dirs) + + def is_excluded_path(self, path): + if not path or path == '/': + return True + for ex_path in EXCLUDED_PATHS: + if path.startswith(ex_path): + return True + + def add_files(self, repo_id, added_files): + if not added_files: + return + rows = [] + for de in added_files: + path = de.path.rstrip('/') + mtime = de.mtime + parent_dir = os.path.dirname(path) + file_name = os.path.basename(path) + modifier = de.modifier + + if self.is_excluded_path(path): + continue + + row = { + METADATA_COLUMN_CREATOR.name: modifier, + METADATA_COLUMN_CREATED_TIME.name: timestamp_to_isoformat_timestr(mtime), + METADATA_COLUMN_MODIFIER.name: modifier, + METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(mtime), + METADATA_COLUMN_PARENT_DIR.name: parent_dir, + METADATA_COLUMN_NAME.name: file_name, + METADATA_COLUMN_IS_DIR.name: 'False', + } + rows.append(row) + if not rows: + return + self.metadata_server_api.insert_rows(repo_id, METADATA_TABLE.id, rows) + + def delete_files(self, repo_id, deleted_files): + if not deleted_files: + return + + sql = f'SELECT `{METADATA_COLUMN_ID.name}` FROM `{METADATA_TABLE.name}` WHERE' + need_deleted = False + for file in deleted_files: + path = file.path.rstrip('/') + if self.is_excluded_path(path): + continue + need_deleted = True + parent_dir = os.path.dirname(path) + file_name = os.path.basename(path) + sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + + if not need_deleted: + return + sql = sql.rstrip(' OR') + query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) + + if not query_result: + return + + row_ids = [] + for row in query_result: + row_ids.append(row[METADATA_COLUMN_ID.name]) + + self.metadata_server_api.delete_rows(repo_id, METADATA_TABLE.id, row_ids) + + def update_files(self, repo_id, modified_files): + if not modified_files: + return + + sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + path_to_file_dict = {} + need_update = False + for file in modified_files: + path = file.path.rstrip('/') + if self.is_excluded_path(path): + continue + need_update = True + parent_dir = os.path.dirname(path) + file_name = os.path.basename(path) + key = parent_dir + file_name + path_to_file_dict[key] = file + + sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + + if not need_update: + return + sql = sql.rstrip(' OR') + query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) + + if not query_result: + return + + updated_rows = [] + for row in query_result: + row_id = row[METADATA_COLUMN_ID.name] + parent_dir = row[METADATA_COLUMN_PARENT_DIR.name] + file_name = row[METADATA_COLUMN_NAME.name] + key = parent_dir + file_name + new_row = path_to_file_dict.get(key) + + update_row = { + METADATA_COLUMN_ID.name: row_id, + METADATA_COLUMN_MODIFIER.name: new_row.modifier, + METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(new_row.mtime), + } + updated_rows.append(update_row) + + self.metadata_server_api.update_rows(repo_id, METADATA_TABLE.id, updated_rows) + + def add_dirs(self, repo_id, added_dirs): + if not added_dirs: + return + + rows = [] + for de in added_dirs: + path = de.path.rstrip('/') + if self.is_excluded_path(path): + continue + parent_dir = os.path.dirname(path) + file_name = os.path.basename(path) + mtime = de.mtime + + row = { + METADATA_COLUMN_CREATOR.name: '', + METADATA_COLUMN_CREATED_TIME.name: timestamp_to_isoformat_timestr(mtime), + METADATA_COLUMN_MODIFIER.name: '', + METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(mtime), + METADATA_COLUMN_PARENT_DIR.name: parent_dir, + METADATA_COLUMN_NAME.name: file_name, + METADATA_COLUMN_IS_DIR.name: 'True', + + } + rows.append(row) + + if not rows: + return + + self.metadata_server_api.insert_rows(repo_id, METADATA_TABLE.id, rows) + + def delete_dirs(self, repo_id, deleted_dirs): + if not deleted_dirs: + return + sql = f'SELECT `{METADATA_COLUMN_ID.name}` FROM `{METADATA_TABLE.name}` WHERE' + need_delete = False + for d in deleted_dirs: + path = d.path.rstrip('/') + if self.is_excluded_path(path): + continue + need_delete = True + parent_dir = os.path.dirname(path) + dir_name = os.path.basename(path) + sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{dir_name}") OR' + + if not need_delete: + return + sql = sql.rstrip(' OR') + query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) + + if not query_result: + return + + row_ids = [] + for row in query_result: + row_ids.append(row[METADATA_COLUMN_ID.name]) + + self.metadata_server_api.delete_rows(repo_id, METADATA_TABLE.id, row_ids) + + def rename_files(self, repo_id, renamed_files): + if not renamed_files: + return + + sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + path_to_file_dict = {} + need_update = False + for file in renamed_files: + path = file.path.rstrip('/') + if self.is_excluded_path(path): + continue + need_update = True + parent_dir = os.path.dirname(path) + file_name = os.path.basename(path) + key = parent_dir + file_name + path_to_file_dict[key] = file + + sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + if not need_update: + return + sql = sql.rstrip(' OR') + query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) + + if not query_result: + return + + updated_rows = [] + for row in query_result: + row_id = row[METADATA_COLUMN_ID.name] + parent_dir = row[METADATA_COLUMN_PARENT_DIR.name] + file_name = row[METADATA_COLUMN_NAME.name] + key = parent_dir + file_name + new_row = path_to_file_dict.get(key) + + new_path = new_row.new_path + new_parent_dir = os.path.dirname(new_path) + new_file_name = os.path.basename(new_path) + + update_row = { + METADATA_COLUMN_ID.name: row_id, + METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + METADATA_COLUMN_NAME.name: new_file_name, + METADATA_COLUMN_MODIFIER.name: new_row.modifier, + METADATA_COLUMN_MODIFIED_TIME.name: timestamp_to_isoformat_timestr(new_row.mtime), + } + updated_rows.append(update_row) + + if not updated_rows: + return + + self.metadata_server_api.update_rows(repo_id, METADATA_TABLE.id, updated_rows) + + def move_files(self, repo_id, moved_files): + if not moved_files: + return + + sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + path_to_file_dict = {} + need_update = False + for file in moved_files: + path = file.path.rstrip('/') + if self.is_excluded_path(path): + continue + need_update = True + parent_dir = os.path.dirname(path) + file_name = os.path.basename(path) + key = parent_dir + file_name + path_to_file_dict[key] = file + + sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{file_name}") OR' + + if not need_update: + return + sql = sql.rstrip(' OR') + query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) + + if not query_result: + return + + updated_rows = [] + for row in query_result: + row_id = row[METADATA_COLUMN_ID.name] + parent_dir = row[METADATA_COLUMN_PARENT_DIR.name] + file_name = row[METADATA_COLUMN_NAME.name] + key = parent_dir + file_name + new_row = path_to_file_dict.get(key) + + new_path = new_row.new_path + new_parent_dir = os.path.dirname(new_path) + update_row = { + METADATA_COLUMN_ID.name: row_id, + METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + } + updated_rows.append(update_row) + + if not updated_rows: + return + + self.metadata_server_api.update_rows(repo_id, METADATA_TABLE.id, updated_rows) + + def rename_dirs(self, repo_id, renamed_dirs): + if not renamed_dirs: + return + sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + for d in renamed_dirs: + old_path = d.path.rstrip('/') + parent_dir = os.path.dirname(old_path) + dir_name = os.path.basename(old_path) + new_path = d.new_path + + if self.is_excluded_path(old_path): + continue + + sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{dir_name}") OR' \ + f' (`{METADATA_COLUMN_PARENT_DIR.name}` LIKE "{old_path}%")' + query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) + + if not query_result: + return + + updated_rows = [] + for row in query_result: + row_id = row[METADATA_COLUMN_ID.name] + p_dir = row[METADATA_COLUMN_PARENT_DIR.name] + name = row[METADATA_COLUMN_NAME.name] + new_parent_dir = os.path.dirname(new_path) + new_name = os.path.basename(new_path) + + if parent_dir == p_dir and dir_name == name: + update_row = { + METADATA_COLUMN_ID.name: row_id, + METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + METADATA_COLUMN_NAME.name: new_name + } + updated_rows.append(update_row) + else: + old_dir_prefix = os.path.join(parent_dir, dir_name) + new_dir_prefix = os.path.join(new_parent_dir, new_name) + new_parent_dir = p_dir.replace(old_dir_prefix, new_dir_prefix) + + update_row = { + METADATA_COLUMN_ID.name: row_id, + METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + } + updated_rows.append(update_row) + + self.metadata_server_api.update_rows(repo_id, METADATA_TABLE.id, updated_rows) + + def move_dirs(self, repo_id, moved_dirs): + if not moved_dirs: + return + sql = f'SELECT `{METADATA_COLUMN_ID.name}`, `{METADATA_COLUMN_PARENT_DIR.name}`, `{METADATA_COLUMN_NAME.name}` FROM `{METADATA_TABLE.name}` WHERE' + for d in moved_dirs: + old_path = d.path.rstrip('/') + parent_dir = os.path.dirname(old_path) + dir_name = os.path.basename(old_path) + if self.is_excluded_path(old_path): + continue + + new_path = d.new_path + sql += f' (`{METADATA_COLUMN_PARENT_DIR.name}` = "{parent_dir}" AND `{METADATA_COLUMN_NAME.name}` = "{dir_name}") OR' \ + f' (`{METADATA_COLUMN_PARENT_DIR.name}` LIKE "{old_path}%")' + query_result = self.metadata_server_api.query_rows(repo_id, sql).get('results', []) + + if not query_result: + return + + updated_rows = [] + for row in query_result: + row_id = row[METADATA_COLUMN_ID.name] + p_dir = row[METADATA_COLUMN_PARENT_DIR.name] + name = row[METADATA_COLUMN_NAME.name] + new_parent_dir = os.path.dirname(new_path) + new_name = os.path.basename(new_path) + + if parent_dir == p_dir and dir_name == name: + update_row = { + METADATA_COLUMN_ID.name: row_id, + METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + METADATA_COLUMN_NAME.name: new_name, + } + updated_rows.append(update_row) + else: + old_dir_prefix = os.path.join(parent_dir, dir_name) + new_dir_prefix = os.path.join(new_parent_dir, new_name) + new_parent_dir = p_dir.replace(old_dir_prefix, new_dir_prefix) + + update_row = { + METADATA_COLUMN_ID.name: row_id, + METADATA_COLUMN_PARENT_DIR.name: new_parent_dir, + } + updated_rows.append(update_row) + + if not updated_rows: + continue + + self.metadata_server_api.update_rows(repo_id, METADATA_TABLE.id, updated_rows) + + def delete_base(self, repo_id): + self.metadata_server_api.delete_base(repo_id) + + def init_columns(self, repo_id): + # initial md-server base and insert records + # Add columns: creator, created_time, modifier, modified_time, parent_dir, name + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_CREATOR.to_build_column_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_CREATED_TIME.to_build_column_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_MODIFIER.to_build_column_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_MODIFIED_TIME.to_build_column_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_PARENT_DIR.to_build_column_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_NAME.to_build_column_dict()) + self.metadata_server_api.add_column(repo_id, METADATA_TABLE.id, METADATA_COLUMN_IS_DIR.to_build_column_dict()) + + def create_base(self, repo_id): + self.metadata_server_api.create_base(repo_id) + self.init_columns(repo_id) diff --git a/requirements.txt b/requirements.txt index f8c5442f..c3427434 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,6 @@ mock pytest pyjwt pymysql +gevent==24.2.* +Flask==2.2.* +apscheduler diff --git a/seafevent_server/__init__.py b/seafevent_server/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/seafevent_server/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/seafevent_server/request_handler.py b/seafevent_server/request_handler.py new file mode 100644 index 00000000..531cb727 --- /dev/null +++ b/seafevent_server/request_handler.py @@ -0,0 +1,50 @@ +import jwt +import logging + +from flask import Flask, request, make_response +from seafevents.app.config import SEAHUB_SECRET_KEY +from seafevents.seafevent_server.task_manager import task_manager + +app = Flask(__name__) +logger = logging.getLogger(__name__) + + +def check_auth_token(req): + auth = req.headers.get('Authorization', '').split() + if not auth or auth[0].lower() != 'token' or len(auth) != 2: + return False, 'Token invalid.' + + token = auth[1] + if not token: + return False, 'Token invalid.' + + private_key = SEAHUB_SECRET_KEY + try: + jwt.decode(token, private_key, algorithms=['HS256']) + except (jwt.ExpiredSignatureError, jwt.InvalidSignatureError) as e: + return False, e + + return True, None + + +@app.route('/add-init-metadata-task', methods=['GET']) +def add_init_metadata_task(): + is_valid, error = check_auth_token(request) + if not is_valid: + return make_response((error, 403)) + + if task_manager.tasks_queue.full(): + logger.warning('seafevent server busy, queue size: %d' % (task_manager.tasks_queue.qsize(), )) + return make_response(('dtable io server busy.', 400)) + + username = request.args.get('username') + repo_id = request.args.get('repo_id') + + try: + task_id = task_manager.add_init_metadata_task( + username, repo_id) + except Exception as e: + logger.error(e) + return make_response((e, 500)) + + return make_response(({'task_id': task_id}, 200)) diff --git a/seafevent_server/seafevent_server.py b/seafevent_server/seafevent_server.py new file mode 100644 index 00000000..fd3e4bce --- /dev/null +++ b/seafevent_server/seafevent_server.py @@ -0,0 +1,42 @@ +from threading import Thread + +from gevent.pywsgi import WSGIServer +from seafevents.seafevent_server.request_handler import app as application +from seafevents.seafevent_server.task_manager import task_manager + + +class SeafEventServer(Thread): + + def __init__(self, app, config): + Thread.__init__(self) + self._parse_config(config) + self.app = app + task_manager.init(self.app, self._workers, self._task_expire_time, config) + + task_manager.run() + + self._server = WSGIServer((self._host, int(self._port)), application) + + def _parse_config(self, config): + if config.has_option('SEAF-EVENT-SERVER', 'host'): + self._host = config.get('SEAF-EVENT-SERVER', 'host') + else: + self._host = '127.0.0.1' + + if config.has_option('SEAF-EVENT-SERVER', 'port'): + self._port = config.getint('SEAF-EVENT-SERVER', 'port') + else: + self._port = '8889' + + if config.has_option('SEAF-EVENT-SERVER', 'workers'): + self._workers = config.getint('SEAF-EVENT-SERVER', 'workers') + else: + self._workers = 3 + + if config.has_option('SEAF-EVENT-SERVER', 'task_expire_time'): + self._task_expire_time = config.getint('SEAF-EVENT-SERVER', 'task_expire_time') + else: + self._task_expire_time = 30 * 60 + + def run(self): + self._server.serve_forever() diff --git a/seafevent_server/task_manager.py b/seafevent_server/task_manager.py new file mode 100644 index 00000000..7d0410a5 --- /dev/null +++ b/seafevent_server/task_manager.py @@ -0,0 +1,178 @@ +import logging +import queue +import uuid +from datetime import datetime +from threading import Thread, Lock + +from apscheduler.triggers.cron import CronTrigger +from apscheduler.schedulers.gevent import GeventScheduler + +from seafevents.db import init_db_session_class +from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI +from seafevents.repo_metadata.repo_metadata import RepoMetadata +from seafevents.repo_metadata.metadata_manager import MetadataManager + + +logger = logging.getLogger(__name__) + + +class IndexTask: + + def __init__(self, task_id, readable_id, func, args): + self.id = task_id + self.readable_id = readable_id + self.func = func + self.args = args + + self.status = 'init' + + self.started_at = None + self.finished_at = None + + self.result = None + self.error = None + + @staticmethod + def get_readable_id(readable_id): + return readable_id + + def run(self): + self.status = 'running' + self.started_at = datetime.now() + return self.func(*self.args) + + def set_result(self, result): + self.result = result + self.status = 'success' + self.finished_at = datetime.now() + + def set_error(self, error): + self.error = error + self.status = 'error' + self.finished_at = datetime.now() + + def is_finished(self): + return self.status in ['error', 'success'] + + def get_cost_time(self): + if self.started_at and self.finished_at: + return (self.finished_at - self.started_at).seconds + return None + + def get_info(self): + return f'{self.id}--{self.readable_id}--{self.func}' + + def __str__(self): + return f'' + + +class TaskManager: + + def __init__(self): + self.tasks_queue = queue.Queue() + self.tasks_map = {} # {task_id: task} all tasks + self.readable_id2task_map = {} # {task_readable_id: task} in queue or running + self.check_task_lock = Lock() # lock access to readable_id2task_map + self.sched = GeventScheduler() + self.app = None + self.conf = { + 'workers': 3, + 'expire_time': 30 * 60 + } + self._db_session_class = None + self._metadata_server_api = None + self.repo_metadata = None + self.metadata_manager = None + + self.sched.add_job(self.clear_expired_tasks, CronTrigger(minute='*/30')) + + def init(self, app, workers, task_expire_time, config): + self.app = app + self.conf['expire_time'] = task_expire_time + self.conf['workers'] = workers + + self._db_session_class = init_db_session_class(config) + self._metadata_server_api = MetadataServerAPI('seafevents') + self.repo_metadata = RepoMetadata(self._metadata_server_api) + self.metadata_manager = MetadataManager(self._db_session_class, self.repo_metadata) + + def get_pending_or_running_task(self, readable_id): + task = self.readable_id2task_map.get(readable_id) + return task + + def add_init_metadata_task(self, username, repo_id): + + readable_id = repo_id + with self.check_task_lock: + task = self.get_pending_or_running_task(readable_id) + if task: + return task.id + + task_id = str(uuid.uuid4()) + task = IndexTask(task_id, readable_id, self.metadata_manager.create_metadata, + (repo_id, ) + ) + self.tasks_map[task_id] = task + self.readable_id2task_map[task.readable_id] = task + self.tasks_queue.put(task) + + return task_id + + def query_task(self, task_id): + return self.tasks_map.get(task_id) + + def handle_task(self): + while True: + try: + task = self.tasks_queue.get(timeout=2) + except queue.Empty: + continue + except Exception as e: + logger.error(e) + continue + + try: + task_info = task.get_info() + logger.info('Run task: %s' % task_info) + + # run + task.run() + task.set_result('success') + + logger.info('Run task success: %s cost %ds \n' % (task_info, task.get_cost_time())) + except Exception as e: + task.set_error(e) + logger.exception('Failed to handle task %s, error: %s \n' % (task.get_info(), e)) + finally: + with self.check_task_lock: + self.readable_id2task_map.pop(task.readable_id, None) + + def run(self): + thread_num = self.conf['workers'] + for i in range(thread_num): + t_name = 'TaskManager Thread-' + str(i) + t = Thread(target=self.handle_task, name=t_name) + t.setDaemon(True) + t.start() + self.sched.start() + + def clear_expired_tasks(self): + """clear tasks finished for conf['expire_time'] in tasks_map + + when a task end, it will not be pop from tasks_map immediately, + because this task might be responsible for multi-http-requests(not only one), that might query task status + + but task will not restored forever, so need to clear + """ + expire_tasks = [] + for task in self.tasks_map.values(): + if not task.is_finished(): + continue + if (datetime.now() - task.finished_at).seconds >= self.conf['expire_time']: + expire_tasks.append(task) + logger.info('expired tasks: %s', len(expire_tasks)) + for task in expire_tasks: + self.tasks_map.pop(task.id, None) + + +task_manager = TaskManager() diff --git a/utils/__init__.py b/utils/__init__.py index 544eb5ba..4336293d 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -4,6 +4,9 @@ import atexit import configparser import subprocess +import datetime +import pytz +from seafevents.app.config import TIME_ZONE logger = logging.getLogger(__name__) pyexec = None @@ -196,3 +199,34 @@ def parse_interval(interval, default): return default else: return val + + +def dt(value): + """Convert 32/64 bits timestamp to datetime object. + """ + try: + return datetime.datetime.utcfromtimestamp(value) + except ValueError: + # TODO: need a better way to handle 64 bits timestamp. + return datetime.datetime.utcfromtimestamp(value / 1000000) + + +def timestamp_to_isoformat_timestr(timestamp): + try: + min_ts = -(1 << 31) + max_ts = (1 << 31) - 1 + if min_ts <= timestamp <= max_ts: + dt_obj = datetime.datetime.fromtimestamp(timestamp) + else: + dt_obj = datetime.datetime.fromtimestamp(timestamp / 1000000) + + time_zone = pytz.timezone(TIME_ZONE) + dt_obj = dt_obj.replace(microsecond=0) + aware_datetime = dt_obj.replace(tzinfo=time_zone) + target_timezone = pytz.timezone(str(time_zone)) + localized_datetime = target_timezone.normalize(aware_datetime.astimezone(pytz.UTC)) + isoformat_timestr = localized_datetime.isoformat() + return isoformat_timestr + except Exception as e: + logger.error(e) + return ''