PySpark Common Dataframe Patterns
This section outlines common and essential patterns for working with PySpark DataFrames, providing concise code examples for efficient data manipulation and analysis.
Importing Functions & Types
It's good practice to import PySpark SQL functions and types with aliases for cleaner code.
# Easily reference these as F.my_function() and T.my_type() below
from pyspark.sql import functions as F, types as T
Filtering DataFrames
Learn how to filter rows based on various conditions, including equality, range comparisons, and multiple criteria.
Filtering by Equality
# Filter on equals condition
df = df.filter(df.is_adult == 'Y')
Filtering by Range
# Filter on >, <, >=, <= condition
df = df.filter(df.age > 25)
Filtering with Multiple Conditions
Combine conditions using logical AND (&) and OR (|) operators. Ensure each condition is enclosed in parentheses.
# Multiple conditions require parentheses around each condition
df = df.filter((df.age > 25) & (df.is_adult == 'Y'))
Filtering Against a List
# Compare against a list of allowed values
from pyspark.sql.functions import col
df = df.filter(col('first_name').isin([3, 4, 7]))
Sorting Results
Order your DataFrame based on one or more columns in ascending or descending order.
# Sort results ascending
df = df.orderBy(df.age.asc())
# Sort results descending
df = df.orderBy(df.age.desc())
Joining DataFrames
Understand different types of joins and how to match columns between DataFrames.
Left Join
# Left join in another dataset
df = df.join(person_lookup_table, 'person_id', 'left')
Joining on Different Columns
# Match on different columns in left & right datasets
df = df.join(other_table, df.id == other_table.person_id, 'left')
Joining on Multiple Columns
# Match on multiple columns
df = df.join(other_table, ['first_name', 'last_name'], 'left')
Column Operations
Perform various operations on DataFrame columns, including adding, modifying, selecting, and renaming.
Adding a Static Column
# Add a new static column
df = df.withColumn('status', F.lit('PASS'))
Constructing a Dynamic Column
Create new columns based on conditional logic using when
and otherwise
.
# Construct a new dynamic column
df = df.withColumn('full_name', F.when(
(df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname)
).otherwise(F.lit('N/A')))
Selecting and Renaming Columns
Choose specific columns to keep and rename them for clarity.
# Pick which columns to keep, optionally rename some
df = df.select(
'name',
'age',
F.col('dob').alias('date_of_birth'),
)
Removing Columns
# Remove columns
df = df.drop('mod_dt', 'mod_username')
Renaming a Column
# Rename a column
df = df.withColumnRenamed('dob', 'date_of_birth')
Selecting Columns from Another DataFrame
# Keep all the columns which also occur in another dataset
df = df.select(*(F.col(c) for c in df2.columns))
Batch Rename/Clean Columns
A common task is to clean up column names by converting them to lowercase and replacing spaces or hyphens.
# Batch Rename/Clean Columns
for col_name in df.columns:
df = df.withColumnRenamed(col_name, col_name.lower().replace(' ', '_').replace('-', '_'))
Casting, Null Handling, and Duplicates
Manage data types, replace null values, and handle duplicate records effectively.
Casting Column Types
# Cast a column to a different type
df = df.withColumn('price', df.price.cast(T.DoubleType()))
Filling Null Values
Replace nulls in specific columns with default values.
# Replace all nulls with a specific value
df = df.fillna({
'first_name': 'Tom',
'age': 0,
})
Coalescing Values
Select the first non-null value from a list of columns.
# Take the first value that is not null
df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A')))
Dropping Duplicates
Remove duplicate rows from a DataFrame.
# Drop duplicate rows in a dataset (distinct)
df = df.dropDuplicates() # or
df = df.distinct()
# Drop duplicate rows, but consider only specific columns
df = df.dropDuplicates(['name', 'height'])
Replacing Empty Strings with Null
# Replace empty strings with null (leave out subset keyword arg to replace in all columns)
df = df.replace({"": None}, subset=["name"])
Replacing NaN Values
# Convert Python/PySpark/NumPy NaN operator to null
df = df.replace(float("nan"), None)