Robust Machine Learning with ML Pipelines
From Data Analysis with Python and PySpark by Jonathan Rioux
This article covers using transformers and estimators to prepare data into ML features.
This article refers to data and code that have been covered in chapter 12, as seen below:
- Read a CSV file containing dishes name and multiple columns as feature candidates.
- Sanitized the column names (lowered the case, fixed the punctuation, spacing, and non-printable characters)
- Removed illogical and irrelevant records
- Filled the
nullvalues of binary columns to
- Capped the amounts for
sodiumto the 99% percentile
- Created ratio features (number of calories from a macro over number of calories for the dish)
- Imputed the
meanof continuous features.
- Scaled continuous features between
You can use the code leading to
food_features in the book’s repository under
In this excerpt, we continue our journey to a robust machine learning training program. To help us, we delve deeper into transformers and estimators, this time in the context of an ML pipeline.
ML pipelines are how PySpark implements machine learning capabilities. They provide better code organization and flexibility, at the expense of a little preparation up front. This article starts by explaining what an ML pipeline is, using the dessert prediction data set. We review just enough theory about transformers, estimators, and ML pipelines to get us started.
Transformers and estimators: the building blocks of ML in Spark
We’re going to cover the two main components of ML pipelines: transformers and estimators. We take a second look at transformers and estimators in the context of re-usable and parameterizeable building blocks. From a 36,000 foot view, a ML pipeline is an ordered list of transformers and estimators. Notwithstanding, it is crucial that we understand not only how to create, but also how to modify those building blocks first to use ML pipelines with optimal efficiency.
Transformers and estimators are very useful classes for ML modelling. When we train a ML model, we get back a fitted model, which is akin to a new program that we did not code explicitly. This new data-driven program then has a sole purpose: taking a properly formatted data set and transform it by appending a prediction column. Next, we see that transformers and estimators not only provide a useful abstraction for ML modelling, they also provide portability through serialization and deserialization. This means that you can train and save your ML model and deploy it in another environment.
To illustrate how a transformer and an estimator is parameterized, we will use a transformer and an estimator defined and used in chapter 12:
VectorAssemblertransformer that taked five columns and creates a Vector column to be used for model training.
MinMaxScalerestimator that scales values contained in a Vector column, returning values between 0 and 1 for each element in the vectors.
For convenience, I include in listing 1 the relevant code. We start with the transformer and then build on it to introduce the estimator.
Listing 1. The
MinMaxScaler example we’ll explore in this section
CONTINUOUS_NB = ["rating", "calories_i", "protein_i", "fat_i", "sodium_i"]
continuous_assembler = VectorAssembler(
continuous_scaler = MinMaxScaler(
Data comes in, data comes out
This section formally introduces the
Transformer as the first building block of a ML pipeline. We introduce the general transformer blueprint and how to access and modify its parameterization. This added context on the transformer plays a crucial role when we want to run experiments with our ML code or optimize our ML models.
VectorAssembler transformer example, we provide two arguments to the constructor:
outpulCol. Those arguments provide the necessary functionality to create a fully functional
VectorAssembler transformer. This transformer has for sole purpose — through its
transform() method — to take the values in
inputCols (assembled values) and return a single column, named
outputCol, that contains a vector of all the assembled values.
The parameterization of a transformer is called Params (capital P). When instantiating a transformers class, just like with any Python class, we pass the parameters we want as argument, making sure to explicitly specify each keyword. Once the transformer has been instantiated, PySpark provides us with a set of methods to extract and modify Params. The next sections cover retrieving and modifying Params after the transformer’s instantiation.
Let’s have a look at the signature of
VectorAssembler: keyword-only arguments. If you look at the signature for
VectorAssembler (and pretty much every transformer and estimator in the
pyspark.ml modules), you’ll see an asterisk at the beginning of the parameters list.
class pyspark.ml.feature.VectorAssembler(*, inputCols=None, outputCol=None, handleInvalid='error')
In Python, every parameter after the asterisk
* is called keyword-only argument, meaning that we need to mention the keyword. For instance, we couldn’t do
VectorAssembler("input_column", "output_column"). For more information, refer to PEP (Python Enhancement Proposal) 3102 at https://www.python.org/dev/peps/pep-3102/.
As a fun add-on, Python also supports positional-only parameters with the slash
/ character. See PEP 570 (https://www.python.org/dev/peps/pep-0570/).
Peeking under the hood: getting and explaining the Params
Looking back at figure 1, the instantiation of
VectorAssembler accepted three arguments:
handleInvalid. We also hinted that the configuration of a transformer (and estimator, by the same occasion) class instance relied on Params, which drove the behavior of the transformers. In this section, we explore Params, highlight their similarities and differences compared to regular class attributes, and why those differences matters. You might think “well, I know how to get attributes out of a Python class, and transformers are Python classes“. While that is correct, transformers (and estimators) follow a more Java/Scala-like design and I recommend not skipping over this section. It’s short, useful, and will help you avoid headaches when working with ML pipelines.
First, let’s do what any Python developer would do and access one of the attributes of the transformer directly. In listing 2, we see that accessing the
outputCol attribute of
continuous_assembler does not yield
continuous, like we passed to the constructor. Instead, we get a reference to an object called a Param (class
pyspark.ml.param.Param) which wraps each of our transformers attributes.
Listing 2. Accessing a transformer’s parameters directly yields an object (called a
# VectorAssembler_e18a6589d2d5__outputCol ❶
❶ Rather than returning the
continuous value passed as an argument to
outputCol, we get an object called a
To access the value directly of a specific Param, we use a getter method, which is simply put the word
get, followed by the name of our Param in CamelCase. In the case of
outputCol, shown in listing 3, the getter method is called
getOutputCol() (note the capital O).
Listing 3. Accessing the value of a the
outputCol Param through
print(continuous_assembler.getOutputCol()) # => continuous
So far, Params seem like they add boilerplate with little benefit.
explainParam() changes this. This method provides documentation about the Param as well as the value. This is best explained by an example and we see the output of explaining the
outputCol Param in listing 4.
If you want to see all the Params at once, you can also use the pluralized version,
explainParams(). This method takes no argument and will return a newline-delimited string of all the Params.
The string output contains:
- the name of the Param:
- a short description of the Param:
output column name.;
defaultvalue of the Param:
VectorAssembler_e18a6589d2d5__output, used if we don’t explicitly pass a value ourselves;
- and the
currentvalue of the Param:
Listing 4. Explaining the
outputCol Param with
# outputCol: output column name. ❶)
# (default: VectorAssembler_e18a6589d2d5__output, current: continuous) ❷
❶ The name and a short description of the
❷ Even we defined a value for
In this section, we got our way around getting the relevant information out of our transformer’s Params. This section applies verbatim to estimators as well. In the next section, we stop looking at Params and we start changing them. Afterwards, transformers will have no more secrets!
What about the plain
Transformers (and estimators) provide the plain
getParam(). It simply returns the Param, just like accessing the
outputCol did at the beginning of the section. I believe that this is done so that PySpark transformers can have a consistent API with their Java/Scala equivalent.
Setting params of an instantiated transformer using getters and setters
Just like the previous section on getting Params, setting Params works the same for estimators.
In this section, we modify the Params of a transformer! Simple as that! This is mainly useful in two scenarios:
- You are building your transformer in the REPL and you want to experiment with different Param-eterization;
- You are optimizing your ML pipeline Params, like we do in.
How do we change the Params of a transformer? For every getter, there is a setter, which is simply put the word
set, followed by the name of our Param in CamelCase. Unlike getters, setters take the new value as their sole argument. In listing 5, we change the
outputCol Param to
more_continuous using the relevant setter method. This operation returns the transformed transformer but also makes the modification in-place, which means that you do not have to assign the result of a setter to a variable.
Listing 5. Setting the
outputCol Param to
more_continuous. The modification is done in-place.
print(continuous_assembler.getOutputCol()) # => more_continuous
❶ While the
setOutputCol() method returns a new transformer object, it also makes the modification in place, so we don’t have to assign the result to a variable.
If you need to change multiple Params as once (for instance, you want to change the input and outpul columns in one fell swoop while experimenting with different scenarios) you can use the
setParams() has the exact same signature as the constructor: you just pass the new values as keywords, like shown in listing 6.
Listing 6. Changing multiple Params at once using
inputCols=["one", "two", "three"], handleInvalid="skip"
# handleInvalid: How to handle invalid data (NULL and NaN values). [...]
# (default: error, current: skip)
# inputCols: input column names. (current: ['one', 'two', 'three']) ❶
# outputCol: output column name.
# (default: VectorAssembler_e18a6589d2d5__output, current: continuous)
❶ Params not passed to
setParams keep their previous value (set in listing 5).
Finally, if you want to return a Param to its default value, you can use the
clear() method. This time, you need to pass the
Param object: for instance, in listing 7, we reset the
handleInvalid Param by using clear. We pass the actual Param as an argument, accessed via the attribute slot seen at the beginning of the section,
continuous_assembler.handleInvalid. This will prove useful if you have a transformer that has both
inputCols/outputCols as possible Params. PySpark only allows one set to be active at once, so if you want to move between one column and multiple columns, you need to
clear() the ones not being used.
Listing 7. Clearing the current value of the
handleInvalid Param with
print(continuous_assembler.getHandleInvalid()) # => error ❶
handleInvalid returned to its original value,
This is it, folks! In this section, we learned in greater detail the how and why of a transformer, as well as how to get, set, and clear its Params. In the next section, we apply this useful knowledge to speed through the second building block of a ML pipeline, the estimator.
Transformers and estimators are passed by reference using the
copy() method. As opposed to using something like fluent API, where each data frame transformation generates a new data frame, using
copy() enables method chaining which makes our data transformation code very readable.
When working with transformers (and estimators), remember that they are passed by reference and that setters modify the object in-place. If you assign your transformer to a new variable name and then use a setter on either of those variables, it’ll modify the Param for both references.
new_continuous_assembler = continuous_assembler
print(new_continuous_assembler.getOutputCol()) # => new_output
print(continuous_assembler.getOutputCol()) # => new_output ❶
❶ Both the outputCol of continuous_assembler and new_continuous_assembler were modified by the setter.
The solution to this is to
copy() the transformer and then assign the copy to the new variable.
copy_continuous_assembler = continuous_assembler.copy()
print(copy_continuous_assembler.getOutputCol()) # => copy_output
print(continuous_assembler.getOutputCol()) # => new_output ❶
❶ When making a copy, modifications to the Params of
copy_continuous_assembler don’t impact
Data comes in, transformer comes out
This section covers the
Estimator, the second half of the ML pipeline. Just like with transformers, understanding how to operate and configure an estimator is an invaluable step in creating an efficient ML pipeline. Where a transformer transforms an input data frame into an output data frame, an estimator is fitted on an input data frame and returns an output transformer. In this section, we see that this relationship between transformers and estimators means that they are Param-eterized the same way as explained in. We focus on estimator usage through the
fit() method (vs.
transform() for the transformer), really the only notable difference for the end-user.
Where a transformer uses a
transform() method, applied to a data frame, to return a transformed data frame, an estimator uses a
fit() method, applied to a data frame, to return a fully parameterized transformer called a
Model. This distinction enables estimators to configure transformers based on the input data.
As an example, the
MinMaxScaler estimator in figure 2 takes four parameters, two of which we rely on the default value.
max, which are the minimum and maximum values our scaled column will take. We keep both at their default of
outputColsare the input and output column, respectively. They follow the same conventions as with the transformer.
In order to scale the values between
max, we need to extract from the input column the minimum value (which I call
E_min) as well as the maximum value (
E_min is transformed to
E_max is transformed to
1.0, and any value in between takes a value between
max using the following formula (see the exercises at the end of the section for a corner (or edge) case when
E_min are the same).
Because the transformation relies on actual values from the data, we can’t use a plain transformer, which expects to “know” everything (through its Param-eterization) before it can apply the
transform() method. In the case of the
MinMaxScaler, we can translate
E_max as simple operations (
max comes from
- E_min = min(inputCol)
- E_max = max(inputCol)
Once those values are computed (during the
fit() method), PySpark creates, Param-meterizes, and returns a transformer/model.
transform() approach applies for estimators far more complex than
MinMaxScaler. Case in point: ML models are actually implemented as estimators in Spark.
That’s all for this article.