PySpark Distinct Examples

In this notebook, we will go through PySpark Distinct. For this exercise, I will be using following data from Kaggle...
https://www.kaggle.com/code/kirichenko17roman/recommender-systems/data

If you don't have PySpark installed, install Pyspark on Linux by clicking here.

In [ ]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("Purchase") \
    .config('spark.ui.showConsoleProgress', False) \
    .getOrCreate()

Let us look at the data first.

In [2]:
df = spark.read.csv(
    "/home/notebooks/kz.csv", 
    header=True, sep=",")
#show 3 rows of our DataFrame
df.show(3)
+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|          event_time|           order_id|         product_id|        category_id|       category_code|  brand| price|            user_id|
+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|2020-04-24 11:50:...|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|162.01|1515915625441993984|
|2020-04-24 11:50:...|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|162.01|1515915625441993984|
|2020-04-24 14:37:...|2294444024058086220|2273948319057183658|2268105430162997728|electronics.audio...| huawei| 77.52|1515915625447879434|
+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
only showing top 3 rows

In [3]:
df.columns
Out[3]:
['event_time',
 'order_id',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id']

This is transaction data.

PySpark Distinct

Let us check how many rows are in our data.

In [4]:
df.count()
Out[4]:
2633521

To count the distinct rows, we can use distinct() method on the pyspark dataframe.

In [5]:
df.distinct().count()
Out[5]:
2632846

PySpark countDistinct

In [6]:
from pyspark.sql.functions import countDistinct

CountDistinct can be passed to pySpark aggregate function. In the below snippet, we are counting number of unique brands.

In [7]:
df.agg(countDistinct('brand').alias('cnt')).collect()[0].cnt
Out[7]:
23021

We can apply the above command on multiple columns as shown below.

In [8]:
items = df.agg(*(countDistinct(col(c)).alias(c) for c in ['category_code','brand'])).collect()[0]
In [9]:
print('category_code\tbrand\n')
print('%s\t\t%s\n'%(items.category_code,items.brand))
category_code	brand

510		23021

We can also use groupby, agg and countDistinct together. Let us say we want to calculate average price of each brand and also find out how many categories are there for each brand.

In [10]:
from pyspark.sql import functions as F
In [11]:
avg_price = [F.avg('price')]
cnt = [F.countDistinct(c) for c in ['category_code','brand']]
df.groupby('brand').agg(F.avg('price'),F.countDistinct('category_code')).show(5)
+-------------------+------------------+--------------------+
|              brand|        avg(price)|count(category_code)|
+-------------------+------------------+--------------------+
|1515915625450324494|              null|                   3|
|1515915625484616467|              null|                   1|
|1515915625484629529|              null|                   1|
|           sibrtekh| 16.85457142857143|                   2|
|            edifier|15.202325581395337|                   2|
+-------------------+------------------+--------------------+
only showing top 5 rows

Looks like there are lot of rows in data with no price. Let us re-run above command without null rows.

In [12]:
avg_price = [F.avg('price')]
cnt = [F.countDistinct(c) for c in ['category_code','brand']]
df.dropna().groupby('brand').agg(F.avg('price'),F.countDistinct('category_code')).show(5)
+--------+------------------+--------------------+
|   brand|        avg(price)|count(category_code)|
+--------+------------------+--------------------+
|sibrtekh|1.9322222222222223|                   2|
| edifier|15.029576719576713|                   2|
|  vortex| 6.505000000000001|                   1|
| ruggear|54.053461538461534|                   1|
|   sigma| 744.8535714285715|                   1|
+--------+------------------+--------------------+
only showing top 5 rows

PySpark Select Distinct

We can also perform Distinct using SQL select method.

In [13]:
df.select('brand').distinct().count()
Out[13]:
23022
In [14]:
df.select('category_code').distinct().count()
Out[14]:
511

We can repeat above command on multiple columns.

In [15]:
df.select('category_code','brand').distinct().count()
Out[15]:
37631

Conclusion


I hope above examples gave you enough to get started on PySpark Distinct.