Apache Spark and Scala (36 Blogs) Become a Certified Professional

Spark Accumulators Explained: Apache Spark

Last updated on May 22,2019 54.4K Views

Prithviraj Bose
Prithviraj has spent close to two decades in the software development industry... Prithviraj has spent close to two decades in the software development industry designing and developing applications ranging from Level 5 process control software at...

Contributed by Prithviraj Bose

Here’s a blog on the stuff that you need to know about Spark accumulators. With Apache Spark Certification being a key skill that most IT recruiters hunt for, its growth and demand in the industry has been exponential since its inception.

What are accumulators?

Accumulators are variables that are used for aggregating information across the executors. For example, this information can pertain to data or API diagnosis like how many records are corrupted or how many times a particular library API was called.

To understand why we need accumulators, let’s see a small example.

Here’s an imaginary log of transactions of a chain of stores around the central Kolkata region.

logs-Spark-accumulators

There are 4 fields,

Field 1 -> City

Field 2 -> Locality

Field 3 -> Category of item sold

Field 4 -> Value of item sold

However, the logs can be corrupted. For example, the second line is a blank line, the fourth line reports some network issues and finally the last line shows a sales value of zero (which cannot happen!).

We can use accumulators to analyse the transaction log to find out the number of blank logs (blank lines), number of times the network failed, any product that does not have a category or even number of times zero sales were recorded. The full sample log can be found here.
Accumulators are applicable to any operation which are,
1. Commutative -> f(x, y) = f(y, x), and
2. Associative -> f(f(x, y), z) = f(f(x, z), y) = f(f(y, z), x)
For example, sum and max functions satisfy the above conditions whereas average does not.

Why use Spark Accumulators?

Now why do we need accumulators and why not just use variables as shown in the code below.

variables-spark-accumulators

The problem with the above code is that when the driver prints the variable blankLines its value will be zero. This is because when Spark ships this code to every executor the variables become local to that executor and its updated value is not relayed back to the driver. To avoid this problem we need to make blankLines an accumulator such that all the updates to this variable in every executor is relayed back to the driver.
So the above code should be written as,
code-spark-accumulators

This guarantees that the accumulator blankLines is updated across every executor and the updates are relayed back to the driver.

We can implement other counters for network errors or zero sales value, etc. The full source code along with the implementation of the other counters can be found here.

People familiar with Hadoop Map-Reduce will notice that Spark’s accumulators are similar to Hadoop’s Map-Reduce counters.

Caveats

When using accumulators there are some caveats that we as programmers need to be aware of,

  1. Computations inside transformations are evaluated lazily, so unless an action happens on an RDD the transformationsare not executed. As a result of this, accumulators used inside functions like map() or filter() wont get executed unless some action happen on the RDD.
  2. Spark guarantees to update accumulators inside actionsonly once. So even if a task is restarted and the lineage is recomputed, the accumulators will be updated only once.
  3. Spark does not guarantee this for transformations. So if a task is restarted and the lineage is recomputed, there are chances of undesirable side effects when the accumulators will be updated more than once.

To be on the safe side, always use accumulators inside actions ONLY.
The code here shows a simple yet effective example on how to achieve this.
For more information on accumulators, read this.

Got a question for us? Mention them in the comment section and we will get back to you.

Related Posts:

Get Started with Apache Spark and Scala

Apache Spark combineByKey Explained

Comments
1 Comment
  • Ronak Hingar says:

    https://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html .
    In this documentation there is a datastructure created which conflicts the original thought variables cannot be updated across executors and drivers.

    AtomicReference offsetRanges = new AtomicReference<>();

    directKafkaStream.transformToPair(rdd -> { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd;
    }).map(
    ...
    ).foreachRDD(rdd -> { for (OffsetRange o : offsetRanges.get()) {
    System.out.println(
    o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
    ); } ...
    });
    System.out.println(Arrays.toString(offsetRanges.get()));

    I have added a print statement after the narrow transformations so that statement gets exceuted on the driver.
    By your logic when I print it in the end i should get empty value as none of the offsets are updated back to the driver. But that is not the case.

Join the discussion

Browse Categories

webinar REGISTER FOR FREE WEBINAR
REGISTER NOW
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP

Subscribe to our Newsletter, and get personalized recommendations.

image not found!
image not found!

Spark Accumulators Explained: Apache Spark

edureka.co