common_patterns
Explore common PySpark DataFrame patterns for data manipulation, filtering, joins, column operations, casting, and handling nulls and duplicates.
PySpark Common Dataframe Patterns
This section outlines common and essential patterns for working with PySpark DataFrames, providing concise code examples for efficient data manipulation and analysis.
Importing Functions & Types
It's good practice to import PySpark SQL functions and types with aliases for cleaner code.
# Easily reference these as F.my_function() and T.my_type() below
from pyspark.sql import functions as F, types as T
Filtering DataFrames
Learn how to filter rows based on various conditions, including equality, range comparisons, and multiple criteria.
Filtering by Equality
# Filter on equals condition
df = df.filter(df.is_adult == 'Y')
Filtering by Range
# Filter on >, <, >=, <= condition
df = df.filter(df.age > 25)
Filtering with Multiple Conditions
Combine conditions using logical AND (&) and OR (|) operators. Ensure each condition is enclosed in parentheses.
# Multiple conditions require parentheses around each condition
df = df.filter((df.age > 25) & (df.is_adult == 'Y'))
Filtering Against a List
# Compare against a list of allowed values
from pyspark.sql.functions import col
df = df.filter(col('first_name').isin([3, 4, 7]))
Sorting Results
Order your DataFrame based on one or more columns in ascending or descending order.
# Sort results ascending
df = df.orderBy(df.age.asc())
# Sort results descending
df = df.orderBy(df.age.desc())
Joining DataFrames
Understand different types of joins and how to match columns between DataFrames.
Left Join
# Left join in another dataset
df = df.join(person_lookup_table, 'person_id', 'left')
Joining on Different Columns
# Match on different columns in left & right datasets
df = df.join(other_table, df.id == other_table.person_id, 'left')
Joining on Multiple Columns
# Match on multiple columns
df = df.join(other_table, ['first_name', 'last_name'], 'left')
Column Operations
Perform various operations on DataFrame columns, including adding, modifying, selecting, and renaming.
Adding a Static Column
# Add a new static column
df = df.withColumn('status', F.lit('PASS'))
Constructing a Dynamic Column
Create new columns based on conditional logic using when
and otherwise
.
# Construct a new dynamic column
df = df.withColumn('full_name', F.when(
(df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname)
).otherwise(F.lit('N/A')))
Selecting and Renaming Columns
Choose specific columns to keep and rename them for clarity.
# Pick which columns to keep, optionally rename some
df = df.select(
'name',
'age',
F.col('dob').alias('date_of_birth'),
)
Removing Columns
# Remove columns
df = df.drop('mod_dt', 'mod_username')
Renaming a Column
# Rename a column
df = df.withColumnRenamed('dob', 'date_of_birth')
Selecting Columns from Another DataFrame
# Keep all the columns which also occur in another dataset
df = df.select(*(F.col(c) for c in df2.columns))
Batch Rename/Clean Columns
A common task is to clean up column names by converting them to lowercase and replacing spaces or hyphens.
# Batch Rename/Clean Columns
for col_name in df.columns:
df = df.withColumnRenamed(col_name, col_name.lower().replace(' ', '_').replace('-', '_'))
Casting, Null Handling, and Duplicates
Manage data types, replace null values, and handle duplicate records effectively.
Casting Column Types
# Cast a column to a different type
df = df.withColumn('price', df.price.cast(T.DoubleType()))
Filling Null Values
Replace nulls in specific columns with default values.
# Replace all nulls with a specific value
df = df.fillna({
'first_name': 'Tom',
'age': 0,
})
Coalescing Values
Select the first non-null value from a list of columns.
# Take the first value that is not null
df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A')))
Dropping Duplicates
Remove duplicate rows from a DataFrame.
# Drop duplicate rows in a dataset (distinct)
df = df.dropDuplicates() # or
df = df.distinct()
# Drop duplicate rows, but consider only specific columns
df = df.dropDuplicates(['name', 'height'])
Replacing Empty Strings with Null
# Replace empty strings with null (leave out subset keyword arg to replace in all columns)
df = df.replace({"": None}, subset=["name"])
Replacing NaN Values
# Convert Python/PySpark/NumPy NaN operator to null
df = df.replace(float("nan"), None)