aggregation_operations

Learn PySpark aggregation operations with examples for count, sum, mean, max, min, collect_set, collect_list, and using window functions for row aggregation.

PySpark Aggregation Operations

Understanding PySpark Aggregation Functions

PySpark provides a rich set of aggregation functions to perform data summarization and analysis on DataFrames. These operations are crucial for deriving insights from large datasets. Aggregations are typically performed after a groupBy operation, allowing you to compute aggregate values for each group.

Common Aggregation Operations

Here are some of the most frequently used PySpark aggregation functions:

  • Row Count: F.count() - Counts the number of rows in each group.
  • Sum of Rows in Group: F.sum(*cols) - Calculates the sum of values in specified columns for each group.
  • Mean of Rows in Group: F.mean(*cols) - Computes the average of values in specified columns for each group.
  • Max of Rows in Group: F.max(*cols) - Finds the maximum value in specified columns for each group.
  • Min of Rows in Group: F.min(*cols) - Finds the minimum value in specified columns for each group.
  • First Row in Group: F.alias(*cols) - Selects the first value encountered in a column within each group.

Example: Grouping by Gender and Finding Max Age

from pyspark.sql import functions as F

# Assuming 'df' is your PySpark DataFrame
df = df.groupBy('gender').agg(F.max('age').alias('max_age_by_gender'))

Collecting Data within Groups

Beyond simple numerical aggregations, PySpark allows you to collect data from within groups into lists or sets.

  • Collect a Set of all Rows in Group: F.collect_set(col) - Gathers unique values from a column into an array for each group.
  • Collect a List of all Rows in Group: F.collect_list(col) - Gathers all values (including duplicates) from a column into an array for each group.

Example: Collecting Unique Names by Age

from pyspark.sql import functions as F

# Assuming 'df' is your PySpark DataFrame
df = df.groupBy('age').agg(F.collect_set('name').alias('person_names'))

Advanced Aggregation with Window Functions

Window functions offer a powerful way to perform calculations across a set of table rows that are somehow related to the current row. This is particularly useful for tasks like ranking, calculating running totals, or selecting the latest record within partitions.

Example: Selecting the Latest Row per Person

This example demonstrates how to use window functions to filter for the most recent record for each unique combination of first and last names, ordered by date.

from pyspark.sql import functions as F
from pyspark.sql import Window as W

# Assuming 'df' is your PySpark DataFrame with columns 'first_name', 'last_name', 'date', and other data

# Define the window specification: partition by first and last name, order by date descending
window = W.partitionBy("first_name", "last_name").orderBy(F.desc("date"))

# Add a row number based on the defined window
df = df.withColumn("row_number", F.row_number().over(window))

# Filter to keep only the rows with row_number = 1 (the latest row for each partition)
df = df.filter(F.col("row_number") == 1)

# Drop the temporary row_number column
df = df.drop("row_number")

Further Reading on PySpark