In this notebook, we will go through PySpark GroupBy method. For this exercise, I will be using following data from Kaggle...
https://www.kaggle.com/code/kirichenko17roman/recommender-systems/data
If you don't have PySpark installed, install Pyspark on Linux by clicking here.
from pyspark.sql.functions import sum, col, desc, avg, round, count
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Purchase") \
.config('spark.ui.showConsoleProgress', False) \
.getOrCreate()
Let us look at the data first.
df = spark.read.csv(
"/home/notebooks/kz.csv",
header=True, sep=",")
#show 3 rows of our DataFrame
df.show(3)
df.columns
This is transaction data.
Let us count number of unique transactions by categories.
df.groupBy(['category_code']).count().show(5)
PySpark groupby and count can be run on multiple columns.
df.groupBy(['category_code','brand']).count().show(5)
dfg = df.dropna().groupBy(['category_code'])
dfg.count().show(2)
Most of the times, groupby is followed by aggregate method. Let us say we want to find the average price for each category. Here is how it can be done.
df.dropna().groupBy(['category_code']).agg({'price':'avg'}).show(5)
Note, pyspark has named the average price column to avg(price). We can rename the column name after aggregate method with withColumnRenamed method.
df.dropna().groupBy(['category_code']).agg({'price':'avg'}).withColumnRenamed("avg(price)", "price").show(5)
Another way to rename the column in pyspark is using alias method.
df.dropna().groupBy(['category_code']).agg(avg('price').alias("avg_price")).show(3)
We can also run multiple aggregate methods after groupby. Note F.avg and F.max which we imported above from pyspark.sql.
import pyspark.sql.functions as F
df.dropna().groupBy(['category_code']).agg(F.avg('price'),F.max('price')).show(2)
We can rename the multiple columns using toDF() method as shown below.
Data_list = ["category_code","avg_price","max_price"]
df.dropna().groupBy(['category_code']).agg(F.avg('price'),F.max('price')).toDF(*Data_list).show(2)
or we can use alias method this way...
df.dropna().groupBy(['category_code']).agg(avg('price').alias("avg_price"),F.max('price').alias("max_price")).show(3)
Let us sort the table by max_price.
df.dropna().groupBy(['category_code']).agg(F.avg('price'),F.max('price')).toDF(*Data_list).sort('max_price').show(2)
We can filter results using Filter method. Below code filters the categories which have average price greater than 500.
dfg = df.dropna().groupBy(['category_code']).agg(F.avg('price').alias("avg_price"))
dfg.filter(dfg.avg_price> 500).show(4)
Related Notebooks
- Pandas Groupby Count of Rows In Each Group
- PySpark Distinct Examples
- An Anatomy of Key Tricks in word2vec project with examples
- How to Sort Pandas DataFrame with Examples
- How To Read CSV File Using Python PySpark
- How To Analyze Data Using Pyspark RDD
- How to Create DataFrame in R Using Examples
- Top Non Javascript Boxplot Libraries In R With Examples
- Learn Pygame With Examples