Chapter 9 Big Data Cleaning and Transformation

Preceding the filtering/selection/aggregation of raw data, data cleaning and transformation typically have to be run on large volumes of raw data before the observations and variables of interest can be further analyzed. Typical data cleaning tasks involve:

  • Normalization/standardization (across entities, categories, observation periods).
  • Coding of additional variables (indicators, strings to categorical, etc.).
  • Removing/adding covariates.
  • Merging/joining datasets.
  • Properly defining data types for each variable.

All of these steps are very common tasks when working with data for analytics purposes, independent of the size of the dataset. However, as most of the techniques and software developed for such tasks is meant to process data in memory, performing these tasks on large datasets can be challenging. Data cleaning workflows you are perfectly familiar with might slow down substantially or crash due to a lack of memory (RAM), particularly if the data preparation step involves merging/joining two datasets. Other potential bottlenecks are the parsing of large files (CPU) or intensive reading from and writing to the hard disk (Mass storage).

In practice, the most critical bottleneck of common data preparation tasks is often a lack of RAM. In the following, we thus explore two strategies that broadly build on the idea of virtual memory (using parts of the hard disk as RAM) and/or lazy evaluation (only loading/processing the part of a dataset really required).

9.1 Out-of-memory strategies and lazy evaluation: Practical basics

Virtual memory is in simple words an approach to combining the RAM and mass storage components in order to cope with a lack of RAM. Modern operating systems come with a virtual memory manager that automatically handles the swapping between RAM and the hard-disk, when running processes that use up too much RAM. However, a virtual memory manager is not specifically developed to perform this task in the context of data analysis. Several strategies have thus been developed to build on the basic idea of virtual memory in the context of data analysis tasks.

  • Chunked data files on disk: The data analytics software ‘partitions’ the dataset, and maps and stores the chunks of raw data on disk. What is actually ‘read’ into RAM when importing the data file with this approach is the mapping to the partitions of the actual dataset (the data structure) and some metadata describing the dataset. In R, this approach is implemented in the ff package (Adler et al. 2022) and several packages building on ff. In this approach, the usage of disk space and the linking between RAM and files on disk is very explicit (and clearly visible to the user).

  • Memory mapped files and shared memory: The data analytics software uses segments of virtual memory for the dataset and allows different programs/processes to access it in the same memory segment. Thus, virtual memory is explicitly allocated for one or several specific data analytics tasks. In R, this approach is notably implemented in the bigmemory package (Kane, Emerson, and Weston 2013) and several packages building on bigmemory.

A conceptually related but differently focused approach is the lazy evaluation implemented in Apache Arrow and the corresponding arrow package (Richardson et al. 2022). While Apache Arrow is basically a platform for in-memory columnar data, it is optimized for processing large amounts of data and working with datasets that actually do not fit into memory. The way this is done is that instructions on what to do with a dataset are not evaluated step-by-step on the spot but all together at the point of actually loading the data into R. That is, we can connect to a dataset via arrow, see its variables, etc., give instructions of which observations to filter out and which columns to select, all before we read the dataset into RAM. In comparison to the strategies outlined above, this approach is usually much faster but might still lead to a situation with a lack of memory.

In the following subsections we briefly look at how to set up an R session for data preparation purposes with any of these approaches (ff, bigmemory, arrow) and look at some of the conceptual basics behind the approaches.

9.1.1 Chunking data with the ff package

We first install and load the ff and ffbase (de Jonge, Wijffels, and van der Laan 2023) packages, as well as the pryr package. We use the familiar flights.csv dataset54 For the sake of the example, we only use a fraction of the original dataset.55 On disk, the dataset is about 30MB:

fs::file_size("data/flights.csv")
## 29.5M

However, loading the entire dataset of several GBs would work just fine, using the ff-approach.

When importing data via the ff package, we first have to set up a directory where ff can store the partitioned dataset (recall that this is explicitly/visibly done on disk). We call this new directory ff_files.

# SET UP --------------
# install.packages(c("ff", "ffbase"))
# you might have to install the ffbase package directly from GitHub:
# devtools::install_github("edwindj/ffbase", subdir="pkg")
# load packages
library(ff)
library(ffbase)
library(data.table) # for comparison


# create directory for ff chunks, and assign directory to ff 
system("mkdir ff_files")
options(fftempdir = "ff_files")

Now we can read in the data with read.table.ffdf. In order to better understand the underlying concept, we also import the data into a common data.table object via fread() and then look at the size of the objects resulting from the two ‘import’ approaches in the R environment with object.size().

# usual in-memory csv import
flights_dt <- fread("data/flights.csv")

# out-of-memory approach
flights <- 
     read.table.ffdf(file="data/flights.csv",
                     sep=",",
                     VERBOSE=TRUE,
                     header=TRUE,
                     next.rows=100000,
                     colClasses=NA)
## read.table.ffdf 1..100000 (100000)  csv-read=0.609sec ffdf-write=0.065sec
## read.table.ffdf 100001..200000 (100000)  csv-read=0.479sec ffdf-write=0.044sec
## read.table.ffdf 200001..300000 (100000)  csv-read=0.446sec ffdf-write=0.046sec
## read.table.ffdf 300001..336776 (36776)  csv-read=0.184sec ffdf-write=0.04sec
##  csv-read=1.718sec  ffdf-write=0.195sec  TOTAL=1.913sec
# compare object sizes
object.size(flights) # out-of-memory approach
## 949976 bytes
object.size(flights_dt) # common data.table
## 32569024 bytes

Note that there are two substantial differences to what we have previously seen when using fread(). It takes much longer to import a CSV into the ff_files structure. However, the RAM allocated to it is much smaller. This is exactly what we would expect, keeping in mind what read.table.ffdf() does in comparison to what fread() does. Now we can actually have a look at the data chunks created by ff.

# show the files in the directory keeping the chunks
head(list.files("ff_files"))
## [1] "ffdf42b781703dcfe.ff" "ffdf42b781d18cdf9.ff"
## [3] "ffdf42b781d5105fb.ff" "ffdf42b781d6ccc74.ff"
## [5] "ffdf42b782494a6a1.ff" "ffdf42b782df9670d.ff"

9.1.2 Memory mapping with bigmemory

The bigmemory package handles data in matrices and therefore only accepts data values of identical data type. Before importing data via the bigmemory package, we thus have to ensure that all variables in the raw data can be imported in a common type.

# SET UP ----------------

# load packages
library(bigmemory)
library(biganalytics)

# import the data
flights <- read.big.matrix("data/flights.csv",
                     type="integer",
                     header=TRUE,
                     backingfile="flights.bin",
                     descriptorfile="flights.desc")

Note that, similar to the ff example, read.big.matrix() creates a local file flights.bin on disk that is linked to the flights object in RAM. From looking at the imported file, we see that various variable values have been discarded. This is because we have forced all variables to be of type "integer" when importing the dataset.

object.size(flights)
## 696 bytes
str(flights)
## Formal class 'big.matrix' [package "bigmemory"] with 1
slot
## ..@ address:<externalptr>

Again, the object representing the dataset in R does not contain the actual data (it does not even take up a KB of memory).

9.1.3 Connecting to Apache Arrow

# SET UP ----------------

# load packages
library(arrow)

# import the data
flights <- read_csv_arrow("data/flights.csv",
                     as_data_frame = FALSE)

Note the as_data_frame=FALSE in the function call. This instructs Arrow to establish a connection to the file and read some of the data (to understand what is in the file), but not actually import the whole CSV.

summary(flights)
##                Length Class        Mode       
## year           336776 ChunkedArray environment
## month          336776 ChunkedArray environment
## day            336776 ChunkedArray environment
## dep_time       336776 ChunkedArray environment
## sched_dep_time 336776 ChunkedArray environment
## dep_delay      336776 ChunkedArray environment
## arr_time       336776 ChunkedArray environment
## sched_arr_time 336776 ChunkedArray environment
## arr_delay      336776 ChunkedArray environment
## carrier        336776 ChunkedArray environment
## flight         336776 ChunkedArray environment
## tailnum        336776 ChunkedArray environment
## origin         336776 ChunkedArray environment
## dest           336776 ChunkedArray environment
## air_time       336776 ChunkedArray environment
## distance       336776 ChunkedArray environment
## hour           336776 ChunkedArray environment
## minute         336776 ChunkedArray environment
## time_hour      336776 ChunkedArray environment
object.size(flights)
## 488 bytes

Again, we notice that the flights object is much smaller than the actual dataset on disk.

9.2 Big Data preparation tutorial with ff

9.2.1 Set up

The following code and data examples build on Walkowiak (2016), Chapter 3.56 The set up for our analysis script involves the loading of the ff and ffbase packages, the initialization of fixed variables to hold the paths to the datasets, and the creation and assignment of a new local directory ff_files in which the binary flat file-partitioned chunks of the original datasets will be stored.

## SET UP ------------------------

# create and set directory for ff files
system("mkdir ff_files")
options(fftempdir = "ff_files")

# load packages
library(ff)
library(ffbase)
library(pryr)

# fix vars
FLIGHTS_DATA <- "data/flights_sep_oct15.txt"
AIRLINES_DATA <- "data/airline_id.csv"

9.2.2 Data import

In a first step we read (or ‘upload’) the data into R. This step involves the creation of the binary chunked files as well as the mapping of these files and the metadata. In comparison to the traditional read.csv approach, you will notice two things. On the one hand the data import takes longer; on the other hand it uses up much less RAM than with read.csv.

# DATA IMPORT ------------------

# check memory used
mem_used()
## 1.79 GB
# 1. Upload flights_sep_oct15.txt and airline_id.csv files from flat files. 

system.time(flights.ff <- read.table.ffdf(file=FLIGHTS_DATA,
                                          sep=",",
                                          VERBOSE=TRUE,
                                          header=TRUE,
                                          next.rows=100000,
                                          colClasses=NA))
## read.table.ffdf 1..100000 (100000)  csv-read=0.564sec ffdf-write=0.095sec
## read.table.ffdf 100001..200000 (100000)  csv-read=0.603sec ffdf-write=0.072sec
## read.table.ffdf 200001..300000 (100000)  csv-read=0.611sec ffdf-write=0.068sec
## read.table.ffdf 300001..400000 (100000)  csv-read=0.625sec ffdf-write=0.08sec
## read.table.ffdf 400001..500000 (100000)  csv-read=0.626sec ffdf-write=0.072sec
## read.table.ffdf 500001..600000 (100000)  csv-read=0.681sec ffdf-write=0.075sec
## read.table.ffdf 600001..700000 (100000)  csv-read=0.638sec ffdf-write=0.069sec
## read.table.ffdf 700001..800000 (100000)  csv-read=0.6sec ffdf-write=0.081sec
## read.table.ffdf 800001..900000 (100000)  csv-read=0.612sec ffdf-write=0.075sec
## read.table.ffdf 900001..951111 (51111)  csv-read=0.329sec ffdf-write=0.047sec
##  csv-read=5.889sec  ffdf-write=0.734sec  TOTAL=6.623sec
##    user  system elapsed 
##   5.659   0.750   6.626
system.time(airlines.ff <- read.csv.ffdf(file= AIRLINES_DATA,
                             VERBOSE=TRUE,
                             header=TRUE,
                             next.rows=100000,
                             colClasses=NA))
## read.table.ffdf 1..1607 (1607)  csv-read=0.005sec ffdf-write=0.004sec
##  csv-read=0.005sec  ffdf-write=0.004sec  TOTAL=0.009sec
##    user  system elapsed 
##   0.009   0.001   0.010
# check memory used
mem_used()
## 1.79 GB

Comparison with read.table

# Using read.table()
system.time(flights.table <- read.table(FLIGHTS_DATA, 
                                        sep=",",
                                        header=TRUE))
##    user  system elapsed 
##   5.164   0.684   5.976
system.time(airlines.table <- read.csv(AIRLINES_DATA,
                                       header = TRUE))
##    user  system elapsed 
##   0.002   0.000   0.003
# check the memory used
mem_used()
## 1.93 GB

9.2.3 Inspect imported files

A particularly useful aspect of working with the ff package and the packages building on it is that many of the simple R functions that work on normal data.frames in RAM also work on ff_files files. Hence, without actually having loaded the entire raw data of a large dataset into RAM, we can quickly get an overview of the key characteristics, such as the number of observations and the number of variables.

# 2. Inspect the ff_files objects.
## For flights.ff object:
class(flights.ff)
## [1] "ffdf"
dim(flights.ff)
## [1] 951111     28
## For airlines.ff object:
class(airlines.ff)
## [1] "ffdf"
dim(airlines.ff)
## [1] 1607    2

9.2.4 Data cleaning and transformation

After inspecting the data, we go through several steps of cleaning and transformation, with the goal of then merging the two datasets. That is, we want to create a new dataset that contains detailed flight information but with additional information on the carriers/airlines. First, we want to rename some of the variables.

# step 1: 
# Rename "Code" variable from airlines.ff 
# to "AIRLINE_ID" and "Description" into "AIRLINE_NM".
names(airlines.ff) <- c("AIRLINE_ID", "AIRLINE_NM")
names(airlines.ff)
## [1] "AIRLINE_ID" "AIRLINE_NM"
str(airlines.ff[1:20,])
## 'data.frame': 20 obs. of 2 variables:
## $ AIRLINE_ID: int 19031 19032 19033 19034 19035 19036
19037 19038 19039 19040 ...
## $ AIRLINE_NM: Factor w/ 1607 levels "40-Mile Air:
Q5",..: 945 1025 503 721 64 725 1194 99 1395 276 ...

Now we can join the two datasets via the unique airline identifier "AIRLINE_ID". Note that these kinds of operations would usually take up substantially more RAM on the spot, if both original datasets were also fully loaded into RAM. As illustrated by the mem_change() function, this is not the case here. All that is needed is a small chunk of RAM to keep the metadata and mapping-information of the new ff_files object; all the actual data is cached on the hard disk.

# merge of ff_files objects
mem_change(flights.data.ff <- merge.ffdf(flights.ff,
                                         airlines.ff,
                                         by="AIRLINE_ID"))
## 774 kB
#The new object is only 551.2 KB in size
class(flights.data.ff)
## [1] "ffdf"
dim(flights.data.ff)
## [1] 951111     29
names(flights.data.ff)
##  [1] "YEAR"              "MONTH"            
##  [3] "DAY_OF_MONTH"      "DAY_OF_WEEK"      
##  [5] "FL_DATE"           "UNIQUE_CARRIER"   
##  [7] "AIRLINE_ID"        "TAIL_NUM"         
##  [9] "FL_NUM"            "ORIGIN_AIRPORT_ID"
## [11] "ORIGIN"            "ORIGIN_CITY_NAME" 
## [13] "ORIGIN_STATE_NM"   "ORIGIN_WAC"       
## [15] "DEST_AIRPORT_ID"   "DEST"             
## [17] "DEST_CITY_NAME"    "DEST_STATE_NM"    
## [19] "DEST_WAC"          "DEP_TIME"         
## [21] "DEP_DELAY"         "ARR_TIME"         
## [23] "ARR_DELAY"         "CANCELLED"        
## [25] "CANCELLATION_CODE" "DIVERTED"         
## [27] "AIR_TIME"          "DISTANCE"         
## [29] "AIRLINE_NM"

9.2.5 Inspect difference in in-memory operation

In comparison to the ff-approach, performing the merge in memory needs more resources:

##For flights.table:
names(airlines.table) <- c("AIRLINE_ID", "AIRLINE_NM")
names(airlines.table)
## [1] "AIRLINE_ID" "AIRLINE_NM"
str(airlines.table[1:20,])
## 'data.frame': 20 obs. of 2 variables:
## $ AIRLINE_ID: int 19031 19032 19033 19034 19035 19036
19037 19038 19039 19040 ...
## $ AIRLINE_NM: chr "Mackey International Inc.: MAC" "Munz
Northern Airlines Inc.: XY" "Cochise Airlines Inc.: COC"
"Golden Gate Airlines Inc.: GSA" ...
# check memory usage of merge in RAM 
mem_change(flights.data.table <- merge(flights.table,
                                       airlines.table,
                                       by="AIRLINE_ID"))
## 161 MB
#The new object is already 105.7 MB in size
#A rapid spike in RAM use when processing

9.2.6 Subsetting

Now, we want to filter out some observations as well as select only specific variables for a subset of the overall dataset.

mem_used()
## 2.09 GB
# Subset the ff_files object flights.data.ff:
subs1.ff <- 
     subset.ffdf(flights.data.ff, 
                 CANCELLED == 1, 
                 select = c(FL_DATE,
                            AIRLINE_ID,
                            ORIGIN_CITY_NAME,
                            ORIGIN_STATE_NM,
                            DEST_CITY_NAME,
                            DEST_STATE_NM,
                            CANCELLATION_CODE))

dim(subs1.ff)
## [1] 4529    7
mem_used()
## 2.09 GB

9.2.7 Save/load/export ff files

In order to better organize and easily reload the newly created ff_files files, we can explicitly save them to disk.

# Save a newly created ff_files object to a data file:
# (7 files (one for each column) created in the ffdb directory)
save.ffdf(subs1.ff, overwrite = TRUE) 

If we want to reload a previously saved ff_files object, we do not have to go through the chunking of the raw data file again but can very quickly load the data mapping and metadata into RAM in order to further work with the data (stored on disk).

# Loading previously saved ff_files files:
rm(subs1.ff)
#gc()
load.ffdf("ffdb")
# check the class and structure of the loaded data
class(subs1.ff) 
## [1] "ffdf"
dim(subs1.ff)
## [1] 4529    7
dimnames(subs1.ff)
## [[1]]
## NULL
## 
## [[2]]
## [1] "FL_DATE"           "AIRLINE_ID"       
## [3] "ORIGIN_CITY_NAME"  "ORIGIN_STATE_NM"  
## [5] "DEST_CITY_NAME"    "DEST_STATE_NM"    
## [7] "CANCELLATION_CODE"

If we want to store an ff_files dataset in a format more accessible for other users (such as CSV), we can do so as follows. This last step is also quite common in practice. The initial raw dataset is very large; thus we perform all the theoretically very memory-intensive tasks of preparing the analytic dataset via ff and then store the (often much smaller) analytic dataset in a more accessible CSV file in order to later read it into RAM and run more computationally intensive analyses directly in RAM.

#  Export subs1.ff into CSV and TXT files:
write.csv.ffdf(subs1.ff, "subset1.csv")

9.3 Big Data preparation tutorial with arrow

We begin by initializing our R session as in the short arrow introduction above.

# SET UP ----------------

# load packages
library(arrow)
library(dplyr)
library(pryr) # for profiling

# fix vars
FLIGHTS_DATA <- "data/flights_sep_oct15.txt"
AIRLINES_DATA <- "data/airline_id.csv"

# import the data
flights <- read_csv_arrow(FLIGHTS_DATA,
                     as_data_frame = FALSE)
airlines <- read_csv_arrow(AIRLINES_DATA,
                     as_data_frame = FALSE)

Note how the data from the CSV files is not actually read into RAM yet. The created objects flights and airlines are not data frames (yet) and occupy hardly any RAM.

class(flights)
## [1] "Table"        "ArrowTabular" "ArrowObject" 
## [4] "R6"
class(airlines)
## [1] "Table"        "ArrowTabular" "ArrowObject" 
## [4] "R6"
object_size(flights)
## 283.62 kB
object_size(airlines)
## 283.62 kB

In analogy to the ff tutorial above, we go through the same data preparation steps. First, we rename the variables in airlines to ensure that the variable names are consistent with the flights data frame.

# step 1: 
# Rename "Code" variable from airlines.ff to "AIRLINE_ID"
# and "Description" into "AIRLINE_NM".
names(airlines) <- c("AIRLINE_ID", "AIRLINE_NM")
names(airlines)
## [1] "AIRLINE_ID" "AIRLINE_NM"

In a second step, the two data frames are merged/joined. The arrow package follows dplyr-syntax regarding data preparation tasks. That is, we can directly build on functions like

# merge the two datasets via Arrow
flights.data.ar <- inner_join(airlines, flights, by="AIRLINE_ID")
object_size(flights.data.ar)
## 647.74 kB

In a last step, we filter the resulting dataset for cancelled flights and select only some of the available variables.

Now, we want to filter out some observations as well as select only specific variables for a subset of the overall dataset. As Arrow works with the dplyr back-end, we can directly use the typical dplyr-syntax to combine selection of columns and filtering of rows.

# Subset the ff_files object flights.data.ff:
subs1.ar <- 
        flights.data.ar %>%
        filter(CANCELLED == 1) %>%
        select(FL_DATE,
               AIRLINE_ID,
               ORIGIN_CITY_NAME,
               ORIGIN_STATE_NM,
               DEST_CITY_NAME,
               DEST_STATE_NM,
               CANCELLATION_CODE)
        
object_size(subs1.ar)
## 591.21 kB

Again, this operation hardly affected RAM usage by R. Note, though, that in contrast to the ff-approach, Arrow has actually not yet created the new subset sub1.ar. In fact, it has not even really imported the data or merged the two datasets. This is the effect of the lazy evaluation approach implemented in arrow. To further process the data in sub1.ar with other functions (outside of arrow), we need to actually trigger the evaluation of all the data preparation steps we have just instructed R to do. This is done via collect().

mem_change(subs1.ar.df <- collect(subs1.ar))
## 2.47 MB
class(subs1.ar.df)
## [1] "tbl_df"     "tbl"        "data.frame"
object_size(subs1.ar.df)
## 57.15 kB

Note how in this tutorial, the final subset is substantially smaller than the initial two datasets. Hence, in this case it is fine to actually load this into RAM as a data frame. However, this is not a necessary part of the workflow. Instead of calling collect(), you can then trigger the computation of all the data preparation steps via compute() and, for example, store the resulting arrow table to a CSV file.

subs1.ar %>% 
        compute() %>% 
        write_csv_arrow(file="data/subs1.ar.csv")

9.4 Wrapping up

  • Typically, the raw/uncleaned data is the critical bottleneck in terms of data volume, particularly as the selection and filtering of the overall dataset in the preparation of analytic datasets can only work properly with cleaned data.
  • Out-of-memory strategies are based on the concept of virtual memory and are key to cleaning large amounts of data locally.
  • The ff package provides a high-level R interface to an out-of-memory approach. Most functions in ff and the corresponding ffbase package come with a syntax very similar to the basic R syntax for data cleaning and manipulation.
  • The basic idea behind ff is to store the data in chunked format in an easily accessible way on the hard disk and only keep the metadata of a dataset (e.g., variable names) in an R object in RAM while working on the dataset.
  • The arrow package offers similar functionality based on a slightly different approach called lazy evaluation (only evaluate data manipulation/cleaning tasks once the data is pulled into R). Unlike ff, arrow closely follows the dplyr syntax rather than basic R syntax for data cleaning tasks.

References

Adler, Daniel, Christian Gläser, Oleg Nenadic, Jens Oehlschlägel, Martijn Schuemie, and Walter Zucchini. 2022. Ff: Memory-Efficient Storage of Large Data on Disk and Fast Access Functions. https://CRAN.R-project.org/package=ff.
de Jonge, Edwin, Jan Wijffels, and Jan van der Laan. 2023. Ffbase: Basic Statistical Functions for Package ’Ff’. https://github.com/edwindj/ffbase.
Kane, Michael J., John Emerson, and Stephen Weston. 2013. “Scalable Strategies for Computing with Massive Data.” Journal of Statistical Software 55 (14): 1–19. https://www.jstatsoft.org/article/view/v055i14.
Richardson, Neal, Ian Cook, Nic Crane, Dewey Dunnington, Romain François, Jonathan Keane, Dragoș Moldovan-Grünfeld, Jeroen Ooms, and Apache Arrow. 2022. Arrow: Integration to ’Apache’ ’Arrow’.
Walkowiak, Simkon. 2016. Big Data Analytics with r. Birmingham, UK: PACKT Publishing.

  1. Data from the same source is also used in the code examples given in Kane, Emerson, and Weston (2013).↩︎

  2. The full raw data used there can be downloaded here: http://stat-computing.org/dataexpo/2009/the-data.html.↩︎

  3. You can download the original datasets used in these examples from https://github.com/PacktPublishing/Big-Data-Analytics-with-R/tree/master/Chapter%203.↩︎