How to Calculate the Difference Between Rows in PySpark

In PySpark, the difference between rows can be calculated using the Window functions, which allow us to define a window and partition the rows within it based on the columns specified. After that, the difference between the rows can be calculated using the lag() function, which takes the value from the previous row as its argument. This allows us to find the difference between the values in the current row and the previous row.


You can use the following syntax to calculate the difference between rows in a PySpark DataFrame:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

#define window
w = Window.partitionBy('employee').orderBy('period')

#calculate difference between rows of sales values, grouped by employee
df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w))

This particular example calculates the difference in values between consecutive rows in the sales column, grouped by values in the employee column.

The following example shows how to use this syntax in practice.

Example: Calculate Difference Between Rows in PySpark

Suppose we have the following PySpark DataFrame that contains information about sales made by various employees at some company during different sales periods:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data = [['A', 1, 18], 
        ['A', 2, 20], 
        ['A', 3, 25], 
        ['A', 4, 40], 
        ['B', 1, 34], 
        ['B', 2, 32],
        ['B', 3, 19]] 
  
#define column names
columns = ['employee', 'period', 'sales'] 
  
#create dataframe using data and column names
df = spark.createDataFrame(data, columns) 
  
#view dataframe
df.show()

+--------+------+-----+
|employee|period|sales|
+--------+------+-----+
|       A|     1|   18|
|       A|     2|   20|
|       A|     3|   25|
|       A|     4|   40|
|       B|     1|   34|
|       B|     2|   32|
|       B|     3|   19|
+--------+------+-----+

We can use the following syntax to calculate the difference in values between consecutive rows in the sales column, grouped by values in the employee column:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

#define window
w = Window.partitionBy('employee').orderBy('period')

#calculate difference between rows of sales values, grouped by employee
df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w))

#view new DataFrame
df_new.show()

+--------+------+-----+----------+
|employee|period|sales|sales_diff|
+--------+------+-----+----------+
|       A|     1|   18|      null|
|       A|     2|   20|         2|
|       A|     3|   25|         5|
|       A|     4|   40|        15|
|       B|     1|   34|      null|
|       B|     2|   32|        -2|
|       B|     3|   19|       -13|
+--------+------+-----+----------+

The new column named sales_diff shows the difference in values between consecutive rows in the sales column.

Note that the first value in the sales_diff column for each employee is null since there is no previous value to calculate the difference for.

If you’d like, you can use the fillna function to replace these null values with zero:

#replace null values with 0 in sales_diff column
df_new.fillna(0, 'sales_diff').show()

+--------+------+-----+----------+
|employee|period|sales|sales_diff|
+--------+------+-----+----------+
|       A|     1|   18|         0|
|       A|     2|   20|         2|
|       A|     3|   25|         5|
|       A|     4|   40|        15|
|       B|     1|   34|         0|
|       B|     2|   32|        -2|
|       B|     3|   19|       -13|
+--------+------+-----+----------+

Each of the null values in the sales_diff column have now been replaced with zero.

Note: You can find the complete documentation for the PySpark lag function .

The following tutorials explain how to perform other common tasks in PySpark:

x