!pip install pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
For this exercise, I will use the purchase data. Let us take a look at this data using unix head command. We can run unix commands in Python Jupyter notebook using ! in front of every command.
!head -1 purchases.csv
Firstly, We need to create a spark container by calling SparkSession. This step is necessary before doing anything
from pyspark.sql import SparkSession
from pyspark.sql.types import *
#create session in order to be capable of accessing all Spark API
spark = SparkSession \
.builder \
.appName("Purchase") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
#define data schema for file we want to read
purchaseSchema = StructType([
StructField("Date", DateType(), True),
StructField("Time", StringType(), True),
StructField("City", StringType(), True),
StructField("Item", StringType(), True),
StructField("Total", FloatType(), True),
StructField("Payment", StringType(), True),
])
# read csv file with our defined schema into Spark DataFrame, and use "tab" delimiter
purchaseDataframe = spark.read.csv(
"purchases.csv",
header=True, schema=purchaseSchema, sep="\t")
#show 3 rows of our DataFrame
purchaseDataframe.show(3)
#count number of rows of our dataFrame
num_rows = purchaseDataframe.count()
print("number of rows: ", num_rows)
#show our dataFrame schema
purchaseDataframe.printSchema()
#show statistic of the data we want
purchaseDataframe.describe('Total').show()
Find number of unique values. Find number of unique city names.
purchaseDataframe.select('City').distinct().count()
#create new dataFrame from "City" and "Total" columns
newDataframe = purchaseDataframe.select(purchaseDataframe['City'],
purchaseDataframe['Total'])
# top 10 rows
newDataframe.show(5);
print('=========================')
# schema of dataframe
newDataframe.printSchema()
#filter only row data whose "Total" column value > 300
purchaseDataframe.filter(purchaseDataframe['Total'] > 300).show(5)
# sorting dataframe by city
sortedByCity = purchaseDataframe.orderBy('City').show(10)
numTransactionEachCity = purchaseDataframe.groupBy("City").count()
numTransactionEachCity.show(5)
Since Spark dataFrame is distributed into clusters, we cannot access it by [row,column] as we can do in pandas dataFrame for example. There is an alternative way to do that in Pyspark by creating new column "index". Then, we can use ".filter()" function on our "index" column.
#import monotonically_increasing_id
from pyspark.sql.functions import monotonically_increasing_id
newPurchasedDataframe = purchaseDataframe.withColumn(
"index", monotonically_increasing_id())
newPurchasedDataframe.show(10)
row2Till4 = newPurchasedDataframe.filter((newPurchasedDataframe['index']>=2) &
(newPurchasedDataframe['index']<=4))
row2Till4.show()
Then, to access it by row and column, use ".select()" function we ever used above before.
#particular column value
dataRow2ColumnTotal = newPurchasedDataframe.filter(newPurchasedDataframe['index']==2).select('Total')
dataRow2ColumnTotal.show()
purchaseDataframe.filter(purchaseDataframe.City.isNull()).show()
Below snippet shows how to drop duplicate rows and also how to count duplicate rows in Pyspark
#count the number of original data rows
n1 =purchaseDataframe.count()
print("number of original data rows: ", n1)
#count the number of data rows after deleting duplicated data
n2 = purchaseDataframe.dropDuplicates().count()
print("number of data rows after deleting duplicated data: ", n2)
n3 = n1 - n2
print("number of duplicate rows: ", n3)
Delete row if there is at least one (column) missing data.
PurchaseNoMissingValue = purchaseDataframe.dropDuplicates().dropna(
how="any")# use how="all" for missing data in the entire column
numberOfMissingValueAny = n1 - PurchaseNoMissingValue.count()
print("number of rows with missing data: ", numberOfMissingValueAny)
purchaseDataframe.show(5)
meanTotal = purchaseDataframe.groupBy().avg("Total").take(1)[0][0]
print('Mean total:',meanTotal)