Skip to content

Commit

Permalink
update err
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonwang-db committed Jan 16, 2025
1 parent e2cff22 commit 3124755
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,6 @@ def main(infile: IO, outfile: IO) -> None:
},
)
outfile.flush()
except Exception as e:
error_msg = "data source {} throw exception: {}".format(data_source.name, e)
raise PySparkRuntimeError(
errorClass="PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR",
messageParameters={"msg": error_msg},
)
finally:
reader.stop()
except BaseException as e:
Expand Down
62 changes: 27 additions & 35 deletions python/pyspark/sql/worker/python_streaming_sink_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,44 +89,36 @@ def main(infile: IO, outfile: IO) -> None:
)
# Receive the `overwrite` flag.
overwrite = read_bool(infile)
# Instantiate data source reader.
try:
# Create the data source writer instance.
writer = data_source.streamWriter(schema=schema, overwrite=overwrite)
# Create the data source writer instance.
writer = data_source.streamWriter(schema=schema, overwrite=overwrite)
# Receive the commit messages.
num_messages = read_int(infile)

# Receive the commit messages.
num_messages = read_int(infile)
commit_messages = []
for _ in range(num_messages):
message = pickleSer._read_with_length(infile)
if message is not None and not isinstance(message, WriterCommitMessage):
raise PySparkAssertionError(
errorClass="DATA_SOURCE_TYPE_MISMATCH",
messageParameters={
"expected": "an instance of WriterCommitMessage",
"actual": f"'{type(message).__name__}'",
},
)
commit_messages.append(message)
commit_messages = []
for _ in range(num_messages):
message = pickleSer._read_with_length(infile)
if message is not None and not isinstance(message, WriterCommitMessage):
raise PySparkAssertionError(
errorClass="DATA_SOURCE_TYPE_MISMATCH",
messageParameters={
"expected": "an instance of WriterCommitMessage",
"actual": f"'{type(message).__name__}'",
},
)
commit_messages.append(message)

batch_id = read_long(infile)
abort = read_bool(infile)
batch_id = read_long(infile)
abort = read_bool(infile)

# Commit or abort the Python data source write.
# Note the commit messages can be None if there are failed tasks.
if abort:
writer.abort(commit_messages, batch_id)
else:
writer.commit(commit_messages, batch_id)
# Send a status code back to JVM.
write_int(0, outfile)
outfile.flush()
except Exception as e:
error_msg = "data source {} throw exception: {}".format(data_source.name, e)
raise PySparkRuntimeError(
errorClass="PYTHON_STREAMING_DATA_SOURCE_RUNTIME_ERROR",
messageParameters={"action": "commitOrAbort", "error": error_msg},
)
# Commit or abort the Python data source write.
# Note the commit messages can be None if there are failed tasks.
if abort:
writer.abort(commit_messages, batch_id)
else:
writer.commit(commit_messages, batch_id)
# Send a status code back to JVM.
write_int(0, outfile)
outfile.flush()
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
Expand Down

0 comments on commit 3124755

Please sign in to comment.