How can I calculate the difference between rows in PySpark?

Calculating the difference between rows in PySpark refers to the process of finding the numerical or categorical difference between two consecutive rows in a PySpark DataFrame. This can be achieved by using various built-in functions and methods such as lag, lead, and diff. These functions allow for the comparison of values in different rows and provide the ability to perform calculations on the resulting differences. By utilizing these functions, users can easily calculate the changes or variances between rows, which can be useful in various data analysis and manipulation tasks.

PySpark: Calculate the Difference Between Rows


You can use the following syntax to calculate the difference between rows in a PySpark DataFrame:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

#define window
w = Window.partitionBy('employee').orderBy('period')

#calculate difference between rows of sales values, grouped by employee
df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w))

This particular example calculates the difference in values between consecutive rows in the sales column, grouped by values in the employee column.

The following example shows how to use this syntax in practice.

Example: Calculate Difference Between Rows in PySpark

Suppose we have the following PySpark DataFrame that contains information about sales made by various employees at some company during different sales periods:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data = [['A', 1, 18], 
        ['A', 2, 20], 
        ['A', 3, 25], 
        ['A', 4, 40], 
        ['B', 1, 34], 
        ['B', 2, 32],
        ['B', 3, 19]] 
  
#define column names
columns = ['employee', 'period', 'sales'] 
  
#create dataframe using data and column names
df = spark.createDataFrame(data, columns) 
  
#view dataframe
df.show()

+--------+------+-----+
|employee|period|sales|
+--------+------+-----+
|       A|     1|   18|
|       A|     2|   20|
|       A|     3|   25|
|       A|     4|   40|
|       B|     1|   34|
|       B|     2|   32|
|       B|     3|   19|
+--------+------+-----+

We can use the following syntax to calculate the difference in values between consecutive rows in the sales column, grouped by values in the employee column:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

#define window
w = Window.partitionBy('employee').orderBy('period')

#calculate difference between rows of sales values, grouped by employee
df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w))

#view new DataFrame
df_new.show()

+--------+------+-----+----------+
|employee|period|sales|sales_diff|
+--------+------+-----+----------+
|       A|     1|   18|      null|
|       A|     2|   20|         2|
|       A|     3|   25|         5|
|       A|     4|   40|        15|
|       B|     1|   34|      null|
|       B|     2|   32|        -2|
|       B|     3|   19|       -13|
+--------+------+-----+----------+

The new column named sales_diff shows the difference in values between consecutive rows in the sales column.

Note that the first value in the sales_diff column for each employee is null since there is no previous value to calculate the difference for.

If you’d like, you can use the fillna function to replace these null values with zero:

#replace null values with 0 in sales_diff column
df_new.fillna(0, 'sales_diff').show()

+--------+------+-----+----------+
|employee|period|sales|sales_diff|
+--------+------+-----+----------+
|       A|     1|   18|         0|
|       A|     2|   20|         2|
|       A|     3|   25|         5|
|       A|     4|   40|        15|
|       B|     1|   34|         0|
|       B|     2|   32|        -2|
|       B|     3|   19|       -13|
+--------+------+-----+----------+

Each of the null values in the sales_diff column have now been replaced with zero.

Note: You can find the complete documentation for the PySpark lag function .

Additional Resources

The following tutorials explain how to perform other common tasks in PySpark:

x