PySpark GroupBy Examples

In this notebook, we will go through PySpark GroupBy method. For this exercise, I will be using following data from Kaggle...
https://www.kaggle.com/code/kirichenko17roman/recommender-systems/data

If you don't have PySpark installed, install Pyspark on Linux by clicking here.

In [ ]:
from pyspark.sql.functions import sum, col, desc, avg, round, count
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("Purchase") \
    .config('spark.ui.showConsoleProgress', False) \
    .getOrCreate()

Let us look at the data first.

In [2]:
df = spark.read.csv(
    "/home/notebooks/kz.csv", 
    header=True, sep=",")
#show 3 rows of our DataFrame
df.show(3)
+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|          event_time|           order_id|         product_id|        category_id|       category_code|  brand| price|            user_id|
+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|2020-04-24 11:50:...|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|162.01|1515915625441993984|
|2020-04-24 11:50:...|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|162.01|1515915625441993984|
|2020-04-24 14:37:...|2294444024058086220|2273948319057183658|2268105430162997728|electronics.audio...| huawei| 77.52|1515915625447879434|
+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
only showing top 3 rows

In [3]:
df.columns
Out[3]:
['event_time',
 'order_id',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id']

This is transaction data.

PySpark Groupby Count

Let us count number of unique transactions by categories.

In [4]:
df.groupBy(['category_code']).count().show(5)
+----------------+-----+
|   category_code|count|
+----------------+-----+
|           13.87|11075|
|          350.67|    5|
|computers.ebooks|  884|
|           98.59|    2|
|            3.89| 6997|
+----------------+-----+
only showing top 5 rows

PySpark groupby and count can be run on multiple columns.

In [5]:
df.groupBy(['category_code','brand']).count().show(5)
+--------------------+-------------------+-----+
|       category_code|              brand|count|
+--------------------+-------------------+-----+
|electronics.smart...|               oppo|36349|
|appliances.enviro...|            airline|   52|
|computers.periphe...|               sanc|  584|
|appliances.enviro...|            insight|   11|
|               11.55|1515915625481232307|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

PySpark drop null follow by GroupBy

In [6]:
dfg = df.dropna().groupBy(['category_code'])
In [7]:
dfg.count().show(2)
+--------------------+-----+
|       category_code|count|
+--------------------+-----+
|    computers.ebooks|  398|
|computers.periphe...| 3053|
+--------------------+-----+
only showing top 2 rows

PySpark GroupBy and Aggregate

Most of the times, groupby is followed by aggregate method. Let us say we want to find the average price for each category. Here is how it can be done.

In [8]:
df.dropna().groupBy(['category_code']).agg({'price':'avg'}).show(5)
+--------------------+------------------+
|       category_code|        avg(price)|
+--------------------+------------------+
|    computers.ebooks| 199.6687185929649|
|computers.periphe...| 71.94989518506395|
|construction.tool...|  18.2120273065784|
|appliances.kitche...|43.298406940063074|
|electronics.video...| 401.3619130434783|
+--------------------+------------------+
only showing top 5 rows

Note, pyspark has named the average price column to avg(price). We can rename the column name after aggregate method with withColumnRenamed method.

In [9]:
df.dropna().groupBy(['category_code']).agg({'price':'avg'}).withColumnRenamed("avg(price)", "price").show(5)
+--------------------+------------------+
|       category_code|             price|
+--------------------+------------------+
|    computers.ebooks| 199.6687185929649|
|computers.periphe...| 71.94989518506395|
|construction.tool...|  18.2120273065784|
|appliances.kitche...|43.298406940063074|
|electronics.video...| 401.3619130434783|
+--------------------+------------------+
only showing top 5 rows

Another way to rename the column in pyspark is using alias method.

In [10]:
df.dropna().groupBy(['category_code']).agg(avg('price').alias("avg_price")).show(3)
+--------------------+-----------------+
|       category_code|        avg_price|
+--------------------+-----------------+
|    computers.ebooks|199.6687185929649|
|computers.periphe...|71.94989518506395|
|construction.tool...| 18.2120273065784|
+--------------------+-----------------+
only showing top 3 rows

Pyspark Multiple Aggregate functions

We can also run multiple aggregate methods after groupby. Note F.avg and F.max which we imported above from pyspark.sql.
import pyspark.sql.functions as F

In [11]:
df.dropna().groupBy(['category_code']).agg(F.avg('price'),F.max('price')).show(2)
+--------------------+------------------+----------+
|       category_code|        avg(price)|max(price)|
+--------------------+------------------+----------+
|     accessories.bag| 20.63646942148758|     97.20|
|accessories.umbrella|110.71249999999998|     99.28|
+--------------------+------------------+----------+
only showing top 2 rows

We can rename the multiple columns using toDF() method as shown below.

In [12]:
Data_list = ["category_code","avg_price","max_price"]
df.dropna().groupBy(['category_code']).agg(F.avg('price'),F.max('price')).toDF(*Data_list).show(2)
+--------------------+------------------+---------+
|       category_code|         avg_price|max_price|
+--------------------+------------------+---------+
|     accessories.bag| 20.63646942148758|    97.20|
|accessories.umbrella|110.71249999999998|    99.28|
+--------------------+------------------+---------+
only showing top 2 rows

or we can use alias method this way...

In [13]:
df.dropna().groupBy(['category_code']).agg(avg('price').alias("avg_price"),F.max('price').alias("max_price")).show(3)
+--------------------+------------------+---------+
|       category_code|         avg_price|max_price|
+--------------------+------------------+---------+
|     accessories.bag| 20.63646942148758|    97.20|
|accessories.umbrella|110.71249999999998|    99.28|
|     apparel.costume|21.384999999999998|    27.75|
+--------------------+------------------+---------+
only showing top 3 rows

PySpark GroupBy follow by Aggregate and Sort Method

Let us sort the table by max_price.

In [14]:
df.dropna().groupBy(['category_code']).agg(F.avg('price'),F.max('price')).toDF(*Data_list).sort('max_price').show(2)
+--------------+------------------+---------+
| category_code|         avg_price|max_price|
+--------------+------------------+---------+
|    kids.swing|            115.72|   115.72|
|apparel.tshirt|21.384516129032253|    23.13|
+--------------+------------------+---------+
only showing top 2 rows

PySpark GroupBy follow by Aggregate and Filter method

We can filter results using Filter method. Below code filters the categories which have average price greater than 500.

In [15]:
dfg = df.dropna().groupBy(['category_code']).agg(F.avg('price').alias("avg_price"))
dfg.filter(dfg.avg_price> 500).show(4)
+--------------------+-----------------+
|       category_code|        avg_price|
+--------------------+-----------------+
|electronics.camer...| 670.243984962406|
|construction.tool...|513.4461206896547|
|  computers.notebook|571.6449383765361|
+--------------------+-----------------+

Conclusion


PySpark GroupBy is very powerful method to do data analysis. I hope above examples gave you enough to get started on PySpark GroupBy. Please email me if you want me to add more examples on PySpark Groupby.