Open In App

Ranking Duplicate Values of a Column in Incremental Order in PySpark

Last Updated : 11 Jul, 2024
Improve
Improve
Like Article
Like
Save
Share
Report

In data processing, it is often necessary to rank or order the values within the column especially when dealing with the duplicate values. The Ranking duplicate values in an incremental order helps in the various data analytics tasks such as the identifying the sequence of the events prioritizing tasks or generating the unique identifiers. The PySpark, the Python API for the Apache Spark offers powerful tools for the handling large-scale data processing and can efficiently perform ranking the operations on the large datasets. This article will guide we through the process of the ranking duplicate values in the column using the PySpark.

Understanding Window Functions in PySpark

The PySpark provides several functions to the rank or order data within the DataFrames. Window functions in PySpark are used to perform operations on a set of rows that are related to the current row. These functions are particularly useful for tasks such as ranking, cumulative sums, moving averages, and more. The main window functions used for ranking are:

  • rank(): Assigns ranks to rows within a window partition, leaving gaps in rank values if there are ties.
  • dense_rank(): Similar to rank(), but does not leave gaps in rank values.
  • row_number(): Assigns unique consecutive numbers to rows within a window partition.

The PySpark Solution: A Step-by-Step Approach

Let’s dive into an example to illustrate how to rank duplicate values in the column incrementally using PySpark.

Set Up the PySpark Environment

First, ensure that we have PySpark installed. we can install it using pip if we haven’t already:

pip install pyspark

Next, import the necessary modules and create a Spark session:

Python
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number

# Create a Spark session
spark = SparkSession.builder.appName("RankDuplicates").getOrCreate()

Creating a Sample DataFrame

Let’s create a sample DataFrame to demonstrate the ranking of duplicate values:

Python
# Sample data
data = [
    (1, 1101),
    (2, 1102),
    (3, 1101),
    (4, 1102)
]

# Column names
columns = ["customer_id", "trigger_id"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

Output:

+-----------+----------+
|customer_id|trigger_id|
+-----------+----------+
| 1| 1101|
| 2| 1102|
| 3| 1101|
| 4| 1102|
+-----------+----------+

1. Ranking Duplicate Values Using rank()

To rank duplicate values in incremental order, we can use the rank() function along with a window specification:

Python
# Define the window specification
window_spec = Window.partitionBy("trigger_id").orderBy("customer_id")

# Apply the rank function
df_rank = df.withColumn("rank", rank().over(window_spec))
df_rank.show()

Output:

+-----------+----------+----+
|customer_id|trigger_id|rank|
+-----------+----------+----+
| 1| 1101| 1|
| 3| 1101| 2|
| 2| 1102| 1|
| 4| 1102| 2|
+-----------+----------+----+

2. Ranking Duplicate Values Using dense_rank()

The dense_rank() function can be used similarly to rank(), but it does not leave gaps in the ranking sequence:

Python
# Apply the dense_rank function
df_dense_rank = df.withColumn("dense_rank", dense_rank().over(window_spec))
df_dense_rank.show()

Output:

+-----------+----------+----------+
|customer_id|trigger_id|dense_rank|
+-----------+----------+----------+
| 1| 1101| 1|
| 3| 1101| 2|
| 2| 1102| 1|
| 4| 1102| 2|
+-----------+----------+----------+

Comparing rank()dense_rank(), and row_number()

To understand the differences between these functions, let’s apply all three to the DataFrame:

Python
# Apply the row_number function
df_row_number = df.withColumn("row_number", row_number().over(window_spec))

# Show all rankings
df_all_ranks = df_rank.join(df_dense_rank, ["customer_id", "trigger_id"]) \
                      .join(df_row_number, ["customer_id", "trigger_id"])
df_all_ranks.show()

Output:

+-----------+----------+----+----------+----------+
|customer_id|trigger_id|rank|dense_rank|row_number|
+-----------+----------+----+----------+----------+
| 1| 1101| 1| 1| 1|
| 3| 1101| 2| 2| 2|
| 2| 1102| 1| 1| 1|
| 4| 1102| 2| 2| 2|
+-----------+----------+----+----------+----------+

Splitting DataFrames Based on Rank

You may want to split the DataFrame into two separate DataFrames based on even and odd ranks. Here’s how you can do it:

Python
# Split DataFrame into even and odd ranks
df_even_rank = df_rank.filter(df_rank.rank % 2 == 0)
df_odd_rank = df_rank.filter(df_rank.rank % 2 != 0)

# Show even rank DataFrame
df_even_rank.show()

# Show odd rank DataFrame
df_odd_rank.show()

Even Rank Output:

+-----------+----------+----+
|customer_id|trigger_id|rank|
+-----------+----------+----+
| 3| 1101| 2|
| 4| 1102| 2|
+-----------+----------+----+

Odd Rank Output:

+-----------+----------+----+
|customer_id|trigger_id|rank|
+-----------+----------+----+
| 1| 1101| 1|
| 2| 1102| 1|
+-----------+----------+----+

Conclusion

Ranking duplicate values in a column incrementally is a common task in data analysis, and PySpark provides efficient tools to accomplish this. By using window functions such as rank(), dense_rank(), and row_number(), you can easily rank and manipulate your data according to your needs.

In this article, we covered:

  • Setting up PySpark and creating a sample DataFrame.
  • Using window functions to rank duplicate values.
  • Comparing different ranking functions.
  • Splitting DataFrames based on rank.


Previous Article
Next Article

Similar Reads

MultiLabel Ranking Metrics - Ranking Loss | ML
Ranking Loss is defined as the number of incorrectly ordered labels with respect to the number of correctly ordered labels. The best value of ranking loss can be zero Given a binary indicator matrix of ground-truth labels [Tex]y\epsilon \left \{ 0, 1 \right \}^{n_{samples} * n_{labels}}[/Tex] The score associated with each label is denoted by [Tex]
3 min read
Multilabel Ranking Metrics-Label Ranking Average Precision | ML
Label Ranking average precision (LRAP) measures the average precision of the predictive model but instead using precision-recall. It measures the label rankings of each sample. Its value is always greater than 0. The best value of this metric is 1. This metric is related to average precision but used label ranking instead of precision and recall LR
3 min read
Removing duplicate rows based on specific column in PySpark DataFrame
In this article, we are going to drop the duplicate rows based on a specific column from dataframe using pyspark in Python. Duplicate data means the same data based on some condition (column values). For this, we are using dropDuplicates() method: Syntax: dataframe.dropDuplicates(['column 1','column 2','column n']).show() where, dataframe is the in
1 min read
How to add column sum as new column in PySpark dataframe ?
In this article, we are going to see how to perform the addition of New columns in Pyspark dataframe by various methods. It means that we want to create a new column that will contain the sum of all values present in the given row. Now let's discuss the various methods how we add sum as new columns But first, let's create Dataframe for Demonstratio
4 min read
Show distinct column values in PySpark dataframe
In this article, we are going to display the distinct column values from dataframe using pyspark in Python. For this, we are using distinct() and dropDuplicates() functions along with select() function. Let's create a sample dataframe. C/C++ Code # importing module import pyspark # importing sparksession from pyspark.sql module from pyspark.sql imp
2 min read
Filtering rows based on column values in PySpark dataframe
In this article, we are going to filter the rows based on column values in PySpark dataframe. Creating Dataframe for demonstration: C/C++ Code # importing module import spark # importing sparksession from pyspark.sql module from pyspark.sql import SparkSession # creating sparksession and giving an app name spark = SparkSession.builder.appName('spar
2 min read
PySpark - Adding a Column from a list of values using a UDF
A data frame that is similar to a relational table in Spark SQL, and can be created using various functions in SparkSession is known as a Pyspark data frame. There occur various circumstances in which you get data in the list format but you need it in the form of a column in the data frame. If a similar situation has occurred with you, then you can
4 min read
Drop duplicate rows in PySpark DataFrame
In this article, we are going to drop the duplicate rows by using distinct() and dropDuplicates() functions from dataframe using pyspark in Python. Let's create a sample Dataframe C/C++ Code # importing module import pyspark # importing sparksession from # pyspark.sql module from pyspark.sql import SparkSession # creating sparksession and giving #
2 min read
Removing duplicate columns after DataFrame join in PySpark
In this article, we will discuss how to remove duplicate columns after a DataFrame join in PySpark. Create the first dataframe for demonstration: C/C++ Code # Importing necessary libraries from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder.appName('pyspark \ - example join').getOrCreate() # Create data in dat
3 min read
How to avoid duplicate columns after join in PySpark ?
In this article, we will discuss how to avoid duplicate columns in DataFrame after join in PySpark using Python. Create the first dataframe for demonstration: C/C++ Code # importing module import pyspark # importing sparksession from pyspark.sql module from pyspark.sql import SparkSession # creating sparksession and giving an app name spark = Spark
2 min read