-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement runner for apache spark #133
Comments
Can this be ticketed or asked somewhere? Sounds like a pretty incredible bug if so |
I'm thinking it might be appropriate to add some detail to the issue mentioned above that already deals with docker perms on the beam repo (apache/beam#20568). |
OK, so @ranchodeluxe's pushback here was so sensible and my understanding of what on earth was going wrong so inchoate that I went back to double check use of the Slightly longer story: The next matter is how to best approach environment construction. The most recent Beam release advertises Spark support up to 3.2.1 but I've spent a bit of time digging around and it appears that people have used it with later versions. 3.3.0, at least, seems fine (if this seems like it shouldn't matter, rest assured that - at least when dealing with spark - minor versions demand respect). The reason that this turns out to be useful is that the earliest EMR Serverless version that allows container customization is 6.9.0. EMR Serverless 6.9.0 is pinned to Spark 3.3.0. OK, so this relatively minimal image is built and then pushed up to ECR:
With that container in place, a few permissions need to be set (VPC settings for internet access but also a trust policy for EMR-Serverless principal's to access ECR data). Also, any special runtime permissions will need to be added to an execution role at this point - the default in the console may well be fine for accessing S3 on the owner's account. When constructing a serverless application, remember to refer to the image pushed up to ECR OK, so with this infra in place, I wrote a very simple, very stupid test script that counts words in moby dick and which hopefully invokes some parallelism. Here's that: import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run():
options = PipelineOptions([
])
with beam.Pipeline(options=options) as p:
(p
| 'ReadFromText' >> beam.io.ReadFromText('s3://nzimmerman-testing/texts/mobydick.txt')
| 'SplitWords' >> beam.FlatMap(lambda x: x.split())
| 'CountWords' >> beam.combiners.Count.PerElement()
| 'FormatResults' >> beam.Map(lambda word_count: f"{word_count[0]}: {word_count[1]}")
| 'WriteToText' >> beam.io.WriteToText('s3://nzimmerman-testing/beam/output2')
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run() From there, running the beam job is just a matter of referring to the Oh, and here are some output files with counts of words in moby dick: |
@moradology this is awesome, and also I love that you have your own copy of Moby Dick stored on s3 😃 |
Good news/bad news after further exploration. Bad first, I suppose: BadI've come to the conclusion that EMR-Serverless is a non-starter. This absolutely sucks because serverless is just about the best option available for running a spark cluster with relatively little effort put into getting the infrastructure right. Spark just runs there and it generally does so faster and cheaper than a naively, manually provisioned cluster on EMR classic. The Moby dick job I ran there was tiny and probably worked because it avoided serializing work over to executor nodes (👎). The process SDK-harness works and it is not too bad (just yum install git and golang) to build the boot binary in a custom image but the expectations baked into the import boto3
import argparse
import subprocess
def fetch_config_from_s3(s3_uri, local_path):
"""Fetch a file from S3 and save it locally."""
bucket, key = parse_s3_uri(s3_uri)
s3 = boto3.client('s3')
s3.download_file(Bucket=bucket, Key=key, Filename=local_path)
def parse_s3_uri(s3_uri):
"""Parse an s3:// URI into bucket name and key."""
if not s3_uri.startswith("s3://"):
raise ValueError("Invalid S3 URI")
parts = s3_uri[5:].split("/", 1)
if len(parts) != 2:
raise ValueError("Invalid S3 URI")
return parts[0], parts[1]
def main():
parser = argparse.ArgumentParser(description="Run pangeo forge aimed at process harness to support EMR")
parser.add_argument('--s3_config_uri', required=True, help="S3 URI for the config file")
args, unknown_args = parser.parse_known_args()
local_config_path = '/tmp/pangeo_config.py'
fetch_config_from_s3(args.s3_config_uri, local_config_path)
# Construct the CLI command with the remaining arguments
command = ["pangeo-forge-runner", "bake"] + unknown_args + ["-f", local_config_path]
subprocess.run(command, check=True)
if __name__ == "__main__":
main() GoodAs for good news:
|
This is super helpful and interesting work, thanks so much for the summary @moradology. And I like the repo name! |
OK, so I read a bunch of Beam source and I'm starting to think that there is a path to running pipelines on EMR-Serverless (which is kind of the holy grail for doing this kind of work on AWS). It will require providing a new (non-portable, non-job-server-dependent) implementation of beam's runner class rather than using any of the off-the-shelf solutions they provide. A sketch of the relevant lines of work:
TRANSLATIONS = {
_Create: Create,
apache_beam.ParDo: ParDo,
apache_beam.Map: Map,
_GroupByKeyOnly: GroupByKey,
_Flatten: Flatten,
} |
The goal here is fairly straightforward and the introductory materials mostly live here: https://beam.apache.org/documentation/runners/spark/. They indicate, however, that there will likely be (at least?) two different spark runners to accommodate different spark cluster deployments. I'm going to try to capture what I've learned so far, as the devil is really in the (fairly obscure) details here and familiarity with spark isn't quite sufficient to make things legible in lieu of some heavy iteration.
Rationale for multiple spark runners
Roughly, spark clusters can run with in 'stand alone' mode with spark master listening on port 7077 (this is what the apache-beam documentation writers seem to greatly prefer) or as via spark-submit (which is how YARN and MESOS orchestrated clusters do their thing and so is probably valuable).
Beam jobserver route
In the case of the port 7077 job submissions, apache beam has a pretty convenient docker image which serves as the intermediary between a python process which describes/returns the beam pipeline and the cluster to which it is connected via port 7077 on the cluster's master. Here, any runner we'd implement probably needs little (nothing?) more than these two implementation specific parameters:
This isn't the way to work with EMR, as EMR is YARN backed. A different strategy is necessary.
Beam stuff-everything-into-a-jar-and-pray-your-workers-have-docker-permissions route
As stated, EMR orchestrates spark via YARN. Yarn wants to see
spark-submit
being used and doesn't care what happens on port 7077 (it isn't listening). That's fine. Actually, apache-beam has some tooling for this:What happens when this is run is that a jar file pops out at the specified path. This jar file is exactly the kind of thing YARN wants to consume. EMR "steps" are totally capable of running a
spark-submit
pointing out to some location on S3 and, in fact, this is how things are designed.There's a bit of a wrinkle here, unfortunately. As it turns out, regardless of the SDK one chooses (EXTERNAL, PROCESS, or - of course - DOCKER), beam jobs specified in python will absolutely, positively, 100% try to spawn docker containers to actually run the code. This is somewhat surprising, as the SDK docs here would seem to indicate that PROCESS ought to use a compiled
boot
binary. Well, I compiled that binary and included it in the EMR bootstrap step and it absolutely still attempts to run everything via docker (😢).OK, so why is this a problem?
Well, as it turns out EMR does not grant the
hadoop
user appropriate permissions to spawn docker containers and there's not really an obvious workaround via AWS' standard customization path of providing a bootstrap script (short of hacking the expected permissions in the various files which the docker group grants access to which seems bad and kind of dangerous).Having unravelled this mysterious bad behavior, I stumbled across a relevant issue which seems to be current:
https://issues.apache.org/jira/browse/BEAM-11378
apache/beam#20568
Here's a taste of the bootstrap script that I was attempting to modify permissions with.
newgrp
is a dead end in bootstrap and will cause your cluster to hang for potentially hours without ANY useful debugging information:Is there any workaround?
Yeah, I think so though I've not proven them out yet.
The text was updated successfully, but these errors were encountered: