AWS EMR Serverless
On June 1st 2022 AWS announced the general availability of serverless Elastic Map Reduce (EMR). Amazon EMR is a cloud platform for running large-scale big data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. All running in a distributed manner on a cluster of servers.
Before now, it could be a bit of a faff to correctly specify the type, number and size of the cluster needed to process whatever job you were running successfully in a timely manner. Often, the cluster would be under-provisioned, and the job would fail due to lack of resources or over-provisioned resulting in you spending more than you needed to get the job done.
With EMR serverless, provisioning a compute cluster just became much, much easier and issues such as those I mentioned should be much less likely to happen since you are now able to specify a minimum cluster size to use at the outset of your job. The cluster can then grow - up to a user-specified limit if required - and shrink as needed depending on the process requirements of the job in hand. Also, by default, a cluster will spin down after 15 minutes of inactivity. All of this should save users money in the long term and is a real game-changer in the AWS serverless offering in my view.
In this article, we’re going to describe how to set up an EMR serverless cluster and run a pyspark job on it to perform some simple analytics on data residing in S3. I‘m using a Windows-based system throughout. Note that since we're not specifying a VPC for our serverless set-up, the list of AWS services that EMR serverless can access is limited to S3, AWS Glue, DynamoDB, CloudWatch, KMS, and Secrets Manager within the same AWS Region. To access services such as Redshift and RDS for instance you would need to specify a VPC. The details of how to do that are in the documentation.
Before following along, make sure you have an AWS account and have set up login credentials. The two main ways to set up an EMR serverless environment will probably be via the AWS EMR console or via the AWS command-line interface (CLI). We’ll be using the latter, so the first thing you should do is ensure you have the latest version of the AWS CLI available. To check if you have a suitable version, type the following at your system command line prompt:-
C:\> aws emr-serverless help
If the above command returns sensible output, you’re good to go, otherwise, click on the below link to get the latest CLI version.
https://aws.amazon.com/cli/
Before we get to the EMR set-up part let’s look at the pyspark job we'll run on our cluster and the data it's going to process.
On S3 I have a data set I frequently use for big data-type exploratory analysis. It’s a pipe-separated 21GB text file containing approximately 335 million records. The first 10 records look like this:
+-----+------+-------+-------------------+---+------+---+----+-------+-------+------+----+----+----+ | id|period| c3| c4| c5| c6| c7| c8| c9| c10| c11| c12| c13| c14| +-----+------+-------+-------------------+---+------+---+----+-------+-------+------+----+----+----+ |18511| 1|2587198|2004-03-31 00:00:00| 0|100000| 0|1.97|0.49988| 100000| null|null|null|null| |18511| 2|2587198|2004-06-30 00:00:00| 0|160000| 0| 3.2|0.79669| 60000| 60.0|null|null|null| |18511| 3|2587198|2004-09-30 00:00:00| 0|160000| 0|2.17|0.79279| 0| 0.0|null|null|null| |18511| 4|2587198|2004-09-30 00:00:00| 0|160000| 0|1.72|0.79118| 0| 0.0|null|null|null| |18511| 5|2587198|2005-03-31 00:00:00| 0| 0| 0| 0.0| 0.0|-160000|-100.0|null|null| 19| |18511| 1|2587940|2004-03-31 00:00:00| 0|240000| 0|0.78|0.27327| 240000| null|null|null|null| |18511| 2|2587940|2004-06-30 00:00:00| 0|560000| 0|1.59|0.63576| 320000|133.33|null| 24|null| |18511| 3|2587940|2004-09-30 00:00:00| 0|560000| 0|1.13|0.50704| 0| 0.0|null|null|null| |18511| 4|2587940|2004-09-30 00:00:00| 0|560000| 0|0.96|0.50704| 0| 0.0|null|null|null| |18511| 5|2587940|2005-03-31 00:00:00| 0| 0| 0| 0.0| 0.0|-560000|-100.0|null|null| 14|
The content of this data set is not that important, suffice to say that the second field in the above file (period) contains 56 unique integer values ranging from 1 to 56. For the purpose of this article, our pyspark code will simply read in the file contents to a data frame, then group by the period column and display a list of unique periods and their associated counts. If the above data was in a database table, the equivalent SQL for this operation would be:
select period, count() as cnt from mytable group by period
The pyspark code (period_group.py) to do this is pretty straightforward and shown below.
# period_group.py import sys from datetime import date from pyspark.sql import SparkSession from pyspark.sql.types import StructField, StructType, DoubleType, LongType,IntegerType, TimestampType if __name__ == "__main__": spark = SparkSession.builder.appName("GroupCount").getOrCreate() #define the schema definition of our data file myschema = StructType ([ StructField("id",IntegerType(),True),StructField("period",IntegerType(), True), StructField("c3", IntegerType(),True),StructField("c4", TimestampType(), True), StructField("c5", IntegerType(), True),StructField("c6", LongType(), True), StructField("c7", IntegerType(), True),StructField("c8", DoubleType(), True), StructField("c9", DoubleType(), True), StructField("c10",LongType(), True), StructField("c11",DoubleType(), True),StructField("c12", IntegerType(), True), StructField("c13",IntegerType(), True),StructField("c14", IntegerType(), True) ]) df = spark.read.format("csv").option("header", "false")\ .schema(myschema).option("delimiter", '|')\ .load("s3a://mybucket-serverless/myinput/myfile.csv") df.count() # prints 335021097 on my system periods = df.groupBy("period").count() periods.show(5) # e.g sample output # +------+-------+ # |period| count| # +------+-------+ # | 31|6005203| # | 53|7362091| # | 34|6265175| # | 28|6106842| # | 26|6082665| # +------+-------+ # only showing top 5 rows # Saves periods dataframe to S3 in the JSON format. periods.write.format("json").save("s3a://mybucket-serverless/myoutput/")
Now we can look at how we set up our EMR serverless cluster and submit a spark job to run on it.
1) Copy your PySpark file to an S3 location in the same region you intend for your EMR serverless cluster.
2) Create an IAM role with an attached policy that grants permissions for EMR Serverless.
# First, create a file called role-policy.json with the following content { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "emr-serverless.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } # now create the IAM role with c:\> aws iam create-role --role-name emr-serverless-job-role --assume-role-policy-document file://role-policy.json
NB If the above command successfully executes it will return some JSON text indicating the job role ARN. Note this as you will need it when submitting jobs to your cluster.
3) As our Spark job will be reading from and writing to S3, assign required S3 permissions to the role we just created.
# First, create a file called s3perm.json with the following content { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadS3Data", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mybucket-serverless", "arn:aws:s3:::mybucket-serverless/*" ] }, { "Sid": "WriteS3Data", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:::mybucket-serverless/*" ] } ] } # now attach the policy to the IAM role previously created c:\> aws iam put-role-policy --role-name emr-serverless-job-role --policy-name S3Access --policy-document file://s3perm.json
4) Create our serverless application.
Note that "application" in these terms refers to the EMR cluster, not the pyspark code. The spark code that runs on the cluster is referred to as a job in the EMR serverless world. We create two JSON files for this part, one that specifies our initial cluster capacity (init-cap.json) and one that specifies the maximum cluster capacity our job can provision (max-cap.json).
# init-cap.json # { "DRIVER": { "workerCount": 2, "workerConfiguration": { "cpu": "2vCPU", "memory": "4GB" } }, "EXECUTOR": { "workerCount": 10, "workerConfiguration": { "cpu": "2vCPU", "memory": "4GB" } } } # max-cap.json # "maximumCapacity": { "cpu": "200 vCPU", "memory": "200 GB", "disk": "1000 GB" } # Now we create or cluster c:\> aws emr-serverless create-application --type SPARK --name serverless-demo --release-label "emr-6.6.0" --initial-capacity file://init-cap.json --maximum-capacity file://max-cap.json
The only mandatory parts of the create-application command are the type, name and release label. Using these will get you a cluster with AWS default values, but in our example, we've specified exactly how we want our cluster to be provisioned. The JSON in the init-cap.json file specifies what is called a pre-initialised capacity. This effectively creates a warm pool of workers for an application which means that submitted jobs start within seconds. We have also specified the maximum resources that we want to be provisioned using the JSON in the max-cap.json file. The different types of worker configurations are shown below.
CPU Memory Default ephemeral storage --- ------ -------------------------- 1 vCPU Minimum 2 GB, maximum 8 GB, 20GB in 1 GB increments 2 vCPU Minimum 4 GB, maximum 16 GB, 20GB in 1 GB increments 4 vCPU Minimum 8 GB, maximum 30 GB, 20GB in 1 GB increments
An application by default is configured to auto-stop when idle for 15 minutes. When an application changes to the STOPPED state, it releases any configured pre-initialized capacity. You can modify the amount of idle time before an application auto-stops or turn this feature off.
If the above command executes successfully it will return some JSON text showing the main properties of the application including the application ID and the application state. Initially, the state field will show as CREATING. Before running our actual pyspark code on the cluster we need to wait until the state is set to CREATED.
You can get what state the application is in by typing in (substituting your own application id that was returned above) :
c:\> aws emr-serverless get-application --application-id <your_application_id>
So, when it shows as CREATED you can perform the next step. That might take a minute or two.
5) Submitting a spark job to the EMR serverless cluster.
As before we need to create two JSON files first. driver.json should contain the following:
# driver.json # { "sparkSubmit": { "entryPoint": "s3://mybucket-serverless/mycode/period_group.py", "sparkSubmitParameters": "--conf spark.driver.cores=1 --conf spark.driver.memory=3g --conf spark.executor.cores=2 --conf spark.executor.memory=3g --conf spark.executor.instances=10" } }
And monitor.json contains:-
#monitor.json # { "monitoringConfiguration": { "s3MonitoringConfiguration": { "logUri": "s3://mybucket-serverless/logs/" } } }
We submit the job via:
c:\> aws emr-serverless start-job-run --application-id <YOUR_APPLICATION_ID> --execution-role-arn ,<YOUR_JOB_ROLE_ARN> --job-driver file://driver.json --configuration-overrides --executionTimeoutMinutes 15 file://monitor.json
When this command executes successfully it returns a snippet of JSON text that will indicate what your job run ID is. You can use this to monitor how the spark job is proceeding by using the command below. Each job run has a set timeout. If the job run runs beyond this duration, it will be automatically cancelled. The default timeout is 12 hours. When you start your job run, you can configure this timeout setting to a value that meets your job requirements with the executionTimeoutInMinutes property.
c:\> aws emr-serverless get-job-run --application-id <YOUR_APPLICATION_ID> --job-run-id <YOUR_JOB_RUN_ID>
When the above command runs, it will display JSON text indicating the status of your job. One of the fields shown will be the state which will go from RUNNING to SUCCESS. After your job has succeeded you can look up the spark DRIVER log output on S3. There you should see two files called stderr.gz and stdout.gz which can be examined further to make sure the job has done what you expected.
It’s also possible to monitor the job via the SPARK UI as it's running by using a pre-built Docker container supplied by AWS, but that’s for another article maybe. You can check all that out via the links provided at the end of the article.
If you’re happy that all has worked as expected and you don’t have any more use for your cluster you can spin it down and delete your application and role.
c:\> aws emr-serverless stop-application --application-id <YOUR_APPLICATION_ID> c:\> aws emr-serverless delete-application --application-id <YOUR_APPLICATION_ID> c:\> aws iam delete-role --role-name emr-serverless-job-role
More information on AWS EMR serverless can be found at the following links:
https://github.com/aws-samples/emr-serverless-samples
That's all I have for now. If you found the contents of this article useful please like and help spread the knowledge around too by re-sharing.