Sourcing Data from S3 with Drake

Sourcing Data from S3 with Drake

drake is a package for orchestrating R workflows. Suppose I have some data in S3 that I want to pull into R through a drake plan. In this post I’ll use the S3 object’s ETag to make drake only re-download the data if it’s changed.

This covers the scenario in which the object name in S3 stays the same. If I had, say, data being uploaded each day with an object name suffixed with the date, then I wouldn’t bother checking for any changes.

Connecting to S3

Both the aws.s3 package and the PAWS package will connect to S3 from R. I’ve used both of these packages, and there’s nothing wrong with them, but I always find myself going back to wrapping AWS CLI commands. I’m not saying this is the best way to use AWS from within R, but it works, although I haven’t tested this on anything other than Linux.

By this point I’ve run aws configure in a terminal to make sure that I can actually connect to AWS. I’ve also created an S3 bucket.

There are two ways to connect to S3 from the AWS CLI. s3 commands are more high-level than s3api commands, but I’ll need to use both here.

Uploading some data

I’ll start by uploading some CSV data to my bucket using an s3 command, so that I have something to source in my drake plan. What I really like about the s3 commands is that I don’t have to mess around with any multi-part uploads, as the AWS CLI takes care of all that complexity for me.

I’ll create a function that forms and executes the command. My command needs to be of the form aws s3 cp $SOURCE $TARGET. The $SOURCE or $TARGET variables can be either local files or objects on S3, with objects prefixed with “s3://$BUCKET”. My function will take a data frame and, using the name of that data frame, determine the path of the object on S3. A more sophisticated function would be more flexible about how I’m storing the data, but this will do for my demonstration.

Note the use of shQuote here, a base function that quotes a string to be passed to a shell.

upload_data_to_s3_bucket_as_csv <- function(data, bucket) {
  object_name <- paste0(deparse(substitute(data)), ".csv")
  temp_file <- tempfile()
  # delete this temp file afterwards, even if this function errors
  on.exit(unlink(temp_file)) 
  readr::write_csv(data, temp_file)
  quoted_file_path <- shQuote(temp_file)
  quoted_object_path <- shQuote(glue::glue("s3://{bucket}/{object_name}"))
  system(glue::glue("aws s3 cp {quoted_file_path} {quoted_object_path}"))
}

Getting object metadata

The ETag is a hash that changes when the object changes1. It’s a short string like “de3b6f4731f18de03e51a5fea8102c93”. No matter how big an object is, the ETag stays the same size, and is quick to retrieve. This means that we can check the ETag every time a drake plan is made without spending too much time, and only re-download the actual data if drake detects a change in this value.

I need to use a lower-level s3api command here. The head-object command retrieves object metadata. I convert that metadata from JSON, extract the ETag, and remove the stray quotation marks around it.

get_etag <- function(object, bucket) {
  response <- system(
    glue::glue("aws s3api head-object --bucket {bucket} --key {object}"),
    intern = TRUE
  )
  raw_etag <- jsonlite::fromJSON(response)$ETag
  gsub("\"", "", raw_etag)
}

Downloading from S3

I’ll once again use an s3 command to download data from an S3. This function is very similar to the upload function, with the source and target reversed.

download_and_parse_csv_from_s3_bucket <- function(object, bucket) {
  temp_file <- tempfile()
  # delete this temp file afterwards, even if this function errors
  on.exit(unlink(temp_file)) 
  quoted_file_path <- shQuote(temp_file)
  quoted_object_path <- shQuote(glue::glue("s3://{bucket}/{object}"))
  system(glue::glue("aws s3 cp {quoted_object_path} {quoted_file_path}"))
  readr::read_csv(temp_file)
}

Generating some random data

I’ll need some data to upload to my bucket and then retrieve. Here’s my go-to function for generating a data frame of random bits, adapated from this StackOverflow answer:

generate_random_data <- function(nrow = 1000, ncol = 10) {
  data.frame(replicate(ncol, sample(0:1, nrow, rep = TRUE)))
}

Now I’ll upload some random data to my bucket. I’ve created a bucket “ocelittle”, which is the unofficial name of ocelot kittens. This has nothing to do with AWS; I just needed a unique name for the bucket.

some_random_data <- generate_random_data()
upload_data_to_s3_bucket_as_csv(some_random_data, bucket = "ocelittle")
get_etag("some_random_data.csv", bucket = "ocelittle")
#> [1] "104b796e58ef578339253f1f04673388"

Method 1: A separate target for the ETag

There are two equally valid ways to structure the drake plan to check the ETag. They’re effectively equivalent, but there’s some slight variation in how the targets are displayed when I run drake::vis_drake_graph.

In this first method, I’ll create a separate target for the ETag so that it appears in my drake plan visualisations, as in the plot at the top of this page. Pay close attention to the conditions for each trigger:

s3_plan <- drake::drake_plan(
  etag = target(
    get_etag("some_random_data.csv", "ocelittle"),
    trigger = trigger(condition = TRUE)
  ),
  data = target(
    download_and_parse_csv_from_s3_bucket("some_random_data.csv", "ocelittle"),
    trigger = trigger(change = etag)
  )
)

The condition for the etag target is TRUE, which means that this target will always run when I make the drake plan. The data target only runs when the value of the etag target has changed. When I make this plan for the first time, both targets are executed:

drake::make(s3_plan)
#>  target etag
#>  target data
#> Parsed with column specification:
#> cols(
#>   X1 = col_double(),
#>   X2 = col_double(),
#>   X3 = col_double(),
#>   X4 = col_double(),
#>   X5 = col_double(),
#>   X6 = col_double(),
#>   X7 = col_double(),
#>   X8 = col_double(),
#>   X9 = col_double(),
#>   X10 = col_double()
#> )

When I run the plan a second time, the etag target runs, as expected. But as the object’s ETag hasn’t changed, drake doesn’t execute the data target.

drake::make(s3_plan)
#>  target etag

Now I’ll generate some new random data, and overwrite the previous CSV:

some_random_data <- generate_random_data()
upload_data_to_s3_bucket_as_csv(some_random_data, bucket = "ocelittle")
get_etag("some_random_data.csv", bucket = "ocelittle")
#> [1] "5b978808807d201824757e7b703d8910"

drake detects the change and re-downloads the data:

drake::make(s3_plan)
#>  target etag
#>  target data
#> Parsed with column specification:
#> cols(
#>   X1 = col_double(),
#>   X2 = col_double(),
#>   X3 = col_double(),
#>   X4 = col_double(),
#>   X5 = col_double(),
#>   X6 = col_double(),
#>   X7 = col_double(),
#>   X8 = col_double(),
#>   X9 = col_double(),
#>   X10 = col_double()
#> )

Method 2: Embedding the ETag in the data target

Rather than having a separate target for the etag, I can use put the get_etag function directly into the change condition for the data download target. This won’t show the ETag when I run drake::drake_vis_graph.

First, I’ll clean the drake cache:

drake::clean()

The change trigger accepts any R expression, so it accepts the get_etag function. This will run every time the plan is made.

s3_plan_2 <- drake::drake_plan(
  data = target(
    download_and_parse_csv_from_s3_bucket("some_random_data.csv", "ocelittle"),
    trigger = trigger(change = get_etag("some_random_data.csv", "ocelittle"))
  )
)
drake::make(s3_plan_2)
#>  target data
#> Parsed with column specification:
#> cols(
#>   X1 = col_double(),
#>   X2 = col_double(),
#>   X3 = col_double(),
#>   X4 = col_double(),
#>   X5 = col_double(),
#>   X6 = col_double(),
#>   X7 = col_double(),
#>   X8 = col_double(),
#>   X9 = col_double(),
#>   X10 = col_double()
#> )
drake::make(s3_plan_2)
#>  All targets are already up to date.

And now, just to check, I’ll upload some new data and make sure that drake downloads it:

some_random_data <- generate_random_data()
upload_data_to_s3_bucket_as_csv(some_random_data, bucket = "ocelittle")
get_etag("some_random_data.csv", bucket = "ocelittle")
#> [1] "afce4300c7ea473c81b5a9f0f9712af3"
drake::make(s3_plan_2)
#>  target data
#> Parsed with column specification:
#> cols(
#>   X1 = col_double(),
#>   X2 = col_double(),
#>   X3 = col_double(),
#>   X4 = col_double(),
#>   X5 = col_double(),
#>   X6 = col_double(),
#>   X7 = col_double(),
#>   X8 = col_double(),
#>   X9 = col_double(),
#>   X10 = col_double()
#> )

Once again, drake detects the change and re-downloads the data.


devtools::session_info()
#> ─ Session info ───────────────────────────────────────────────────────────────
#>  setting  value                       
#>  version  R version 4.0.0 (2020-04-24)
#>  os       Ubuntu 20.04.1 LTS          
#>  system   x86_64, linux-gnu           
#>  ui       X11                         
#>  language en_AU:en                    
#>  collate  en_AU.UTF-8                 
#>  ctype    en_AU.UTF-8                 
#>  tz       Australia/Melbourne         
#>  date     2020-08-23                  
#> 
#> ─ Packages ───────────────────────────────────────────────────────────────────
#>  package     * version    date       lib source                            
#>  assertthat    0.2.1      2019-03-21 [1] CRAN (R 4.0.0)                    
#>  backports     1.1.8      2020-06-17 [1] CRAN (R 4.0.0)                    
#>  base64url     1.4        2018-05-14 [1] CRAN (R 4.0.0)                    
#>  callr         3.4.3      2020-03-28 [1] CRAN (R 4.0.0)                    
#>  cli           2.0.2      2020-02-28 [1] CRAN (R 4.0.0)                    
#>  crayon        1.3.4      2017-09-16 [1] CRAN (R 4.0.0)                    
#>  desc          1.2.0      2018-05-01 [1] CRAN (R 4.0.0)                    
#>  devtools      2.3.0      2020-04-10 [1] CRAN (R 4.0.0)                    
#>  digest        0.6.25     2020-02-23 [1] CRAN (R 4.0.0)                    
#>  downlit       0.0.0.9000 2020-07-25 [1] Github (r-lib/downlit@ed969d0)    
#>  drake       * 7.12.4     2020-07-30 [1] Github (ropensci/drake@20cd701)   
#>  ellipsis      0.3.1      2020-05-15 [1] CRAN (R 4.0.0)                    
#>  evaluate      0.14       2019-05-28 [1] CRAN (R 4.0.0)                    
#>  fansi         0.4.1      2020-01-08 [1] CRAN (R 4.0.0)                    
#>  filelock      1.0.2      2018-10-05 [1] CRAN (R 4.0.0)                    
#>  fs            1.5.0      2020-07-31 [1] CRAN (R 4.0.0)                    
#>  glue          1.4.1      2020-05-13 [1] CRAN (R 4.0.0)                    
#>  here          0.1        2017-05-28 [1] CRAN (R 4.0.0)                    
#>  hms           0.5.3      2020-01-08 [1] CRAN (R 4.0.0)                    
#>  htmltools     0.5.0      2020-06-16 [1] CRAN (R 4.0.0)                    
#>  hugodown      0.0.0.9000 2020-08-13 [1] Github (r-lib/hugodown@2af491d)   
#>  igraph        1.2.5      2020-03-19 [1] CRAN (R 4.0.0)                    
#>  jsonlite      1.7.0      2020-06-25 [1] CRAN (R 4.0.0)                    
#>  knitr         1.29       2020-06-23 [1] CRAN (R 4.0.0)                    
#>  lattice       0.20-41    2020-04-02 [4] CRAN (R 4.0.0)                    
#>  lifecycle     0.2.0      2020-03-06 [1] CRAN (R 4.0.0)                    
#>  magrittr      1.5        2014-11-22 [1] CRAN (R 4.0.0)                    
#>  Matrix        1.2-18     2019-11-27 [4] CRAN (R 4.0.0)                    
#>  memoise       1.1.0.9000 2020-05-09 [1] Github (hadley/memoise@4aefd9f)   
#>  pillar        1.4.6      2020-07-10 [1] CRAN (R 4.0.0)                    
#>  pkgbuild      1.1.0      2020-07-13 [1] CRAN (R 4.0.0)                    
#>  pkgconfig     2.0.3      2019-09-22 [1] CRAN (R 4.0.0)                    
#>  pkgload       1.1.0      2020-05-29 [1] CRAN (R 4.0.0)                    
#>  prettyunits   1.1.1      2020-01-24 [1] CRAN (R 4.0.0)                    
#>  processx      3.4.3      2020-07-05 [1] CRAN (R 4.0.0)                    
#>  progress      1.2.2      2019-05-16 [1] CRAN (R 4.0.0)                    
#>  ps            1.3.4      2020-08-11 [1] CRAN (R 4.0.0)                    
#>  purrr         0.3.4      2020-04-17 [1] CRAN (R 4.0.0)                    
#>  R6            2.4.1      2019-11-12 [1] CRAN (R 4.0.0)                    
#>  Rcpp          1.0.5      2020-07-06 [1] CRAN (R 4.0.0)                    
#>  readr         1.3.1      2018-12-21 [1] CRAN (R 4.0.0)                    
#>  remotes       2.1.1      2020-02-15 [1] CRAN (R 4.0.0)                    
#>  reticulate    1.16       2020-05-27 [1] CRAN (R 4.0.0)                    
#>  rlang         0.4.7      2020-07-09 [1] CRAN (R 4.0.0)                    
#>  rmarkdown     2.3.3      2020-08-13 [1] Github (rstudio/rmarkdown@204aa41)
#>  rprojroot     1.3-2      2018-01-03 [1] CRAN (R 4.0.0)                    
#>  sessioninfo   1.1.1      2018-11-05 [1] CRAN (R 4.0.0)                    
#>  storr         1.2.1      2018-10-18 [1] CRAN (R 4.0.0)                    
#>  stringi       1.4.6      2020-02-17 [1] CRAN (R 4.0.0)                    
#>  stringr       1.4.0      2019-02-10 [1] CRAN (R 4.0.0)                    
#>  testthat      2.3.2      2020-03-02 [1] CRAN (R 4.0.0)                    
#>  tibble        3.0.3      2020-07-10 [1] CRAN (R 4.0.0)                    
#>  tidyselect    1.1.0      2020-05-11 [1] CRAN (R 4.0.0)                    
#>  txtq          0.2.3      2020-06-23 [1] CRAN (R 4.0.0)                    
#>  usethis       1.6.1      2020-04-29 [1] CRAN (R 4.0.0)                    
#>  vctrs         0.3.2      2020-07-15 [1] CRAN (R 4.0.0)                    
#>  withr         2.2.0      2020-04-20 [1] CRAN (R 4.0.0)                    
#>  xfun          0.16       2020-07-24 [1] CRAN (R 4.0.0)                    
#>  yaml          2.2.1      2020-02-01 [1] CRAN (R 4.0.0)                    
#> 
#> [1] /home/mdneuzerling/R/x86_64-pc-linux-gnu-library/4.0
#> [2] /usr/local/lib/R/site-library
#> [3] /usr/lib/R/site-library
#> [4] /usr/lib/R/library

  1. The ETag may or may not be an MD5 hash of the obejct data. ↩︎