Spark Submit Command Explained with Examples

The spark-submit command is a utility to run or submit a Spark or PySpark application program (or job) to the cluster by specifying options and configurations, the application you are submitting can be written in Scala, Java, or Python (PySpark). You can use this utility in order to do the following.

  1. Submitting Spark application on different cluster managers like Yarn, Kubernetes, Mesos, and Stand-alone.
  2. Submitting Spark application on client or cluster deployment modes.

Related:

In this article, I will explain different spark-submit command options and configurations along with how to use a uber jar or zip file for Scala and Java, using Python .py file, and finally how to submit the application on Yarn. Mesos, Kubernetes, and standalone cluster managers.

Table of contents

Spark Submit Command

Spark binary comes with spark-submit.sh script file for Linux, Mac, and spark-submit.cmd command file for windows, these scripts are available at $SPARK_HOME/bin directory.

If you are using Cloudera distribution, you may also find spark2-submit.sh which is used to run Spark 2.x applications. By adding this Cloudera supports both Spark 1.x and Spark 2.x applications to run in parallel.

spark-submit command internally uses org.apache.spark.deploy.SparkSubmit class with the options and command line arguments you specify.

Below is a spark-submit command with the most-used command options.


./bin/spark-submit \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key<=<value> \
  --driver-memory <value>g \
  --executor-memory <value>g \
  --executor-cores <number of cores>  \
  --jars  <comma separated dependencies>
  --class <main-class> \
  <application-jar> \
  [application-arguments]

You can also submit the application like below without using the script.


./bin/spark-class org.apache.spark.deploy.SparkSubmit <options & arguments>

Spark Submit Options

Below I have explained some of the common options, configurations, and specific options to use with Scala and Python. You can also get all options available by running the below command.


./bin/spark-submit --help

Deployment Modes

Using --deploy-mode, you specify where to run the Spark application driver program. Spark support cluster and client deployment modes.

ValueDescription
clusterIn cluster mode, the driver runs on one of the worker nodes, and this node shows as a driver on the Spark Web UI of your application. cluster mode is used to run production jobs.
clientIn client mode, the driver runs locally where you are submitting your application from. client mode is majorly used for interactive and debugging purposes. Note that in client mode only the driver runs locally and all other executors run on different nodes on the cluster.

Cluster Managers

Using --master option, you specify what cluster manager to use to run your application. Spark currently supports Yarn, Mesos, Kubernetes, Stand-alone, and local. The uses of these are explained below.

Cluster ManagerValueDescription
YarnyarnUse yarn if your cluster resources are managed by Hadoop Yarn.
Mesosmesos://HOST:PORTuse mesos://HOST:PORT for Mesos cluster manager, replace the host and port of Mesos cluster manager.
Standalonespark://HOST:PORTUse spark://HOST:PORT for Standalone cluster, replace the host and port of stand-alone cluster.
Kubernetesk8s://HOST:PORT
k8s://https://HOST:PORT
Use k8s://HOST:PORT for Kubernetes, replace the host and port of Kubernetes. This by default connects with https, but if you wanted to use unsecured use k8s://https://HOST:PORT
locallocal
local[k]
local[K,F]
Use local to run locally with a one worker thread.
Use local[k] and specify k with the number of cores you have locally, this runs application with k worker threads.
use local[k,F] and specify F with number of attempts it should run when failed.

Example: Below submits applications to yarn managed cluster.


./bin/spark-submit \
    --deploy-mode cluster \
    --master yarn \
    --class org.apache.spark.examples.SparkPi \
    /spark-home/examples/jars/spark-examples_versionxx.jar 80

Driver and Executor resources (Cores & memory)

While submitting an application, you can also specify how much memory and cores you wanted to give for driver and executors.

OptionDescription
–driver-memoryMemory to be used by the Spark driver.
–driver-coresCPU cores to be used by the Spark driver
–num-executorsThe total number of executors to use.
–executor-memoryAmount of memory to use for the executor process.
–executor-coresNumber of CPU cores to use for the executor process.
–total-executor-coresThe total number of executor cores to use.

Example:


./bin/spark2-submit \
   --master yarn \
   --deploy-mode cluster \
   --driver-memory 8g \
   --executor-memory 16g \
   --executor-cores 2  \
   --class org.apache.spark.examples.SparkPi \
   /spark-home/examples/jars/spark-examples_versionxx.jar 80

Other Options

OptionsDescription
–filesUse comma-separated files you wanted to use.
Usually, these can be files from your resource folder.
Using this option, Spark submits all these files to cluster.
–verboseDisplays the verbose information. For example, writes all configurations spark application uses to the log file.

Note: Files specified with --files are uploaded to the cluster.

Example: Below example submits the application to yarn cluster manager by using cluster deployment mode and with 8g driver memory, 16g and 2 cores for each executor.


./bin/spark2-submit \
   --verbose
   --master yarn \
   --deploy-mode cluster \
   --driver-memory 8g \
   --executor-memory 16g \
   --executor-cores 2  \
   --files /path/log4j.properties,/path/file2.conf,/path/file3.json
   --class org.apache.spark.examples.SparkPi \
   /spark-home/examples/jars/spark-examples_versionxx.jar 80

Spark Submit configurations

Spark submit supports several configurations using --config, these configurations are used to specify Application configurations, shuffle parameters, runtime configurations.

Most of these configurations are the same for Spark applications written in Java, Scala, and Python(PySpark)

Configuration keyConfiguration Description
spark.sql.shuffle.partitionsNumber of partitions to create for wider shuffle transformations (joins and aggregations).
spark.executor.memoryOverheadAmount of additional memory to be allocated per executor process in cluster mode, this is typically memory for JVM overheads. (Not supported for PySpark)
spark.serializerorg.apache.spark.serializer.
JavaSerializer
(default)
org.apache.spark.serializer.KryoSerializer
spark.sql.files.maxPartitionBytesThe maximum number of bytes to be used for every partition when reading files. Default 128MB.
spark.dynamicAllocation.enabledSpecifies whether to dynamically increase or decrease the number of executors based on the workload. Default true.
spark.dynamicAllocation
.minExecutors
A minimum number of executors to use when dynamic allocation is enabled.
spark.dynamicAllocation
.maxExecutors
A maximum number of executors to use when dynamic allocation is enabled.
spark.executor.extraJavaOptionsSpecify JVM options (see example below)

Besides these, Spark also supports many more configurations.

Example :


./bin/spark2-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.sql.shuffle.partitions=20000" \
--conf "spark.executor.memoryOverhead=5244" \
--conf "spark.memory.fraction=0.8" \
--conf "spark.memory.storageFraction=0.2" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.files.maxPartitionBytes=168435456" \
--conf "spark.dynamicAllocation.minExecutors=1" \
--conf "spark.dynamicAllocation.maxExecutors=200" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ 
--files /path/log4j.properties,/path/file2.conf,/path/file3.json \
--class org.apache.spark.examples.SparkPi \
/spark-home/examples/jars/spark-examples_repace-spark-version.jar 80

Alternatively, you can also set these globally @ $SPARK_HOME/conf/spark-defaults.conf to apply for every Spark application. And you can also set using SparkConf programmatically.


val config = new SparkConf()
config.set("spark.sql.shuffle.partitions",300)
val spark=SparkSession.builder().config(config)

First preference goes to SparkConf, then spark-submit –config and then configs mentioned in spark-defaults.conf

Submit Scala or Java Application

Regardless of which language you use, most of the options are same however, there are few options that are specific to a language, for example, to run a Spark application written in Scala or Java, you need to use the additionally following options.

OptionDescription
–jarsIf you have all dependency jar’s in a folder, you can pass all these jars using this spark submit –jars option. All your jar files should be comma-separated.
for example –jars jar1.jar,jar2.jar, jar3.jar.
–packagesAll transitive dependencies will be handled when using this command.
–classScala or Java class you wanted to run.
This should be a fully qualified name with the package
for example org.apache.spark.examples.SparkPi.

Note: Files specified with --jars and --packages are uploaded to the cluster.

Example :


./bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.sql.shuffle.partitions=20000" \
--jars "dependency1.jar,dependency2.jar"
--class com.sparkbyexamples.WordCountExample \
spark-by-examples.jar 

Spark Submit PySpark (Python) Application

When you wanted to submit a PySpark application, you need to specify the .py file you wanted to run or specify the .egg file.

Below are some of the options & configurations specific to PySpark application.

PySpark Specific ConfigurationsDescription
–py-filesUse --py-files to add .py.zip or .egg files.
–config spark.executor.pyspark.memoryThe amount of memory to be used by PySpark for each executor.
–config spark.pyspark.driver.pythonPython binary executable to use for PySpark in driver.
–config spark.pyspark.pythonPython binary executable to use for PySpark in both driver and executors.

Note: Files specified with --py-files are also uploaded to the cluster.

Example 1 :


./bin/spark-submit \
   --master yarn \
   --deploy-mode cluster \
   wordByExample.py

Example 2 :


./bin/spark-submit \
   --master yarn \
   --deploy-mode cluster \
   --py-files file1.py,file2.py
   wordByExample.py

Submitting Application to Mesos

Here, we are submitting spark application on a Mesos managed cluster using deployment mode with 5G memory and 8 cores for each executor.


# Running Spark application on Mesos cluster manager
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://192.168.231.132:7077 \
  --deploy-mode cluster \
  --executor-memory 5G \
  --executor-cores 8 \
   http://examples/jars/spark-examples_versionxx.jar 80

Submitting Application to Kubernetes

The below example runs Spark application on a Kubernetes managed cluster using cluster deployment mode with 5G memory and 8 cores for each executor.


# Running Spark application on Kubernetes cluster
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master k8s://192.168.231.132:443 \
  --deploy-mode cluster \
  --executor-memory 5G \
  --executor-cores 8 \
  /spark-home/examples/jars/spark-examples_versionxx.jar 80

Submitting Application to Standalone

The below example runs Spark application on a Standalone cluster using cluster deployment mode with 5G memory and 8 cores for each executor.


# Running Spark application on standalone cluster
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://192.168.231.132:7077 \
  --deploy-mode cluster \
  --executor-memory 5G \
  --executor-cores 8 \
  /spark-home/examples/jars/spark-examples_versionxx.jar 80

Happy Learning !!

NNK

SparkByExamples.com is a BigData and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment using Scala and Maven.

Leave a Reply