Apache Spark and Scala (35 Blogs) Become a Certified Professional

RDD using Spark : The Building Block of Apache Spark

Last updated on Nov 26,2019 2.7K Views
Ravi Kiran
Tech Enthusiast working as a Research Analyst at Edureka. Curious about learning... Tech Enthusiast working as a Research Analyst at Edureka. Curious about learning more about Data Science and Big-Data Hadoop.

MI-new-launch

myMock Interview Service for Real Tech Jobs

myMock-mobile-banner-bg

myMock Interview Service for Real Tech Jobs

  • Mock interview in latest tech domains i.e JAVA, AI, DEVOPS,etc
  • Get interviewed by leading tech experts
  • Real time assessment report and video recording

Spark, The word itself is enough to generate a spark in every Hadoop engineer’s mind. An in-memory processing tool which is lightning-fast in cluster computing. Compared to MapReduce, the in-memory data sharing makes RDDs 10-100x faster than network and disk sharing and all this is possible because of RDDs (Resilient Distributed Data sets). The key points we focus today in this RDD using Spark article are:

Need for RDDs?

Why do we need RDD?-RDD using Spark

The world is evolving with Artificial Intelligence and Data Science because of the advancement in Machine Learning. Algorithms based on  Regression, Clustering, and Classification which runs on Distributed Iterative Computation fashion that includes Reusing and Sharing of data among multiple computing units.

 

The traditional MapReduce techniques needed a Stable Intermediate and Distributed storage like HDFS comprising repetitive computations with data replications and data serialization, which made the process a lot slower. Finding a solution was never easy.

Traditional HDFS-RDD using Spark

This is where RDDs (Resilient Distributed Datasets) comes to the big picture.

RDDs are easy to use and effortless to create as data is imported from data sources and dropped into RDDs. Further, the operations are applied to process them. They are a distributed collection of memory with permissions as Read-only and most importantly, they are Fault-tolerant.

In memory computation of RDD-RDD using Spark

 

If any data partition of an RDD is lost, it can be regenerated by applying the same transformation operation on that lost partition in lineage, rather than processing all the data from scratch. This kind of approach in real time scenarios can make miracles happen in situations of data loss or when a system is down.

 

What are RDDs?

RDD or (Resilient Distributed Data set) is a fundamental data structure in Spark. The term Resilient defines the ability that generates the data automatically or data rolling back to the original state when an unexpected calamity occurs with a probability of data loss.

 

                             Data rolled back-RDD using Spark

The data written into RDDs is partitioned and stored into multiple executable nodes. If an executing node fails in the run time, then it instantly gets the back up from the next executable node. This is why RDDs are considered as an advanced type of data structures when compared to other traditional data structures. RDDs can store structured, unstructured and semi-structured data.

RDD Multiple Partitions-RDD using Spark

Let’s move ahead with our RDD using Spark blog and learn about the unique features of RDDs which gives it an edge over other types of data structures.

 

Features of RDD

Features of RDD-RDD using Spark

 

  • In-Memory(RAM) Computations: The concept of In-Memory computation takes the data processing to a faster and efficient stage where the overall performance of the system is upgraded. 
  • Lazy Evaluation: The term Lazy evaluation says the transformations are applied to the data in RDD, but the output is not generated. Instead, the applied transformations are logged.
  • Persistence: The resultant RDDs are always reusable.
  • Coarse-Grained Operations: The user can apply transformations to all elements in data sets through map, filter or group by operations.
  • Fault Tolerant: If there is a loss of data, the system can roll back to its original state by using the logged transformations.
  • Immutability: Data defined, retrieved or created cannot be changed once it is logged into the system. In case if you need to access and modify the existing RDD, you must create a new RDD by applying a set of Transformation functions on to the current or preceding RDD.
  • Partitioning: It is the crucial unit of parallelism in Spark RDD. By default, the number of partitions created is based on your data source. You can even decide the number of partitions you wish to make using custom partition functions.

Creation of RDD using Spark

RDDs can be created in three ways:

Methods of creating RDD-RDD using Spark

  1. Reading data from parallelized collections
val PCRDD = spark.sparkContext.parallelize(Array("Mon","Tue","Wed","Thu","Fri","Sat"),2)
val resultRDD = PCRDD.collect()
resultRDD.collect().foreach(println)
  1. Applying transformation on previous RDDs
val words = spark.sparkContext.parallelize(Seq("Spark","is","a","very","powerful","language"))
val wordpair = words.map(w =(w.charAt(0),w))
wordpair.collect().foreach(println)
  1. Reading data from external storage or file paths like HDFS or HBase
val Sparkfile = spark.read.textFile("/user/edureka_566977/spark/spark.txt.")
Sparkfile.collect()

Operations performed on RDDs:

There are mainly two types of operations which are performed on RDDs, namely:

  • Transformations 
  • Actions

Transformations and actions-RDD using Spark

Transformations: The operations we apply on RDDs to filter, access and modify the data in parent RDD to generate a successive RDD is called transformation. The new RDD returns a pointer to the previous RDD ensuring the dependency between them.

Transformations are Lazy Evaluations, in other words, the operations applied on the RDD that you are working will be logged but not executed. The system throws a result or exception after triggering the Action.

We can divide transformations into two types as below:

  • Narrow Transformations
  • Wide Transformations

Narrow Transformations We apply narrow transformations on to a single partition of the parent RDD to generate a new RDD as data required to process the RDD is available on a single partition of the parent RDD. The examples for narrow transformations are:

  • map()
  • filter()
  • flatMap()
  • partition()
  • mapPartitions()

Wide Transformations: We apply the wide transformation on multiple partitions to generate a new RDD. The data required to process the RDD is available on the multiple partitions of the parent RDD. The examples for wide transformations are :

  • reduceBy()
  • union()

Actions: Actions instruct Apache Spark to apply computation and pass the result or an exception back to the driver RDD. Few of the actions include:

  • collect()
  • count()
  • take()
  • first()

Let us practically apply the operations on RDDs:

IPL Teams-RDD using Spark

IPL(Indian Premier League) is a cricket tournament with it’s hipe at a peak level. So, lets today get our hands on to the IPL data set and execute our RDD using Spark.

  • Firstly, let’s download a CSV match data of IPL. After downloading it, it starts to look as an EXCEL file with rows and columns.

Matches.csv-RDD with Spark

In the next step, we fire up the spark and load the matches.csv file from its location, in my case my csv file location is “/user/edureka_566977/test/matches.csv”

Fire up the Spark-RDD using Spark

 

Now let us Start with the Transformation part first:

Transformation-RDD using Spark

  • map():

We use Map Transformation to apply a specific transformation operation on every element of an RDD. Here we create an RDD by name CKfile where store our csv file. We shall create another RDD called States to store the city details.

spark2-shell
val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")
CKfile.collect.foreach(println)
val states = CKfile.map(_.split(",")(2))
states.collect().foreach(println)

 

Map Transformation in spark- RDD using Spark

 

  • filter(): 

Filter transformation, the name itself describes its use. We use this transformation operation to filter out the selective data out of a collection of data given. We apply filter operation here to get the records of the IPL matches of the year 2017 and store it in fil RDD.

val fil = CKfile.filter(line => line.contains("2017"))
fil.collect().foreach(println)

 

Filter operation using Spark-RDD using Spark

 

  • flatMap():

We apply flatMap is a transformation operation to each of the elements of an RDD to create a newRDD. It is similar to Map transformation. here we apply Flatmap to spit out the matches of Hyderabad city and store the data into filRDD RDD.


val filRDD = fil.flatMap(line => line.split("Hyderabad")).collect()

 

Flat Map Transformation in Spark-RDD using Spark

 

  • partition(): 

Every data we write into an RDD is split into a certain number of partitions. We use this transformation to find the number of partitions the data is actually split into.

val fil = CKfile.filter(line => line.contains("2017"))
fil.partitions.size

 

Partition Transformation Output-RDD using Spark

     

  • mapPartitions(): 

We consider MapPatitions as an alternative of Map() and foreach() together. We use mapPartitions here to find the number of rows we have in our fil RDD.

val fil = CKfile.filter(line => line.contains("2016"))
fil.mapPartitions(idx => Array(idx.size).iterator).collect

 

MapPartitions Transformation Output-RDD using Spark

  • reduceBy(): 

We use ReduceBy() on Key-Value pairs. We used this transformation on our csv file to find the player with the highest Man of the matches.

val ManOfTheMatch = CKfile.map(_.split(",")(13))
val MOTMcount = ManOfTheMatch.map(WINcount => (WINcount,1))
val ManOTH = MOTMcount.reduceByKey((x,y) => x+y).map(tup => (tup._2,tup._1))sortByKey(false)
ManOTH.take(10).foreach(println)

 

ReduceBy Transformation Output-RDD using Spark

 

  • union():

The name explains it all, We use union transformation is to club two RDDs together. Here we are creating two RDDs namely fil and fil2. fil RDD contains the records of 2017 IPL matches and fil2 RDD contains 2016 IPL match record.

val fil = CKfile.filter(line => line.contains("2017"))
val fil2 = CKfile.filter(line => line.contains("2016"))
val uninRDD = fil.union(fil2)

Union Transformation in spark-RDD in Spark

 

Let us start with the Action part where we show actual output:

Action-RDD using Spark

  • collect():

Collect is the action which we use to display the contents in an RDD.

val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")
CKfile.collect.foreach(println)

 

Collect Action Using Spark-RDD using Spark

 

  • count():

Count is an action that we use to count the number of records present in an RDD.Here we are using this operation to count the total number of records in our matches.csv file.

val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")
CKfile.count()

 

Count Action-RDD using Spark

 

  • take():

Take is an Action operation similar to collect but the only difference is it can print any selective number of rows as per user request. Here we apply the following code to print the top ten leading reports.

val statecountm = Scount.reduceByKey((x,y) => x+y).map(tup => (tup._2,tup._1))sortByKey(false)
statecountm.collect().foreach(println)
statecountm.take(10).foreach(println)

 

Take Action output-RDD using Spark

 

  • first(): 

First() is an action operation similar to collect() and take() it used to print the topmost report s the output Here we use the first() operation to find the maximum number of matches played in a particular city and we get Mumbai as the output.

val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv")
val states = CKfile.map(_.split(",")(2))
val Scount = states.map(Scount => (Scount,1))
scala> val statecount = Scount.reduceByKey((x,y)=> x+y).collect.foreach(println)
Scount.reduceByKey((x,y)=> x+y).collect.foreach(println)
val statecountm = Scount.reduceByKey((x,y)=> x+y).map(tup => (tup._2,tup._1))sortByKey(false)
statecountm.first()

 

First Action Output-RDD using Spark

 

To make our process our learning RDD using Spark, even more, interesting, I have come up with an interesting use case.

RDD using Spark: Pokemon Use Case

Pokemon-RDD using Spark

  • Firstly, Let us download a Pokemon.csv file and load it to the spark-shell as we did to the Matches.csv file.
val PokemonDataRDD1 = sc.textFile("/user/edureka_566977/PokemonFile/PokemonData.csv")
PokemonDataRDD1.collect().foreach(println)

Loading CSV-RDD using Spark

Pokemons are actually available in a large variety, Let us find a few varieties.

  • Removing schema from Pokemon.csv file

We might not need the Schema of Pokemon.csv file. Hence, we remove it.

val Head = PokemonDataRDD1.first()
val NoHeader = PokemonDataRDD1.filter(line => !line.equals(Head))

  • Finding the number of partitions our pokemon.csv is distributed into.
println("No.ofpartitions="+NoHeader.partitions.size)

partition of pokemon.csv-RDD using Spark

  • Water Pokemon

Finding the number of Water pokemon

val WaterRDD = PokemonDataRDD1.filter(line => line.contains("Water"))
WaterRDD.collect().foreach(println)

Water Pokemon-RDD using Spark

  • Fire Pokemon

Finding the number of Fire pokemon

val FireRDD = PokemonDataRDD1.filter(line => line.contains("Fire"))
FireRDD.collect().foreach(println)

Fire Pokemon-RDD using Spark

     

    • We can also detect the population of a different type of pokemon using the count function
    WaterRDD.count()
    FireRDD.count()
    

    Population count-RDD using Spark

      • Since I like the game of defensive strategy let us find the pokemon with maximum defence.
      val defenceList = NoHeader.map{x => x.split(',')}.map{x => (x(6).toDouble)}
      println("Highest_Defence : "+defenceList.max())
      

      maximum Defence - RDD using Spark

      • We know the maximum defense strength value but we don’t know which pokemon is it. so, let us find which is that pokemon.
      val defWithPokemonName = NoHeader.map{x => x.split(',')}.map{x => (x(6).toDouble,x(1))}
      val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered(1)(Ordering[Double].reverse.on(_._1))
      MaxDefencePokemon.foreach(println)
      

      GroupBy to find max defence pokemon name-RDD usin Spark

      • Now let us sort out the pokemon with least Defence
      val minDefencePokemon = defenceList.distinct.sortBy(x => x.toDouble,true,1)
      minDefencePokemon.take(5).foreach(println)
      

      Pokemon with least defence-RDD using Spark

       

      • Now let us see the Pokemon with a less defensive strategy.
      val PokemonDataRDD2 = sc.textFile("/user/edureka_566977/PokemonFile/PokemonData.csv")
      val Head2 = PokemonDataRDD2.first()
      val NoHeader2 = PokemonDataRDD2.filter(line => !line.equals(Head))
      val defWithPokemonName2 = NoHeader2.map{x => x.split(',')}.map{x => (x(6).toDouble,x(1))}
      val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered(1)(Ordering[Double].on(_._1))
      MinDefencePokemon2.foreach(println)
      

       

      least defensive strategy-RDD using Spark

      So, with this, we come to an end of this RDD using Spark article. I hope we sparked a little light upon your knowledge about RDDs, their features and the various types of operations that can be performed on them.

      This article based on Apache Spark and Scala Certification Training is designed to prepare you for the Cloudera Hadoop and Spark Developer Certification Exam (CCA175). You will get an in-depth knowledge on Apache Spark and the Spark Ecosystem, which includes Spark RDD, Spark SQL, Spark MLlib and Spark Streaming. You will get comprehensive knowledge on Scala Programming language, HDFS, Sqoop, Flume, Spark GraphX and Messaging System such as Kafka.

       

      Comments
      0 Comments

      Browse Categories

      webinar REGISTER FOR FREE WEBINAR
      REGISTER NOW
      webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP

      Subscribe to our Newsletter, and get personalized recommendations.