Troubleshooting R mapper script on Amazon Elastic MapReduce - Results not as expected

I am trying to use Amazon Elastic Map Reduce to run a series of simulations of several million cases. This is an Rscript streaming job with no reducer. I am using the Identity Reducer in my EMR call --reducer org.apache.hadoop.mapred.lib.IdentityReducer.

The script file works fine when tested and run locally from the command line on a Linux box when passing one line of string manually echo "1,2443,2442,1,5" | ./mapper.R and I get the one line of results that I am expecting. However, when I tested my simulation using about 10,000 cases (lines) from the input file on EMR, I only got output for a dozen lines or so out of 10k input lines. I've tried several times and I cannot figure out why. The Hadoop job runs fine without any errors. It seems like input lines are being skipped, or perhaps something is happening with the Identity reducer. The results are correct for the cases where there is output.

My input file is a csv with the following data format, a series of five integers separated by commas:

1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...

Here is my R script for mapper.R

#! /usr/bin/env Rscript

# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
    list <- lapply(casename, function(x) {
        read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
                 header = TRUE,
        stringsAsFactors = FALSE)})
    return(data.frame(list))
}

con <- file("stdin")            
line <- readLines(con, n = 1, warn = FALSE) 
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases)                      # for indexing

## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")

## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
                     stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))

## Calculations that use df d and produce a string called 'output' 
## in the form of "id: value1 value2 value3 ..." to be used at a 
## later time for agregation.

cat(output, "\n")
close(con)

The (generalized) EMR call for this simulation is:

ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2

If anyone has any insights as to why I might be experiencing these issues, I am open to suggestions, as well as any changes/optimization to the R script.

My other option is to turn the script into a function and run a parallelized apply using R multicore packages, but I haven't tried it yet. I'd like to get this working on EMR. I used JD Long's and Pete Skomoroch's R/EMR examples as a basis for creating the script.

5
задан wahalulu 14 December 2010 в 20:17
поделиться