ARTICLE

Big Data is Just a Lot of Small Data: using pandas UDF

The tales of two versions

PySpark 3.0 completely changed how we interact with the Pandas UDF API and added a lot of functionality and performance improvements. Because of this, I’ve structured this article with PySpark 3.0 in mind. For convenience with those using PySpark 2.3 or 2.4, I’ve added some side-bars when applicable with the appropriate syntax.

ARROW_PRE_0_15_IPC_FORMAT=1
# Conda installation
conda install pandas sklearn pyarrow

# Pip installation
pip install pandas sklearn pyarrow

Column transformations with Pandas: using Series UDF

In this section, we cover the simplest family of Pandas UDF: the Series UDF. This family shares a column-first focus with regular PySpark data transformation functions. All of our UDF in this section will take a Column object (or objects) as input and return a Column object as output. In practice, they serve as the most common types of UDF and fill the use-case where you want to bring a functionality already implemented in Pandas — or a library that plays well with Pandas — and promote it to the distributed world of PySpark.

  1. The Series to Series is the simplest: it takes Columns objects as inputs, converts them to Pandas Series objects (giving it its name) and return a Series object that gets promoted back to a PySpark Column object.
  2. The Iterator of Series to Iterator of Series differs in the sense that the Column objects gets batched into batches and then fed as an Iterator object. It takes a single Column object as input and returns a single Column. It provides performance improvements, especially when the UDF needs to initialize an expensive state before working on the data (for instance, local ML models created in scikit-learn).
  3. The Iterator of multiple Series to Iterator of Series if a combination of #1 and #2: it can take multiple Columns as input, like #1, yet preserves the iterator pattern from #2.

Series to Series UDF: column functions, but with Pandas

In this section, we cover the most common types of Scalar UDF: the Series to Series UDF. Series to Series UDF are akin to most of the functions in the pyspark.sql model. For the most part, they work just like Python UDF with one key difference. Python UDF work on one record at a time and you express your logic through regular Python code. Scalar UDF work on one Series at a time and you express your logic through Pandas code. The difference is subtle and it’s easier to explain visually.

Figure 1. Comparing a Python UDF to a Pandas scalar UDF. The former splits a column in individual records, where the latter breaks them in Series.
  1. Instead of udf(), I use pandas_udf(), again, from the pyspark.sql.functions module. Optionally (but recommended), we pass the return type of the UDF as an argument to the pandas_udf() decorator.
  2. Our function signature is also different: rather than using scalar values (such as int or str), the UDF takes pd.Series and return a pd.Series.
import pandas as pd
import pyspark.sql.types as T


@F.pandas_udf(T.DoubleType()) (1)
def f_to_c(degrees: pd.Series) -> pd.Series: (2)
"""Transforms Farhenheit to Celcius."""
return (degrees - 32) * 5 / 9
gsod = gsod.withColumn("temp_c", f_to_c(F.col("temp")))
gsod.select("temp", "temp_c").distinct().show(5)

# +-----+-------------------+
# | temp| temp_c|
# +-----+-------------------+
# | 37.2| 2.8888888888888906|
# | 85.9| 29.944444444444443|
# | 53.5| 11.944444444444445|
# | 71.6| 21.999999999999996|
# |-27.6|-33.111111111111114|
# +-----+-------------------+
# only showing top 5 rows

Working with complex types in Pandas UDF

PySpark has a richer data type system than Pandas, who clubs strings and complex types into a catchall object type. Since you are dropping from PySpark into Pandas during the execution of the UDF, you are solely responsible for aligning the types accordingly. This is where the return type attribute of the pandas_udf decorator comes in handy, as it’ll help diagnosing bugs early.

Scalar UDF + cold start = Iterator of Series UDF

This is only available with PySpark 3.0+.

  1. The signature goes from (pd.Series) → pd.Series to (Iterator[pd.Series]) → Iterator[pd.Series]. This is consequential to using an Iterator of Series UDF.
  2. When working with the Series to Series UDF, we assumed that PySpark would give us one batch at a time. Here, since we are working with an Iterator of Series, we are explicitly iterating over each batch one by one. PySpark will take care of distributing the work for us.
  3. Rather than using a return value, we yield so that our function returns an iterator.
from time import sleep
from typing import Iterator


@F.pandas_udf(T.DoubleType())
def f_to_c2(degrees: Iterator[pd.Series]) -> Iterator[pd.Series]: ❶
"""Transforms Farhenheit to Celcius."""
sleep(5) ❷
for batch in degrees: ❸
yield (batch - 32) * 5 / 9 ❸


gsod.select(
"temp", f_to_c2(F.col("temp")).alias("temp_c")
).distinct().show(5)
# +-----+-------------------+
# | temp| temp_c|
# +-----+-------------------+
# | 37.2| 2.8888888888888906|
# | 85.9| 29.944444444444443|
# | 53.5| 11.944444444444445|
# | 71.6| 21.999999999999996|
# |-27.6|-33.111111111111114|
# +-----+-------------------+
# only showing top 5 rows
Figure 2. The transformation of three Series of values into a single date column. We iterate over each batch using a for loop, use multiple assignment to get the individual columns from the Tuple, pack them in a dictionary that feeds into a data frame where we can apply our to_datetime() function.
  1. year_mo_da is an Iterator of a Tuple of series, representing all the batches of values contained into the year, mo, and da columns.
  2. To access each batch, we use a for loop over the Iterator, the same principle as for the Iterator of Series UDF.
  3. To extract each individual series from the Tuple, we use multiple assignment. In this case, year will map to the first Series of the Tuple, mo to the second, and da to the third.
  4. Since pd.to_datetime requests a data frame containing the year, month, and day columns, we create the data frame via a dictionary, giving the keys the relevant column names. pd.to_datetime returns a Series.
  5. Finally, we yield the answer to build the Iterator of Series, fulfilling our contract.
from typing import Tuple


@F.pandas_udf(T.DateType())
def create_date(
year_mo_da: Iterator[Tuple[pd.Series, pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
"""Merges three cols (representing Y-M-D of a date) into a Date col."""
for year, mo, da in year_mo_da:
yield pd.to_datetime(
pd.DataFrame(dict(year=year, month=mo, day=da))
)


gsod.select(
"year", "mo", "da",
create_date(F.col("year"), F.col("mo"), F.col("da")).alias("date"),
).distinct().show(5)

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Manning Publications

Follow Manning Publications on Medium for free content and exclusive discounts.