Skip to content

Commit

Permalink
feat(search): add wiki search
Browse files Browse the repository at this point in the history
  • Loading branch information
cir9no committed Oct 23, 2024
1 parent cb60b53 commit cba51d5
Show file tree
Hide file tree
Showing 15 changed files with 940 additions and 49 deletions.
3 changes: 3 additions & 0 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from seafevents.seafevent_server.seafevent_server import SeafEventServer
from seafevents.app.config import ENABLE_METADATA_MANAGEMENT
from seafevents.seasearch.index_task.filename_index_updater import RepoFilenameIndexUpdater
from seafevents.seasearch.index_task.wiki_index_updater import WikiIndexUpdater


class App(object):
Expand Down Expand Up @@ -42,6 +43,7 @@ def __init__(self, config, ccnet_config, seafile_config,
self._index_worker = RepoMetadataIndexWorker(config)
self._slow_task_handler = SlowTaskHandler(config)
self._repo_filename_index_updater = RepoFilenameIndexUpdater(config)
self._wiki_index_updater = WikiIndexUpdater(config)

def serve_forever(self):
if self._fg_tasks_enabled:
Expand All @@ -66,3 +68,4 @@ def serve_forever(self):
self._index_worker.start()
self._slow_task_handler.start()
self._repo_filename_index_updater.start()
self._wiki_index_updater.start()
4 changes: 2 additions & 2 deletions repo_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def _get_virtual_repo_in_repos(self, repo_ids):
if not repo_ids:
return []
try:
cmd = """SELECT repo_id from VirtualRepo WHERE repo_id IN {}""".format(tuple(repo_ids))
formatted_ids = ", ".join("'{}'".format(id) for id in repo_ids)
cmd = """SELECT repo_id from VirtualRepo WHERE repo_id IN ({})""".format(formatted_ids)
res = session.execute(text(cmd)).fetchall()
return res
except Exception as e:
Expand Down Expand Up @@ -142,5 +143,4 @@ def get_virtual_repo_in_repos(self, repo_ids):
logger.error(e)
return self._get_virtual_repo_in_repos(repo_ids)


repo_data = RepoData()
33 changes: 33 additions & 0 deletions seafevent_server/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,36 @@ def extract_file_details():
details = add_file_details(repo_id, obj_ids, metadata_server_api, face_recognition_task_manager, embedding_faces=False)

return {'details': details}, 200


@app.route('/wiki-search', methods=['POST'])
def search_wiki():
is_valid = check_auth_token(request)
if not is_valid:
return {'error_msg': 'Permission denied'}, 403

# Check seasearch is enable
if not index_task_manager.enabled:
return {'error_msg': 'Seasearch is not enabled by seafevents.conf'}
try:
data = json.loads(request.data)
except Exception as e:
logger.exception(e)
return {'error_msg': 'Bad request.'}, 400

query = data.get('query').strip()
wiki = data.get('wiki')

if not query:
return {'error_msg': 'query invalid.'}, 400
if not wiki:
return {'error_msg': 'wiki invalid.'}, 400

try:
count = int(data.get('count'))
except:
count = 20

results = index_task_manager.wiki_search(query, wiki, count)

return {'results': results}, 200
49 changes: 46 additions & 3 deletions seasearch/index_store/index_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from seafevents.seasearch.utils import need_index_metadata_info
from seafevents.db import init_db_session_class
from seafevents.seasearch.utils.constants import ZERO_OBJ_ID, REPO_FILENAME_INDEX_PREFIX
from seafevents.seasearch.utils.constants import ZERO_OBJ_ID, REPO_FILENAME_INDEX_PREFIX, \
WIKI_INDEX_PREFIX
from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI
from seafevents.repo_metadata.utils import METADATA_TABLE
from seafevents.utils import timestamp_to_isoformat_timestr
Expand Down Expand Up @@ -57,9 +58,9 @@ def update_library_filename_index(self, repo_id, commit_id, repo_filename_index,
commit_id = to_commit
time.sleep(1)

repo_status_filename_index.begin_update_repo(repo_id, commit_id, new_commit_id, metadata_last_updated_time)
repo_status_filename_index.begin_update_repo(repo_id, commit_id, new_commit_id, metadata_updated_time=metadata_last_updated_time)
repo_filename_index.update(index_name, repo_id, commit_id, new_commit_id, rows, self.metadata_server_api, need_index_metadata)
repo_status_filename_index.finish_update_repo(repo_id, new_commit_id, metadata_query_time)
repo_status_filename_index.finish_update_repo(repo_id, new_commit_id, metadata_updated_time=metadata_query_time)

logger.info('repo: %s, update repo filename index success', repo_id)

Expand All @@ -74,3 +75,45 @@ def delete_repo_filename_index(self, repo_id, repo_filename_index, repo_status_f

def keyword_search(self, query, repos, repo_filename_index, count, suffixes=None, search_path=None, obj_type=None):
return repo_filename_index.search_files(repos, query, 0, count, suffixes, search_path, obj_type)

def delete_wiki_index(self, wiki_id, wiki_index, wiki_status_index):
# first delete wiki_index
wiki_index_name = WIKI_INDEX_PREFIX + wiki_id
wiki_index.delete_index_by_index_name(wiki_index_name)
wiki_status_index.delete_documents_by_repo(wiki_id)

def wiki_search(self, query, wiki, wiki_index, count):
return wiki_index.search_wiki(wiki, query, 0, count)

def update_wiki_index(self, wiki_id, commit_id, wiki_index, wiki_status_index):
try:
new_commit_id = commit_id
index_name = WIKI_INDEX_PREFIX + wiki_id

wiki_index.create_index_if_missing(index_name)

wiki_status = wiki_status_index.get_repo_status_by_id(wiki_id)
from_commit = wiki_status.from_commit
to_commit = wiki_status.to_commit

if new_commit_id == from_commit:
return

if not from_commit:
commit_id = ZERO_OBJ_ID
else:
commit_id = from_commit

if wiki_status.need_recovery():
logger.warning('%s: wiki index inrecovery', wiki_id)
wiki_index.update(index_name, wiki_id, commit_id, to_commit)
commit_id = to_commit
time.sleep(1)
wiki_status_index.begin_update_repo(wiki_id, commit_id, new_commit_id)
wiki_index.update(index_name, wiki_id, commit_id, new_commit_id)
wiki_status_index.finish_update_repo(wiki_id, new_commit_id)

logger.info('wiki: %s, update wiki index success', wiki_id)

except Exception as e:
logger.exception('wiki_id: %s, update wiki index error: %s.', wiki_id, e)
2 changes: 1 addition & 1 deletion seasearch/index_store/repo_file_name_index.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import logging

Expand Down Expand Up @@ -174,6 +173,7 @@ def search_files(self, repos, keyword, start=0, size=10, suffixes=None, search_p
bulk_search_params.append(data)
search_path = None


results = self.seasearch_api.m_search(bulk_search_params)
files = []

Expand Down
98 changes: 59 additions & 39 deletions seasearch/index_store/repo_status_index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from seafevents.seasearch.utils.constants import REPO_STATUS_FILENAME_INDEX_NAME


class RepoStatus(object):
def __init__(self, repo_id, from_commit, to_commit, metadata_updated_time):
def __init__(self, repo_id, from_commit, to_commit, **kwargs):
self.repo_id = repo_id
self.from_commit = from_commit
self.to_commit = to_commit
self.metadata_updated_time = metadata_updated_time
if 'metadata_updated_time' in kwargs:
self.metadata_updated_time = kwargs['metadata_updated_time']

def need_recovery(self):
return self.to_commit is not None
Expand Down Expand Up @@ -35,9 +39,6 @@ class RepoStatusIndex(object):
'updatingto': {
'type': 'keyword'
},
'metadata_updated_time': {
'type': 'keyword'
},
},
}

Expand All @@ -46,8 +47,15 @@ def __init__(self, seasearch_api, index_name):
self.seasearch_api = seasearch_api
self.create_index_if_missing()

def is_status_filename_index(self):
return self.index_name.startswith(REPO_STATUS_FILENAME_INDEX_NAME)

def create_index_if_missing(self):
if not self.seasearch_api.check_index_mapping(self.index_name).get('is_exist'):
if self.is_status_filename_index():
self.mapping['properties']['metadata_updated_time'] = {
'type': 'keyword'
}
data = {
'mappings': self.mapping,
}
Expand All @@ -56,35 +64,45 @@ def create_index_if_missing(self):
def check_repo_status(self, repo_id):
return self.seasearch_api.check_document_by_id(self.index_name, repo_id).get('is_exist')

def add_repo_status(self, repo_id, commit_id, updatingto, metadata_updated_time):
date = {
def add_repo_status(self, repo_id, commit_id, updatingto, **kwargs):
data = {
'repo_id': repo_id,
'commit_id': commit_id,
'updatingto': updatingto,
'metadata_updated_time': metadata_updated_time,
}

if 'metadata_updated_time' in kwargs:
data.update(metadata_updated_time=kwargs['metadata_updated_time'])

doc_id = repo_id
self.seasearch_api.create_document_by_id(self.index_name, doc_id, date)
self.seasearch_api.create_document_by_id(self.index_name, doc_id, data)

def begin_update_repo(self, repo_id, old_commit_id, new_commit_id, metadata_updated_time):
self.add_repo_status(repo_id, old_commit_id, new_commit_id, metadata_updated_time)
def begin_update_repo(self, repo_id, old_commit_id, new_commit_id, **kwargs):
self.add_repo_status(repo_id, old_commit_id, new_commit_id, **kwargs)

def finish_update_repo(self, repo_id, commit_id, metadata_updated_time):
self.add_repo_status(repo_id, commit_id, None, metadata_updated_time)
def finish_update_repo(self, repo_id, commit_id, **kwargs):
self.add_repo_status(repo_id, commit_id, None, **kwargs)

def delete_documents_by_repo(self, repo_id):
return self.seasearch_api.delete_document_by_id(self.index_name, repo_id)

def get_repo_status_by_id(self, repo_id):
doc = self.seasearch_api.get_document_by_id(self.index_name, repo_id)
if doc.get('error'):
return RepoStatus(repo_id, None, None, None)
if self.is_status_filename_index():
return RepoStatus(repo_id, None, None, metadata_updated_time=None)
else:
return RepoStatus(repo_id, None, None)

commit_id = doc['_source']['commit_id']
updatingto = doc['_source']['updatingto']
metadata_updated_time = doc['_source']['metadata_updated_time']
repo_id = doc['_source']['repo_id']

return RepoStatus(repo_id, commit_id, updatingto, metadata_updated_time)
if self.is_status_filename_index():
metadata_updated_time = doc['_source']['metadata_updated_time']
return RepoStatus(repo_id, commit_id, updatingto, metadata_updated_time=metadata_updated_time)

return RepoStatus(repo_id, commit_id, updatingto)

def update_repo_status_by_id(self, doc_id, data):
self.seasearch_api.update_document_by_id(self.index_name, doc_id, data)
Expand All @@ -93,27 +111,28 @@ def get_repo_status_by_time(self, check_time):
per_size = 2000
start = 0
repo_head_list = []
while True:
query_params = {
"query": {
"bool": {
"must": [
{"range":
{"@timestamp":
{
"lt": check_time
}
query_params = {
"query": {
"bool": {
"must": [
{"range":
{"@timestamp":
{
"lt": check_time
}
}
]
}
},
"_source": ["commit_id", "updatingto", "metadata_updated_time"],
"from": start,
"size": per_size,
"sort": ["-@timestamp"],
}

}
]
}
},
"_source": ["commit_id", "updatingto"],
"from": start,
"size": per_size,
"sort": ["-@timestamp"],
}
if self.is_status_filename_index():
query_params['_source'].append('metadata_updated_time')
while True:
repo_heads, total = self._repo_head_search(query_params)
repo_head_list.extend(repo_heads)
start += per_size
Expand Down Expand Up @@ -151,13 +170,14 @@ def _repo_head_search(self, query_params):
repo_id = hit['_id']
commit_id = hit.get('_source').get('commit_id')
updatingto = hit.get('_source').get('updatingto')
metadata_updated_time = hit.get('_source').get('metadata_updated_time')
repo_heads.append({
repo_head = {
'repo_id': repo_id,
'commit_id': commit_id,
'updatingto': updatingto,
'metadata_updated_time': metadata_updated_time,
})
}
if 'metadata_updated_time' in hit.get('_source', {}):
repo_head['metadata_updated_time'] = hit.get('_source').get('metadata_updated_time')
repo_heads.append(repo_head)
return repo_heads, total

def delete_index_by_index_name(self):
Expand Down
Loading

0 comments on commit cba51d5

Please sign in to comment.