Skip to content

Commit

Permalink
update metadata in seafevents (#328)
Browse files Browse the repository at this point in the history
* update metadata in seafevents

* optimize
  • Loading branch information
JoinTyang committed Jun 29, 2024
1 parent 687ae24 commit 91ef4f9
Show file tree
Hide file tree
Showing 21 changed files with 1,487 additions and 6 deletions.
13 changes: 13 additions & 0 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
WorkWinxinNoticeSender, FileUpdatesSender, RepoOldFileAutoDelScanner,\
DeletedFilesCountCleaner

from seafevents.repo_metadata.index_master import RepoMetadataIndexMaster
from seafevents.repo_metadata.index_worker import RepoMetadataIndexWorker
from seafevents.seafevent_server.seafevent_server import SeafEventServer
from seafevents.app.config import ENABLE_METADATA_MANAGEMENT


class App(object):
def __init__(self, config, ccnet_config, seafile_config,
Expand All @@ -17,6 +22,7 @@ def __init__(self, config, ccnet_config, seafile_config,
self._events_handler = EventsHandler(config)
self._count_traffic_task = CountTrafficInfo(config)
self._update_login_record_task = CountUserActivity(config)
self._seafevent_server = SeafEventServer(self, config)

if self._bg_tasks_enabled:
self._index_updater = IndexUpdater(config)
Expand All @@ -29,12 +35,16 @@ def __init__(self, config, ccnet_config, seafile_config,
self._file_updates_sender = FileUpdatesSender()
self._repo_old_file_auto_del_scanner = RepoOldFileAutoDelScanner(config)
self._deleted_files_count_cleaner = DeletedFilesCountCleaner(config)
if ENABLE_METADATA_MANAGEMENT:
self._index_master = RepoMetadataIndexMaster(config)
self._index_worker = RepoMetadataIndexWorker(config)

def serve_forever(self):
if self._fg_tasks_enabled:
self._events_handler.start()
self._update_login_record_task.start()
self._count_traffic_task.start()
self._seafevent_server.start()

if self._bg_tasks_enabled:
self._file_updates_sender.start()
Expand All @@ -47,3 +57,6 @@ def serve_forever(self):
self._content_scanner.start()
self._repo_old_file_auto_del_scanner.start()
self._deleted_files_count_cleaner.start()
if ENABLE_METADATA_MANAGEMENT:
self._index_master.start()
self._index_worker.start()
4 changes: 4 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
DTABLE_WEB_SERVER = getattr(seahub_settings, 'DTABLE_WEB_SERVER', None)
SEATABLE_EX_PROPS_BASE_API_TOKEN = getattr(seahub_settings, 'SEATABLE_EX_PROPS_BASE_API_TOKEN', None)
EX_PROPS_TABLE = getattr(seahub_settings, 'EX_PROPS_TABLE', None)
SEAHUB_SECRET_KEY = getattr(seahub_settings, 'SECRET_KEY', '')
METADATA_SERVER_SECRET_KEY = getattr(seahub_settings, 'METADATA_SERVER_SECRET_KEY', '')
METADATA_SERVER_URL = getattr(seahub_settings, 'METADATA_SERVER_URL', '')
ENABLE_METADATA_MANAGEMENT = getattr(seahub_settings, 'ENABLE_METADATA_MANAGEMENT', False)
except ImportError:
logger.critical("Can not import seahub settings.")
raise RuntimeError("Can not import seahub settings.")
Expand Down
7 changes: 2 additions & 5 deletions app/event_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ def __init__(self, config, socket_connect_timeout=30, socket_timeout=None):
self._parse_config(config, socket_connect_timeout, socket_timeout)

def _parse_config(self, config, socket_connect_timeout, socket_timeout):
mq_type = ''
if config.has_option('EVENTS PUBLISH', 'mq_type'):
mq_type = config.get('EVENTS PUBLISH', 'mq_type').upper()
if mq_type != 'REDIS':
logging.warning("Unknown database backend: %s" % mq_type)

if not config.has_section('REDIS'):
return

if config.has_option('REDIS', 'server'):
Expand Down
4 changes: 3 additions & 1 deletion app/mq_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import seafevents.statistics.handlers as stats_handlers
from seafevents.db import init_db_session_class
from seafevents.app.event_redis import RedisClient
import seafevents.repo_metadata.handlers as metadata_handler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,7 +48,7 @@ def handle_message(self, config, session, redis_connection, channel, msg):
funcs = self._handlers.get(msg_type)
for func in funcs:
try:
if func.__name__ == 'RepoUpdatePublishHandler':
if func.__name__ == 'RepoUpdatePublishHandler' or func.__name__ == 'RepoMetadataUpdateHandler':
func(config, redis_connection, msg)
else:
func(config, session, msg)
Expand Down Expand Up @@ -83,6 +84,7 @@ def init_message_handlers(config):
events_handlers.register_handlers(message_handler, enable_audit)
stats_handlers.register_handlers(message_handler)
publisher_handlers.register_handlers(message_handler)
metadata_handler.register_handlers(message_handler)


class EventsHandler(object):
Expand Down
2 changes: 2 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def main(background_tasks_only=False):
foreground_tasks_enabled = True
background_tasks_enabled = False

from gevent import monkey; monkey.patch_all()

app = App(config, ccnet_config, seafile_config, foreground_tasks_enabled=foreground_tasks_enabled,
background_tasks_enabled=background_tasks_enabled)

Expand Down
18 changes: 18 additions & 0 deletions mq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import redis
import logging

logger = logging.getLogger(__name__)


def get_mq(server, port, password):
rdp = redis.ConnectionPool(host=server, port=port,
password=password, retry_on_timeout=True, decode_responses=True)
mq = redis.StrictRedis(connection_pool=rdp)
try:
mq.ping()
except Exception as e:
logger.error("Redis server can't be connected: host %s, port %s, error %s",
server, port, e)
finally:
# python redis is a client, each operation tries to connect and retry exec
return mq
129 changes: 129 additions & 0 deletions repo_data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os
import logging
from sqlalchemy.sql import text

from seafevents.repo_data.db import init_db_session_class

logger = logging.getLogger(__name__)


class RepoData(object):
def __init__(self):
if 'SEAFILE_CENTRAL_CONF_DIR' in os.environ:
confdir = os.environ['SEAFILE_CENTRAL_CONF_DIR']
else:
confdir = os.environ['SEAFILE_CONF_DIR']
self.seafile_conf = os.path.join(confdir, 'seafile.conf')
self.db_session = init_db_session_class(self.seafile_conf)

def to_dict(self, result_proxy):
res = []
for i in result_proxy.mappings():
res.append(i)
return res

def _get_repo_id_commit_id(self, start, count):
session = self.db_session()
try:
cmd = """SELECT repo_id, commit_id
FROM Branch WHERE name = :name
AND repo_id NOT IN (SELECT repo_id from VirtualRepo)
limit :start, :count"""
res = [(r[0], r[1]) for r in session.execute(text(cmd),
{'name': 'master',
'start': start,
'count': count})]
return res
except Exception as e:
raise e
finally:
session.close()


def _get_all_trash_repo_list(self):
session = self.db_session()
try:
cmd = """SELECT repo_id, repo_name, head_id, owner_id,
size, org_id, del_time FROM RepoTrash ORDER BY del_time DESC"""
res = session.execute(text(cmd))
return self.to_dict(res)
except Exception as e:
raise e
finally:
session.close()

def _get_all_repo_list(self):
session = self.db_session()
try:
cmd = """SELECT r.repo_id, c.file_count FROM Repo r LEFT JOIN RepoFileCount c
ON r.repo_id = c.repo_id"""
res = session.execute(text(cmd))
return self.to_dict(res)
except Exception as e:
raise e
finally:
session.close()

def _get_repo_head_commit(self, repo_id):
session = self.db_session()
try:
cmd = """SELECT b.commit_id
from Branch as b inner join Repo as r
where b.repo_id=r.repo_id and b.repo_id=:repo_id"""
res = session.execute(text(cmd), {'repo_id': repo_id}).fetchone()
return res[0] if res else None
except Exception as e:
raise e
finally:
session.close()

def _get_repo_name_mtime_size(self, repo_id):
session = self.db_session()
try:
cmd = """SELECT RepoInfo.name, RepoInfo.update_time, RepoSize.size
FROM RepoInfo INNER JOIN RepoSize ON RepoInfo.repo_id = RepoSize.repo_id
AND RepoInfo.repo_id = :repo_id"""
res = session.execute(text(cmd), {'repo_id': repo_id})
return self.to_dict(res)
except Exception as e:
raise e
finally:
session.close()

def get_repo_name_mtime_size(self, repo_id):
try:
return self._get_repo_name_mtime_size(repo_id)
except Exception as e:
logger.error(e)
return self._get_repo_name_mtime_size(repo_id)

def get_all_repo_list(self):
try:
return self._get_all_repo_list()
except Exception as e:
logger.error(e)
return self._get_all_repo_list()

def get_all_trash_repo_list(self):
try:
return self._get_all_trash_repo_list()
except Exception as e:
logger.error(e)
return self._get_all_trash_repo_list()

def get_repo_id_commit_id(self, start, count):
try:
return self._get_repo_id_commit_id(start, count)
except Exception as e:
logger.error(e)
return self._get_repo_id_commit_id(start, count)

def get_repo_head_commit(self, repo_id):
try:
return self._get_repo_head_commit(repo_id)
except Exception as e:
logger.error(e)
return self._get_repo_head_commit(repo_id)


repo_data = RepoData()
79 changes: 79 additions & 0 deletions repo_data/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging
import configparser

from urllib.parse import quote_plus

from sqlalchemy import create_engine
from sqlalchemy.event import contains as has_event_listener, listen as add_event_listener
from sqlalchemy.exc import DisconnectionError
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import Pool


# base class of model classes in events.models and stats.models
class Base(DeclarativeBase):
pass


logger = logging.getLogger('seafes')

def create_engine_from_conf(config_file):
seaf_conf = configparser.ConfigParser()
seaf_conf.read(config_file)
backend = seaf_conf.get('database', 'type')
if backend == 'mysql':
db_server = 'localhost'
db_port = 3306

if seaf_conf.has_option('database', 'host'):
db_server = seaf_conf.get('database', 'host')
if seaf_conf.has_option('database', 'port'):
db_port =seaf_conf.getint('database', 'port')
db_username = seaf_conf.get('database', 'user')
db_passwd = seaf_conf.get('database', 'password')
db_name = seaf_conf.get('database', 'db_name')
db_url = "mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8" % \
(db_username, quote_plus(db_passwd),
db_server, db_port, db_name)
else:
logger.critical("Unknown Database backend: %s" % backend)
raise RuntimeError("Unknown Database backend: %s" % backend)

kwargs = dict(pool_recycle=300, echo=False, echo_pool=False)

engine = create_engine(db_url, **kwargs)
if not has_event_listener(Pool, 'checkout', ping_connection):
# We use has_event_listener to double check in case we call create_engine
# multipe times in the same process.
add_event_listener(Pool, 'checkout', ping_connection)

return engine

def init_db_session_class(config_file):
"""Configure Session class for mysql according to the config file."""
try:
engine = create_engine_from_conf(config_file)
except (configparser.NoOptionError, configparser.NoSectionError) as e:
logger.error(e)
raise RuntimeError("invalid config file %s", config_file)

Session = sessionmaker(bind=engine)
return Session

# This is used to fix the problem of "MySQL has gone away" that happens when
# mysql server is restarted or the pooled connections are closed by the mysql
# server beacause being idle for too long.
#
# See http://stackoverflow.com/a/17791117/1467959
def ping_connection(dbapi_connection, connection_record, connection_proxy): # pylint: disable=unused-argument
cursor = dbapi_connection.cursor()
try:
cursor.execute("SELECT 1")
cursor.close()
except:
logger.info('fail to ping database server, disposing all cached connections')
connection_proxy._pool.dispose() # pylint: disable=protected-access

# Raise DisconnectionError so the pool would create a new connection
raise DisconnectionError()
Empty file added repo_metadata/__init__.py
Empty file.
22 changes: 22 additions & 0 deletions repo_metadata/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging


def RepoMetadataUpdateHandler(config, redis_connection, msg):

elements = msg['content'].split('\t')
if len(elements) != 3:
logging.warning("got bad message: %s", elements)
return

try:
if redis_connection.publish('metadata_update', msg['content']) > 0:
logging.debug('Publish event: %s' % msg['content'])
else:
logging.info('No one subscribed to metadata_update channel, event (%s) has not been send' % msg['content'])
except Exception as e:
logging.error(e)
logging.error("Failed to publish metadata_update event: %s " % msg['content'])


def register_handlers(handlers):
handlers.add_handler('seaf_server.event:repo-update', RepoMetadataUpdateHandler)
Loading

0 comments on commit 91ef4f9

Please sign in to comment.