PySpark Useful Functions & Transformations
PySpark Data Transformations: Flattening and Lookup
This section details essential PySpark transformations for efficient data manipulation. We cover functions to flatten nested DataFrame structures and perform lookup-based value replacements, crucial for data cleaning and preparation in big data pipelines.
Flattening Nested Struct Columns
The flatten
function simplifies nested data by converting struct columns into flat columns. This is particularly useful when dealing with complex JSON or Avro data structures that are common in big data processing.
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
def flatten(df: DataFrame, delimiter="_") -> DataFrame:
'''
Flatten nested struct columns in `df` by one level separated by `delimiter`, i.e.:
df = [ {'a': {'b': 1, 'c': 2} } ]
df = flatten(df, '_')
-> [ {'a_b': 1, 'a_c': 2} ]
'''
flat_cols = [name for name, type in df.dtypes if not type.startswith("struct")]
nested_cols = [name for name, type in df.dtypes if type.startswith("struct")]
flat_df = df.select(
flat_cols
+ [F.col(nc + "." + c).alias(nc + delimiter + c) for nc in nested_cols for c in df.select(nc + ".*").columns]
)
return flat_df
Lookup and Replace Values in DataFrame
The lookup_and_replace
function enables you to substitute values in a DataFrame column based on a join with another DataFrame. This is a common operation for enriching data or standardizing values using a reference dataset.
def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value):
'''
Replace every value in `df1`'s `df1_key` column with the corresponding value
`df2_value` from `df2` where `df1_key` matches `df2_key`
df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc)
'''
return (
df1
.join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left')
.withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key)))
.drop(df2_key)
.drop(df2_value)
)
Benefits of these Transformations
- Data Simplification: Flattening makes complex nested data easier to query and analyze.
- Data Enrichment: Lookup replacements allow for the integration of external data to add context.
- Code Reusability: These functions provide modular solutions for common data engineering tasks.