forked from amundsen-io/amundsendatabuilder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
elasticsearch_publisher.py
147 lines (115 loc) · 5.98 KB
/
elasticsearch_publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
from typing import List
from amundsen_common.models.index_map import TABLE_INDEX_MAP
from elasticsearch.exceptions import NotFoundError
from pyhocon import ConfigTree
from databuilder.publisher.base_publisher import Publisher
LOGGER = logging.getLogger(__name__)
##################################################################################################
#
# ElasticsearchPublisher is being deprecated in favor of using SearchMetadatatoElasticasearchTask
# which publishes ES metadata with mappings compatible with amundsensearch >= 4.0.0
#
##################################################################################################
class ElasticsearchPublisher(Publisher):
"""
Elasticsearch Publisher uses Bulk API to load data from JSON file.
A new index is created and data is uploaded into it. After the upload
is complete, index alias is swapped to point to new index from old index
and traffic is routed to new index.
Old index is deleted after the alias swap is complete
"""
FILE_PATH_CONFIG_KEY = 'file_path'
FILE_MODE_CONFIG_KEY = 'mode'
ELASTICSEARCH_CLIENT_CONFIG_KEY = 'client'
ELASTICSEARCH_DOC_TYPE_CONFIG_KEY = 'doc_type'
ELASTICSEARCH_NEW_INDEX_CONFIG_KEY = 'new_index'
ELASTICSEARCH_ALIAS_CONFIG_KEY = 'alias'
ELASTICSEARCH_MAPPING_CONFIG_KEY = 'mapping'
# config to control how many max documents to publish at a time
ELASTICSEARCH_PUBLISHER_BATCH_SIZE = 'batch_size'
DEFAULT_ELASTICSEARCH_INDEX_MAPPING = TABLE_INDEX_MAP
def __init__(self) -> None:
super(ElasticsearchPublisher, self).__init__()
def init(self, conf: ConfigTree) -> None:
self.conf = conf
self.file_path = self.conf.get_string(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY)
self.file_mode = self.conf.get_string(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY, 'r')
self.elasticsearch_type = self.conf.get_string(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY)
self.elasticsearch_client = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY)
self.elasticsearch_new_index = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY)
self.elasticsearch_alias = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY)
self.elasticsearch_mapping = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY,
ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING)
self.elasticsearch_batch_size = self.conf.get(ElasticsearchPublisher.ELASTICSEARCH_PUBLISHER_BATCH_SIZE,
10000)
self.file_handler = open(self.file_path, self.file_mode)
def _fetch_old_index(self) -> List[str]:
"""
Retrieve all indices that currently have {elasticsearch_alias} alias
:return: list of elasticsearch indices
"""
try:
indices = self.elasticsearch_client.indices.get_alias(self.elasticsearch_alias).keys()
return indices
except NotFoundError:
LOGGER.warn("Received index not found error from Elasticsearch. " +
"The index doesn't exist for a newly created ES. It's OK on first run.")
# return empty list on exception
return []
def publish_impl(self) -> None:
"""
Use Elasticsearch Bulk API to load data from file to a {new_index}.
After upload, swap alias from {old_index} to {new_index} in a atomic operation
to route traffic to {new_index}
"""
LOGGER.warn('ElasticsearchPublisher is being deprecated in favor of using SearchMetadatatoElasticasearchTask\
which publishes ES metadata with mappings compatible with amundsensearch >= 4.0.0')
actions = [json.loads(line) for line in self.file_handler.readlines()]
# ensure new data exists
if not actions:
LOGGER.warning("received no data to upload to Elasticsearch!")
return
# Convert object to json for elasticsearch bulk upload
# Bulk load JSON format is defined here:
# https://www.elastic.co/guide/en/elasticsearch/reference/6.2/docs-bulk.html
bulk_actions = []
cnt = 0
# create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)
for action in actions:
index_row = dict(index=dict(_index=self.elasticsearch_new_index))
action['resource_type'] = self.elasticsearch_type
bulk_actions.append(index_row)
bulk_actions.append(action)
cnt += 1
if cnt == self.elasticsearch_batch_size:
self.elasticsearch_client.bulk(bulk_actions)
LOGGER.info('Publish %i of records to ES', cnt)
cnt = 0
bulk_actions = []
# Do the final bulk actions
if bulk_actions:
self.elasticsearch_client.bulk(bulk_actions)
# fetch indices that have {elasticsearch_alias} as alias
elasticsearch_old_indices = self._fetch_old_index()
# update alias to point to the new index
actions = [{"add": {"index": self.elasticsearch_new_index, "alias": self.elasticsearch_alias}}]
# delete old indices
delete_actions = [{"remove_index": {"index": index}} for index in elasticsearch_old_indices]
actions.extend(delete_actions)
update_action = {"actions": actions}
# perform alias update and index delete in single atomic operation
self.elasticsearch_client.indices.update_aliases(update_action)
def close(self) -> None:
"""
close the file handler
:return:
"""
if self.file_handler:
self.file_handler.close()
def get_scope(self) -> str:
return 'publisher.elasticsearch'