Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: pickle_library option is ignored #28558

Open
1 of 15 tasks
chuckwondo opened this issue Sep 20, 2023 · 9 comments
Open
1 of 15 tasks

[Bug]: pickle_library option is ignored #28558

chuckwondo opened this issue Sep 20, 2023 · 9 comments

Comments

@chuckwondo
Copy link

chuckwondo commented Sep 20, 2023

What happened?

When constructing a Pipeline, the option pickle_library is checked during construction, raising a ValueError when the value supplied is not one of the allowed values ("default", "dill", or "cloudpickle").

Unfortunately, however, it is ignored when it comes time to use pickling.

I was able to produce a PicklingError via a pipeline that uses a MultiZarrToZarr preprocessor, like so (abridged from linked issue, but illustrative):

from kerchunk.combine import drop

...

with beam.Pipeline() as pipeline:
    _ = (
        pipeline
        | beam.Create(pattern.items())
        | OpenWithKerchunk(file_type=pattern.file_type)
        | CombineReferences(
            concat_dims=["time"],
            identical_dims=["lat", "lon", "channel"],
            mzz_kwargs={"preprocess": drop("lst_unc_sys")},
        )
        | WriteCombinedReference(target_root, store_name)
    )

The error produced was the following:

  File "apache_beam/coders/coder_impl.py", line 270, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
  File ".../python3.11/site-packages/apache_beam/coders/coders.py", line 869, in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
              ^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'drop.<locals>.preproc'

It took quite a bit of digging, but I discovered that I should be able to address this issue by setting the following pipeline options:

  • save_main_session=True
  • pickle_library="cloudpickle"

Therefore I tried this:

from apache_beam.options.pipeline_options import PipelineOptions
from kerchunk.combine import drop

...

options = PipelineOptions(pickle_library="cloudpickle", save_main_session=True)

with beam.Pipeline(options=options) as pipeline:
    _ = (
        pipeline
        | beam.Create(pattern.items())
        | OpenWithKerchunk(file_type=pattern.file_type)
        | CombineReferences(
            concat_dims=["time"],
            identical_dims=["lat", "lon", "channel"],
            mzz_kwargs={"preprocess": drop("lst_unc_sys")},
        )
        | WriteCombinedReference(str(target_root), store_name)
    )

Unfortunately, this produced the identical error.

After a bit of digging, I discovered that the problem is with the PickleCoder class's _create_impl method (v2.50.0):

class PickleCoder(_PickleCoderBase):
  """Coder using Python's pickle functionality."""
  def _create_impl(self):
    dumps = pickle.dumps
    protocol = pickle.HIGHEST_PROTOCOL
    return coder_impl.CallbackCoderImpl(
        lambda x: dumps(x, protocol), pickle.loads)

  ...

Specifically, the method refers directly to pickle.dumps and pickle.loads from the standard pickle module (i.e., import pickle appears at the top of coders.py), rather than from the repo's pickler module, which is the module where the pickle library is set via the specified pipeline options described above.

When I added the import of pickler and locally modified the _create_impl method as follows, my pipeline ran without error:

from apache_beam.internal import pickler

...

class PickleCoder(_PickleCoderBase):
  """Coder using Python's pickle functionality."""
  def _create_impl(self):
    return coder_impl.CallbackCoderImpl(pickler.dumps, pickler.loads)

  ...

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@chuckwondo
Copy link
Author

.take-issue

@chuckwondo
Copy link
Author

After digging into this more, I see that a _MemoizingPickleCoder was added via #15357, and that coder makes use of the internal pickler module, which means that it honors any pickle_library option that may have been specified. This is precisely the approach (sans memoization) that I've identified above as a potential "fix" for PickleCoder.

However, the way I'm interpreting various related parts of the code, there appear to be some inconsistencies:

  1. _MemoizingPickleCoder overrides as_deterministic_coder, but DillCoder does not do so. Since pickle_library uses dill by default, that means that the _create_impl methods of _MemoizingPickleCoder and DillCoder are essentially functionally equivalent (ignoring memoization), when pickle_library is "default" or "dill", so I would expect that both coders should implement as_deterministic_coder the same way (i.e., either both override that method identically, or neither would override it).
  2. PickleCoder overrides as_deterministic_coder identically to _MemoizingPickleCoder (I suspect _MemoizingPickleCoder's implementation was copied from PickleCoder), but PickleCoder makes direct use of the standard pickle module rather than using Beam's internal pickler module (which is seemingly the crux of this bug). I don't know if this represents any sort of inconsistency, but they both return FastPrimitivesCoder(self, requires_deterministic=step_label), which doesn't work because FastPrimitivesCoder only takes 1 argument. It appears that this is being confused with FastPrimitivesCoderImpl which does take a 2nd argument (although it's named requires_deterministic_step_label, not requires_deterministic). Since this appears to be a separate bug, I will create a new issue reporting it as such.
  3. The docstring at the top of apache_beam/coders/coders.py states the following: "In Beam, we generally we use pickle for pipeline elements and dill for more complex types, like user functions." This is exactly the situation I'm facing (i.e., the need to pickle a user function), but setting save_main_session and pickle_library are not causing my function to be pickled via dill (default) nor cloudpickle. Upon further digging, it appears that using cloudpickle should skip saving the main session, but using dill should still do so, but this is not the case. Instead PickleCoder, which uses the pickle module, is what is being invoked to pickle the function, rather than dill.

So far, I have been unable to decipher where there should be logic that uses the internal pickler module, particularly in the case of saving the main session, or pickling user functions, nor if there is something else I should be doing in my transforms to trigger such usage of pickler, in addition to setting save_main_session to True.

The only thing I've managed to do that works for this case is to modify PickleCoder to use pickler instead of pickle, but I don't know if that's the correct solution. I cannot tell if there are cases where we intentionally want to use the standard pickle module rather than the internal pickler module, so I don't know if changing PickleCoder to use pickler instead of pickle to fix the error I'm encountering might cause failures for other cases.

I've forked and cloned the repo, but for the life of me, I cannot get the Python test suite to run without failure. I have managed to get the word count example to run successfully (as mentioned in the contributing guide), but I cannot determine how to run a more comprehensive set of tests that covers testing of pickling, so I am not confident about being able test my proposed fix (nor add new tests, if necessary) since I cannot get a suite of tests to run without failure (nor without taking extremely long to run -- I've killed the test process after 15 minutes of running, and then probably another 15 minutes trying to kill all of the spawned processes that seem to restart themselves and don't get cleaned up when I halt the main test process via Ctrl-C).

Does anybody have some guidance here?

@liferoad
Copy link
Collaborator

@tvalentyn can you take a look at this issue?

@tvalentyn
Copy link
Contributor

Instead PickleCoder, which uses the pickle module, is what is being invoked to pickle the function, rather than dill.

This is not the case, functions are pickled using apache_beam.internal.pickler. Coders are only user to encode pcollection elements. The --pickle_library option did not intend to influence the pickler's selection of PickleCoder - the intent of that coder was to use Python's standard pickler module.

@tvalentyn
Copy link
Contributor

What is the nature of elements in your collection? I am wondering why is it that PickleCoder is used for encoding them - is that intentional?

Since this appears to be a separate bug, I will create a new issue reporting it as such.

I think we should have been using DeterministicFastPrimitivesCoder there. feel free to send a PR.

Also note that it is possible to create custom coders in your pipeline and use them.

@chuckwondo
Copy link
Author

Instead PickleCoder, which uses the pickle module, is what is being invoked to pickle the function, rather than dill.

This is not the case, functions are pickled using apache_beam.internal.pickler. Coders are only user to encode pcollection elements. The --pickle_library option did not intend to influence the pickler's selection of PickleCoder - the intent of that coder was to use Python's standard pickler module.

Perhaps that should be the case, but that is not what I am experiencing, which is why I'm reporting this. Apache Beam is very new to me, so it could very well be that I simply don't know what I'm doing, and I'm missing something important.

I'll attempt to summarize and clarify what I'm doing and what I'm encountering:

  1. My code snippets from above are pulled from this issue I created in pangeo-forge-recipes: Passing preprocessor to MultiZarrToZarr raises PicklingError pangeo-forge/pangeo-forge-recipes#616
  2. Since I wrote that issue, I discovered beam's save_main_session and pickle_library options as possibilities for addressing the pickling error I'm encountering.
  3. Finding that no combination of setting those options eliminates the pickling error, I created the issue here. (Only by tweaking my locally installed apache_beam dependency's PickleCoder to use the internal pickler module was I able to eliminate the picking error.)

My goal is to drop a problematic variable ("lst_unc_sys") from my dataset, but using the "preprocess" option of the mzz_kwargs argument to CombineReferences is failing because something in the bowels of beam seems not to realize that it should be using apache_beam.internal.pickler to pickle the preprocess function I'm supplying.

Is there something I'm missing in order to make that happen?

@chuckwondo chuckwondo removed their assignment Nov 28, 2023
@chuckwondo
Copy link
Author

@tvalentyn, would you mind expanding on the following? How would I go about doing so?

Also note that it is possible to create custom coders in your pipeline and use them.

@tvalentyn
Copy link
Contributor

@chuckwondo
Copy link
Author

Thanks for the link, but given that example, I think that perhaps we're talking about 2 different things.

The coder issue I'm having is with respect to the internal beam mechanism used to pickle objects to pass between processes. What's happening is a failure to pickle a function I'm using, not the data I'm dealing with.

Specifically, the error occurs while attempting to pickle the function returned by the higher-order drop function. Here's the specific snippet from the code I've shown earlier:

from kerchunk.combine import drop

...

        | CombineReferences(
            concat_dims=["time"],
            identical_dims=["lat", "lon", "channel"],
            mzz_kwargs={"preprocess": drop("lst_unc_sys")},
        )

The call to drop("lst_unc_sys") produces a new function, which is then used as the value of the "preprocess" kwarg to MultiZarrToZarr (under the covers). This ends up producing the following error:

AttributeError: Can't pickle local object 'drop.<locals>.preproc'

That indicates that it cannot pickle the preproc function that is local to drop.

Given that the indicated function is not a top-level function, I created a top-level function with the same logic as the local function, and used that as the value of "preprocess" in the hopes that pickling my top-level function would fix the issue, but that produced the same error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants