Chapter 14 Regression Analysis and Categorization with Spark and R
Regression analysis, particularly simple linear regression (OLS), is the backbone of applied econometrics. As discussed in previous chapters, regression analysis can be computationally very intensive with a dataset of many observations and variables, as it involves matrix operations on a very large model matrix. Chapter 12 discusses in one case study the special case of a large model matrix due to fixed-effects dummy variables. In this chapter, we first look at a generally applicable approach for estimating linear regression models with large datasets (when the model matrix cannot be held in RAM). Building on the same sparklyr
framework (Luraschi et al. 2022) as for the simple linear regression case, we then look at classification models, such as logit and random forest. Finally, we look at how regression analysis and machine learning tasks can be organized in machine learning pipelines to be run, stored/reloaded, and updated flexibly.
14.1 Simple linear regression analysis
Suppose we want to conduct a correlation study of what factors are associated with longer or shorter arrival delays in air travel. Via its built-in ‘MLib’ library, Spark provides several high-level functions to conduct regression analyses. When calling these functions via sparklyr
(or SparkR
), their usage is actually very similar to the usual R packages/functions commonly used to run regressions in R.
As a simple point of reference, we first estimate a linear model with the usual R approach (all computed in the R environment). First, we load the data as a common data.table
. We could also convert a copy of the entire SparkDataFrame
object to a data.frame
or data.table
and get essentially the same outcome. However, collecting the data from the RDD structure would take much longer than parsing the CSV with fread
. In addition, we only import the first 300 rows. Running regression analysis with relatively large datasets in Spark on a small local machine might fail or be rather slow.73
# flights_r <- collect(flights) # very slow!
<- data.table::fread("data/flights.csv", nrows = 300) flights_r
Now we run a simple linear regression (OLS) and show the summary output.
# specify the linear model
<- arr_delay ~ dep_delay + distance
model1 # fit the model with OLS
<- lm(model1, flights_r)
fit1 # compute t-tests etc.
summary(fit1)
##
## Call:
## lm(formula = model1, data = flights_r)
##
## Residuals:
## Min 1Q Median 3Q Max
## -42.39 -9.96 -1.91 9.87 48.02
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) -0.182662 1.676560 -0.11 0.91
## dep_delay 0.989553 0.017282 57.26 <2e-16 ***
## distance 0.000114 0.001239 0.09 0.93
## ---
## Signif. codes:
## 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## Residual standard error: 15.5 on 297 degrees of freedom
## Multiple R-squared: 0.917, Adjusted R-squared: 0.917
## F-statistic: 1.65e+03 on 2 and 297 DF, p-value: <2e-16
Now we aim to compute essentially the same model estimate in sparklyr
.74 In order to use Spark via the sparklyr
package, we need to first load the package and establish a connection with Spark (similar to SparkR::sparkR.session()
).
library(sparklyr)
# connect with default configuration
<- spark_connect(master="local") sc
We then copy the data.table flights_r
(previously loaded into our R session) to Spark. Again, working on a normal laptop this seems trivial, but the exact same command would allow us (when connected with Spark on a cluster computer in the cloud) to properly load and distribute the data.table on the cluster. Finally, we then fit the model with ml_linear_regression()
and compute.
# load data to spark
<- copy_to(sc, flights_r, "flights_spark")
flights_spark # fit the model
<- ml_linear_regression(flights_spark, formula = model1)
fit1_spark # compute summary stats
summary(fit1_spark)
Deviance Residuals:
Min 1Q Median 3Q Max
-42.386 -9.965 -1.911 9.866 48.024
Coefficients:
(Intercept) dep_delay distance
-0.1826622687 0.9895529018 0.0001139616
R-Squared: 0.9172
Root Mean Squared Error: 15.42
Alternatively, we can use the spark_apply()
function to run the regression analysis in R via the original R lm()
function.75
# fit the model
spark_apply(flights_spark,
function(df){
::tidy(lm(arr_delay ~ dep_delay + distance, df))},
broomnames = c("term",
"estimate",
"std.error",
"statistic",
"p.value")
)
# Source: spark<?> [?? x 5]
term estimate std.error statistic p.value
<chr> <dbl> <dbl> <dbl> <dbl>
1 (Intercept) -0.183 1.68 -0.109 9.13e- 1
2 dep_delay 0.990 0.0173 57.3 1.63e-162
3 distance 0.000114 0.00124 0.0920 9.27e- 1
Finally, the parsnip
package (Kuhn and Vaughan 2022) (together with the tidymodels
package; Kuhn and Wickham (2020)) provides a simple interface to run the same model (or similar specifications) on different “engines” (estimators/fitting algorithms), and several of the parsnip
models are also supported in sparklyr
. This significantly facilitates the transition from local testing (with a small subset of the data) to running the estimation on the entire dataset on spark.
library(tidymodels)
library(parsnip)
# simple local linear regression example from above
# via tidymodels/parsnip
<- fit(linear_reg(engine="lm"), model1, data=flights_r)
fit1 tidy(fit1)
# A tibble: 3 × 5
term estimate std.error statistic p.value
<chr> <dbl> <dbl> <dbl> <dbl>
1 (Intercept) -0.183 1.68 -0.109 9.13e- 1
2 dep_delay 0.990 0.0173 57.3 1.63e-162
3 distance 0.000114 0.00124 0.0920 9.27e- 1
# run the same on Spark
<- fit(linear_reg(engine="spark"), model1, data=flights_spark)
fit1_spark tidy(fit1_spark)
# A tibble: 3 × 5
term estimate std.error statistic p.value
<chr> <dbl> <dbl> <dbl> <dbl>
1 (Intercept) -0.183 1.68 -0.109 9.13e- 1
2 dep_delay 0.990 0.0173 57.3 1.63e-162
3 distance 0.000114 0.00124 0.0920 9.27e- 1
We will further build on this interface in the next section where we look at different machine learning procedures for a classification problem.
14.2 Machine learning for classification
Building on sparklyr
, tidymodels
, and parsnip
, we test a set of machine learning models on the classification problem discussed in Varian (2014), predicting Titanic survivors. The data for this exercise can be downloaded from here: http://doi.org/10.3886/E113925V1.
We import and prepare the data in R.
# load into R, select variables of interest, remove missing
<- read.csv("data/titanic3.csv")
titanic_r <- na.omit(titanic_r[, c("survived",
titanic_r "pclass",
"sex",
"age",
"sibsp",
"parch")])
$survived <- ifelse(titanic_r$survived==1, "yes", "no") titanic_r
In order to assess the performance of the classifiers later on, we split the sample into training and test datasets. We do so with the help of the rsample
package (Frick et al. 2022), which provides a number of high-level functions to facilitate this kind of pre-processing.
library(rsample)
# split into training and test set
<- initial_split(titanic_r)
titanic_r <- training(titanic_r)
ti_training <- testing(titanic_r) ti_testing
For the training and assessment of the classifiers, we transfer the two datasets to the spark cluster.
# load data to spark
<- copy_to(sc, ti_training, "ti_training_spark")
ti_training_spark <- copy_to(sc, ti_testing, "ti_testing_spark") ti_testing_spark
Now we can set up a ‘horse race’ between different ML approaches to find the best performing model. Overall, we will consider the following models/algorithms:
- Logistic regression
- Boosted trees
- Random forest
# models to be used
<- list(logit=logistic_reg(engine="spark", mode = "classification"),
models btree=boost_tree(engine = "spark", mode = "classification"),
rforest=rand_forest(engine = "spark", mode = "classification"))
# train/fit the models
<- lapply(models, fit, formula=survived~., data=ti_training_spark) fits
The fitted models (trained algorithms) can now be assessed with the help of the test dataset. To this end, we use the high-level accuracy
function provided in the yardstick
package (Kuhn, Vaughan, and Hvitfeldt 2022) to compute the accuracy of the fitted models. We proceed in three steps. First, we use the fitted models to predict the outcomes (we classify cases into survived/did not survive) of the test set. Then we fetch the predictions from the Spark cluster, format the variables, and add the actual outcomes as an additional column.
# run predictions
<- lapply(fits, predict, new_data=ti_testing_spark)
predictions # fetch predictions from Spark, format, add actual outcomes
<-
pred_outcomes lapply(1:length(predictions), function(i){
<- collect(predictions[[i]]) # load into local R environment
x_r $pred_class <- as.factor(x_r$pred_class) # format for predictions
x_r$survived <- as.factor(ti_testing$survived) # add true outcomes
x_rreturn(x_r)
})
Finally, we compute the accuracy of the models, stack the results, and display them (ordered from best-performing to worst-performing.)
<- lapply(pred_outcomes, accuracy, truth="survived", estimate="pred_class")
acc <- bind_rows(acc)
acc $model <- names(fits)
accorder(acc$.estimate, decreasing = TRUE),] acc[
# A tibble: 3 × 4
.metric .estimator .estimate model
<chr> <chr> <dbl> <chr>
1 accuracy binary 0.817 rforest
2 accuracy binary 0.790 btree
3 accuracy binary 0.779 logit
In this simple example, all models perform similarly well. However, none of them really performs outstandingly. In a next step, we might want to learn about which variables are considered more or less important for the predictions. Here, the tidy()
function is very useful. As long as the model types are comparable (here btree
and rforest
), tidy()
delivers essentially the same type of summary for different models.
tidy(fits[["btree"]])
# A tibble: 5 × 2
feature importance
<chr> <dbl>
1 age 0.415
2 sex_male 0.223
3 pclass 0.143
4 sibsp 0.120
5 parch 0.0987
tidy(fits[["rforest"]])
# A tibble: 5 × 2
feature importance
<chr> <dbl>
1 sex_male 0.604
2 pclass 0.188
3 age 0.120
4 sibsp 0.0595
5 parch 0.0290
Finally, we clean up and disconnect from the Spark cluster.
spark_disconnect(sc)
14.3 Building machine learning pipelines with R and Spark
Spark provides a framework to implement machine learning pipelines called ML Pipelines, with the aim of facilitating the combination of various preparatory steps and ML algorithms into a pipeline/workflow. sparklyr
provides a straightforward interface to ML Pipelines that allows implementing and testing the entire ML workflow in R and then easily deploying the final pipeline to a Spark cluster or more generally to the production environment. In the following example, we will revisit the e-commerce purchase prediction model (Google Analytics data from the Google Merchandise Shop) introduced in Chapter 1. That is, we want to prepare the Google Analytics data and then use lasso to find a set of important predictors for purchase decisions, all built into a machine learning pipeline.
14.3.1 Set up and data import
All of the key ingredients are provided in sparklyr
. However, I recommend using the ‘piping’ syntax provided in dplyr
(Wickham et al. 2023) to implement the machine learning pipeline. In this context, using this syntax is particularly helpful to make the code easy to read and understand.
# load packages
library(sparklyr)
library(dplyr)
# fix vars
<- "data/ga.csv" INPUT_DATA
Recall that the Google Analytics dataset is a small subset of the overall data generated by Google Analytics on a moderately sized e-commerce site. Hence, it makes perfect sense to first implement and test the pipeline locally (on a local Spark installation) before deploying it on an actual Spark cluster in the cloud. In a first step, we thus copy the imported data to the local Spark instance.
# import to local R session, prepare raw data
<- na.omit(read.csv(INPUT_DATA))
ga #ga$purchase <- as.factor(ifelse(ga$purchase==1, "yes", "no"))
# connect to, and copy the data to the local cluster
<- spark_connect(master = "local")
sc <- copy_to(sc, ga, "ga_spark", overwrite = TRUE) ga_spark
14.3.2 Building the pipeline
The pipeline object is initialized via ml_pipeline()
, in which we refer to the connection to the local Spark cluster. We then add the model specification (the formula) to the pipeline with ft_r_formula()
. ft_r_formula
essentially transforms the data in accordance with the common specification syntax in R (here: purchase ~ .
). Among other things, this takes care of properly setting up the model matrix. Finally, we add the model via ml_logistic_regression()
. We can set the penalization parameters via elastic_net_param
(with alpha=1
, we get the lasso).
# ml pipeline
<-
ga_pipeline ml_pipeline(sc) %>%
ft_string_indexer(input_col="city",
output_col="city_output",
handle_invalid = "skip") %>%
ft_string_indexer(input_col="country",
output_col="country_output",
handle_invalid = "skip") %>%
ft_string_indexer(input_col="source",
output_col="source_output",
handle_invalid = "skip") %>%
ft_string_indexer(input_col="browser",
output_col="browser_output",
handle_invalid = "skip") %>%
ft_r_formula(purchase ~ .) %>%
ml_logistic_regression(elastic_net_param = list(alpha=1))
Finally, we create a cross-validator object to train the model with a k-fold cross-validation and fit the model. For the sake of the example, we use only a 30-fold cross validation (to be run in parallel on 8 cores).
# specify the hyperparameter grid
# (parameter values to be considered in optimization)
<- list(logistic_regression=list(max_iter=80))
ga_params
# create the cross-validator object
set.seed(1)
<- ml_cross_validator(sc,
cv_lasso estimator=ga_pipeline,
estimator_param_maps = ga_params,
ml_binary_classification_evaluator(sc),
num_folds = 30,
parallelism = 8)
# train/fit the model
<- ml_fit(cv_lasso, ga_spark)
cv_lasso_fit # note: this takes several minutes to run on a local machine (1 node, 8 cores)
Finally, we can inspect and further process the results – in particular the model’s performance.
# pipeline summary
# cv_lasso_fit
# average performance
$avg_metrics_df cv_lasso_fit
areaUnderROC max_iter_1
1 0.8666304 80
Before closing the connection to the Spark cluster, we can save the entire pipeline to work further with it later on.
# save the entire pipeline/fit
ml_save(
cv_lasso_fit,"ga_cv_lasso_fit",
overwrite = TRUE
)
To reload the pipeline later on, run ml_load(sc, "ga_cv_lasso_fit")
.
14.4 Wrapping up
The key take-aways from this chapter are:
- When running econometric analysis such as linear or logistic regressions with massive amounts of data,
sparklyr
provides all the basic functions you need. - You can test your code on your local spark installation by connecting to the local ‘cluster’:
spark_connect(master="local")
. This allows you to test your entire regression analysis script locally (on a sub-sample) before running the exact same script via a connection to a large spark cluster on AWS EMR. To do so, simply connect to the cluster viaspark_connect(master = "yarn")
from RStudio server, following the setup introduced in Section 8.4. - The
rsample
package provides easy-to-use high-level functions to split your dataset into training and test datasets: See?initial_split
,?training
, and?testing
. - The
parsnip
andbroom
packages (Robinson, Hayes, and Couch 2022) provide a way to easily standardize regression output. This is very helpful if you want to verify your regression analysis implementation for Spark with the more familiar R regression frameworks such aslm()
. For example, compare the standard R OLS output with the linear regression output computed on a Spark cluster:fit(linear_reg(engine="lm"), model1, data=flights_r)
for R’s standard OLS;fit(linear_reg(engine="spark"), model1, data=flights_r)
for Spark. - For more advanced users,
sparklyr
provides a straightforward way to efficiently implement entire Spark machine learning pipelines in an R script viaml_pipeline(sc)
and thedplyr
-style pipe operators%>%
, including model specification, data preparation, and selection and specification of the estimator.
References
Again, it is important to keep in mind that running Spark on a small local machine is only optimal for learning and testing code (based on relatively small samples). The whole framework is optimized to be run on cluster computers.↩︎
Most regression models commonly used in traditional applied econometrics are provided in some form in
sparklyr
orSparkR
. See the package documentation for more details.↩︎Note, though, that this approach might take longer.↩︎