We at Ambee are always dealing with large chunks of data ( ~4TB / day ). Now that’s huge.
You obviously can’t handle it using pandas ( or can you? ) Here is where the “big data” part actually comes into the practice.
There are a lot of things we do to the data. Preprocess, feature engineer, outlier detections, normalisations and what not.
Now, if that was a single csv file of a small sample size, things would have been very simple. Sadly, that isn’t the case.
Working with big data requires tools that are capable of handling big data.
In this blog let’s focus on a small part of the huge pipeline that we have.
ETL
Yes, Extract Transform and Load.
Now how do we it?
We need a service or a platform that:
- Runs everything in parallel
- Is Fast
- Serverless
And that service is AWS Glue.
AWS Glue helps us build event driven ETL Pipelines. It’s pretty awesome, so let’s just dive right into the hands on.
Here is the skeletal structure;
- Import libraries
- Extract
- Transform
- Load
Import Libraries
We import the required libraries in order for glue to work, and also other helper functions from pyspark.
Note: Glue works with glue tables and data catalogs, so make sure you create them before writing glue jobs. Glue tables can be created by crawlers, or by manual schema
#Import neccessary libraries
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from random import randint,shuffle,sample
import pyspark.sql.functions as f
from datetime import datetime
from awsglue.transforms import *
#Import glue modules
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
We also need to make relevant connections to the db, s3 buckets and glue catalog
#Initialize contexts and session
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
session = glue_context.spark_session
#Parameters
glue_db = "glue_db_name"
glue_table = "glue_table_name"
s3_write_path = "path_to_write_the_files_to"
Extract
As the heading describes itself, we need to pull the data from the databases or s3 buckets.
df = glue_context.create_data_frame_from_catalog(database = glue_db, table_name = glue_table)
print("Read the dataframe")
So the above code is creating a dataframe the glue catalog table.
This is the base dataframe that we shall use for all our transformations.
Transform
Here is where you modify the code to your will, your transformations. Below are some examples.
## Transform
#Convert date of type string to date type.
df = df.withColumn('createdAt',f.to_date(f.col('createdAt')))
#Create a new column 'ts' that contains the unix timestamp of 'createdAt'
df = df.withColumn('ts',f.unix_timestamp(f.col('createdAt')))
# Group by using latitude longitude and unixtime stamp and calculate their mean.
df = df.groupBy(['latitude','longitude','ts']).mean()
print("Done with transformations!")
Load
So we read the data from a source ( s3 buckets, databases ), we modified it to our will and need. Now is the time to load it back to the database.
Note: Using write_dynamic_frame slows down if the data set is large. Not if the source data is large, but if the transformed data is large.
#Convert back to dynamic frame
dynamic_frame_write = DynamicFrame.fromDF(df, glue_context, "dynamic_frame_write")
#Write data back to S3
S3bucket_node3 = glue_context.write_dynamic_frame.from_options(
frame = dynamic_frame_write,
connection_type = "s3",
format="glueparquet",
connection_options = {
"path": s3_write_path,
"partitionKeys": ['ts'],
},
format_options={"compression": "gzip"},
transformation_ctx="S3bucket_node3",
)
And that’s it. A small simple, straight to the point ETL Pipeline.
Find the entire code here:
#Import neccessary libraries
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from random import randint,shuffle,sample
import pyspark.sql.functions as f
from datetime import datetime
from awsglue.transforms import *
#Import glue modules
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
#Initialize contexts and session
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
session = glue_context.spark_session
#Parameters
glue_db = "glue_db_name"
glue_table = "glue_table_name"
s3_write_path = "path_to_write_the_files_to"
#Log starting time
dt_start = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("Start time:", dt_start)
df = glue_context.create_data_frame_from_catalog(database = glue_db, table_name = glue_table)
print("Read the dataframe")
#Transform Operations
'''
Add your transformation code in pyspark here
'''
print("Done with transformations!")
#Convert back to dynamic frame
dynamic_frame_write = DynamicFrame.fromDF(df, glue_context, "dynamic_frame_write")
#Write data back to S3
S3bucket_node3 = glue_context.write_dynamic_frame.from_options(
frame = dynamic_frame_write,
connection_type = "s3",
format="glueparquet",
connection_options = {
"path": s3_write_path,
},
format_options={"compression": "gzip"},
transformation_ctx="S3bucket_node3",
)
'''
If there is any column that you want to partition by, use the following code.
connection_options = {
"path": s3_write_path,
"partitionKeys": ['ts'],
},
'''
#Log end time
dt_end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("End time:", dt_end)
Now we just create a glue job, add the script, decide the number of workers and run the job.
Thanks for reading.
Did you know you can use GeoPandas to filter big data? Learn more about how we do it at Ambee. Click here to read on.
Nithin. He loves to code and create new things everyday. He spends his leisure time reading and running his youtube channel.