How To Analyze Data Using Pyspark RDD

In this article, I will go over rdd basics. I will use an example to go through pyspark rdd.

Before we delve in to our rdd example. Make sure you have following libraries installed.

Spark - Checkout how to install Spark

Pyspark - Check out how to install pyspark in Python 3

Now lets import the necessary library packages to initialize our SparkSession.

In [1]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

For this example, I will using a College.csv. Please checkout the following url for detail about the data.

In [2]:
In [3]:
ls College.csv

How to use parallization in Pyspark rdd

Spark's real power can be leveraged, when we use its paralleliztion feature. My machine has 4 cores. Therefore I can ask Spark to use these 4 cores while performing the data operations. Easiest way to do that is specifying core option while building the sparkcontext using SparkConf.

In [4]:
conf = SparkConf().setAppName("rdd basic").setMaster("local[4]")

Above conf variables contains the setting which we can pass to the Sparkcontext.

In [5]:
sc = SparkContext(conf=conf)

You will run in to following error, because there is already spark context running.

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext

There we will have to stop the running context first before we initialize again with the new config settings. Lets do that.

In [6]:

Lets run the following code to start our sparksession.

In [7]:
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

Read csv file Pyspark

Lets read our csv file using pyspark sqlContext.

In [8]:
df ='College.csv',header=True)

df is a pyspark dataframe similar in nature to Pandas dataframe. Lets check the datatype using type(df)

In [9]:

To see the first row, we can use df.first()

In [10]:
Row(_c0='Abilene Christian University', Private='Yes', Apps='1660', Accept='1232', Enroll='721', Top10perc='23', Top25perc='52', F.Undergrad='2885', P.Undergrad='537', Outstate='7440', Room.Board='3300', Books='450', Personal='2200', PhD='70', Terminal='78', S.F.Ratio='18.1', perc.alumni='12', Expend='7041', Grad.Rate='60')

If you want to see more than one row, use method whereas n is number of records or rows to print.

In [11]:
|                 _c0|Private|Apps|Accept|Enroll|Top10perc|Top25perc|F.Undergrad|P.Undergrad|Outstate|Room.Board|Books|Personal|PhD|Terminal|S.F.Ratio|perc.alumni|Expend|Grad.Rate|
|Abilene Christian...|    Yes|1660|  1232|   721|       23|       52|       2885|        537|    7440|      3300|  450|    2200| 70|      78|     18.1|         12|  7041|       60|
|  Adelphi University|    Yes|2186|  1924|   512|       16|       29|       2683|       1227|   12280|      6450|  750|    1500| 29|      30|     12.2|         16| 10527|       56|
only showing top 2 rows

What is pyspark rdd

Ok. Now lets talk about rdd. rdd is a resilient distributed data set which is distributed across the nodes of clusters. Data Operations in rdd are done in memory because of which parallel data operations work very efficiently.

Convert Dataframe to rdd

Lets convert our dataframe to rdd first using df.rdd command.

In [12]:
df_rdd = df.rdd

Lets check the data type again to see it is rdd now.

In [13]:

Every row in rdd is consist of key, value pairs.

Lets count the number of records or rows in our rdd.

In [14]:

Let us print our first row from the rdd using df_rdd.take(1)

In [15]:
[Row(_c0='Abilene Christian University', Private='Yes', Apps='1660', Accept='1232', Enroll='721', Top10perc='23', Top25perc='52', F.Undergrad='2885', P.Undergrad='537', Outstate='7440', Room.Board='3300', Books='450', Personal='2200', PhD='70', Terminal='78', S.F.Ratio='18.1', perc.alumni='12', Expend='7041', Grad.Rate='60')]

Operations in pyspark are lazy operations. Meaning until or unless we ask it to compute, it doesnt evaluate the operations on the data. Let me explain through an example.

rdd map function in Pyspark

Lets us print the value of field Apps from the first row in our data. For that we will have to use command. is like a python lambda function.

In [16]:
print( x: x.Apps))
PythonRDD[26] at RDD at PythonRDD.scala:53

As we see above, printing the above command didnt show the value because the command is not executed yet. To execute, we will have to use the collect() method.

In [17]:
results = x: x.Apps).collect()

Lets check the datatype now. It should be a list.

In [18]:

How to select nth row in Pyspark rdd

To select nth row, we can use rdd.take(n) method. This way we can select n rows and then index the row we want. Lets select 2nd row. This is how we can do it.

In [19]:
Row(_c0='Adelphi University', Private='Yes', Apps='2186', Accept='1924', Enroll='512', Top10perc='16', Top25perc='29', F.Undergrad='2683', P.Undergrad='1227', Outstate='12280', Room.Board='6450', Books='750', Personal='1500', PhD='29', Terminal='30', S.F.Ratio='12.2', perc.alumni='16', Expend='10527', Grad.Rate='56')

To do some data operations, we will have to change the data type for some of the fields. For example if we want to do mathematical operations on the field 'Apps' then it should be a number but currently it is a string. Lets change the data type of 'Apps' field.

How to convert data type string to interger or number in Pyspark rdd

Lets write a Python small function which will do this conversion for us.

In [20]:
from pyspark.sql import Row
def convert_to_int(row,col):
  row_dict = row.asDict()
  row_dict[col] = int(row_dict[col])
  newrow = Row(**row_dict)
  return newrow

Ok the above function takes a row which is a pyspark row datatype and the name of the field for which we want to convert the data type.

Now we can feed to our rdd the above function to convert the data type to integer.

In [21]:
df_rdd_new = x : convert_to_int(x,'Apps'))

Lets check out the data type of 'Apps' field.

In [22]: x: type(x.Apps)).take(1)

How to filter rows in Pyspark rdd

Lets say we want universities with applications more than 2000 number.

In [23]:
df_rdd_new.filter(lambda x: x['Apps'] > 2000).count()

How to sort by key in Pyspark rdd

Since our data has key value pairs, We can use sortByKey() function of rdd to sort the rows by keys. By default it will first sort keys by name from a to z, then would look at key location 1 and then sort the rows by value of ist key from smallest to largest. As we see below, keys have been sorted from a to z per row and then for key at location 1 which is 'Accept' it will sort the values from smallest to largest.

In [24]:
[Row(Accept='1005', Apps=1286, Books='500', Enroll='363', Expend='8024', F.Undergrad='1363', Grad.Rate='72', Outstate='13900', P.Undergrad='74', Personal='900', PhD='75', Private='Yes', Room.Board='4300', S.F.Ratio='14', Terminal='81', Top10perc='16', Top25perc='37', _c0='Lycoming College', perc.alumni='32'),
 Row(Accept='1005', Apps=1563, Books='500', Enroll='240', Expend='6562', F.Undergrad='1380', Grad.Rate='57', Outstate='5542', P.Undergrad='136', Personal='1000', PhD='65', Private='No', Room.Board='4330', S.F.Ratio='14.2', Terminal='71', Top10perc='1', Top25perc='19', _c0='North Adams State College', perc.alumni='17')]

However we can control both the key to be sorted and sorting order as shown below.

In [25]:
df_rdd_new.sortByKey(False,keyfunc=(lambda x: x[1])).map(lambda x: x['Apps']).take(2)
[2186, 582]

In above example, we have provided lambda function to chose the key. x[1] is referring to key 'Apps'. Ist option 'False' means, ordered will be from biggest to smallest that is descending.

Wrap Up!

Thats it for now. I will be adding more examples to this post in coming days. Stay Tuned!

Related Topics: