diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py b/python/pyspark/sql/streaming/python_streaming_source_runner.py index a7349779dc626..d5e90c4ccb60b 100644 --- a/python/pyspark/sql/streaming/python_streaming_source_runner.py +++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py @@ -183,12 +183,6 @@ def main(infile: IO, outfile: IO) -> None: }, ) outfile.flush() - except Exception as e: - error_msg = "data source {} throw exception: {}".format(data_source.name, e) - raise PySparkRuntimeError( - errorClass="PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", - messageParameters={"msg": error_msg}, - ) finally: reader.stop() except BaseException as e: diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py b/python/pyspark/sql/worker/python_streaming_sink_runner.py index 0d46fc9021213..3cf2f762fddde 100644 --- a/python/pyspark/sql/worker/python_streaming_sink_runner.py +++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py @@ -89,44 +89,36 @@ def main(infile: IO, outfile: IO) -> None: ) # Receive the `overwrite` flag. overwrite = read_bool(infile) - # Instantiate data source reader. - try: - # Create the data source writer instance. - writer = data_source.streamWriter(schema=schema, overwrite=overwrite) + # Create the data source writer instance. + writer = data_source.streamWriter(schema=schema, overwrite=overwrite) + # Receive the commit messages. + num_messages = read_int(infile) - # Receive the commit messages. - num_messages = read_int(infile) - commit_messages = [] - for _ in range(num_messages): - message = pickleSer._read_with_length(infile) - if message is not None and not isinstance(message, WriterCommitMessage): - raise PySparkAssertionError( - errorClass="DATA_SOURCE_TYPE_MISMATCH", - messageParameters={ - "expected": "an instance of WriterCommitMessage", - "actual": f"'{type(message).__name__}'", - }, - ) - commit_messages.append(message) + commit_messages = [] + for _ in range(num_messages): + message = pickleSer._read_with_length(infile) + if message is not None and not isinstance(message, WriterCommitMessage): + raise PySparkAssertionError( + errorClass="DATA_SOURCE_TYPE_MISMATCH", + messageParameters={ + "expected": "an instance of WriterCommitMessage", + "actual": f"'{type(message).__name__}'", + }, + ) + commit_messages.append(message) - batch_id = read_long(infile) - abort = read_bool(infile) + batch_id = read_long(infile) + abort = read_bool(infile) - # Commit or abort the Python data source write. - # Note the commit messages can be None if there are failed tasks. - if abort: - writer.abort(commit_messages, batch_id) - else: - writer.commit(commit_messages, batch_id) - # Send a status code back to JVM. - write_int(0, outfile) - outfile.flush() - except Exception as e: - error_msg = "data source {} throw exception: {}".format(data_source.name, e) - raise PySparkRuntimeError( - errorClass="PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR", - messageParameters={"action": "commitOrAbort", "error": error_msg}, - ) + # Commit or abort the Python data source write. + # Note the commit messages can be None if there are failed tasks. + if abort: + writer.abort(commit_messages, batch_id) + else: + writer.commit(commit_messages, batch_id) + # Send a status code back to JVM. + write_int(0, outfile) + outfile.flush() except BaseException as e: handle_worker_exception(e, outfile) sys.exit(-1)