You are currently viewing How to Spark Submit Python | PySpark File (.py)?

Submitting a Python file (.py) containing PySpark code to Spark submit involves using the spark-submit command. This command is utilized for submitting Spark applications written in various languages, including Scala, Java, R, and Python, to a Spark cluster. In this article, I will demonstrate several examples of how to submit a Python (.py), which is a PySpark program file, by using different options and configurations.

Key Points:

  • Use the spark-submit command to submit PySpark applications to a Spark cluster. This command initiates the execution of the application on the cluster.
  • Configure the cluster settings, such as the number of executors, memory allocation, and other Spark properties, either programmatically using SparkConf or through configuration files like spark-defaults.conf.
  • Ensure that all necessary dependencies for your PySpark application are included or available on the Spark cluster’s environment. This includes Python packages, libraries, and any external resources required for the application to run successfully.
  • Monitor the execution of your PySpark application using Spark’s built-in monitoring tools, such as Spark UI, to track job progress, resource utilization, task execution, and other metrics.
  • Implement error handling mechanisms within your PySpark application to gracefully handle exceptions, failures, and unexpected conditions during execution.

1. Spark Submit Python File

Apache Spark binary comes with spark-submit.sh script file for Linux, Mac, and spark-submit.cmd command file for windows, these scripts are available at $SPARK_HOME/bin directory which is used to submit the PySpark file with .py extension (Spark with python) to the cluster.

Below is a simple spark-submit command to run python file with the command options that are used most of the time.


./bin/spark-submit \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key<=<value> \
  --driver-memory <value>g \
  --executor-memory <value>g \
  --executor-cores <number of cores>  \
  --py-files file1.py,file2.py,file3.zip, file4.egg \
  wordByExample.py [application-arguments]

When you want to spark-submit a PySpark application (Spark with Python), you need to specify the .py file you want to run and specify the .egg file or .zip file for dependency libraries.

Below are some of the options & configurations specific to run pyton (.py) file with spark submit. besides these, you can also use most of the options & configs that are covered in the below sections.

Note: When you submit a Python file to spark-submit make sure your python file contains PySpark code.

Python Specific ConfigurationsDescription
–py-filesUse --py-files to add .py.zip or .egg files.
–config spark.executor.pyspark.memoryThe amount of memory to be used by PySpark for each executor.
–config spark.pyspark.driver.pythonPython binary executable to use for PySpark in driver.
–config spark.pyspark.pythonPython binary executable to use for PySpark in both driver and executors.
spark submit Python specific options

Note: Files specified with --py-files are uploaded to the cluster before it runs the application. You can also upload these files ahead and refer them in your PySpark application.

2. Options

Below, I have covered some of the spark-submit options, configurations that can be used with Python files. You can also get all options available by running the --help command.


./bin/spark-submit --help

2.1 –deploy-mode

Using --deploy-mode, you specify where to run the PySpark application driver program. Spark support cluster and client deployment modes.

ValueDescription
clusterIn client mode, the driver runs locally where you submit your application. This mode is mostly used for interactive and debugging purposes. Note that in client mode, only the driver runs locally, and all other executors run on different nodes on the cluster.
clientIn client mode, the driver runs locally where you submit your application. This mode is mostly used for interactive and debugging purposes. In client mode, the driver runs in the system from where you submit the application. All other executors run on different nodes on the cluster.

2.2 –master

Using --master option, you specify what cluster manager to use to run your application. PySpark currently supports Yarn, Mesos, Kubernetes, Stand-alone, and local. The uses of these are explained below.

MasterValueDescription
YarnyarnUse yarn if your cluster resources are managed by Hadoop Yarn.
Mesosmesos://HOST:PORTuse mesos://HOST:PORT for Mesos cluster manager, replace the host and port of Mesos cluster manager.
Standalonespark://HOST:PORTUse spark://HOST:PORT for Standalone cluster, replace the host and port of stand-alone cluster.
Kubernetesk8s://HOST:PORT
k8s://https://HOST:PORT
To run applications in kubernetes
locallocal
local[k]
local[K,F]
Use local to run locally with a one worker thread.
Use local[k] and specify k with the number of cores you have locally, this runs application with k worker threads.
With local[k,F], F stands for number of attempts it should run when failed.

2.3 CPU Core & Memory

While submitting an application, you can also specify how much memory and cores you wanted to give for driver and executors.

OptionDescription
–driver-memoryMemory to be used by the Spark driver.
–driver-coresCPU cores to be used by the Spark driver
–num-executorsThe total number of executors to use.
–executor-memoryAmount of memory to use for the executor process.
–executor-coresNumber of CPU cores to use for the executor process.
–total-executor-coresThe total number of executor cores to use.

Example:


./bin/spark2-submit \
   --master yarn \
   --deploy-mode cluster \
   --driver-memory 8g \
   --executor-memory 16g \
   --executor-cores 2  \
   --py-files file1.py,file2.py,file3.zip, file4.egg \
   wordByExample.py [application-arguments]

2.4 Other Used Options

OptionsDescription
–filesUse comma-separated files you wanted to use.
Usually, these can be files from your resource folder.
Using this option, Spark submits all these files to cluster.
–verboseShow verbose information. Write all configurations the Spark application utilizes to the log file.

Example: Below example submits the application to yarn cluster manager by using cluster deployment mode and with 8g driver memory, 16g, and 2 cores for each executor.


./bin/spark2-submit \
   --verbose
   --master yarn \
   --deploy-mode cluster \
   --driver-memory 8g \
   --executor-memory 16g \
   --executor-cores 2  \
   --files /path/log4j.properties,/path/file2.conf,/path/file3.json \
   --class org.apache.spark.examples.SparkPi \
   --py-files file1.py,file2.py,file3.zip, file4.egg \
   wordByExample.py [application-arguments]

3. Configurations

Spark submit supports several configurations using --config, these configurations are used to specify application configurations, shuffle parameters, runtime configurations e.t.c. Most of these configurations are same for Spark applications written in Java, Scala, and Python(PySpark).

keyDescription
spark.sql.shuffle.partitionsPartitions to create after wider transformations. Default set to 200.
spark.executor.memoryOverheadThe amount of additional memory to be allocated per executor process in cluster mode, it is typically memory for JVM overheads. (Not supported for PySpark)
spark.serializerOption to change Java serialization from JavaSerializer to kryoSerializer
spark.sql.files.maxPartitionBytesThe maximum number of bytes to be used for every partition when reading files. Default 128MB.
spark.dynamicAllocation.enabledSpecifies whether to dynamically increase or decrease the number of executors based on the workload. Default true.
spark.dynamicAllocation
.minExecutors
Specify the minimum number of executors to use with dynamic allocation is enabled.
spark.dynamicAllocation
.maxExecutors
Specify the maximum number of executors to use with dynamic allocation is enabled.
spark.executor.extraJavaOptionsSpecify JVM options (see example below)

Besides these, PySpark also supports many more configurations.

Example :


./bin/spark2-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.sql.shuffle.partitions=20000" \
--conf "spark.executor.memoryOverhead=5244" \
--conf "spark.memory.fraction=0.8" \
--conf "spark.memory.storageFraction=0.2" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.files.maxPartitionBytes=168435456" \
--conf "spark.dynamicAllocation.minExecutors=1" \
--conf "spark.dynamicAllocation.maxExecutors=200" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ 
--files /path/log4j.properties,/path/file2.conf,/path/file3.json \
--py-files file1.py,file2.py,file3.zip, file4.egg \
wordByExample.py [application-arguments]

Alternatively, you have the option to set these configurations globally in $SPARK_HOME/conf/spark-defaults.conf to apply them to every Spark application. Additionally, you can set them programmatically using SparkConf.


config = SparkConf()
config.set("spark.sql.shuffle.partitions","300")
val spark=SparkSession.builder.config(config)

The priority order for configurations is as follows: SparkConf takes precedence, followed by configurations specified using spark-submit –config, and finally, settings mentioned in spark-defaults.conf.

Conclusion

In conclusion, submitting PySpark (Spark with Python) applications to a Spark cluster involves leveraging the spark-submit command. Through this process, developers can effectively deploy their applications to the cluster, utilizing various options and configurations as needed. Whether setting configurations globally in spark-defaults.conf, specifying them via spark-submit –config, or programmatically using SparkConf, there are multiple avenues available to tailor the deployment process to suit specific requirements.

By understanding and utilizing these submission methods, developers can seamlessly integrate their Python-based Spark applications into the cluster environment for efficient execution and processing of large-scale data tasks.

Happy Learning !!