PySpark lit()

lit and typedLit functions are used to add a new Column to the DataFrame using a constant/literal value. Both these functions take in a constant and return a Column data type.

Let us first we load the important libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (array, col, create_map, lit, when)
In [3]:
# initializing spark session instance
spark = SparkSession.builder.appName('snippets').getOrCreate()

Then load our initial records

In [4]:
columns = ["Name","Salary","Age"]
data = [("Sam", 1000,20), ("Alex", 120000,40), ("Peter", 5000,30)]
In [5]:
# converting data to rdds
rdd = spark.sparkContext.parallelize(data)
In [6]:
# Then creating a dataframe from our rdd variable
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
In [7]:
# visualizing current data before manipulation
dfFromRDD2.show()
+-----+------+---+
| Name|Salary|Age|
+-----+------+---+
|  Sam|  1000| 20|
| Alex|120000| 40|
|Peter|  5000| 30|
+-----+------+---+

1) Here we are adding a column called "Classify" to all the rows of the dataframe and assigning a value 0 using lit function. In the first example, we will achieve this by using withColumn

In [8]:
# here we add the column "Classify" using withColumn and lit
modified_dfFromRDD2 = dfFromRDD2.withColumn("Classify",lit(0))
In [9]:
# visualizing the modified dataframe 
modified_dfFromRDD2.show()
+-----+------+---+--------+
| Name|Salary|Age|Classify|
+-----+------+---+--------+
|  Sam|  1000| 20|       0|
| Alex|120000| 40|       0|
|Peter|  5000| 30|       0|
+-----+------+---+--------+

2) We can also add a new column using 'select' function to select all the columns including the new column we want to add. Then we use the 'alias' function to give column name and using 'lit' to assign column value

In [10]:
# here we updated the column "Classify", CASE expression conditions based on the values in the Salary column
modified_dfFromRDD2 = dfFromRDD2.select("Name", "Salary", "Age", lit(0).alias("Classify"))
# visualizing the modified dataframe by using the 'alias' for aliasing the new column. 
In [11]:
# As you can see, it is exactly the same as the previous output.
modified_dfFromRDD2.show()
+-----+------+---+--------+
| Name|Salary|Age|Classify|
+-----+------+---+--------+
|  Sam|  1000| 20|       0|
| Alex|120000| 40|       0|
|Peter|  5000| 30|       0|
+-----+------+---+--------+

3) We can also apply some conditions using when/otherwise functions and use them with withColumn and lit functions to add a new column "Classify" with different constant values of each row.

In [12]:
# Note in the below conditions, the resultant constant values 1, 2 and 3 can assigned without 'lit' as well
modified_dfFromRDD3 = dfFromRDD2.withColumn("Classify", when(col('Salary') < 5000, lit(1))\
                                                       .when(col('Salary') < 10000, lit(2))\
                                                       .otherwise(lit(3)))
In [13]:
modified_dfFromRDD3.show()
+-----+------+---+--------+
| Name|Salary|Age|Classify|
+-----+------+---+--------+
|  Sam|  1000| 20|       1|
| Alex|120000| 40|       3|
|Peter|  5000| 30|       2|
+-----+------+---+--------+

TypedLit: The Scala and Java APIs provide a function called typedLit to handle collection types like arrays (or lists), maps (or dictionaries) etc. However, this API doesn't exist in Pyspark (as of now). The workaround to handle collection types in Pyspark is shown below.

In [14]:
# We use the array function to create a array column called Scores using 'lit' each item in array
modified_dfFromRDD4 = dfFromRDD2.withColumn("Scores", array([lit(0), lit(1), lit(2)]))
In [14]:
# visualizing the resulting dataframe 
modified_dfFromRDD4.show()
+-----+------+---+---------+
| Name|Salary|Age|   Scores|
+-----+------+---+---------+
|  Sam|  1000| 20|[0, 1, 2]|
| Alex|120000| 40|[0, 1, 2]|
|Peter|  5000| 30|[0, 1, 2]|
+-----+------+---+---------+

In [15]:
# We use the create_map function to create a map column called Day_Score_Map using 'lit' each key and value in the map
modified_dfFromRDD5 = dfFromRDD2.withColumn("Day_Score_Map", create_map([lit('D1'), lit(1), lit('D2'), lit(2)]))
In [16]:
# visualizing the resulting dataframe 
modified_dfFromRDD5.show()
+-----+------+---+------------------+
| Name|Salary|Age|     Day_Score_Map|
+-----+------+---+------------------+
|  Sam|  1000| 20|{D1 -> 1, D2 -> 2}|
| Alex|120000| 40|{D1 -> 1, D2 -> 2}|
|Peter|  5000| 30|{D1 -> 1, D2 -> 2}|
+-----+------+---+------------------+

In [17]:
spark.stop()