Skip to content

Commit

Permalink
Merge pull request #74 from tubular/fix/elasticsearch-7
Browse files Browse the repository at this point in the history
IMP support elasticsearch 7
  • Loading branch information
xym333 authored Sep 10, 2019
2 parents 29e61ad + c45334d commit 987d715
Show file tree
Hide file tree
Showing 17 changed files with 392 additions and 55 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.8.1
* Fix support for using multiple sparkly sessions during tests
* SparklySession does not persist modifications to os.environ
* Support ElasticSearch 7 by making type optional.

## 2.8.0
* Extend `SparklyCatalog` to work with database properties:
- `spark.catalog_ext.set_database_property`
Expand Down
31 changes: 27 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ services:
depends_on:
cassandra.docker:
condition: service_healthy
elastic.docker:
elastic6.docker:
condition: service_healthy
elastic7.docker:
condition: service_healthy
kafka.docker:
condition: service_healthy
Expand All @@ -37,7 +39,9 @@ services:
depends_on:
cassandra.docker:
condition: service_healthy
elastic.docker:
elastic6.docker:
condition: service_healthy
elastic7.docker:
condition: service_healthy
kafka.docker:
condition: service_healthy
Expand All @@ -56,10 +60,29 @@ services:
healthcheck:
test: ps ax | grep cassandra

elastic.docker:
elastic6.docker:
image: docker.elastic.co/elasticsearch/elasticsearch:6.5.4
environment:
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
healthcheck:
test: "curl -f http://localhost:9200/_cat/health | grep green"
interval: 5s
timeout: 5s
retries: 20

elastic7.docker:
image: docker.elastic.co/elasticsearch/elasticsearch:7.3.0
environment:
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
healthcheck:
test: ps ax | grep elastic
test: "curl -f http://localhost:9200/_cat/health | grep green"
interval: 5s
timeout: 5s
retries: 20

mysql.docker:
image: mysql:5.7
Expand Down
2 changes: 1 addition & 1 deletion sparkly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
assert SparklySession


__version__ = '2.8.0'
__version__ = '2.8.1'
10 changes: 6 additions & 4 deletions sparkly/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def elastic(self, host, es_index, es_type, query='', fields=None, port=None,
Args:
host (str): Elastic server host.
es_index (str): Elastic index.
es_type (str): Elastic type.
es_type (str|None): Elastic type. Deprecated in Elasticsearch 7 but required in below 7
query (str): Pre-filter es documents, e.g. '?q=views:>10'.
fields (list[str]|None): Select only specified fields.
port (int|None) Elastic server port.
Expand All @@ -164,7 +164,7 @@ def elastic(self, host, es_index, es_type, query='', fields=None, port=None,
assert self._spark.has_package('org.elasticsearch:elasticsearch-spark')

reader_options = {
'path': '{}/{}'.format(es_index, es_type),
'path': '{}/{}'.format(es_index, es_type) if es_type else es_index,
'format': 'org.elasticsearch.spark.sql',
'es.nodes': host,
'es.query': query,
Expand Down Expand Up @@ -330,10 +330,12 @@ def _resolve_elastic(self, parsed_url, parsed_qs):
if 'fields' in parsed_qs:
kwargs['fields'] = parsed_qs.pop('fields').split(',')

path_segments = parsed_url.path.split('/')

return self.elastic(
host=parsed_url.netloc,
es_index=parsed_url.path.split('/')[1],
es_type=parsed_url.path.split('/')[2],
es_index=path_segments[1],
es_type=path_segments[2] if len(path_segments) > 2 else None,
port=parsed_url.port,
parallelism=parsed_qs.pop('parallelism', None),
options=parsed_qs,
Expand Down
5 changes: 5 additions & 0 deletions sparkly/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

from copy import deepcopy
import os
import signal
import sys
Expand Down Expand Up @@ -77,8 +78,10 @@ class MySession(sparkly.SparklySession):
repositories = []

_instantiated_session = None
_original_environment = None

def __init__(self, additional_options=None):
SparklySession._original_environment = deepcopy(os.environ)
os.environ['PYSPARK_PYTHON'] = sys.executable

submit_args = [
Expand Down Expand Up @@ -138,6 +141,8 @@ def stop(cls):
if SparklySession._instantiated_session is not None:
SparkSession.stop(SparklySession._instantiated_session)
SparklySession._instantiated_session = None
os.environ = SparklySession._original_environment
SparklySession._original_environment = None

@property
def builder(self):
Expand Down
35 changes: 33 additions & 2 deletions sparkly/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import os
import pprint
import shutil
import signal
import sys
import tempfile
from unittest import TestCase
from unittest.util import safe_repr
import warnings

from pyspark.context import SparkContext
from pyspark.sql import types as T
import six

Expand Down Expand Up @@ -74,6 +76,32 @@
_test_session_cache = None


def _ensure_gateway_is_down():
# Apparently, the gateway and underlying JVM stay alive between different
# invocations of SparkContext, even when the context is explicitly stopped.
# This makes it impossible to have multiple SparklySessions for testing,
# with different JAR requirements etc; once the first one initializes the
# gateway / JVM, the other ones just re-use the existing gateway. So we have
# to kill it explicitly here.
if not SparkContext._gateway:
return

jvm_pid = int(
# Get the still active JVM
SparkContext._gateway.jvm
# Extract its process name (pid@hostname)
.java.lang.management.ManagementFactory.getRuntimeMXBean().getName()
# And keep the pid (yeah, unfortunately there's no easier way to
# get it in Java 8...)
.split('@')[0]
)
SparkContext._gateway.shutdown()
SparkContext._gateway = None
os.kill(jvm_pid, signal.SIGKILL)
os.environ.pop('PYSPARK_GATEWAY_PORT', None)
os.environ.pop('PYSPARK_GATEWAY_SECRET', None)


class SparklyTest(TestCase):
"""Base test for spark scrip tests.
Expand Down Expand Up @@ -122,6 +150,7 @@ def _init_session(cls):
logger.info('Found a global session, stopping it %r', _test_session_cache)
_test_session_cache.stop()
_test_session_cache = None
_ensure_gateway_is_down()

cls.spark = cls.setup_session()

Expand Down Expand Up @@ -151,6 +180,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
cls.spark.stop()
_ensure_gateway_is_down()
super(SparklyTest, cls).tearDownClass()

for fixture in cls.class_fixtures:
Expand Down Expand Up @@ -505,6 +535,7 @@ def _init_session(cls):
if _test_session_cache:
logger.info('Stopping the previous global session %r', _test_session_cache)
_test_session_cache.stop()
_ensure_gateway_is_down()

logger.info('Starting the new global session for %r', cls.session)
spark = _test_session_cache = cls.setup_session()
Expand Down Expand Up @@ -625,7 +656,7 @@ class ElasticFixture(Fixture):
...
"""

def __init__(self, host, es_index, es_type, mapping=None, data=None, port=None):
def __init__(self, host, es_index, es_type=None, mapping=None, data=None, port=None):
self.host = host
self.port = port or 9200
self.es_index = es_index
Expand All @@ -649,7 +680,7 @@ def setup_data(self):
)
self._request(
'PUT',
'/{}/_mapping/{}'.format(self.es_index, self.es_type),
'/{}/_mapping/{}'.format(self.es_index, self.es_type or ''),
self.read_file(self.mapping),
)

Expand Down
13 changes: 8 additions & 5 deletions sparkly/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def by_url(self, url):
elastic://localhost:9200/my_index/my_type?&parallelism=3&mode=overwrite
&es.write.operation=upsert
Note: type is deprecated in Elasticsearch 7.
Is an equivalent for::
Expand Down Expand Up @@ -154,14 +155,14 @@ def cassandra(self, host, keyspace, table, consistency=None, port=None, mode=Non

return self._basic_write(writer_options, options, parallelism, mode)

def elastic(self, host, es_index, es_type, port=None, mode=None,
def elastic(self, host, es_index, es_type=None, port=None, mode=None,
parallelism=None, options=None):
"""Write a dataframe into an ElasticSearch index.
Args:
host (str): Elastic server host.
es_index (str): Elastic index.
es_type (str): Elastic type.
es_type (str|None): Elastic type. Deprecated in Elasticsearch 7 but required in below 7
port (int|None) Elastic server port.
mode (str|None): Spark save mode,
http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes
Expand All @@ -173,7 +174,7 @@ def elastic(self, host, es_index, es_type, port=None, mode=None,
assert self._spark.has_package('org.elasticsearch:elasticsearch-spark')

writer_options = {
'path': '{}/{}'.format(es_index, es_type),
'path': '{}/{}'.format(es_index, es_type) if es_type else es_index,
'format': 'org.elasticsearch.spark.sql',
'es.nodes': host,
}
Expand Down Expand Up @@ -512,10 +513,12 @@ def _resolve_csv(self, parsed_url, parsed_qs):
)

def _resolve_elastic(self, parsed_url, parsed_qs):
path_segments = parsed_url.path.split('/')

return self.elastic(
host=parsed_url.netloc,
es_index=parsed_url.path.split('/')[1],
es_type=parsed_url.path.split('/')[2],
es_index=path_segments[1],
es_type=path_segments[2] if len(path_segments) > 2 else None,
port=parsed_url.port,
mode=parsed_qs.pop('mode', None),
parallelism=parsed_qs.pop('parallelism', None),
Expand Down
12 changes: 11 additions & 1 deletion tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class SparklyTestSession(SparklySession):
packages = [
'datastax:spark-cassandra-connector:2.4.0-s_2.11',
'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4',
'org.elasticsearch:elasticsearch-spark-20_2.11:7.3.0',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0',
'mysql:mysql-connector-java:6.0.6',
'io.confluent:kafka-avro-serializer:3.0.1',
Expand All @@ -43,3 +43,13 @@ class SparklyTestSession(SparklySession):
'collect_max': 'brickhouse.udf.collect.CollectMaxUDAF',
'length_of_text': (lambda text: len(text), StringType())
}


class SparklyTestSessionWithES6(SparklySession):
packages = [
'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4',
]

repositories = [
'http://packages.confluent.io/maven/',
]
2 changes: 2 additions & 0 deletions tests/integration/resources/test_fixtures/data_for_es7.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "index" : { "_index" : "sparkly_test_fixture", "_id": "1" } }
{ "name" : "John", "age": 56}
6 changes: 6 additions & 0 deletions tests/integration/resources/test_read/elastic7_setup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{ "index" : { "_index" : "sparkly_test", "_id": "1" } }
{ "name" : "John2", "topics": [1, 2, 3, 4, 5], "age": 56, "demo": { "age_30": 20, "age_10": 50 } }
{ "index" : { "_index" : "sparkly_test", "_id": "2" } }
{ "name" : "Smith3", "topics": [1, 4, 5], "age": 31, "demo": { "age_30": 110, "age_10": 50 } }
{ "index" : { "_index" : "sparkly_test", "_id": "3" } }
{ "name" : "Smith4", "topics": [4, 5], "age": 12, "demo": { "age_30": 20, "age_10": 1 } }
2 changes: 2 additions & 0 deletions tests/integration/resources/test_write/elastic7_setup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "index" : { "_index" : "sparkly_test", "_id": "1111" } }
{ "uid": "1111", "title": "xxxx", "views": 1111}
Loading

0 comments on commit 987d715

Please sign in to comment.