ARTICLE

Big Data is Just a Lot of Small Data: using pandas UDF

From Data Analysis with Python and PySpark by Jonathan Rioux

Manning Publications
11 min readApr 29, 2022

--

This article covers

· Using pandas Series UDF to accelerate column transformation compared to Python UDF.

· Addressing the cold start of some UDF using Iterator of Series UDF.

Take 37% off Data Analysis with Python and PySpark by entering fccrioux into the discount code box at checkout at manning.com.

This article approaches the distributed nature of PySpark a little differently. If we take a few seconds to think about it, we read data into a data frame and Spark distributes the data across partitions on nodes. What if we could directly operate on those partitions like if they were single-node data frames? More interestingly, what if we control how those single-node partitions were created and used? Using a tool we know? What about pandas?

PySpark’s interoperability with Pandas (also colloquially called Pandas UDF) is a huge selling point when performing data analysis at scale. Pandas is the dominant in-memory Python data manipulation library where PySpark is the dominant distributed one. Combining both of them together unlocks additional possibility: we start by scaling some basic Pandas data manipulation functionality. We then look into operations on GroupedData and how PySpark+Pandas implement the split apply combine pattern common to data analysis. We finish with the ultimate interaction between pandas and PySpark: treating a PySpark data frame like a small collection of Pandas data frame.

This article obviously makes great use of the Pandas (http://pandas.pydata.org) library. Extensive Pandas knowledge is a nice-to-have but is in no way expected. This chapter will cover the necessary Pandas skills to use in within a basic Pandas UDF. If your wish to level up you Pandas skills to become a Pandas UDF ninja, I warmly recommend the Pandas in Action book, by Boris Parkhaver (Manning, 2021).

The tales of two versions

PySpark 3.0 completely changed how we interact with the Pandas UDF API and added a lot of functionality and performance improvements. Because of this, I’ve structured this article with PySpark 3.0 in mind. For convenience with those using PySpark 2.3 or 2.4, I’ve added some side-bars when applicable with the appropriate syntax.

Pandas UDF were introduced in PySpark 2.3. If you are using Spark 2.2 or below, you’re out of luck!

For the examples in the chapter, we will need three previously unused libraries: pandas, scikit-learn, and PyArrow. If you have installed Anaconda (see appendix B), you can use conda to install the libraries; otherwise, you can use pip.

If you are using Spark 2.3 or 2.4, you additionally need to set a flag in the conf/spark-env.sh file of your Spark root directory to account for a change in Arrow serialization format. In the conf/ directory, you should find a spark-env.sh.template file. Make a copy, name it spark-env.sh and add this line in the file.

ARROW_PRE_0_15_IPC_FORMAT=1

This will tell PyArrow to use a serialization format compatible with Spark 2.X, instead of the newer one only compatible with Spark 3.0. The Spark JIRA ticket contains more information about this (https://issues.apache.org/jira/browse/SPARK-29367). You can also use PyArrow version 0.14 and avoid the problem altogether.

# Conda installation
conda install pandas sklearn pyarrow

# Pip installation
pip install pandas sklearn pyarrow

Column transformations with Pandas: using Series UDF

In this section, we cover the simplest family of Pandas UDF: the Series UDF. This family shares a column-first focus with regular PySpark data transformation functions. All of our UDF in this section will take a Column object (or objects) as input and return a Column object as output. In practice, they serve as the most common types of UDF and fill the use-case where you want to bring a functionality already implemented in Pandas — or a library that plays well with Pandas — and promote it to the distributed world of PySpark.

PySpark provides three types of Series UDF. Here is a summary of them; we will explore them further in the rest of the section.

  1. The Series to Series is the simplest: it takes Columns objects as inputs, converts them to Pandas Series objects (giving it its name) and return a Series object that gets promoted back to a PySpark Column object.
  2. The Iterator of Series to Iterator of Series differs in the sense that the Column objects gets batched into batches and then fed as an Iterator object. It takes a single Column object as input and returns a single Column. It provides performance improvements, especially when the UDF needs to initialize an expensive state before working on the data (for instance, local ML models created in scikit-learn).
  3. The Iterator of multiple Series to Iterator of Series if a combination of #1 and #2: it can take multiple Columns as input, like #1, yet preserves the iterator pattern from #2.

Before we start exploring Series UDF, let’s grab a data set to experiment with.

Series to Series UDF: column functions, but with Pandas

In this section, we cover the most common types of Scalar UDF: the Series to Series UDF. Series to Series UDF are akin to most of the functions in the pyspark.sql model. For the most part, they work just like Python UDF with one key difference. Python UDF work on one record at a time and you express your logic through regular Python code. Scalar UDF work on one Series at a time and you express your logic through Pandas code. The difference is subtle and it’s easier to explain visually.

In a Python UDF, when you pass column objects to you UDF, PySpark will unpack each value, perform the computation, and then return the value for each record in a Column object. In a Scalar UDF, depicted in figure 1, PySpark will serialize (through a library called PyArrow, that we installed at the beginning of the chapter) each partitioned column into a pandas Series object (https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html). You then perform the operations on the Series object directly, returning a Series of the same dimension from your UDF. From an end-user perspective, they are the same functionally. Because Pandas is optimized for rapid data manipulation, it is preferable to use Series to Series UDF when you can instead of a regular Python UDF as it’ll be much faster.

Figure 1. Comparing a Python UDF to a Pandas scalar UDF. The former splits a column in individual records, where the latter breaks them in Series.

Now armed with the “how it works” of Series to Series UDF, let’s create one ourselves. I chose to create a simple function that will transform Fahrenheit degrees to Celsius. In Canada, we use both scales depending on the usage: F for cooking, C for body or outside temperature. As a true Canadian, I cook my dinner at 350F yet know that 10C is sweater weather. The function is depicted in listing 1. The building blocks are eerily similar, but we can pick on two main differences.

  1. Instead of udf(), I use pandas_udf(), again, from the pyspark.sql.functions module. Optionally (but recommended), we pass the return type of the UDF as an argument to the pandas_udf() decorator.
  2. Our function signature is also different: rather than using scalar values (such as int or str), the UDF takes pd.Series and return a pd.Series.

The code within the function could be used as-is for a regular python UDF. I am (ab)using the fact that you can do arithmetic operations with pandas Series.

Listing 1. Creating a pandas scalar UDF that transforms Fahrenheit into Celsius. I use the pandas_udf decorator with a UDF type of PandasUDFType.SCALAR.

import pandas as pd
import pyspark.sql.types as T


@F.pandas_udf(T.DoubleType()) (1)
def f_to_c(degrees: pd.Series) -> pd.Series: (2)
"""Transforms Farhenheit to Celcius."""
return (degrees - 32) * 5 / 9

For scalar UDF, the biggest change happens in the decorator used. I could use the pandas_udf function directly too.

The signature for a Series to Series UDF is a function that takes one of multiple pandas.Series

For Spark 2.X users, you need to add another parameter to the decorator here, as only Spark 3.0 and above recognizes function signature for Pandas UDF. The code in listing 9.4 would read @F.pandas_udf(T.DoubleType(), PandasUDFType.SCALAR)

In listing 2, we apply our newly created Series to Series UDF to the temp column of the gsod data frame that contains the temperature (in Fahrenheit) of each station-day combination. Just like with regular Python UDF, Series to Series (and all Scalar UDF) are used like any data manipulation function: here, I create a new column temp_c with withColumn() and apply the f_to_c temperature on the temp column.

Listing 2. Using a Series to Series UDF like any other column manipulation function.

gsod = gsod.withColumn("temp_c", f_to_c(F.col("temp")))
gsod.select("temp", "temp_c").distinct().show(5)

# +-----+-------------------+
# | temp| temp_c|
# +-----+-------------------+
# | 37.2| 2.8888888888888906|
# | 85.9| 29.944444444444443|
# | 53.5| 11.944444444444445|
# | 71.6| 21.999999999999996|
# |-27.6|-33.111111111111114|
# +-----+-------------------+
# only showing top 5 rows

Series to Series UDF, just like Python regular UDF, are very convenient when the record-wise transformation (or “mapping”) you want to apply to your data frame is not available within the stock PySpark functions (pyspark.sql.functions). Creating a “Fahrenheit to Celsius” converter as part of core Spark would be a little intense, so using a Python or a Pandas Series to Series UDF is a way to extend the core functionality with a minimum of fuss. Next, we see how to gain more control over the split and use the split-apply-combine pattern in PySpark.

Working with complex types in Pandas UDF

PySpark has a richer data type system than Pandas, who clubs strings and complex types into a catchall object type. Since you are dropping from PySpark into Pandas during the execution of the UDF, you are solely responsible for aligning the types accordingly. This is where the return type attribute of the pandas_udf decorator comes in handy, as it’ll help diagnosing bugs early.

What if you want to accept or return complex types, such as the array of the struct? Pandas will accept as values within a series a list of items that will be promoted to an ArrayType column. For StructType columns, you will need to replace the relevant pd.Series by a pd.DataFrame. In chapter 6, we saw that struct columns are like mini data frames: the equivalence continues here!

Scalar UDF + cold start = Iterator of Series UDF

This is only available with PySpark 3.0+.

This section combines the two other types of Scalar UDF: the Iterator of Series to Iterator of Series UDF and the Iterator of multiple Series to Iterator of Series. Because they are so similar to the Series to Series UDF in their application, I will focus on the Iterator portion that gives them their power. Iterator of Series UDF are very useful when you have a cold start operation you need to perform, such as deserializing a local ML model (fitted with scikit-learn or another Python modeling library). In our case here, I’ll reuse our f_to_c function but will add a cold start to demonstrate the usage.

Our UDF in listing 3 is really similar to the Series to Series UDF from part 1. A few differences pops:

  1. The signature goes from (pd.Series) → pd.Series to (Iterator[pd.Series]) → Iterator[pd.Series]. This is consequential to using an Iterator of Series UDF.
  2. When working with the Series to Series UDF, we assumed that PySpark would give us one batch at a time. Here, since we are working with an Iterator of Series, we are explicitly iterating over each batch one by one. PySpark will take care of distributing the work for us.
  3. Rather than using a return value, we yield so that our function returns an iterator.

Listing 3. Using an Iterator of Series to Iterator of Series UDF to convert the temperature. sleep(5) will only be called once.

from time import sleep
from typing import Iterator


@F.pandas_udf(T.DoubleType())
def f_to_c2(degrees: Iterator[pd.Series]) -> Iterator[pd.Series]: ❶
"""Transforms Farhenheit to Celcius."""
sleep(5) ❷
for batch in degrees: ❸
yield (batch - 32) * 5 / 9 ❸


gsod.select(
"temp", f_to_c2(F.col("temp")).alias("temp_c")
).distinct().show(5)
# +-----+-------------------+
# | temp| temp_c|
# +-----+-------------------+
# | 37.2| 2.8888888888888906|
# | 85.9| 29.944444444444443|
# | 53.5| 11.944444444444445|
# | 71.6| 21.999999999999996|
# |-27.6|-33.111111111111114|
# +-----+-------------------+
# only showing top 5 rows

The signature is now (Iterator[pd.Series]) → Iterator[pd.Series]. Notice the add-on of the Iterator keyword (from the typing module).

We simulate a cold-start using sleep() for 5 seconds. The cold start will happen on each worker once, rather than for every batch.

Since we are working with an iterator here, we iterate over each batch, using yield (instead of return)

We have covered here the Iterator of Series to Iterator of Series case. What about the Iterator of multiple Series to Iterator of Series? This special case is to wrap multiple columns in a single iterator. For this example, I’ll assemble the year, mo, and da columns (representing the year, month, and day) into a single column. This example requires more data transformation than when using an Iterator of a single Series; I illustrate the process of data transformation in figure 2.

Figure 2. The transformation of three Series of values into a single date column. We iterate over each batch using a for loop, use multiple assignment to get the individual columns from the Tuple, pack them in a dictionary that feeds into a data frame where we can apply our to_datetime() function.
  1. year_mo_da is an Iterator of a Tuple of series, representing all the batches of values contained into the year, mo, and da columns.
  2. To access each batch, we use a for loop over the Iterator, the same principle as for the Iterator of Series UDF.
  3. To extract each individual series from the Tuple, we use multiple assignment. In this case, year will map to the first Series of the Tuple, mo to the second, and da to the third.
  4. Since pd.to_datetime requests a data frame containing the year, month, and day columns, we create the data frame via a dictionary, giving the keys the relevant column names. pd.to_datetime returns a Series.
  5. Finally, we yield the answer to build the Iterator of Series, fulfilling our contract.

Listing 4. Assembling the date from three columns using an Iterator of multiple Series UDF.

from typing import Tuple


@F.pandas_udf(T.DateType())
def create_date(
year_mo_da: Iterator[Tuple[pd.Series, pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
"""Merges three cols (representing Y-M-D of a date) into a Date col."""
for year, mo, da in year_mo_da:
yield pd.to_datetime(
pd.DataFrame(dict(year=year, month=mo, day=da))
)


gsod.select(
"year", "mo", "da",
create_date(F.col("year"), F.col("mo"), F.col("da")).alias("date"),
).distinct().show(5)

This concludes our overview of how to use the Scalar UDF. Scalar UDF are very useful when you make column level transformations, just like the functions in pyspark.sql.functions. When using any Scalar UDF, you need to remember that PySpark will not guarantee the order or the composition of the batches when applying the UDF. If you follow the same “Columns in, Columns out” we use when working with PySpark Column functions, you’ll do great.

By default, Spark will aim for 10,000 records per batch; you can customize the maximum size of each batch using the spark.sql.execution.arrow.maxRecordsPerBatch config when creating the SparkSession object.

Should you need to worry about batch composition based on one or more Columns, the next section addresses how to apply UDF on a GroupedData object to have a finer level of control over the records. We will not only create aggregate functions (an example of this is sum()) but also apply functions while controlling the batches composition.

That’s all for now. If you want to see more, you can check out the book on Manning’s liveBook platform here.

--

--

Manning Publications
Manning Publications

Written by Manning Publications

Follow Manning Publications on Medium for free content and exclusive discounts.

Responses (1)