How To Analyze Data Using Pyspark RDD
In this article, I will go over rdd basics. I will use an example to go through pyspark rdd.
Before we delve in to our rdd example. Make sure you have following libraries installed.
Spark - Checkout how to install Spark
Pyspark - Check out how to install pyspark in Python 3
Now lets import the necessary library packages to initialize our SparkSession.
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)
For this example, I will using a College.csv. Please checkout the following url for detail about the data.
!wget http://faculty.marshall.usc.edu/gareth-james/ISL/College.csv
ls College.csv
How to use parallization in Pyspark rdd
Spark's real power can be leveraged, when we use its paralleliztion feature. My machine has 4 cores. Therefore I can ask Spark to use these 4 cores while performing the data operations. Easiest way to do that is specifying core option while building the sparkcontext using SparkConf.
conf = SparkConf().setAppName("rdd basic").setMaster("local[4]")
Above conf variables contains the setting which we can pass to the Sparkcontext.
sc = SparkContext(conf=conf)
You will run in to following error, because there is already spark context running.
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext
There we will have to stop the running context first before we initialize again with the new config settings. Lets do that.
spark.stop()
Lets run the following code to start our sparksession.
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession(sc)
Read csv file Pyspark
Lets read our csv file using pyspark sqlContext.
df = sqlContext.read.csv('College.csv',header=True)
df is a pyspark dataframe similar in nature to Pandas dataframe. Lets check the datatype using type(df)
type(df)
To see the first row, we can use df.first()
df.first()
If you want to see more than one row, use df.show(n) method whereas n is number of records or rows to print.
df.show(2)
What is pyspark rdd
Ok. Now lets talk about rdd. rdd is a resilient distributed data set which is distributed across the nodes of clusters. Data Operations in rdd are done in memory because of which parallel data operations work very efficiently.
Convert Dataframe to rdd
Lets convert our dataframe to rdd first using df.rdd command.
df_rdd = df.rdd
Lets check the data type again to see it is rdd now.
type(df_rdd)
Every row in rdd is consist of key, value pairs.
Lets count the number of records or rows in our rdd.
df_rdd.count()
Let us print our first row from the rdd using df_rdd.take(1)
df_rdd.take(1)
Operations in pyspark are lazy operations. Meaning until or unless we ask it to compute, it doesnt evaluate the operations on the data. Let me explain through an example.
rdd map function in Pyspark
Lets us print the value of field Apps from the first row in our data. For that we will have to use rdd.map command. rdd.map is like a python lambda function.
print(df_rdd.map(lambda x: x.Apps))
As we see above, printing the above command didnt show the value because the command is not executed yet. To execute, we will have to use the collect() method.
results = df_rdd.map(lambda x: x.Apps).collect()
Lets check the datatype now. It should be a list.
type(results)
How to select nth row in Pyspark rdd
To select nth row, we can use rdd.take(n) method. This way we can select n rows and then index the row we want. Lets select 2nd row. This is how we can do it.
df_rdd.take(2)[1]
To do some data operations, we will have to change the data type for some of the fields. For example if we want to do mathematical operations on the field 'Apps' then it should be a number but currently it is a string. Lets change the data type of 'Apps' field.
How to convert data type string to interger or number in Pyspark rdd
Lets write a Python small function which will do this conversion for us.
from pyspark.sql import Row
def convert_to_int(row,col):
row_dict = row.asDict()
row_dict[col] = int(row_dict[col])
newrow = Row(**row_dict)
return newrow
Ok the above function takes a row which is a pyspark row datatype and the name of the field for which we want to convert the data type.
Now we can feed to our rdd the above function to convert the data type to integer.
df_rdd_new = df_rdd.map(lambda x : convert_to_int(x,'Apps'))
Lets check out the data type of 'Apps' field.
df_rdd_new.map(lambda x: type(x.Apps)).take(1)
How to filter rows in Pyspark rdd
Lets say we want universities with applications more than 2000 number.
df_rdd_new.filter(lambda x: x['Apps'] > 2000).count()
How to sort by key in Pyspark rdd
Since our data has key value pairs, We can use sortByKey() function of rdd to sort the rows by keys. By default it will first sort keys by name from a to z, then would look at key location 1 and then sort the rows by value of ist key from smallest to largest. As we see below, keys have been sorted from a to z per row and then for key at location 1 which is 'Accept' it will sort the values from smallest to largest.
df_rdd_new.sortByKey().take(2)
However we can control both the key to be sorted and sorting order as shown below.
df_rdd_new.sortByKey(False,keyfunc=(lambda x: x[1])).map(lambda x: x['Apps']).take(2)
In above example, we have provided lambda function to chose the key. x[1] is referring to key 'Apps'. Ist option 'False' means, ordered will be from biggest to smallest that is descending.
Wrap Up!
Thats it for now. I will be adding more examples to this post in coming days. Stay Tuned!
Related Topics:
Related Notebooks
- How To Analyze Wikipedia Data Tables Using Python Pandas
- How to Analyze the CSV data in Pandas
- How To Analyze Yahoo Finance Data With R
- How to Visualize Data Using Python - Matplotlib
- How To Read CSV File Using Python PySpark
- Data Analysis With Pyspark Dataframe
- How To Read JSON Data Using Python Pandas
- PySpark concat_ws
- Analyze Corona Virus Cases In India