Chapter 9 Big Data Cleaning and Transformation
Preceding the filtering/selection/aggregation of raw data, data cleaning and transformation typically have to be run on large volumes of raw data before the observations and variables of interest can be further analyzed. Typical data cleaning tasks involve:
- Normalization/standardization (across entities, categories, observation periods).
- Coding of additional variables (indicators, strings to categorical, etc.).
- Removing/adding covariates.
- Merging/joining datasets.
- Properly defining data types for each variable.
All of these steps are very common tasks when working with data for analytics purposes, independent of the size of the dataset. However, as most of the techniques and software developed for such tasks is meant to process data in memory, performing these tasks on large datasets can be challenging. Data cleaning workflows you are perfectly familiar with might slow down substantially or crash due to a lack of memory (RAM), particularly if the data preparation step involves merging/joining two datasets. Other potential bottlenecks are the parsing of large files (CPU) or intensive reading from and writing to the hard disk (Mass storage).
In practice, the most critical bottleneck of common data preparation tasks is often a lack of RAM. In the following, we thus explore two strategies that broadly build on the idea of virtual memory (using parts of the hard disk as RAM) and/or lazy evaluation (only loading/processing the part of a dataset really required).
9.1 Out-of-memory strategies and lazy evaluation: Practical basics
Virtual memory is in simple words an approach to combining the RAM and mass storage components in order to cope with a lack of RAM. Modern operating systems come with a virtual memory manager that automatically handles the swapping between RAM and the hard-disk, when running processes that use up too much RAM. However, a virtual memory manager is not specifically developed to perform this task in the context of data analysis. Several strategies have thus been developed to build on the basic idea of virtual memory in the context of data analysis tasks.
Chunked data files on disk: The data analytics software ‘partitions’ the dataset, and maps and stores the chunks of raw data on disk. What is actually ‘read’ into RAM when importing the data file with this approach is the mapping to the partitions of the actual dataset (the data structure) and some metadata describing the dataset. In R, this approach is implemented in the
ff
package (Adler et al. 2022) and several packages building onff
. In this approach, the usage of disk space and the linking between RAM and files on disk is very explicit (and clearly visible to the user).Memory mapped files and shared memory: The data analytics software uses segments of virtual memory for the dataset and allows different programs/processes to access it in the same memory segment. Thus, virtual memory is explicitly allocated for one or several specific data analytics tasks. In R, this approach is notably implemented in the
bigmemory
package (Kane, Emerson, and Weston 2013) and several packages building onbigmemory
.
A conceptually related but differently focused approach is the lazy evaluation implemented in Apache Arrow and the corresponding arrow
package (Richardson et al. 2022). While Apache Arrow is basically a platform for in-memory columnar data, it is optimized for processing large amounts of data and working with datasets that actually do not fit into memory. The way this is done is that instructions on what to do with a dataset are not evaluated step-by-step on the spot but all together at the point of actually loading the data into R. That is, we can connect to a dataset via arrow
, see its variables, etc., give instructions of which observations to filter out and which columns to select, all before we read the dataset into RAM. In comparison to the strategies outlined above, this approach is usually much faster but might still lead to a situation with a lack of memory.
In the following subsections we briefly look at how to set up an R session for data preparation purposes with any of these approaches (ff
, bigmemory
, arrow
) and look at some of the conceptual basics behind the approaches.
9.1.1 Chunking data with the ff
package
We first install and load the ff
and ffbase
(de Jonge, Wijffels, and van der Laan 2023) packages, as well as the pryr
package. We use the familiar flights.csv
dataset54 For the sake of the example, we only use a fraction of the original dataset.55 On disk, the dataset is about 30MB:
::file_size("data/flights.csv") fs
## 29.5M
However, loading the entire dataset of several GBs would work just fine, using the ff
-approach.
When importing data via the ff
package, we first have to set up a directory where ff
can store the partitioned dataset (recall that this is explicitly/visibly done on disk). We call this new directory ff_files
.
# SET UP --------------
# install.packages(c("ff", "ffbase"))
# you might have to install the ffbase package directly from GitHub:
# devtools::install_github("edwindj/ffbase", subdir="pkg")
# load packages
library(ff)
library(ffbase)
library(data.table) # for comparison
# create directory for ff chunks, and assign directory to ff
system("mkdir ff_files")
options(fftempdir = "ff_files")
Now we can read in the data with read.table.ffdf
. In order to better understand the underlying concept, we also import the data into a common data.table
object via fread()
and then look at the size of the objects resulting from the two ‘import’ approaches in the R environment with object.size()
.
# usual in-memory csv import
<- fread("data/flights.csv")
flights_dt
# out-of-memory approach
<-
flights read.table.ffdf(file="data/flights.csv",
sep=",",
VERBOSE=TRUE,
header=TRUE,
next.rows=100000,
colClasses=NA)
## read.table.ffdf 1..100000 (100000) csv-read=0.609sec ffdf-write=0.065sec
## read.table.ffdf 100001..200000 (100000) csv-read=0.479sec ffdf-write=0.044sec
## read.table.ffdf 200001..300000 (100000) csv-read=0.446sec ffdf-write=0.046sec
## read.table.ffdf 300001..336776 (36776) csv-read=0.184sec ffdf-write=0.04sec
## csv-read=1.718sec ffdf-write=0.195sec TOTAL=1.913sec
# compare object sizes
object.size(flights) # out-of-memory approach
## 949976 bytes
object.size(flights_dt) # common data.table
## 32569024 bytes
Note that there are two substantial differences to what we have previously seen when using fread()
. It takes much longer to import a CSV into the ff_files structure. However, the RAM allocated to it is much smaller. This is exactly what we would expect, keeping in mind what read.table.ffdf()
does in comparison to what fread()
does. Now we can actually have a look at the data chunks created by ff
.
# show the files in the directory keeping the chunks
head(list.files("ff_files"))
## [1] "ffdf42b781703dcfe.ff" "ffdf42b781d18cdf9.ff"
## [3] "ffdf42b781d5105fb.ff" "ffdf42b781d6ccc74.ff"
## [5] "ffdf42b782494a6a1.ff" "ffdf42b782df9670d.ff"
9.1.2 Memory mapping with bigmemory
The bigmemory
package handles data in matrices and therefore only accepts data values of identical data type. Before importing data via the bigmemory
package, we thus have to ensure that all variables in the raw data can be imported in a common type.
# SET UP ----------------
# load packages
library(bigmemory)
library(biganalytics)
# import the data
<- read.big.matrix("data/flights.csv",
flights type="integer",
header=TRUE,
backingfile="flights.bin",
descriptorfile="flights.desc")
Note that, similar to the ff
example, read.big.matrix()
creates a local file flights.bin
on disk that is linked to the flights
object in RAM. From looking at the imported file, we see that various variable values have been discarded. This is because we have forced all variables to be of type "integer"
when importing the dataset.
object.size(flights)
## 696 bytes
str(flights)
## Formal class 'big.matrix' [package "bigmemory"] with 1
slot
## ..@ address:<externalptr>
Again, the object representing the dataset in R does not contain the actual data (it does not even take up a KB of memory).
9.1.3 Connecting to Apache Arrow
# SET UP ----------------
# load packages
library(arrow)
# import the data
<- read_csv_arrow("data/flights.csv",
flights as_data_frame = FALSE)
Note the as_data_frame=FALSE
in the function call. This instructs Arrow to establish a connection to the file and read some of the data (to understand what is in the file), but not actually import the whole CSV.
summary(flights)
## Length Class Mode
## year 336776 ChunkedArray environment
## month 336776 ChunkedArray environment
## day 336776 ChunkedArray environment
## dep_time 336776 ChunkedArray environment
## sched_dep_time 336776 ChunkedArray environment
## dep_delay 336776 ChunkedArray environment
## arr_time 336776 ChunkedArray environment
## sched_arr_time 336776 ChunkedArray environment
## arr_delay 336776 ChunkedArray environment
## carrier 336776 ChunkedArray environment
## flight 336776 ChunkedArray environment
## tailnum 336776 ChunkedArray environment
## origin 336776 ChunkedArray environment
## dest 336776 ChunkedArray environment
## air_time 336776 ChunkedArray environment
## distance 336776 ChunkedArray environment
## hour 336776 ChunkedArray environment
## minute 336776 ChunkedArray environment
## time_hour 336776 ChunkedArray environment
object.size(flights)
## 488 bytes
Again, we notice that the flights
object is much smaller than the actual dataset on disk.
9.2 Big Data preparation tutorial with ff
9.2.1 Set up
The following code and data examples build on Walkowiak (2016), Chapter 3.56 The set up for our analysis script involves the loading of the ff
and ffbase
packages, the initialization of fixed variables to hold the paths to the datasets, and the creation and assignment of a new local directory ff_files
in which the binary flat file-partitioned chunks of the original datasets will be stored.
## SET UP ------------------------
# create and set directory for ff files
system("mkdir ff_files")
options(fftempdir = "ff_files")
# load packages
library(ff)
library(ffbase)
library(pryr)
# fix vars
<- "data/flights_sep_oct15.txt"
FLIGHTS_DATA <- "data/airline_id.csv" AIRLINES_DATA
9.2.2 Data import
In a first step we read (or ‘upload’) the data into R. This step involves the creation of the binary chunked files as well as the mapping of these files and the metadata. In comparison to the traditional read.csv
approach, you will notice two things. On the one hand the data import takes longer; on the other hand it uses up much less RAM than with read.csv
.
# DATA IMPORT ------------------
# check memory used
mem_used()
## 1.79 GB
# 1. Upload flights_sep_oct15.txt and airline_id.csv files from flat files.
system.time(flights.ff <- read.table.ffdf(file=FLIGHTS_DATA,
sep=",",
VERBOSE=TRUE,
header=TRUE,
next.rows=100000,
colClasses=NA))
## read.table.ffdf 1..100000 (100000) csv-read=0.564sec ffdf-write=0.095sec
## read.table.ffdf 100001..200000 (100000) csv-read=0.603sec ffdf-write=0.072sec
## read.table.ffdf 200001..300000 (100000) csv-read=0.611sec ffdf-write=0.068sec
## read.table.ffdf 300001..400000 (100000) csv-read=0.625sec ffdf-write=0.08sec
## read.table.ffdf 400001..500000 (100000) csv-read=0.626sec ffdf-write=0.072sec
## read.table.ffdf 500001..600000 (100000) csv-read=0.681sec ffdf-write=0.075sec
## read.table.ffdf 600001..700000 (100000) csv-read=0.638sec ffdf-write=0.069sec
## read.table.ffdf 700001..800000 (100000) csv-read=0.6sec ffdf-write=0.081sec
## read.table.ffdf 800001..900000 (100000) csv-read=0.612sec ffdf-write=0.075sec
## read.table.ffdf 900001..951111 (51111) csv-read=0.329sec ffdf-write=0.047sec
## csv-read=5.889sec ffdf-write=0.734sec TOTAL=6.623sec
## user system elapsed
## 5.659 0.750 6.626
system.time(airlines.ff <- read.csv.ffdf(file= AIRLINES_DATA,
VERBOSE=TRUE,
header=TRUE,
next.rows=100000,
colClasses=NA))
## read.table.ffdf 1..1607 (1607) csv-read=0.005sec ffdf-write=0.004sec
## csv-read=0.005sec ffdf-write=0.004sec TOTAL=0.009sec
## user system elapsed
## 0.009 0.001 0.010
# check memory used
mem_used()
## 1.79 GB
Comparison with read.table
# Using read.table()
system.time(flights.table <- read.table(FLIGHTS_DATA,
sep=",",
header=TRUE))
## user system elapsed
## 5.164 0.684 5.976
system.time(airlines.table <- read.csv(AIRLINES_DATA,
header = TRUE))
## user system elapsed
## 0.002 0.000 0.003
# check the memory used
mem_used()
## 1.93 GB
9.2.3 Inspect imported files
A particularly useful aspect of working with the ff
package and the packages building on it is that many of the simple R functions that work on normal data.frames in RAM also work on ff_files files. Hence, without actually having loaded the entire raw data of a large dataset into RAM, we can quickly get an overview of the key characteristics, such as the number of observations and the number of variables.
# 2. Inspect the ff_files objects.
## For flights.ff object:
class(flights.ff)
## [1] "ffdf"
dim(flights.ff)
## [1] 951111 28
## For airlines.ff object:
class(airlines.ff)
## [1] "ffdf"
dim(airlines.ff)
## [1] 1607 2
9.2.4 Data cleaning and transformation
After inspecting the data, we go through several steps of cleaning and transformation, with the goal of then merging the two datasets. That is, we want to create a new dataset that contains detailed flight information but with additional information on the carriers/airlines. First, we want to rename some of the variables.
# step 1:
# Rename "Code" variable from airlines.ff
# to "AIRLINE_ID" and "Description" into "AIRLINE_NM".
names(airlines.ff) <- c("AIRLINE_ID", "AIRLINE_NM")
names(airlines.ff)
## [1] "AIRLINE_ID" "AIRLINE_NM"
str(airlines.ff[1:20,])
## 'data.frame': 20 obs. of 2 variables:
## $ AIRLINE_ID: int 19031 19032 19033 19034 19035 19036
19037 19038 19039 19040 ...
## $ AIRLINE_NM: Factor w/ 1607 levels "40-Mile Air:
Q5",..: 945 1025 503 721 64 725 1194 99 1395 276 ...
Now we can join the two datasets via the unique airline identifier "AIRLINE_ID"
. Note that these kinds of operations would usually take up substantially more RAM on the spot, if both original datasets were also fully loaded into RAM. As illustrated by the mem_change()
function, this is not the case here. All that is needed is a small chunk of RAM to keep the metadata and mapping-information of the new ff_files
object; all the actual data is cached on the hard disk.
# merge of ff_files objects
mem_change(flights.data.ff <- merge.ffdf(flights.ff,
airlines.ff,by="AIRLINE_ID"))
## 774 kB
#The new object is only 551.2 KB in size
class(flights.data.ff)
## [1] "ffdf"
dim(flights.data.ff)
## [1] 951111 29
names(flights.data.ff)
## [1] "YEAR" "MONTH"
## [3] "DAY_OF_MONTH" "DAY_OF_WEEK"
## [5] "FL_DATE" "UNIQUE_CARRIER"
## [7] "AIRLINE_ID" "TAIL_NUM"
## [9] "FL_NUM" "ORIGIN_AIRPORT_ID"
## [11] "ORIGIN" "ORIGIN_CITY_NAME"
## [13] "ORIGIN_STATE_NM" "ORIGIN_WAC"
## [15] "DEST_AIRPORT_ID" "DEST"
## [17] "DEST_CITY_NAME" "DEST_STATE_NM"
## [19] "DEST_WAC" "DEP_TIME"
## [21] "DEP_DELAY" "ARR_TIME"
## [23] "ARR_DELAY" "CANCELLED"
## [25] "CANCELLATION_CODE" "DIVERTED"
## [27] "AIR_TIME" "DISTANCE"
## [29] "AIRLINE_NM"
9.2.5 Inspect difference in in-memory operation
In comparison to the ff
-approach, performing the merge in memory needs more resources:
##For flights.table:
names(airlines.table) <- c("AIRLINE_ID", "AIRLINE_NM")
names(airlines.table)
## [1] "AIRLINE_ID" "AIRLINE_NM"
str(airlines.table[1:20,])
## 'data.frame': 20 obs. of 2 variables:
## $ AIRLINE_ID: int 19031 19032 19033 19034 19035 19036
19037 19038 19039 19040 ...
## $ AIRLINE_NM: chr "Mackey International Inc.: MAC" "Munz
Northern Airlines Inc.: XY" "Cochise Airlines Inc.: COC"
"Golden Gate Airlines Inc.: GSA" ...
# check memory usage of merge in RAM
mem_change(flights.data.table <- merge(flights.table,
airlines.table,by="AIRLINE_ID"))
## 161 MB
#The new object is already 105.7 MB in size
#A rapid spike in RAM use when processing
9.2.6 Subsetting
Now, we want to filter out some observations as well as select only specific variables for a subset of the overall dataset.
mem_used()
## 2.09 GB
# Subset the ff_files object flights.data.ff:
<-
subs1.ff subset.ffdf(flights.data.ff,
== 1,
CANCELLED select = c(FL_DATE,
AIRLINE_ID,
ORIGIN_CITY_NAME,
ORIGIN_STATE_NM,
DEST_CITY_NAME,
DEST_STATE_NM,
CANCELLATION_CODE))
dim(subs1.ff)
## [1] 4529 7
mem_used()
## 2.09 GB
9.2.7 Save/load/export ff
files
In order to better organize and easily reload the newly created ff_files
files, we can explicitly save them to disk.
# Save a newly created ff_files object to a data file:
# (7 files (one for each column) created in the ffdb directory)
save.ffdf(subs1.ff, overwrite = TRUE)
If we want to reload a previously saved ff_files
object, we do not have to go through the chunking of the raw data file again but can very quickly load the data mapping and metadata into RAM in order to further work with the data (stored on disk).
# Loading previously saved ff_files files:
rm(subs1.ff)
#gc()
load.ffdf("ffdb")
# check the class and structure of the loaded data
class(subs1.ff)
## [1] "ffdf"
dim(subs1.ff)
## [1] 4529 7
dimnames(subs1.ff)
## [[1]]
## NULL
##
## [[2]]
## [1] "FL_DATE" "AIRLINE_ID"
## [3] "ORIGIN_CITY_NAME" "ORIGIN_STATE_NM"
## [5] "DEST_CITY_NAME" "DEST_STATE_NM"
## [7] "CANCELLATION_CODE"
If we want to store an ff_files
dataset in a format more accessible for other users (such as CSV), we can do so as follows. This last step is also quite common in practice. The initial raw dataset is very large; thus we perform all the theoretically very memory-intensive tasks of preparing the analytic dataset via ff
and then store the (often much smaller) analytic dataset in a more accessible CSV file in order to later read it into RAM and run more computationally intensive analyses directly in RAM.
# Export subs1.ff into CSV and TXT files:
write.csv.ffdf(subs1.ff, "subset1.csv")
9.3 Big Data preparation tutorial with arrow
We begin by initializing our R session as in the short arrow
introduction above.
# SET UP ----------------
# load packages
library(arrow)
library(dplyr)
library(pryr) # for profiling
# fix vars
<- "data/flights_sep_oct15.txt"
FLIGHTS_DATA <- "data/airline_id.csv"
AIRLINES_DATA
# import the data
<- read_csv_arrow(FLIGHTS_DATA,
flights as_data_frame = FALSE)
<- read_csv_arrow(AIRLINES_DATA,
airlines as_data_frame = FALSE)
Note how the data from the CSV files is not actually read into RAM yet. The created objects flights
and airlines
are not data frames (yet) and occupy hardly any RAM.
class(flights)
## [1] "Table" "ArrowTabular" "ArrowObject"
## [4] "R6"
class(airlines)
## [1] "Table" "ArrowTabular" "ArrowObject"
## [4] "R6"
object_size(flights)
## 283.62 kB
object_size(airlines)
## 283.62 kB
In analogy to the ff
tutorial above, we go through the same data preparation steps. First, we rename the variables in airlines
to ensure that the variable names are consistent with the flights
data frame.
# step 1:
# Rename "Code" variable from airlines.ff to "AIRLINE_ID"
# and "Description" into "AIRLINE_NM".
names(airlines) <- c("AIRLINE_ID", "AIRLINE_NM")
names(airlines)
## [1] "AIRLINE_ID" "AIRLINE_NM"
In a second step, the two data frames are merged/joined. The arrow
package follows dplyr
-syntax regarding data preparation tasks. That is, we can directly build on functions like
# merge the two datasets via Arrow
<- inner_join(airlines, flights, by="AIRLINE_ID")
flights.data.ar object_size(flights.data.ar)
## 647.74 kB
In a last step, we filter the resulting dataset for cancelled flights and select only some of the available variables.
Now, we want to filter out some observations as well as select only specific variables for a subset of the overall dataset. As Arrow works with the dplyr
back-end, we can directly use the typical dplyr
-syntax to combine selection of columns and filtering of rows.
# Subset the ff_files object flights.data.ff:
<-
subs1.ar %>%
flights.data.ar filter(CANCELLED == 1) %>%
select(FL_DATE,
AIRLINE_ID,
ORIGIN_CITY_NAME,
ORIGIN_STATE_NM,
DEST_CITY_NAME,
DEST_STATE_NM,
CANCELLATION_CODE)
object_size(subs1.ar)
## 591.21 kB
Again, this operation hardly affected RAM usage by R. Note, though, that in contrast to the ff
-approach, Arrow has actually not yet created the new subset sub1.ar
. In fact, it has not even really imported the data or merged the two datasets. This is the effect of the lazy evaluation approach implemented in arrow
. To further process the data in sub1.ar
with other functions (outside of arrow
), we need to actually trigger the evaluation of all the data preparation steps we have just instructed R to do. This is done via collect()
.
mem_change(subs1.ar.df <- collect(subs1.ar))
## 2.47 MB
class(subs1.ar.df)
## [1] "tbl_df" "tbl" "data.frame"
object_size(subs1.ar.df)
## 57.15 kB
Note how in this tutorial, the final subset is substantially smaller than the initial two datasets. Hence, in this case it is fine to actually load this into RAM as a data frame. However, this is not a necessary part of the workflow. Instead of calling collect()
, you can then trigger the computation of all the data preparation steps via compute()
and, for example, store the resulting arrow
table to a CSV file.
%>%
subs1.ar compute() %>%
write_csv_arrow(file="data/subs1.ar.csv")
9.4 Wrapping up
- Typically, the raw/uncleaned data is the critical bottleneck in terms of data volume, particularly as the selection and filtering of the overall dataset in the preparation of analytic datasets can only work properly with cleaned data.
- Out-of-memory strategies are based on the concept of virtual memory and are key to cleaning large amounts of data locally.
- The
ff
package provides a high-level R interface to an out-of-memory approach. Most functions inff
and the correspondingffbase
package come with a syntax very similar to the basic R syntax for data cleaning and manipulation. - The basic idea behind
ff
is to store the data in chunked format in an easily accessible way on the hard disk and only keep the metadata of a dataset (e.g., variable names) in an R object in RAM while working on the dataset. - The
arrow
package offers similar functionality based on a slightly different approach called lazy evaluation (only evaluate data manipulation/cleaning tasks once the data is pulled into R). Unlikeff
,arrow
closely follows thedplyr
syntax rather than basic R syntax for data cleaning tasks.
References
Data from the same source is also used in the code examples given in Kane, Emerson, and Weston (2013).↩︎
The full raw data used there can be downloaded here: http://stat-computing.org/dataexpo/2009/the-data.html.↩︎
You can download the original datasets used in these examples from https://github.com/PacktPublishing/Big-Data-Analytics-with-R/tree/master/Chapter%203.↩︎