Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return StructuredDataset which is a field in a dataclass #3071

Closed
wants to merge 14 commits into from
Prev Previous commit
Next Next commit
test
Signed-off-by: Nelson Chen <[email protected]>
arbaobao committed Feb 7, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 60142d0e644710052e4f9da489c4c85743e4d90a
2 changes: 1 addition & 1 deletion tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
@@ -893,7 +893,7 @@ def test_attr_access_dc_sd():
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="parquet")

execution_id = run("attr_access_dc_sd.py", "wf")
execution_id = run("attr_access_dc_sd.py", "wf", "--uri", remote_file_path)
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
execution = remote.fetch_execution(name=execution_id)
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
Original file line number Diff line number Diff line change
@@ -14,13 +14,13 @@ class DC:


@task
def create_dc() -> DC:
def create_dc(uri: str) -> DC:
"""Create a dataclass with a StructuredDataset attribute.
Returns:
dc: A dataclass with a StructuredDataset attribute.
"""
dc = DC(sd=StructuredDataset(dataframe=pd.DataFrame({"a": [5]})))
dc = DC(sd=StructuredDataset(uri=uri, file_format="parquet"))

return dc

@@ -35,10 +35,10 @@ def read_sd(dc: DC) -> StructuredDataset:


@workflow
def wf() -> None:
dc = create_dc()
def wf(uri: str) -> None:
dc = create_dc(uri=uri)
r = read_sd(dc=dc)


if __name__ == "__main__":
wf()
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider configurable parquet file path

Consider making the parquet file path configurable through environment variables or configuration files instead of hardcoding it. This would make the code more flexible and easier to maintain across different environments.

Code suggestion
Check the AI-generated fix before applying
Suggested change
wf(uri="tests/flytekit/integration/remote/workflows/basic/data/df.parquet")
import os
default_path = "tests/flytekit/integration/remote/workflows/basic/data/df.parquet"
wf(uri=os.getenv("PARQUET_FILE_PATH", default_path))

Code Review Run #7e2d92


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Sorry, something went wrong.


Unchanged files with check annotations Beta

# def extract(d: Data) -> StructuredDataset:
# return d.f
if isinstance(python_val._literal_sd, StructuredDataset):
sdt = StructuredDatasetType(format=python_val._literal_sd.file_format)
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt)
sd_literal = literals.StructuredDataset(uri=python_val._literal_sd.uri, metadata=metad)
return Literal(scalar=Scalar(structured_dataset=sd_literal))

Check warning on line 744 in flytekit/types/structured/structured_dataset.py

Codecov / codecov/patch

flytekit/types/structured/structured_dataset.py#L741-L744

Added lines #L741 - L744 were not covered by tests
return Literal(scalar=Scalar(structured_dataset=python_val._literal_sd))