Skip to content

Commit

Permalink
Async export excel 12.0 (#333)
Browse files Browse the repository at this point in the history
* update file access export

* optimize

* Update __init__.py

* optimize code

* update

* update

* optimize code

* update

* update

* update

---------

Co-authored-by: 孙永强 <[email protected]>
Co-authored-by: r350178982 <[email protected]>
Co-authored-by: ‘JoinTyang’ <[email protected]>
  • Loading branch information
4 people committed Jul 10, 2024
1 parent 22b3800 commit ae3404e
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 9 deletions.
12 changes: 5 additions & 7 deletions events/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,15 @@ def _get_user_activities(session, username, start, limit):
logger.error('limit must be positive')
raise RuntimeError('limit must be positive')

sub_query = (
sub_query = (
select(UserActivity.activity_id)
.where(UserActivity.username == username)

)
stmt = (
select(Activity)
.where(Activity.id.in_(sub_query))
.order_by(desc(Activity.timestamp))
.slice(start, start + limit)

)
events = session.scalars(stmt).all()

Expand All @@ -81,12 +79,12 @@ def get_user_activities(session, username, start, limit):
def _get_user_activities_by_timestamp(session, username, start, end):
events = []
try:
sub_query = (
sub_query = (
select(UserActivity.activity_id)
.where(
and_(
UserActivity.username == username,
UserActivity.timestamp.between(start, end)
and_(
UserActivity.username == username,
UserActivity.timestamp.between(start, end)
)
)
)
Expand Down
18 changes: 18 additions & 0 deletions events/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,21 @@ def __str__(self):
RepoID = %s, FilePath = %s, Permission = %s>" % \
(self.etype, self.from_user, self.to,
self.repo_id, self.file_path, self.permission)


class UserLogin(Base):
__tablename__ = 'sysadmin_extra_userloginlog'

id = mapped_column(Integer, primary_key=True, autoincrement=True)
login_date = mapped_column(DateTime, nullable=False)
username = mapped_column(String(length=255), nullable=False, index=True)
login_ip = mapped_column(String(length=45), nullable=False)
login_success = mapped_column(Integer, nullable=False)

def __init__(self, login_date, username, login_ip, login_success):
super().__init__()
self.login_date = login_date
self.username = username
self.login_ip = login_ip
self.login_success = login_success

103 changes: 103 additions & 0 deletions seafevent_server/export_task_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import queue
import threading
import logging
import time
import uuid
from seafevents.db import init_db_session_class
from seafevents.seafevent_server.utils import get_event_log_by_time_to_excel

logger = logging.getLogger('seafevents')


class EventExportTaskManager(object):

def __init__(self):
self.app = None
self._db_session_class = None
self.tasks_map = {}
self.task_results_map = {}
self.tasks_queue = queue.Queue(10)
self.current_task_info = {}
self.threads = []
self.conf = {
'workers': 3,
'expire_time': 30 * 60
}

def init(self, app, workers, task_expire_time, config):
self.app = app
self.conf['expire_time'] = task_expire_time
self.conf['workers'] = workers
self._db_session_class = init_db_session_class(config)

def is_valid_task_id(self, task_id):
return task_id in (self.tasks_map.keys() | self.task_results_map.keys())

def add_export_logs_task(self, start_time, end_time, log_type):
task_id = str(uuid.uuid4())
task = (get_event_log_by_time_to_excel, (self._db_session_class, start_time, end_time, log_type, task_id))

self.tasks_queue.put(task_id)
self.tasks_map[task_id] = task
return task_id

def query_status(self, task_id):
task_result = self.task_results_map.pop(task_id, None)
if task_result == 'success':
return True, None
if isinstance(task_result, str) and task_result.startswith('error_'):
return True, task_result[6:]
return False, None

def threads_is_alive(self):
info = {}
for t in self.threads:
info[t.name] = t.is_alive()
return info

def handle_task(self):
while True:
try:
task_id = self.tasks_queue.get(timeout=2)
except queue.Empty:
continue
except Exception as e:
logger.error(e)
continue
task = self.tasks_map.get(task_id)
if type(task) != tuple or len(task) < 1:
continue
if type(task[0]).__name__ != 'function':
continue
task_info = task_id + ' ' + str(task[0])
try:
self.current_task_info[task_id] = task_info
logging.info('Run task: %s' % task_info)
start_time = time.time()

# run
task[0](*task[1])
self.task_results_map[task_id] = 'success'

finish_time = time.time()
logging.info('Run task success: %s cost %ds \n' % (task_info, int(finish_time - start_time)))
self.current_task_info.pop(task_id, None)
except Exception as e:
self.task_results_map[task_id] = 'error_' + str(e.args[0])
logger.exception(e)
logger.error('Failed to handle task %s, error: %s \n' % (task_info, e))
self.current_task_info.pop(task_id, None)
finally:
self.tasks_map.pop(task_id, None)

def run(self):
thread_num = self.conf['workers']
for i in range(thread_num):
t_name = 'TaskManager Thread-' + str(i)
t = threading.Thread(target=self.handle_task, name=t_name)
self.threads.append(t)
t.setDaemon(True)
t.start()


event_export_task_manager = EventExportTaskManager()
50 changes: 49 additions & 1 deletion seafevent_server/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from flask import Flask, request, make_response
from seafevents.app.config import SEAHUB_SECRET_KEY
from seafevents.seafevent_server.task_manager import task_manager
from seafevents.seafevent_server.export_task_manager import event_export_task_manager


app = Flask(__name__)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -35,7 +37,7 @@ def add_init_metadata_task():

if task_manager.tasks_queue.full():
logger.warning('seafevent server busy, queue size: %d' % (task_manager.tasks_queue.qsize(), ))
return make_response(('dtable io server busy.', 400))
return make_response(('seafevent server busy.', 400))

username = request.args.get('username')
repo_id = request.args.get('repo_id')
Expand All @@ -48,3 +50,49 @@ def add_init_metadata_task():
return make_response((e, 500))

return make_response(({'task_id': task_id}, 200))


@app.route('/add-init-export-log-task', methods=['GET'])
def get_sys_logs_task():
is_valid, error = check_auth_token(request)
if not is_valid:
return make_response((error, 403))

if event_export_task_manager.tasks_queue.full():
logger.warning('seafevent server busy, queue size: %d, current tasks: %s, threads is_alive: %s'
% (event_export_task_manager.tasks_queue.qsize(), task_manager.current_task_info,
event_export_task_manager.threads_is_alive()))
return make_response(('seafevent server busy,.', 400))

start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
log_type = request.args.get('log_type')
try:
task_id = event_export_task_manager.add_export_logs_task(start_time, end_time, log_type)
except Exception as e:
logger.error(e)
return make_response((e, 500))

return make_response(({'task_id': task_id}, 200))


@app.route('/query-export-status', methods=['GET'])
def query_status():
is_valid, error = check_auth_token(request)
if not is_valid:
return make_response((error, 403))

task_id = request.args.get('task_id')
if not event_export_task_manager.is_valid_task_id(task_id):
return make_response(('task_id not found.', 404))

try:
is_finished, error = event_export_task_manager.query_status(task_id)
except Exception as e:
logger.debug(e)
return make_response((e, 500))

if error:
return make_response((error, 500))
return make_response(({'is_finished': is_finished}, 200))

4 changes: 3 additions & 1 deletion seafevent_server/seafevent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from gevent.pywsgi import WSGIServer
from seafevents.seafevent_server.request_handler import app as application
from seafevents.seafevent_server.task_manager import task_manager
from seafevents.seafevent_server.export_task_manager import event_export_task_manager


class SeafEventServer(Thread):
Expand All @@ -12,9 +13,10 @@ def __init__(self, app, config):
self._parse_config(config)
self.app = app
task_manager.init(self.app, self._workers, self._task_expire_time, config)
event_export_task_manager.init(self.app, self._workers, self._task_expire_time, config)

task_manager.run()

event_export_task_manager.run()
self._server = WSGIServer((self._host, int(self._port)), application)

def _parse_config(self, config):
Expand Down
Loading

0 comments on commit ae3404e

Please sign in to comment.