PySpark expr()

expr(str) function takes in and executes a sql-like expression. It returns a pyspark Column data type. This is useful to execute statements that are not available with Column type and functional APIs. Using expr(), we can use the Pyspark column names in the expressions as shown in the examples below.

First we load the important libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, expr)
In [3]:
# initializing spark session instance
spark = SparkSession.builder.appName('snippets').getOrCreate()

Then load our initial records

In [4]:
columns = ["Name","Salary","Age","Classify"]
data = [("Sam", 1000,20,0), ("Alex", 120000,40,0), ("Peter", 5000,30,0)]

Let us convert our data to rdds. To learn more about Pyspark rdd. check out following link ...
How To Analyze Data Using Pyspark RDD

In [5]:
# converting data to rdds
rdd = spark.sparkContext.parallelize(data)
In [6]:
# Then creating a dataframe from our rdd variable
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
In [7]:
# visualizing current data before manipulation
dfFromRDD2.show()
+-----+------+---+--------+
| Name|Salary|Age|Classify|
+-----+------+---+--------+
|  Sam|  1000| 20|       0|
| Alex|120000| 40|       0|
|Peter|  5000| 30|       0|
+-----+------+---+--------+

1) Here we are changing the "Classify" column upon some condition using the case expression (rather than the built-in pyspark.sql.functions 'when' API which can also be used to achieve the same result):

If Salary less than 5000, it will change column value to 1

If Salary is less than 10000, it will change column value to 2

else, it will change it to 3

In [8]:
# here we update the column "Classify" using the CASE expression. 
# The conditions are based on the values in the Salary column
modified_dfFromRDD2 = dfFromRDD2.withColumn("Classify", expr("CASE WHEN Salary < 5000 THEN 1 "+
                                                            "WHEN Salary < 10000 THEN 2 " +
                                                            "ELSE 3 END"))
In [9]:
# visualizing the modified dataframe 
modified_dfFromRDD2.show()
+-----+------+---+--------+
| Name|Salary|Age|Classify|
+-----+------+---+--------+
|  Sam|  1000| 20|       1|
| Alex|120000| 40|       3|
|Peter|  5000| 30|       2|
+-----+------+---+--------+

2) We can also give a column alias to the SQL expression

In [45]:
# here we updated the column "Classify", CASE expression conditions based on the values in the Salary column
modified_dfFromRDD2 = dfFromRDD2.select("Name", "Salary", "Age", expr("CASE WHEN Salary < 5000 THEN 1 "+
                                                            "WHEN Salary < 10000 THEN 2 " +
                                                            "ELSE 3 END as Classify"))
In [46]:
# visualizing the modified dataframe by using the 'as' for aliasing the resulting column. 
# As you can see, it is exactly the same as the previous output. You can also see the column name by removing the 'as Classify'
modified_dfFromRDD2.show()
+-----+------+---+--------+
| Name|Salary|Age|Classify|
+-----+------+---+--------+
|  Sam|  1000| 20|       1|
| Alex|120000| 40|       3|
|Peter|  5000| 30|       2|
+-----+------+---+--------+

3) We can also use arithmetic operators to perform operations on columns. Below we add 500 to the salary column and add a new column called New_Salary

In [10]:
modified_dfFromRDD3 = dfFromRDD2.withColumn("New_Salary", expr("Salary + 500"))
In [11]:
modified_dfFromRDD3.show()
+-----+------+---+--------+----------+
| Name|Salary|Age|Classify|New_Salary|
+-----+------+---+--------+----------+
|  Sam|  1000| 20|       0|      1500|
| Alex|120000| 40|       0|    120500|
|Peter|  5000| 30|       0|      5500|
+-----+------+---+--------+----------+

We can also use SQL functions with existing column values in expr()

In [12]:
# Here we use the SQL function 'concat' to concatenate the values in two columns i.e. Name and Salary and also a constant string '_'
modified_dfFromRDD4 = dfFromRDD2.withColumn("Name_Salary", expr("concat(Name, '_', Salary)"))
In [13]:
# visualizing the resulting dataframe 
modified_dfFromRDD4.show()
+-----+------+---+--------+-----------+
| Name|Salary|Age|Classify|Name_Salary|
+-----+------+---+--------+-----------+
|  Sam|  1000| 20|       0|   Sam_1000|
| Alex|120000| 40|       0|Alex_120000|
|Peter|  5000| 30|       0| Peter_5000|
+-----+------+---+--------+-----------+

In [14]:
spark.stop()