How to submit a Python file (.py) with PySpark code to Spark submit? spark-submit is used to submit the Spark applications written in Scala, Java, R, and Python to cluster. In this article, I will cover a few examples of how to submit a python (.py) file by using several options and configurations.
1. Spark Submit Python File
Apache Spark binary comes with spark-submit.sh
script file for Linux, Mac, and spark-submit.cmd
command file for windows, these scripts are available at $SPARK_HOME/bin
directory which is used to submit the PySpark file with .py extension (Spark with python) to the cluster.
Below is a simple spark-submit
command to run python file with the most-used command options.
./bin/spark-submit \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key<=<value> \
--driver-memory <value>g \
--executor-memory <value>g \
--executor-cores <number of cores> \
--py-files file1.py,file2.py,file3.zip, file4.egg \
wordByExample.py [application-arguments]
When you wanted to spark-submit a PySpark application (Spark with Python), you need to specify the .py file you wanted to run and specify the .egg file or .zip file for dependency libraries.
Below are some of the options & configurations specific to run pyton (.py) file with spark submit. besides these, you can also use most of the options & configs that are covered in the below sections.
Note: When you submit a Python file to spark-submit make sure your python file contains PySpark code.
Python Specific Configurations | Description |
---|---|
–py-files | Use --py-files to add .py , .zip or .egg files. |
–config spark.executor.pyspark.memory | The amount of memory to be used by PySpark for each executor. |
–config spark.pyspark.driver.python | Python binary executable to use for PySpark in driver. |
–config spark.pyspark.python | Python binary executable to use for PySpark in both driver and executors. |
Note: Files specified with --py-files
are uploaded to the cluster before it runs the application. You can also upload these files ahead and refer them in your PySpark application.
2. Spark Submit Command Options
Below I have covered some of the spark-submit
options, configurations that can be used with Python files. You can also get all options available by running the --help
command.
./bin/spark-submit --help
2. 1 Deployment Modes (–deploy-mode)
Using --deploy-mode
, you specify where to run the PySpark application driver program. Spark support cluster and client deployment modes.
Value | Description |
---|---|
cluster | In cluster mode, the driver runs on one of the worker nodes, and this node shows as a driver on the Spark Web UI of your application. cluster mode is used to run production jobs. |
client | In client mode, the driver runs locally where you are submitting your application from. client mode is majorly used for interactive and debugging purposes. Note that in client mode only the driver runs locally and all other executors run on different nodes on the cluster. |
2.2 Cluster Managers (–master)
Using --master
option, you specify what cluster manager to use to run your application. PySpark currently supports Yarn, Mesos, Kubernetes, Stand-alone, and local. The uses of these are explained below.
Cluster Manager | Value | Description |
---|---|---|
Yarn | yarn | Use yarn if your cluster resources are managed by Hadoop Yarn. |
Mesos | mesos://HOST:PORT | use mesos://HOST:PORT for Mesos cluster manager, replace the host and port of Mesos cluster manager. |
Standalone | spark://HOST:PORT | Use spark://HOST:PORT for Standalone cluster, replace the host and port of stand-alone cluster. |
Kubernetes | k8s://HOST:PORT k8s://https://HOST:PORT | Use k8s://HOST:PORT for Kubernetes, replace the host and port of Kubernetes. This by default connects with https, but if you wanted to use unsecured use k8s://https://HOST:PORT |
local | local local[k] local[K,F] | Use local to run locally with a one worker thread. Use local[k] and specify k with the number of cores you have locally, this runs application with k worker threads. use local[k,F] and specify F with number of attempts it should run when failed. |
2.3 Driver and Executor Resources (Cores & Memory)
While submitting an application, you can also specify how much memory and cores you wanted to give for driver and executors.
Option | Description |
---|---|
–driver-memory | Memory to be used by the Spark driver. |
–driver-cores | CPU cores to be used by the Spark driver |
–num-executors | The total number of executors to use. |
–executor-memory | Amount of memory to use for the executor process. |
–executor-cores | Number of CPU cores to use for the executor process. |
–total-executor-cores | The total number of executor cores to use. |
Example:
./bin/spark2-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 8g \
--executor-memory 16g \
--executor-cores 2 \
--py-files file1.py,file2.py,file3.zip, file4.egg \
wordByExample.py [application-arguments]
2.4 Other Options
Options | Description |
---|---|
–files | Use comma-separated files you wanted to use. Usually, these can be files from your resource folder. Using this option, Spark submits all these files to cluster. |
–verbose | Displays the verbose information. For example, writes all configurations spark application uses to the log file. |
Note: Files specified with --files
are uploaded to the cluster.
Example: Below example submits the application to yarn cluster manager by using cluster deployment mode and with 8g driver memory, 16g, and 2 cores for each executor.
./bin/spark2-submit \
--verbose
--master yarn \
--deploy-mode cluster \
--driver-memory 8g \
--executor-memory 16g \
--executor-cores 2 \
--files /path/log4j.properties,/path/file2.conf,/path/file3.json \
--class org.apache.spark.examples.SparkPi \
--py-files file1.py,file2.py,file3.zip, file4.egg \
wordByExample.py [application-arguments]
3. Spark Submit Configurations
Spark submit supports several configurations using --config
, these configurations are used to specify application configurations, shuffle parameters, runtime configurations e.t.c. Most of these configurations are same for Spark applications written in Java, Scala, and Python(PySpark).
Configuration key | Configuration Description |
---|---|
spark.sql.shuffle.partitions | Number of partitions to create for wider shuffle transformations (joins and aggregations). |
spark.executor.memoryOverhead | The amount of additional memory to be allocated per executor process in cluster mode, it is typically memory for JVM overheads. (Not supported for PySpark) |
spark.serializer | org.apache.spark.serializer.<br>JavaSerializer (default)org.apache.spark.serializer.KryoSerializer |
spark.sql.files.maxPartitionBytes | The maximum number of bytes to be used for every partition when reading files. Default 128MB. |
spark.dynamicAllocation.enabled | Specifies whether to dynamically increase or decrease the number of executors based on the workload. Default true. |
spark.dynamicAllocation .minExecutors | A minimum number of executors to use when dynamic allocation is enabled. |
spark.dynamicAllocation .maxExecutors | A maximum number of executors to use when dynamic allocation is enabled. |
spark.executor.extraJavaOptions | Specify JVM options (see example below) |
Besides these, PySpark also supports many more configurations.
Example :
./bin/spark2-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.sql.shuffle.partitions=20000" \
--conf "spark.executor.memoryOverhead=5244" \
--conf "spark.memory.fraction=0.8" \
--conf "spark.memory.storageFraction=0.2" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.files.maxPartitionBytes=168435456" \
--conf "spark.dynamicAllocation.minExecutors=1" \
--conf "spark.dynamicAllocation.maxExecutors=200" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--files /path/log4j.properties,/path/file2.conf,/path/file3.json \
--py-files file1.py,file2.py,file3.zip, file4.egg \
wordByExample.py [application-arguments]
Alternatively, you can also set these globally @ $SPARK_HOME/conf/spark-defaults.conf
to apply for every Spark application. And you can also set using SparkConf
programmatically.
config = SparkConf()
config.set("spark.sql.shuffle.partitions","300")
val spark=SparkSession.builder.config(config)
First preference goes to SparkConf
, then spark-submit --config
and then configs mentioned in spark-defaults.conf
Conclusion
In this article I have explained how to submit a python file using spark-submit to run it on the cluster, different options you can use with python file, configuration e.t.c
Happy Learning !!