Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue submitting Spark Job from code when Spark Job is a Python program. #4

Open
sehunley opened this issue Feb 15, 2016 · 2 comments

Comments

@sehunley
Copy link

When submitting a Java or Scala program, everything works fine. When submitting a python program, it's gets to the ACCEPTED state and then stalls. It eventually times out, but it's not getting picked up to run. Is this interface just for Java/Scala programs/jobs or should it be able to submit PySpark/Python jobs as well?

I am trying to invoke the pi.py sample program that comes with Spark 1.6.0.

Below is the java program that I am testing with. I'm new to Spark so apologies for any "newbie" errors.

import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
import org.apache.hadoop.conf.Configuration;
// import org.apache.log4j.Logger;

/**

  • This class submits a SparkPi to a YARN from a Java client (as opposed

  • to submitting a Spark job from a shell command line using spark-submit).

  • To accomplish submitting a Spark job from a Java client, we use

  • the org.apache.spark.deploy.yarn.Client class described below:

  • Usage: org.apache.spark.deploy.yarn.Client [options]
    Options:
    --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)
    --class CLASS_NAME Name of your application's main class (required)
    --primary-py-file A main Python file
    --arg ARG Argument to be passed to your application's main class.
    Multiple invocations are possible, each will be passed in order.
    --num-executors NUM Number of executors to start (Default: 2)
    --executor-cores NUM Number of cores per executor (Default: 1).
    --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
    --driver-cores NUM Number of cores used by the driver (Default: 1).
    --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
    --name NAME The name of your application (Default: Spark)
    --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')
    --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.
    --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
    --files files Comma separated list of files to be distributed with the job.
    --archives archives Comma separated list of archives to be distributed with the job.

    How to call this program example:

    export SPARK_HOME="/Users/mparsian/spark-1.6.0"
    java -DSPARK_HOME="$SPARK_HOME" org.dataalgorithms.client.SubmitSparkPiToYARNFromJavaCode 10
    */
    public class SubmitSparkPiToYARNFromJavaCode {

    public static void main(String[] args) throws Exception {
    long startTime = System.currentTimeMillis();

    // this is passed to SparkPi program
    //THE_LOGGER.info("Slices Passed=" + args[0]);
    String slices = args[0];  
    // String slices = "10";
    //
    // String SPARK_HOME = System.getProperty("SPARK_HOME");
    String SPARK_HOME = "/opt/spark/spark-1.6.0";
    // THE_LOGGER.info("SPARK_HOME=" + SPARK_HOME);
    
    //
    pi(SPARK_HOME, slices); // ... the code being measured ... 
    //
    long elapsedTime = System.currentTimeMillis() - startTime;
    // THE_LOGGER.info("elapsedTime (millis)=" + elapsedTime);
    

    }

    static void pi(String SPARK_HOME, String slices) throws Exception {
    //
    String[] args = new String[]{
    "--name",
    "Submit-SparkPi-To-Yarn",
    //
    "--driver-memory",
    "512MB",
    //
    "--jar",
    SPARK_HOME + "/examples/target/spark-examples_2.11-1.6.0.jar",
    //
    "--class",
    "org.apache.spark.examples.JavaSparkPi",

        // argument 1 to my Spark program
        "--arg",
        slices,
    
        // argument 2 to my Spark program (helper argument to create a proper JavaSparkContext object)
        "--arg",
        "yarn-cluster"
    };
    
    Configuration config = new Configuration();
    //
    System.setProperty("SPARK_YARN_MODE", "true");
    //
    SparkConf sparkConf = new SparkConf();
    ClientArguments clientArgs = new ClientArguments(args, sparkConf);
    Client client = new Client(clientArgs, config, sparkConf);
    
    client.run();
    // done!
    

    }
    }

Thanks,

-Scott

@mahmoudparsian
Copy link
Owner

Hello Scott,

Thank you very much for your email and introduction.
I am not an expert in python, but I want to take a look at your python program, please share it so that I can look at it.

Thank you again!
best regards,
Mahmoud

On Feb 15, 2016, at 11:23 AM, sehunley [email protected] wrote:

When submitting a Java or Scala program, everything works fine. When submitting a python program, it's gets to the ACCEPTED state and then stalls. It eventually times out, but it's not getting picked up to run. Is this interface just for Java/Scala programs/jobs or should it be able to submit PySpark/Python jobs as well?

Thanks,

-Scott


Reply to this email directly or view it on GitHub #4.

@sehunley
Copy link
Author

Hello Mahmoud,
My Python program is pretty simple, it's a sample program that came with Spark 1.6.0. Our Data Scientists are doing their Machine Learning scripts in python, hence the need to call python programatically. Here is the program:

from future import print_function

import sys
from random import random
from operator import add

from pyspark import SparkContext

if name == "main":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonPi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

sc.stop()

Thanks in advance for any help that you can give. Here is a link to the actual output that happens when I run the code and try to submit the program above: [http://stackoverflow.com/questions/35373367/issue-submitting-a-python-application-to-yarn-from-java-code]

-Scott

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants