Chapter 14 Regression Analysis and Categorization with Spark and R

Regression analysis, particularly simple linear regression (OLS), is the backbone of applied econometrics. As discussed in previous chapters, regression analysis can be computationally very intensive with a dataset of many observations and variables, as it involves matrix operations on a very large model matrix. Chapter 12 discusses in one case study the special case of a large model matrix due to fixed-effects dummy variables. In this chapter, we first look at a generally applicable approach for estimating linear regression models with large datasets (when the model matrix cannot be held in RAM). Building on the same sparklyr framework (Luraschi et al. 2022) as for the simple linear regression case, we then look at classification models, such as logit and random forest. Finally, we look at how regression analysis and machine learning tasks can be organized in machine learning pipelines to be run, stored/reloaded, and updated flexibly.

14.1 Simple linear regression analysis

Suppose we want to conduct a correlation study of what factors are associated with longer or shorter arrival delays in air travel. Via its built-in ‘MLib’ library, Spark provides several high-level functions to conduct regression analyses. When calling these functions via sparklyr (or SparkR), their usage is actually very similar to the usual R packages/functions commonly used to run regressions in R.

As a simple point of reference, we first estimate a linear model with the usual R approach (all computed in the R environment). First, we load the data as a common data.table. We could also convert a copy of the entire SparkDataFrame object to a data.frame or data.table and get essentially the same outcome. However, collecting the data from the RDD structure would take much longer than parsing the CSV with fread. In addition, we only import the first 300 rows. Running regression analysis with relatively large datasets in Spark on a small local machine might fail or be rather slow.73

# flights_r <- collect(flights) # very slow!
flights_r <- data.table::fread("data/flights.csv", nrows = 300) 

Now we run a simple linear regression (OLS) and show the summary output.

# specify the linear model
model1 <- arr_delay ~ dep_delay + distance
# fit the model with OLS
fit1 <- lm(model1, flights_r)
# compute t-tests etc.
summary(fit1)
## 
## Call:
## lm(formula = model1, data = flights_r)
## 
## Residuals:
##    Min     1Q Median     3Q    Max 
## -42.39  -9.96  -1.91   9.87  48.02 
## 
## Coefficients:
##              Estimate Std. Error t value Pr(>|t|)    
## (Intercept) -0.182662   1.676560   -0.11     0.91    
## dep_delay    0.989553   0.017282   57.26   <2e-16 ***
## distance     0.000114   0.001239    0.09     0.93    
## ---
## Signif. codes:  
## 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## Residual standard error: 15.5 on 297 degrees of freedom
## Multiple R-squared:  0.917,  Adjusted R-squared:  0.917 
## F-statistic: 1.65e+03 on 2 and 297 DF,  p-value: <2e-16

Now we aim to compute essentially the same model estimate in sparklyr.74 In order to use Spark via the sparklyr package, we need to first load the package and establish a connection with Spark (similar to SparkR::sparkR.session()).

library(sparklyr)

# connect with default configuration
sc <- spark_connect(master="local")

We then copy the data.table flights_r (previously loaded into our R session) to Spark. Again, working on a normal laptop this seems trivial, but the exact same command would allow us (when connected with Spark on a cluster computer in the cloud) to properly load and distribute the data.table on the cluster. Finally, we then fit the model with ml_linear_regression() and compute.

# load data to spark
flights_spark <- copy_to(sc, flights_r, "flights_spark")
# fit the model
fit1_spark <- ml_linear_regression(flights_spark, formula = model1)
# compute summary stats
summary(fit1_spark)
Deviance Residuals:
    Min      1Q  Median      3Q     Max 
-42.386  -9.965  -1.911   9.866  48.024 

Coefficients:
  (Intercept)     dep_delay      distance 
-0.1826622687  0.9895529018  0.0001139616 

R-Squared: 0.9172
Root Mean Squared Error: 15.42

Alternatively, we can use the spark_apply() function to run the regression analysis in R via the original R lm() function.75

# fit the model
spark_apply(flights_spark, 
            function(df){
              broom::tidy(lm(arr_delay ~ dep_delay + distance, df))},
            names = c("term", 
                      "estimate", 
                      "std.error", 
                      "statistic", 
                      "p.value")
    )
# Source: spark<?> [?? x 5]
  term         estimate std.error statistic   p.value
  <chr>           <dbl>     <dbl>     <dbl>     <dbl>
1 (Intercept) -0.183      1.68      -0.109  9.13e-  1
2 dep_delay    0.990      0.0173    57.3    1.63e-162
3 distance     0.000114   0.00124    0.0920 9.27e-  1

Finally, the parsnip package (Kuhn and Vaughan 2022) (together with the tidymodels package; Kuhn and Wickham (2020)) provides a simple interface to run the same model (or similar specifications) on different “engines” (estimators/fitting algorithms), and several of the parsnip models are also supported in sparklyr. This significantly facilitates the transition from local testing (with a small subset of the data) to running the estimation on the entire dataset on spark.

library(tidymodels)
library(parsnip)

# simple local linear regression example from above
# via tidymodels/parsnip
fit1 <- fit(linear_reg(engine="lm"), model1, data=flights_r)
tidy(fit1)
# A tibble: 3 × 5
  term         estimate std.error statistic   p.value
  <chr>           <dbl>     <dbl>     <dbl>     <dbl>
1 (Intercept) -0.183      1.68      -0.109  9.13e-  1
2 dep_delay    0.990      0.0173    57.3    1.63e-162
3 distance     0.000114   0.00124    0.0920 9.27e-  1
# run the same on Spark 
fit1_spark <- fit(linear_reg(engine="spark"), model1, data=flights_spark)
tidy(fit1_spark)
# A tibble: 3 × 5
  term         estimate std.error statistic   p.value
  <chr>           <dbl>     <dbl>     <dbl>     <dbl>
1 (Intercept) -0.183      1.68      -0.109  9.13e-  1
2 dep_delay    0.990      0.0173    57.3    1.63e-162
3 distance     0.000114   0.00124    0.0920 9.27e-  1

We will further build on this interface in the next section where we look at different machine learning procedures for a classification problem.

14.2 Machine learning for classification

Building on sparklyr, tidymodels, and parsnip, we test a set of machine learning models on the classification problem discussed in Varian (2014), predicting Titanic survivors. The data for this exercise can be downloaded from here: http://doi.org/10.3886/E113925V1.

We import and prepare the data in R.

# load into R, select variables of interest, remove missing
titanic_r <- read.csv("data/titanic3.csv")
titanic_r <- na.omit(titanic_r[, c("survived",
                           "pclass",
                           "sex",
                           "age",
                           "sibsp",
                           "parch")])
titanic_r$survived <- ifelse(titanic_r$survived==1, "yes", "no")

In order to assess the performance of the classifiers later on, we split the sample into training and test datasets. We do so with the help of the rsample package (Frick et al. 2022), which provides a number of high-level functions to facilitate this kind of pre-processing.

library(rsample)

# split into training and test set
titanic_r <- initial_split(titanic_r)
ti_training <- training(titanic_r)
ti_testing <- testing(titanic_r)

For the training and assessment of the classifiers, we transfer the two datasets to the spark cluster.

# load data to spark
ti_training_spark <- copy_to(sc, ti_training, "ti_training_spark")
ti_testing_spark <- copy_to(sc, ti_testing, "ti_testing_spark")

Now we can set up a ‘horse race’ between different ML approaches to find the best performing model. Overall, we will consider the following models/algorithms:

  • Logistic regression
  • Boosted trees
  • Random forest
# models to be used
models <- list(logit=logistic_reg(engine="spark", mode = "classification"),
               btree=boost_tree(engine = "spark", mode = "classification"),
               rforest=rand_forest(engine = "spark", mode = "classification"))
# train/fit the models
fits <- lapply(models, fit, formula=survived~., data=ti_training_spark)

The fitted models (trained algorithms) can now be assessed with the help of the test dataset. To this end, we use the high-level accuracy function provided in the yardstick package (Kuhn, Vaughan, and Hvitfeldt 2022) to compute the accuracy of the fitted models. We proceed in three steps. First, we use the fitted models to predict the outcomes (we classify cases into survived/did not survive) of the test set. Then we fetch the predictions from the Spark cluster, format the variables, and add the actual outcomes as an additional column.

# run predictions
predictions <- lapply(fits, predict, new_data=ti_testing_spark)
# fetch predictions from Spark, format, add actual outcomes
pred_outcomes <- 
     lapply(1:length(predictions), function(i){
          x_r <- collect(predictions[[i]]) # load into local R environment
          x_r$pred_class <- as.factor(x_r$pred_class) # format for predictions
          x_r$survived <- as.factor(ti_testing$survived) # add true outcomes
          return(x_r)
     
})

Finally, we compute the accuracy of the models, stack the results, and display them (ordered from best-performing to worst-performing.)

acc <- lapply(pred_outcomes, accuracy, truth="survived", estimate="pred_class")
acc <- bind_rows(acc)
acc$model <- names(fits)
acc[order(acc$.estimate, decreasing = TRUE),]
# A tibble: 3 × 4
  .metric  .estimator .estimate model  
  <chr>    <chr>          <dbl> <chr>  
1 accuracy binary         0.817 rforest
2 accuracy binary         0.790 btree  
3 accuracy binary         0.779 logit  

In this simple example, all models perform similarly well. However, none of them really performs outstandingly. In a next step, we might want to learn about which variables are considered more or less important for the predictions. Here, the tidy() function is very useful. As long as the model types are comparable (here btree and rforest), tidy() delivers essentially the same type of summary for different models.

tidy(fits[["btree"]])
# A tibble: 5 × 2
  feature  importance
  <chr>         <dbl>
1 age          0.415 
2 sex_male     0.223 
3 pclass       0.143 
4 sibsp        0.120 
5 parch        0.0987
tidy(fits[["rforest"]])
# A tibble: 5 × 2
  feature  importance
  <chr>         <dbl>
1 sex_male     0.604 
2 pclass       0.188 
3 age          0.120 
4 sibsp        0.0595
5 parch        0.0290

Finally, we clean up and disconnect from the Spark cluster.

spark_disconnect(sc)

14.3 Building machine learning pipelines with R and Spark

Spark provides a framework to implement machine learning pipelines called ML Pipelines, with the aim of facilitating the combination of various preparatory steps and ML algorithms into a pipeline/workflow. sparklyr provides a straightforward interface to ML Pipelines that allows implementing and testing the entire ML workflow in R and then easily deploying the final pipeline to a Spark cluster or more generally to the production environment. In the following example, we will revisit the e-commerce purchase prediction model (Google Analytics data from the Google Merchandise Shop) introduced in Chapter 1. That is, we want to prepare the Google Analytics data and then use lasso to find a set of important predictors for purchase decisions, all built into a machine learning pipeline.

14.3.1 Set up and data import

All of the key ingredients are provided in sparklyr. However, I recommend using the ‘piping’ syntax provided in dplyr (Wickham et al. 2023) to implement the machine learning pipeline. In this context, using this syntax is particularly helpful to make the code easy to read and understand.

# load packages
library(sparklyr)
library(dplyr)

# fix vars
INPUT_DATA <- "data/ga.csv"

Recall that the Google Analytics dataset is a small subset of the overall data generated by Google Analytics on a moderately sized e-commerce site. Hence, it makes perfect sense to first implement and test the pipeline locally (on a local Spark installation) before deploying it on an actual Spark cluster in the cloud. In a first step, we thus copy the imported data to the local Spark instance.

# import to local R session, prepare raw data
ga <- na.omit(read.csv(INPUT_DATA))
#ga$purchase <- as.factor(ifelse(ga$purchase==1, "yes", "no"))
# connect to, and copy the data to the local cluster
sc <- spark_connect(master = "local")
ga_spark <- copy_to(sc, ga, "ga_spark", overwrite = TRUE)

14.3.2 Building the pipeline

The pipeline object is initialized via ml_pipeline(), in which we refer to the connection to the local Spark cluster. We then add the model specification (the formula) to the pipeline with ft_r_formula(). ft_r_formula essentially transforms the data in accordance with the common specification syntax in R (here: purchase ~ .). Among other things, this takes care of properly setting up the model matrix. Finally, we add the model via ml_logistic_regression(). We can set the penalization parameters via elastic_net_param (with alpha=1, we get the lasso).

# ml pipeline
ga_pipeline <- 
     ml_pipeline(sc) %>%
     ft_string_indexer(input_col="city", 
                       output_col="city_output",
                       handle_invalid = "skip") %>%
     ft_string_indexer(input_col="country", 
                       output_col="country_output",
                       handle_invalid = "skip") %>%
     ft_string_indexer(input_col="source", 
                       output_col="source_output",
                       handle_invalid = "skip") %>%
     ft_string_indexer(input_col="browser", 
                       output_col="browser_output",
                       handle_invalid = "skip") %>%
     ft_r_formula(purchase ~ .) %>% 
     ml_logistic_regression(elastic_net_param = list(alpha=1))

Finally, we create a cross-validator object to train the model with a k-fold cross-validation and fit the model. For the sake of the example, we use only a 30-fold cross validation (to be run in parallel on 8 cores).

# specify the hyperparameter grid
# (parameter values to be considered in optimization)
ga_params <- list(logistic_regression=list(max_iter=80))

# create the cross-validator object
set.seed(1)
cv_lasso <- ml_cross_validator(sc,
                         estimator=ga_pipeline,
                         estimator_param_maps = ga_params,
                         ml_binary_classification_evaluator(sc),
                         num_folds = 30, 
                         parallelism = 8)

# train/fit the model
cv_lasso_fit <- ml_fit(cv_lasso, ga_spark) 
# note: this takes several minutes to run on a local machine (1 node, 8 cores)

Finally, we can inspect and further process the results – in particular the model’s performance.

# pipeline summary 
# cv_lasso_fit
# average performance
cv_lasso_fit$avg_metrics_df
  areaUnderROC max_iter_1
1    0.8666304         80

Before closing the connection to the Spark cluster, we can save the entire pipeline to work further with it later on.

# save the entire pipeline/fit
ml_save(
  cv_lasso_fit,
  "ga_cv_lasso_fit",
  overwrite = TRUE
)

To reload the pipeline later on, run ml_load(sc, "ga_cv_lasso_fit").

14.4 Wrapping up

The key take-aways from this chapter are:

  • When running econometric analysis such as linear or logistic regressions with massive amounts of data, sparklyr provides all the basic functions you need.
  • You can test your code on your local spark installation by connecting to the local ‘cluster’: spark_connect(master="local"). This allows you to test your entire regression analysis script locally (on a sub-sample) before running the exact same script via a connection to a large spark cluster on AWS EMR. To do so, simply connect to the cluster via spark_connect(master = "yarn") from RStudio server, following the setup introduced in Section 8.4.
  • The rsample package provides easy-to-use high-level functions to split your dataset into training and test datasets: See ?initial_split, ?training, and ?testing.
  • The parsnip and broom packages (Robinson, Hayes, and Couch 2022) provide a way to easily standardize regression output. This is very helpful if you want to verify your regression analysis implementation for Spark with the more familiar R regression frameworks such as lm(). For example, compare the standard R OLS output with the linear regression output computed on a Spark cluster: fit(linear_reg(engine="lm"), model1, data=flights_r) for R’s standard OLS; fit(linear_reg(engine="spark"), model1, data=flights_r) for Spark.
  • For more advanced users, sparklyr provides a straightforward way to efficiently implement entire Spark machine learning pipelines in an R script via ml_pipeline(sc) and the dplyr-style pipe operators %>%, including model specification, data preparation, and selection and specification of the estimator.

References

Frick, Hannah, Fanny Chow, Max Kuhn, Michael Mahoney, Julia Silge, and Hadley Wickham. 2022. Rsample: General Resampling Infrastructure. https://CRAN.R-project.org/package=rsample.
Kuhn, Max, and Davis Vaughan. 2022. Parsnip: A Common API to Modeling and Analysis Functions. https://CRAN.R-project.org/package=parsnip.
Kuhn, Max, Davis Vaughan, and Emil Hvitfeldt. 2022. Yardstick: Tidy Characterizations of Model Performance. https://CRAN.R-project.org/package=yardstick.
Kuhn, Max, and Hadley Wickham. 2020. “Tidymodels: A Collection of Packages for Modeling and Machine Learning Using Tidyverse Principles.” https://www.tidymodels.org [last visited: 16/05/2023].
Luraschi, Javier, Kevin Kuo, Kevin Ushey, JJ Allaire, Hossein Falaki, Lu Wang, Andy Zhang, Yitao Li, Edgar Ruiz, and The Apache Software Foundation. 2022. sparklyr: R Interface to Apache Spark. https://spark.rstudio.com/.
Robinson, David, Alex Hayes, and Simon Couch. 2022. Broom: Convert Statistical Objects into Tidy Tibbles. https://CRAN.R-project.org/package=broom.
Varian, Hal R. 2014. “Big Data: New Tricks for Econometrics.” Journal of Economic Perspectives 28 (2): 3–28. https://doi.org/10.1257/jep.28.2.3.
Wickham, Hadley, Romain François, Lionel Henry, Kirill Müller, and Davis Vaughan. 2023. Dplyr: A Grammar of Data Manipulation. https://CRAN.R-project.org/package=dplyr.

  1. Again, it is important to keep in mind that running Spark on a small local machine is only optimal for learning and testing code (based on relatively small samples). The whole framework is optimized to be run on cluster computers.↩︎

  2. Most regression models commonly used in traditional applied econometrics are provided in some form in sparklyr or SparkR. See the package documentation for more details.↩︎

  3. Note, though, that this approach might take longer.↩︎