Practical Spark Tips for Data Scientists

Rahul Agarwal Rahul Agarwal
April 6, 2020 Big Data, Cloud & DevOps

I know — Spark is sometimes frustrating to work with.

Although sometimes we can manage our big data using tools like Rapids or Parallelization, there is no way around using Spark if you are working with Terabytes of data.

Why? Because Spark gives memory errors a lot of times, and it is only when you genuinely work on big datasets with spark, would you be able to truly work with Spark.

This post is going to be about — “Practical Spark and memory management tips for Data Scientists.”


1. Map Side Joins

Joining Dataframes

The syntax of joins in Spark is pretty similar to pandas:

df3 = df1.join(df2, df1.column == df2.column,how='left')

But I faced a problem. The df1 had around 1Billion rows while df2 had around 100 Rows. When I tried the above join, it didn’t work and failed with memory exhausted errors after running for 20 minutes.

I was writing this code on a pretty big cluster with more than 400 executors with each executor having more than 4GB RAM. I was stumped as I tried to repartition my data frames using multiple schemes, but nothing seemed to work.

So what should I do? Is Spark not able to work with a mere billion rows? Not Really. I just needed to use Map-side joins or broadcasting in Spark terminology.

from pyspark.sql.functions import broadcast
df3 = df1.join(broadcast(df2), df1.column == df2.column,how='left')

Using the simple broadcasting code above, I was able to send the smaller df2 to all the nodes, and this didn’t take a lot of time or memory. What happens in the backend is that a copy of df2 is sent to all the partitions and each partition uses that copy to do the join. That means that there is no data movement when it comes to df1, which is a lot bigger than df2.


2. Spark Cluster Configurations

Set the Parallelism and worker nodes based on your task size

Set the Parallelism and worker nodes based on your task size

What also made my life difficult while I was starting work with Spark was the way the Spark cluster needs to be configured. Your spark cluster might need a lot of custom configuration ad tuning based on the job you want to run.

Some of the most important configurations and options are as follows:

a. spark.sql.shuffle.partitions and spark.default.parallelism:

spark.sql.shuffle.partitions configures the number of partitions to use when shuffling data for joins or aggregations. The spark.default.parallelism is the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by the user. The default value for these is 200.

In simple words, these set the degree of parallelism you want to have in your cluster.

If you don’t have a lot of data, the value of 200 is fine, but if you have huge data, you might want to increase these numbers. It also depends on the number of executors you have. My cluster was pretty big with 400 executors, so I kept this at 1200. A rule of thumb is to keep it as a multiple of the number of executors so that each executor ends up with multiple jobs.

sqlContext.setConf( "spark.sql.shuffle.partitions", 800)
sqlContext.setConf( "spark.default.parallelism", 800)

b. spark.sql.parquet.binaryAsString

I was working with .parquet files in Spark, and most of my data columns were strings. But somehow whenever I loaded the data in Spark, the string columns got converted into binary format on which I was not able to use any string manipulation functions. The way I solved this was by using:

sqlContext.setConf("spark.sql.parquet.binaryAsString","true")

The above configuration converts the binary format to string while loading parquet files. Now it is a default configuration I set whenever I work with Spark.

c. Yarn Configurations:

There are other configurations that you might need to tune that define your cluster. But these need to be set up when the cluster is starting and are not as dynamic as the above ones. The few I want to put down here are for managing memory spills on the executor nodes. Sometimes the executor core gets a lot of work.

  • spark.yarn.executor.memoryOverhead: 8192
  • yarn.nodemanager.vmem-check-enabled: False

There are a lot of configurations that you might want to tune while setting up your spark cluster. You can take a look at them in the official docs.


3. Repartitioning

Keeping the workers happy by having them handle an equal amount of data

Keeping the workers happy by having them handle an equal amount of data

You might want to repartition your data if you feel your data has been skewed while working with all the transformations and joins. The simplest way to do it is by using:

df = df.repartition(1000)

Sometimes you might also want to repartition by a known scheme as this scheme might be used by a certain join or aggregation operation later on. You can use multiple columns to repartition using:

df = df.repartition('cola', 'colb','colc','cold')

You can get the number of partitions in a data frame using:

df.rdd.getNumPartitions()

You can also check out the distribution of records in a partition by using the glom function. This helps in understanding the skew in the data that happens while working with various transformations.

df.glom().map(len).collect()

Conclusion

There are a lot of things we don’t know, we don’t know. These are called unknown unknowns. It is only by multiple code failures and reading up on multiple stack overflow threads that we understand what we need.

Here I have tried to summarize a few of the problems that I faced around memory issues and configurations while working with Spark and how to solve them. There are a lot of other configuration options in Spark, which I have not covered, but I hope this post gave you some clarity on how to set these and use them.

  • Experfy Insights

    Top articles, research, podcasts, webinars and more delivered to you monthly.

  • Rahul Agarwal

    Tags
    Data Science
    Leave a Comment
    Next Post
    Blockchain – the Networked Ecosystem is the Business

    Blockchain - the Networked Ecosystem is the Business

    Leave a Reply Cancel reply

    Your email address will not be published. Required fields are marked *

    More in Big Data, Cloud & DevOps
    Big Data, Cloud & DevOps
    Cognitive Load Of Being On Call: 6 Tips To Address It

    If you’ve ever been on call, you’ve probably experienced the pain of being woken up at 4 a.m., unactionable alerts, alerts going to the wrong team, and other unfortunate events. But, there’s an aspect of being on call that is less talked about, but even more ubiquitous – the cognitive load. “Cognitive load” has perhaps

    5 MINUTES READ Continue Reading »
    Big Data, Cloud & DevOps
    How To Refine 360 Customer View With Next Generation Data Matching

    Knowing your customer in the digital age Want to know more about your customers? About their demographics, personal choices, and preferable buying journey? Who do you think is the best source for such insights? You’re right. The customer. But, in a fast-paced world, it is almost impossible to extract all relevant information about a customer

    4 MINUTES READ Continue Reading »
    Big Data, Cloud & DevOps
    3 Ways Businesses Can Use Cloud Computing To The Fullest

    Cloud computing is the anytime, anywhere delivery of IT services like compute, storage, networking, and application software over the internet to end-users. The underlying physical resources, as well as processes, are masked to the end-user, who accesses only the files and apps they want. Companies (usually) pay for only the cloud computing services they use,

    7 MINUTES READ Continue Reading »

    About Us

    Incubated in Harvard Innovation Lab, Experfy specializes in pipelining and deploying the world's best AI and engineering talent at breakneck speed, with exceptional focus on quality and compliance. Enterprises and governments also leverage our award-winning SaaS platform to build their own customized future of work solutions such as talent clouds.

    Join Us At

    Contact Us

    1700 West Park Drive, Suite 190
    Westborough, MA 01581

    Email: support@experfy.com

    Toll Free: (844) EXPERFY or
    (844) 397-3739

    © 2023, Experfy Inc. All rights reserved.