Need training for Insurance Analytics? Browse courses developed by industry thought leaders and Experfy in Harvard Innovation Lab.
Allstate Corporation, the second largest insurance company in United States, founded in 1931, recently launched a Machine Learning recruitment challenge in partnership with Kaggle. Allstate’s objective was to predict the cost, and hence the severity, of claims.
The competition organizers provide the competitors with more than 300,000 examples with masked and anonymous data consisting of more than 100 categorical and numerical attributes, thus being compliant with confidentiality constraints.
The Spark/Scala script explained in this post obtains the training and test input datasets from local or Amazon’s AWS S3 environment and trains a Random Forest model over it. The objective is to demonstrate the use of Spark 2.0 Machine Learning pipelines with Scala language, AWS S3 integration and some general good practices for building Machine Learning models. In order to keep this main objective, more sophisticated techniques (such as a thorough exploratory data analysis and feature engineering) are intentionally omitted.
Why Spark?
Since almost all personal computers nowadays have many Gigabytes of RAM (and it is in an accelerated growing) and powerful CPUs and GPUs, many real-world machine learning problems can be solved with a single computer and frameworks such as ScikitLearn, with no need of a distributed system. Sometimes, though, data grows and keeps growing. Who hasn’t heard the term “Big Data”? When big data is involved, a non-distributed solution may solve the problem for a short time, but afterwards such solution needs to be reviewed and may reqire a significantly different approach. This is where Spark comes in.
Spark started as a research project at UC Berkeley in the AMPLab, a research group that focuses on big data analytics. Since then, it became an Apache project and has delivered many new releases, reaching a consistent maturity with a wide range of functionalities. Most of all, Spark can perform data processing over some Gigabytes or hundreds of Petabytes with basically the same programming code, only requiring a proper cluster of machines in the background (check this link). In some very specific cases the developer may need to tune the process by changing granularity of data distribution and other related aspects, but in general there are plenty of providers that automate all this cluster configuration for the developer. For instance, the script describe here used AWS Elastic MapReduce (EMR), which plays exactly this role.
Why Scala?
Scala is a beautiful and very well-devised programming language, with a strong scientific background from professor Martin Odersky’s research team at Ecole Polytechnique Fédérale de Lausanne.
In more technical terms, Scala was created with a strong functional paradigm, but also fully compatible with the imperative object-oriented paradigm from JVM platform, taking advantage of all JVM’s decades of evolution and maturity. In summary, everything one does in Java can be done in Scala and much more with a much shorter and cleaner code.
It isn’t a surprise that Spark is built precisely over Scala, although it also provides programming interfaces for Python, R and, naturally, Java.
The challenge
Allstate and Kaggle challenged competitors to build a Machine Learning solution that predicts cost/severity of insurance claims. As in any learning process, the effectiveness of the solution depends on the availability of credible data, which in this case was data from 188,318 insurance claims with a set of attributes and the expected outcome, the cost. This represented the training dataset. An additional dataset with 125,546 observations, with the same attributes but the outcome represented the test dataset, using which the trained model was to be run in order to produce the solution to the given problem.
Both datasets contain 116 categorical columns, with values as “A”, “B”, “C”, with no explicit meaning, and 14 numerical columns, with values in a range from 0.0 to 1.0 (normalized). It’s clearly a masked data-set, which ensures full confidentiality and still allows any Machine Learning algorithm to learn from it. In many real-world projects, particularly those that hire data scientists around the globe, often to work remotely, the approach is exactly the same of providing the consultant with an anonymous and masked data, whereas in other projects the data scientist may be required to obtain the data, clean it, organize it, normalize it and so on.
The following section describes in detail a solution for this competition implemented with Apache Spark 2.0 and Scala.
The solution
Although not so labored in terms of Machine Learning techniques, the script that follows provides many important learnings for building ML applications with Apache Spark 2.0, Scala, SBT and finally running it. Some learnings are detailed as follows:
- A sophisticated command line interface is provided by scopt, through which the runtime can be configured with specific named parameters. It is detailed in the section Running the Script Locally. You must add this to your
build.sbt
file:
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"
- And your script code will include something like this:
val parser = new OptionParser[Params]("AllstateClaimsSeverityRandomForestRegressor") {
head("AllstateClaimsSeverityRandomForestRegressor", "1.0")
opt[String]("s3AccessKey").required().action((x, c) =>
c.copy(s3AccessKey = x)).text("The access key is for S3")
opt[String]("s3SecretKey").required().action((x, c) =>
c.copy(s3SecretKey = x)).text("The secret key is for S3")
...
parser.parse(args, Params()) match {
case Some(params) =>
process(params)
case None =>
throw new IllegalArgumentException("One or more parameters are invalid or missing")
}
- In order for SBT to package a jar file containing this and other third-part libraries, you need to use the command
sbt assembly
instead ofsbt package
. For such, it is needed to use sbt-assembly and configure your project accordingly by creating a fileproject/assembly.sbt
with the following content:
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
- The method
process
is called with a case class instance which encapsulates the parameters provided at the command line.
case class Params(s3AccessKey: String = "", s3SecretKey: String = "",
trainInput: String = "", testInput: String = "",
outputFile: String = "",
algoNumTrees: Seq[Int] = Seq(3),
algoMaxDepth: Seq[Int] = Seq(4),
algoMaxBins: Seq[Int] = Seq(32),
numFolds: Int = 10,
trainSample: Double = 1.0,
testSample: Double = 1.0)
def process(params: Params) {
...
- SparkSession.builder is used for building a Spark session. It was introduced in Spark 2.0 and is recommended to be used in place of the old SparkConf and SparkContext. This link provides a good description of this new strategy and the equivalence with the old one.
val sparkSession = SparkSession.builder.
appName("AllstateClaimsSeverityRandomForestRegressor")
.getOrCreate()
- The access to S3 is configured with s3a support, which compared to the predecessor s3n improves the support to large files (no more 5GB limit) and provides higher performance. For more information on this, check this, this and this links.
sparkSession.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkSession.conf.set("spark.hadoop.fs.s3a.access.key", params.s3AccessKey)
sparkSession.conf.set("spark.hadoop.fs.s3a.secret.key", params.s3SecretKey)
- Besides using the new sparkSession.read.csv method, the reading process also includes important settings: It is set to read the header of the CSV file, which is directly applied to the columns’ names of the dataframe created; and inferSchema property is set to true. Without the inferSchema configuration, the float values would be considered as strings which would later cause the VectorAssembler to raise an ugly error:
java.lang.IllegalArgumentException: Data type StringType is not supported
. Finally, both raw dataframes are cached since they are again used later in the code for fitting the StringIndexer transformations and it wouldn’t be good to read the CSV files from the filesystem or S3 once again.
val trainInput = sparkSession.read
.option("header", "true")
.option("inferSchema", "true")
.csv(params.trainInput)
.cache
val testInput = sparkSession.read
.option("header", "true")
.option("inferSchema", "true")
.csv(params.testInput)
.cache
- The column “loss” is renamed to “label”. For some reason, even after using the setLabelCol on the regression model, it still looks for a column called “label”, raising an ugly error:
org.apache.spark.sql.AnalysisException: cannot resolve 'label' given input
columns
. It may be hardcoded somewhere in Spark’s source code. - The content of train.csv is split into training and validation data, 70% and 30%, respectively. The content of “test.csv” is reserved for building the final CSV file for submission on Kaggle. Both original dataframes are sampled according to command line parameters, which is particularly useful for running fast executions in your local machine.
val data = trainInput.withColumnRenamed("loss", "label")
.sample(false, params.trainSample)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, validationData) = (splits(0), splits(1))
trainingData.cache
validationData.cache
val testData = testInput.sample(false, params.testSample).cache
- By using a custom function isCateg the column names are filtered and a StringIndexer is created for each categorical column, aimed at creating a new numerical column according to the custom function categNewCol. Note: It is a weak feature engineering, since it is wrong for a learning model to assume that the categories have an order among them (one is greater or less than the other). Whenever categories are confirmed to be unordered, it is better to use some other technique such as OneHotEncoder, which yields a different new column for each category holding a boolean (0/1) value.
def isCateg(c: String): Boolean = c.startsWith("cat")
def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c
val stringIndexerStages = trainingData.columns.filter(isCateg)
.map(c => new StringIndexer()
.setInputCol(c)
.setOutputCol(categNewCol(c))
.fit(trainInput.select(c).union(testInput.select(c))))
- There are some very important aspects to be considered when building a feature transformation such as StringIndexer or OneHotEncoder. Such transformations need to be fitted before being included in the pipeline and the fit process needs to be done over a dataset that contains all possible categories. For instance, if you fit a StringIndexer over the training dataset and afterwards, when the pipeline is used to predict an outcome over another dataset (validation, test, etc.), it faces some unseen category, then it will fail and raise the error:
org.apache.spark.SparkException: Failed to execute user defined
function($anonfun$4: (string) => double) ... Caused by:
org.apache.spark.SparkException: Unseen label: XYZ ... at
org.apache.spark.ml.feature.StringIndexerModel
. This is the reason why this script fits the StringIndexer transformations over a union of original data fromtrain.csv
andtest.csv
, bypassing the sampling and split parts. - After the sequence of StringIndexer transformations, the next transformation in the pipeline is the VectorAssembler, which groups a set of columns into a new “features” column to be considered by the regression model. The filter for only feature columns is performed with the custom function onlyFeatureCols. Additionally, the custom function removeTooManyCategs is used to filter out some few columns which contain a number of distinct categories much higher than the supported by the default parameter maxBins (for RandomForest). In a seriously competitive scenario, it would be better to perform some exploratory analysis to understand these features, their impact on the outcome variable and which feature engineering techniques could be applied.
def removeTooManyCategs(c: String): Boolean = !(c matches "cat(109$|110$|112$|113$|116$)")
def onlyFeatureCols(c: String): Boolean = !(c matches "id|label")
val featureCols = trainingData.columns
.filter(removeTooManyCategs)
.filter(onlyFeatureCols)
.map(categNewCol)
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
- The very last stage in the pipeline is the regression model, which in this script is a RandomForestRegressor.
val algo = new RandomForestRegressor().setFeaturesCol("features").setLabelCol("label")
val pipeline = new Pipeline().setStages((stringIndexerStages :+ assembler) :+ algo)
- It is interesting to run the pipeline a set of times with different hyperparameters for the transformations and the learning algorithm in order to find the combination that best fits the data (see Hyperparameter optimization). It is also important to evaluate each combination against a separated slice of the data (see K-fold Cross Validation). For accomplishing such objectives, a CrossValidator is used in conjunction with a ParamGridBuilder (more documentation on (this link)[http://spark.apache.org/docs/latest/ml-tuning.html]) queueing executions with distinct combinations of hyperparameters according to which was parametrized in the command line.
val paramGrid = new ParamGridBuilder()
.addGrid(algo.numTrees, params.algoNumTrees)
.addGrid(algo.maxDepth, params.algoMaxDepth)
.addGrid(algo.maxBins, params.algoMaxBins)
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(params.numFolds)
val cvModel = cv.fit(trainingData)
- Note: As observed by this post the Random Forest model is much faster than GBT on Spark. I experienced an execution about 20 times slower with GBT compared to Random Forest with equivalent hyperparameters.
- With an instance of CrossValidatorModel already trained, it is time for evaluating the model over the whole training and the validation datasets. From the result of predictions it is possible to easily obtain evaluation metrics with RegressionMetrics. Additionally, the instance of the best model can be obtained, providing thus access to some other interesting attributes, such as featureImportances.
val trainPredictionsAndLabels = cvModel.transform(trainingData).select("label", "prediction")
.map { case Row(label: Double, prediction: Double) => (label, prediction) }.rdd
val validPredictionsAndLabels = cvModel.transform(validationData).select("label", "prediction")
.map { case Row(label: Double, prediction: Double) => (label, prediction) }.rdd
val trainRegressionMetrics = new RegressionMetrics(trainPredictionsAndLabels)
val validRegressionMetrics = new RegressionMetrics(validPredictionsAndLabels)
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val featureImportances = bestModel.stages.last.asInstanceOf[RandomForestRegressionModel].featureImportances.toArray
- Finally, the model can be used to predict the answer for the test dataset and save a csv file ready to be submitted on Kaggle! Again, Spark 2.0 simplifies the process. The function
coalesce
gathers all partitions into 1 only, thus saving a single output file (not many).
cvModel.transform(testData)
.select("id", "prediction")
.withColumnRenamed("prediction", "loss")
.coalesce(1)
.write.format("csv")
.option("header", "true")
.save(params.outputFile)
Running the script locally
Assuming you have your local environment all set up with Java 8 or higher, Scala 2.11.x and Spark 2.0, you can run the script with the following command structure:
spark-submit --class com.adornes.spark.kaggle.AllstateClaimsSeverityRandomForestRegressor the_jar_file.jar --s3AccessKey YOUR_AWS_ACCESS_KEY_HERE --s3SecretKey YOUR_AWS_SECRET_KEY_HERE --trainInput "file:///path/to/the/train.csv" --testInput "file:///path/to/the/test.csv" --outputFile "file:///path/to/any/name/for/submission.csv" --algoNumTrees 3 --algoMaxDepth 3 --algoMaxBins 32 --numFolds 5 --trainSample 0.01 --testSample 0.01
As previously mentioned, scopt is the tool that enables the nice names for parameters at command line. If you type something wrong, it will output the sample usage as follows:
AllstateClaimsSeverityRandomForestRegressor 1.0
Usage: AllstateClaimsSeverityRandomForestRegressor [options]
--s3AccessKey <value> The access key for S3
--s3SecretKey <value> The secret key for S3
--trainInput <file> Path to file/directory for training data
--testInput <file> Path to file/directory for test data
--outputFile <file> Path to output file
--algoNumTrees <n1[,n2,n3...]>
One or more options for number of trees for RandomForest model. Default: 3
--algoMaxDepth <n1[,n2,n3...]>
One or more values for depth limit
--algoMaxBins <n1[,n2,n3...]>
One or more values for depth limit
--numFolds <value> Number of folds for K-fold Cross Validation
--trainSample <value> Sample fraction from 0.0 to 1.0 for train data
--testSample <value> Sample fraction from 0.0 to 1.0 for test data
Running the script on AWS Elastic MapReduce (EMR)
EMR plays the role of abstracting most of the background setup for a cluster with Spark/Hadoop ecosystems. You can actually build as many clusters as you want (and can afford). By the way, the cost for EC2 instances used with EMR is considerably reduced (it is detailed here).
Although considerably abstracting the cluster configuration, EMR allows the user to customize almost any of the background details through the advanced options of the steps of creating a cluster. For instance, for this Spark script, you’ll need to customize the Java version, according to this link. Besides that, everything is created using the options provided. So, going step by step, log in to your AWS console, in the Services tab look for EMR, select to create a cluster, choose Go to advanced options on the top of the screen and fill the options as follows:
- Vendor – Leave it as Amazon
- Release – Choose emr-5.1.0. Select Hadoop and Spark. I’d also recommend you to select Zeppelin (for working with notebooks) and Ganglia (for detailed monitoring of your cluster)
- Edit software settings (optional) – Ensure the option Enter configuration is selected and copy here the configurations of the aforementioned link
- Add steps – You don’t need to do it at this moment. I prefer to do it later, after your cluster is started and ready for processing stuff. Click Next for Hardware settings
- Hardware – You can leave it as default (and can also resize it later) but maybe 2 core instances can be increased to 4 or more. Don’t forget that your choice will have costs. Click Next for General Cluster Settings.
- Cluster name – Give some name to your cluster. Feel free to leave all other options with the default values. Click Next for Security.
- EC2 Key Pair – It is useful if want to log into your EC2 instances via ssh. You can either create a Key Pair or choose some existent if you already have one. Leave the remaining options with the default values and click on Create Cluster.
Now you’ll have an overview of your cluster’s basic data, including the state of your instances. When they indicate to be ready for processing steps, go to the Steps tab, click on Add step and fill the options as follows:
- Step type – Select Spark application
- Application location – Navigate through your S3 buckets and select the jar file there. You’ll need to have already uploaded it to S3.
- Spark-submit options – Type here
--class com.adornes.spark.kaggle.AllstateClaimsSeverityRandomForestRegressor
indicating the class that holds the code that you want to run. - Arguments – Here you type the rest of the command arguments as demonstrated before, but this time indicating S3 paths as follows:
--s3AccessKey YOUR_AWS_ACCESS_KEY_HERE --s3SecretKey YOUR_AWS_SECRET_KEY_HERE
--trainInput "s3:/path/to/the/train.csv" --testInput "s3:/path/to/the/test.csv"
--outputFile "s3:/path/to/any/name/for/submission.csv"
--algoNumTrees 20,40,60 --algoMaxDepth 5,7,9 --algoMaxBins 32 --numFolds 10
--trainSample 1.0 --testSample 1.0
That’s it! In the list of steps you will see your step running and will also have access to system logs. Detailed logs will be saved to the path defined in your cluster configuration. Additionally, EMR allows the user to clone both steps and clusters, being thus not required to type everything again.
Submission on Kaggle
As mentioned along the explanations, many improvements could/should be done in terms of exploratory data analysis, feature engineering, evaluating other models (starting by the simplest ones, as Linear Regression) and then decreasing the predictions error.
For being over-simplistic, this model achieved a Mean Absolute Error (MAE) of 1286 in the public leaderboard, far from the top positions.
The submission file and the detailed metrics of the model evaluation can be found under the output
directory in the Github repository.
Conclusion
Apache Spark 2.0 is indeed a powerful framework for building Machine Learning models, transformations pipeline, evaluations and everything else with a highly-scalable end product. Scala, in turn, is also a powerful programming language and the natural choice for developing Spark applications.
Allstate’s challenge is a project with a very interesting objective and very aligned with real-world Machine Learning problems, thus turning the concepts discussed on this article applicable to many other problems.
The full source code can be found at this Github repository.
Many technical and conceptual ideas discussed here are open. Suggestions and corrections are highly appreciated.