Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] File staging to user worker support #34208

Merged
merged 11 commits into from
Mar 20, 2025

Conversation

stankiewicz
Copy link
Contributor

@stankiewicz stankiewicz commented Mar 7, 2025

Add files to stage flag for python sdk to support uploading arbitrary files to user worker.

Avoiding modification of container boot.
Files are staged to location that depends on runner (for python it's: semi_persistent_directory = environment.get('SEMI_PERSISTENT_DIRECTORY', None) + '/staged')

@stankiewicz stankiewicz changed the title File staging [Python] File staging to user worker support Mar 7, 2025
@stankiewicz stankiewicz changed the title [Python] File staging to user worker support [WIP][Python] File staging to user worker support Mar 7, 2025
@stankiewicz stankiewicz changed the title [WIP][Python] File staging to user worker support [Python] File staging to user worker support Mar 11, 2025
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @liferoad for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@liferoad
Copy link
Contributor

I think this might have the similar issues for FlinkRunner: #32743

@stankiewicz
Copy link
Contributor Author

stankiewicz commented Mar 11, 2025

I think this might have the similar issues for FlinkRunner: #32743

Here I want to plumb staging arbitrary files using stager and documenting how it should be read on portability. I guess for other runners there has to be some future work to make those files available under environment.get('SEMI_PERSISTENT_DIRECTORY', None) + '/staged') path.

@liferoad
Copy link
Contributor

I mean this might not work for other runners. Can you test this with Dataflow to confirm this works?

@stankiewicz
Copy link
Contributor Author

Sure. This works for python sdk, runner V2. I'm trying to test in with expansion service (no env variable for sure but I want to check if the stage directory with those files is there).

@tvalentyn
Copy link
Contributor

tvalentyn commented Mar 19, 2025

Thanks for this change! I am seeing some errors in precommit tests:

2025-03-13T19:44:26.9812107Z [gw0] [ 57%] FAILED apache_beam/runners/portability/stager_test.py::StagerTest::test_files_to_stage 

@tvalentyn
Copy link
Contributor

The logs are a bit painful to dig through, but seeing this:

2025-03-13T19:47:11.3815653Z =================================== FAILURES ===================================
2025-03-13T19:47:11.3818275Z ________________________ StagerTest.test_files_to_stage ________________________
2025-03-13T19:47:11.3820773Z [gw0] linux -- Python 3.9.21 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/bin/python
2025-03-13T19:47:11.3823360Z 
2025-03-13T19:47:11.3824705Z self = <apache_beam.runners.portability.stager_test.StagerTest testMethod=test_files_to_stage>
2025-03-13T19:47:11.3826190Z 
2025-03-13T19:47:11.3826723Z     def test_files_to_stage(self):
2025-03-13T19:47:11.3829344Z       staging_dir = self.make_temp_dir()
2025-03-13T19:47:11.3832503Z       source_dir = self.make_temp_dir()
2025-03-13T19:47:11.3834541Z     
2025-03-13T19:47:11.3836213Z       foo_ca = os.path.join(source_dir, 'foo.ca')
2025-03-13T19:47:11.3838552Z       self.create_temp_file(foo_ca, 'ca content')
2025-03-13T19:47:11.3840929Z       test_txt = os.path.join(source_dir, 'test.txt')
2025-03-13T19:47:11.3843499Z       self.create_temp_file(test_txt, 'test content')
2025-03-13T19:47:11.3845079Z       files_to_stage = ','.join([foo_ca, test_txt])
2025-03-13T19:47:11.3847306Z       options = PipelineOptions()
2025-03-13T19:47:11.3849055Z       self.update_options(options)
2025-03-13T19:47:11.3850985Z       options.view_as(SetupOptions).files_to_stage = files_to_stage
2025-03-13T19:47:11.3852461Z     
2025-03-13T19:47:11.3853584Z       self.assertEqual(
2025-03-13T19:47:11.3854856Z           ['foo.ca', 'test.txt', stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
2025-03-13T19:47:11.3857036Z >         self.stager.create_and_stage_job_resources(
2025-03-13T19:47:11.3858954Z               options, staging_location=staging_dir)[1])
2025-03-13T19:47:11.3859776Z 
2025-03-13T19:47:11.3860425Z apache_beam/runners/portability/stager_test.py:446: 
2025-03-13T19:47:11.3861756Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
2025-03-13T19:47:11.3864703Z target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/stager.py:473: in create_and_stage_job_resources
2025-03-13T19:47:11.3867274Z     staged_resources = self.stage_job_resources(
2025-03-13T19:47:11.3871029Z target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/stager.py:419: in stage_job_resources
2025-03-13T19:47:11.3872197Z     self.stage_artifact(
2025-03-13T19:47:11.3873064Z apache_beam/runners/portability/stager_test.py:872: in stage_artifact
2025-03-13T19:47:11.3875318Z     shutil.copyfile(local_path_to_artifact, artifact_name)
2025-03-13T19:47:11.3877725Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
2025-03-13T19:47:11.3879358Z 
2025-03-13T19:47:11.3880198Z src = '/'
2025-03-13T19:47:11.3883788Z dst = '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmp_ql9ukkm/tmpipf9odwi/'
2025-03-13T19:47:11.3886848Z 
2025-03-13T19:47:11.3887635Z     def copyfile(src, dst, *, follow_symlinks=True):
2025-03-13T19:47:11.3889620Z         """Copy data from src to dst in the most efficient way possible.
2025-03-13T19:47:11.3891616Z     
2025-03-13T19:47:11.3893647Z         If follow_symlinks is not set and src is a symbolic link, a new
2025-03-13T19:47:11.3897752Z         symlink will be created instead of copying the file it points to.
2025-03-13T19:47:11.3900118Z     
2025-03-13T19:47:11.3901504Z         """
2025-03-13T19:47:11.3903384Z         sys.audit("shutil.copyfile", src, dst)
2025-03-13T19:47:11.3905493Z     
2025-03-13T19:47:11.3906932Z         if _samefile(src, dst):
2025-03-13T19:47:11.3909438Z             raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
2025-03-13T19:47:11.3911690Z     
2025-03-13T19:47:11.3913141Z         file_size = 0
2025-03-13T19:47:11.3914729Z         for i, fn in enumerate([src, dst]):
2025-03-13T19:47:11.3916615Z             try:
2025-03-13T19:47:11.3919122Z                 st = _stat(fn)
2025-03-13T19:47:11.3920134Z             except OSError:
2025-03-13T19:47:11.3920907Z                 # File most likely does not exist
2025-03-13T19:47:11.3921769Z                 pass
2025-03-13T19:47:11.3923362Z             else:
2025-03-13T19:47:11.3925363Z                 # XXX What about other special files? (sockets, devices...)
2025-03-13T19:47:11.3927687Z                 if stat.S_ISFIFO(st.st_mode):
2025-03-13T19:47:11.3929880Z                     fn = fn.path if isinstance(fn, os.DirEntry) else fn
2025-03-13T19:47:11.3932269Z                     raise SpecialFileError("`%s` is a named pipe" % fn)
2025-03-13T19:47:11.3934053Z                 if _WINDOWS and i == 0:
2025-03-13T19:47:11.3936113Z                     file_size = st.st_size
2025-03-13T19:47:11.3939119Z     
2025-03-13T19:47:11.3940984Z         if not follow_symlinks and _islink(src):
2025-03-13T19:47:11.3943201Z             os.symlink(os.readlink(src), dst)
2025-03-13T19:47:11.3945231Z         else:
2025-03-13T19:47:11.3946945Z >           with open(src, 'rb') as fsrc:
2025-03-13T19:47:11.3949370Z E           IsADirectoryError: [Errno 21] Is a directory: '/'
2025-03-13T19:47:11.3951018Z 
2025-03-13T19:47:11.3952846Z /opt/hostedtoolcache/Python/3.9.21/x64/lib/python3.9/shutil.py:264: IsADirectoryError

@stankiewicz
Copy link
Contributor Author

thanks, will take a look at it!

@liferoad liferoad added this to the 2.64.0 Release milestone Mar 19, 2025
@tvalentyn
Copy link
Contributor

Thanks!

@tvalentyn tvalentyn merged commit 8527d43 into apache:master Mar 20, 2025
92 checks passed
talatuyarer pushed a commit to talatuyarer/beam that referenced this pull request Mar 20, 2025
* file staging

* add tests

* help message

* format

* typo

* yapf

* docs

* support for multiple files

* changes.md

* fix test
prodriguezdefino pushed a commit to prodriguezdefino/beam-pabs that referenced this pull request Mar 26, 2025
* file staging

* add tests

* help message

* format

* typo

* yapf

* docs

* support for multiple files

* changes.md

* fix test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants