• Post author:
  • Post category:Amazon AWS
  • Post last modified:April 2, 2024
  • Reading time:13 mins read

Amazon Elastic MapReduce (EMR) is a cloud-based big data platform that simplifies the processing of large volumes of data quickly and cost-effectively at scale. It uses a hosted Hadoop framework operating on Amazon Elastic Compute Cloud (EC2) and Amazon Simple Storage Service (S3), providing a robust, scalable solution for managing big data workloads.

Advertisements

In addition to the regular EMR offering, Amazon also introduced EMR Serverless—an interactive data analysis service. EMR Serverless allows developers and data analysts to analyze and visualize data of any size directly in S3 without needing to manage any infrastructure. With EMR Serverless, there’s no need to set up, manage, or scale clusters manually—you just start an analysis, and AWS takes care of the underlying resources for you. This enables users to start querying data and receiving results in a matter of seconds instead of minutes or hours.

Both EMR and EMR Serverless support a wide variety of big data frameworks including Apache Spark, Hadoop, HBase, Presto, and Flink. These tools are crucial for running big data analysis tasks, like clickstream analysis, data transformations (ETL), financial analysis, machine learning, computational science, and much more.

A key advantage of Amazon EMR lies in its scalability. It can dynamically provision and deprovision Amazon EC2 instances based on the demands of the processing task. This adaptability helps optimize costs, improve performance, and effectively manage capacity.

Moreover, Amazon EMR integrates smoothly with other AWS services, offering a comprehensive solution for data analysis. You can store your data in Amazon S3 and access it directly from your Amazon EMR cluster, or use AWS Glue Data Catalog as a centralized metadata repository across a range of data analytics frameworks like Spark and Hive on EMR.

In this tutorial, we’ll focus on Apache Spark, within the context of Amazon EMR. We’ll walk you through the process of setting up a cluster, running a Spark job, and interpreting the results.

Prerequisites:

  1. An AWS account: You will need an AWS account to create and configure your Amazon EMR resources.
  2. AWS CLI: The AWS Command Line Interface is a unified tool to manage your AWS services. You’ll need to have it installed on your local machine.
  3. Basic knowledge of Apache Spark: Spark is a big data processing framework and engine that is commonly used for data analytics.
  4. Sample dataset: You can use any dataset of your choice, but for this tutorial, we will use this SalesData.csv (here is how the data looks like):

Date,Salesperson,Lead Name,Segment,Region,Target Close,Forecasted Monthly Revenue,Opportunity Stage,Weighted Revenue,Closed Opportunity,Active Opportunity,Latest Status Entry
1/2/2011,Gerri Hinds,US_SMB_1317,SMB,US,2/2/2011,103699,Lead,10370,FALSE,FALSE,FALSE
1/3/2011,David King,EMEA_Enterprise_405,Enterprise,EMEA,4/9/2011,393841,Lead,39384,FALSE,FALSE,FALSE
1/6/2011,James Swanger,US_Enterprise_1466,Enterprise,US,5/4/2011,326384,Lead,32638,FALSE,FALSE,FALSE
1/11/2011,Gerri Hinds,US_SMB_2291,SMB,US,2/14/2011,76316,Lead,7632,FALSE,FALSE,FALSE

1. Step-by-step tutorial:

1.1 Step 1: Set Up Your AWS CLI

First, you need to set up AWS CLI on your computer. If you haven’t already installed the AWS CLI, you can do so by following the instructions aws cli.

After the installation, configure your AWS CLI by entering your Access Key ID, Secret Access Key, AWS region, and output format. You can do this by running aws configure command in your terminal:


$ aws configure
AWS Access Key ID [None]: YOUR_ACCESS_KEY
AWS Secret Access Key [None]: YOUR_SECRET_KEY
Default region name [None]: us-west-2
Default output format [None]: json

1.2 Step 2: Set up an S3 Bucket

  1. Create an S3 bucket to store your sample data. Navigate to the S3 service in the AWS Management Console and click on “Create bucket.” Provide a unique name and select your desired region. 
  2. Once the bucket is created, create two sub-folders named:
    1. cleaned_data
    2. raw_data

Step 2: Prepare the Sample Data:

  1. Upload the SalesData.csv  file to your S3 bucket by clicking on your bucket name under the raw_data sub-folder, then “Upload.”S

1.3 Step 3: Launch an EMR cluster

We will use the create-cluster command to launch an EMR cluster. This command specifies the applications (such as Spark) that will be installed on the cluster. Here is a basic example:


aws emr create-cluster --name "Spark cluster" --release-label emr-6.4.0 \
--applications Name=Spark --ec2-attributes KeyName=myKey --instance-type m5.xlarge \
--instance-count 3 --use-default-roles

You need to replace “myKey” with your own EC2 key pair name. The –use-default-roles option will create default EMR roles if they do not already exist. 

This command will output a JSON that contains your ClusterId. Make sure to note down this ClusterId as it is needed for later steps.

1.4 Step 4: Connect to the Master Node

Once the cluster is ready, you can SSH into the master node. First, you need to get the public DNS of the master node. 

Use the following command to describe your cluster and get the master public DNS:


aws emr describe-cluster --cluster-id <your-cluster-id>

Then, use the SSH command to connect to the master node:


ssh -i ~/path/my-key-pair.pem [email protected]

Replace ~/path/my-key-pair.pem with the path to your key pair file, and replace ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com with the master public DNS.

1.5 Step 5: Running a Spark Job

Now that you are connected to the master node, you can run Spark jobs.

We will write a Spark (PySpark) code for your data processing tasks. In this Spark application we are performing the following operation:

  1. Create a Spark session
  2. Read the data from the  SalesData.csv  file which is stored in our S3 bucket located under the sub-folder raw_data and perform the following transformations:
    1. Conversion of Date: The “Date” column is converted from string type to date type using the to_date function. This enables easier date-based analysis and operations.
    2. Total Sales by Salesperson: The code calculates the total sales by salesperson by grouping the DataFrame by “Salesperson” and calculating the sum of the “Forecasted_Monthly_Revenue” column.
    3. Average Revenue by Opportunity Stage: The code calculates the average revenue per opportunity stage by grouping the DataFrame by “Opportunity_Stage” and calculating the average of the “Weighted_Revenue” column.
    4. Filtering Closed Opportunities: The code filters the DataFrame to include only closed opportunities by selecting rows where the “Closed_Opportunity” column is set to True.
    5. Selection of Specific Columns: The code selects specific columns from the DataFrame to include in the cleaned dataset. Adjust the column names as needed based on your requirements.
  3. Once everything is done, we are saving all the results in our S3 bucket located under the sub-folder cleaned_data 

Remember to replace <YOUR_BUCKET_LOCATION_OF_RAW_DATA> and <YOUR_BUCKET_LOCATION_OF_CLEANED_DATA> with your S3 bucket locations. Feel free to modify the transformations or add additional ones based on your specific needs.


from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DateType

S3_INPUT_DATA = 's3://mybuckect-spark-by-example/raw_data/'
S3_OUTPUT_DATA = 's3://mybuckect-spark-by-example/cleaned_data/'

def main():

     # Creating the SparkSession
     spark = SparkSession.builder.appName("My Demo ETL App").getOrCreate()
     spark.sparkContext.setLogLevel('ERROR')

    # Spark DataFrame (Raw) - Transformation
    df = spark.read.option("Header", True).option("InferSchema", True).csv(S3_INPUT_DATA)

    # Define a dictionary of replacements to replace spaces in column names with underscores
    replacements = {c:c.replace(' ','_') for c in df.columns if ' ' in c}
    # Select columns from the df using the replacements dict to rename columns   with spaces
    df = df.select([F.col(c).alias(replacements.get(c, c)) for c in df.columns])
    # Convert the "Date" column from string type to date type
    df = df.withColumn("Date", F.to_date(F.col("Date"), "M/d/yyyy"))

    # Calculate the total sales by salesperson
    sales_by_salesperson = df.groupBy("Salesperson") \
                             .agg(F.sum("Forecasted_Monthly_Revenue") \
                             .alias("Total_Sales"))

    # Calculate the average revenue per opportunity stage
    avg_revenue_by_stage = df.groupBy("Opportunity_Stage") \
                             .agg(F.avg("Weighted_Revenue") \
                             .alias("Avg_Revenue"))

    # Filter the dataset to include only closed opportunities
    closed_opportunities = df.filter(F.col("Closed_Opportunity") == True)

    # Select specific columns for the cleaned dataset
    cleaned_df = df.select("Date", "Salesperson", "Segment", 
                            "Region", "Opportunity_Stage", "Weighted_Revenue")

    # Print the total number of records in the cleaned dataset
    print(f"Total no. of records in the cleaned dataset is: {cleaned_df.count()}")

    try:
    # Save the DataFrames under different folders within the S3_OUTPUT_DATA bucket
    sales_by_salesperson.write.mode('overwrite') \
                        .parquet(S3_OUTPUT_DATA +  "/sales_by_salesperson")
    avg_revenue_by_stage.write.mode('overwrite') \
                        .parquet(S3_OUTPUT_DATA + "/avg_revenue_by_stage")
    closed_opportunities.write.mode('overwrite') \
                        .parquet(S3_OUTPUT_DATA + "/closed_opportunities")
    cleaned_df.write.mode('overwrite') \
                        .parquet(S3_OUTPUT_DATA + "/cleaned_df")
    print('The cleaned data is uploaded')
except:
    print('Something went wrong, please check the logs :P')

if __name__ == '__main__':
    main()

Save it as count.py, and run it with spark-submit:


spark-submit count.py

Once the job completes, check the S3 bucket under the folder cleaned_data, you will see the new transformed and processed data in parquet format.

1.6 Step 6: Terminating the Cluster

Don’t forget to terminate the cluster after you’re done to avoid unnecessary charges. You can terminate your cluster using the terminate-clusters command:


aws emr terminate-clusters --cluster-ids <your-cluster-id>

2. Conclusion

In conclusion, Amazon EMR provides a robust and scalable environment for processing large datasets using popular open-source tools like Apache Spark. Whether you’re performing complex data transformations, training machine learning models, or running interactive SQL queries, EMR provides the tools and the scalability you need to effectively handle your big data workloads.

In this tutorial, we learned how to set up an Amazon EMR cluster, run a Spark job, and terminate the cluster when it’s no longer needed. We’ve only scratched the surface of what’s possible with Amazon EMR and Spark. These tools are highly configurable, and you can optimize and adjust them according to the specific needs of your data processing tasks.

debnsuma

Suman Debnath is a Principal Developer Advocate (Data Engineering) at Amazon Web Services, primarily focusing on Data Engineering, Data Analysis and Machine Learning. He is passionate about large scale distributed systems and is a vivid fan of Python. His background is in storage performance and tool development, where he has developed various performance benchmarking and monitoring tools.