SPARK-6

Transformations 

Actions

Transformations

TRA1transfor1

t5t4t3T2

t9t8t7t6

 

spark-wide-transformationspark-narrow-transformation-1spark-map-transformation-operationspark-flatmap-transformation-operation

. What does reduce action do?

A reduce action converts an RDD to a single value by applying recursively the provided (in argument) function on the elements of an RDD until only one value is left. The provided function must be commutative and associative – the order of arguments or in what way we apply the function should not make difference.

The following diagram shows the process of applying “sum” reduce function on an RDD containing 1, 2, 3, 4.

reduce-action.png

 

 

reduce-action

 

Difference between map() and flatMap()?

  • The map() transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD.
  • Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap(). As with map(), the function we provide to flatMap() is called individually for each element in our input RDD. Instead of returning a single element, we return an iterator with our return values.

The map()change takes in a capacity and applies it to every component in the RDD with the consequence of the capacity being the new estimation of every component in the subsequent RDD. Some of the time we need to deliver numerous yield components for each information component. The task to do this is called flatMap(). Similarly as with guide(), the capacity we give to flatMap() is called independently for every component in our info RDD. Rather than restoring a solitary component, we return an iterator with our arrival esteems.

Explain about the different types of transformations on DStreams?

  • Stateless Transformations- Processing of the batch does not depend on the output of the previous batch. Examples – map (), reduceByKey (), filter ().
  • Stateful Transformations- Processing of the batch depends on the intermediary results of the previous batch. Examples –Transformations that depend on sliding windows.

What is Transformation in begin?

Begin gives two one of a kind exercises on RDDs called changes and Actions. Change seeks after lazy assignment and short lived hold the data until the point that with the exception of whenever called the Action. Each change makes/return new RDD. Instance of changes: Map, flatMap, groupByKey, reduceByKey, channel, co-gathering, join, sortByKey, Union, specific, precedent are fundamental begin changes

.What is Action in Spark?

Exercises are RDD’s undertaking, that regard returns back to the battle driver programs, which kick off work to execute on a group. Change’s yield is a commitment of Actions. decrease, accumulate, takeSample, take, first, saveAsTextfile, saveAsSequenceFile, countByKey, foreach are ordinary exercises in Apache begin.

Explain the filter transformation.
<li style=”list-style-type: none”>
filter() transformation in Apache Spark takes function as input.
It returns an RDD that only has element that pass the condition mentioned in input function.
Example:

val rdd1 = sc.parallelize(List(10,20,40,60))
val rdd2 = rdd2.filter(x => x !=10)
println(rdd2.collect())
Output

10
It returns a new dataset which is formed by selecting those elements of source on which function returns true. It returns those elements only that satisfy a predicate. The predicate is a function that accepts parameter and returns Boolean value either true or false. It keeps only those elements which pass/satisfies the condition and filter out those which don’t. so the new RDD will be set of those elements for which function returns true.

———————-
What is the command to start and stop the spark in interactive shell ?
Command to start the interactive shell in Scala:
>>>>bin/spark-shell
First go the spark directory i.e.

hdadmin@ubuntu:~$ cd spark-1.6.1-bin-hadoop2.6/
hdadmin@ubuntu:~/spark-1.6.1-bin-hadoop2.6$ bin/spark-shell
——————————————————————————————————————————
Command to stop the interactive shell in Scala:
scala>Press (Ctrl+D)
One can see the following message
scala> Stopping spark context.

Explain sortByKey() operation
> sortByKey() is a transformation.
> It returns an RDD sorted by Key.
> Sorting can be done in (1) Ascending OR (2) Descending OR (3) custom sorting
From :
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#212_SortByKey
They will work with any key type K that has an implicit Ordering[K] in scope. Ordering objects already exist for all of the standard primitive types. Users can also define their own orderings for custom types, or to override the default ordering. The implicit ordering that is in the closest scope will be used.

When called on Dataset
of (K, V) where k is Ordered returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the ascending argument.

Example :

<br />
val rdd1 = sc.parallelize(Seq((“India”,91),(“USA”,1),(“Brazil”,55),(“Greece”,30),(“China”,86),(“Sweden”,46),(“Turkey”,90),(“Nepal”,977)))<br />
val rdd2 = rdd1.sortByKey()<br />
rdd2.collect();<br />

Output:
Array[(String,Int)] = (Array(Brazil,55),(China,86),(Greece,30),(India,91),(Nepal,977),(Sweden,46),(Turkey,90),(USA,1)

<br />
val rdd1 = sc.parallelize(Seq((“India”,91),(“USA”,1),(“Brazil”,55),(“Greece”,30),(“China”,86),(“Sweden”,46),(“Turkey”,90),(“Nepal”,977)))<br />
val rdd2 = rdd1.sortByKey(false)<br />
rdd2.collect();<br />

Output:
Array[(String,Int)] = (Array(USA,1),(Turkey,90),(Sweden,46),(Nepal,977),(India,91),(Greece,30),(China,86),(Brazil,55)

Adding one more point on sortByKey() operation is , the result of sortByKey() is based on range-partitione

Using Partitioner concept, we can avoid shuffle of data across the network. This is required when you performing operations like sortByKey(), join(), cogroup()….etc. Different operation(s) has different partitioner(hash-based, range-based, or custom partitioner)
—————————-
distnct() transformation

If one want only unique elements in a RDD in that case one can use d1.distnct() where d1 is RDD
Example

val d1 = sc.parallelize(List(“c”,”c”,”p”,”m”,”t”))
val result = d1.distnct()
result.foreach{println}
OutPut:
p
t
m
c
distinct() transformation is expensive operation as it requires shuffling all the data over the network to ensure that we receive only one copy of each element
union() transformation

Its simplest set operation.
rdd1.union(rdd2) which outputs a RDD which contains the data from both sources.
If the duplicates are present in the input RDD, output of union() transformation will contain duplicate also which can be fixed using distinct().
Example

val u1 = sc.parallelize(List(“c”,”c”,”p”,”m”,”t”))
val u2 = sc.parallelize(List(“c”,”m”,”k”))
val result = u1.union(u2)
result.foreach{println}
Output:
c
c
p
m
t
c
m
k

intersection() transformation

<li style=”list-style-type: none”>
intersection(anotherrdd) returns the elements which are present in both the RDDs.
intersection(anotherrdd) remove all the duplicate including duplicated in single RDD
val is1 = sc.parallelize(List(“c”,”c”,”p”,”m”,”t”))
val is2 = sc.parallelize(List(“c”,”m”,”k”))
val result = is1.union(is2)
result.foreach{println}
Output :
m
c

subtract() transformation

Subtract(anotherrdd).
It returns an RDD that has only value present in the first RDD and not in second RDD.
Example

val s1 = sc.parallelize(List(“c”,”c”,”p”,”m”,”t”))
val s2 = sc.parallelize(List(“c”,”m”,”k”))
val result = s1.subtract(s2)
result.foreach{println}
Output:
t
p

————-
Explain foreach() operation in apache spark
foreach() operation is an action.
> It do not return any value.
> It executes input function on each element of an RDD.
It executes the function on each item in RDD. It is good for writing database or publishing to web services. It executes parameter less function for each data items.

Example:

val mydata = Array(1,2,3,4,5,6,7,8,9,10)
val rdd1 = sc.parallelize(mydata)
rdd1.foreach{x=>println(x)}

OR

rdd1.foreach{println}

Output:
1
2
3
4
5
6
7
8
9
10

What is the difference between groupByKey vs reduceByKey in Spark?
Which of groupByKey and reduceByKey is transformation and which is action?
While processing RDD which is better groupByKey or reduceByKey?

On applying groupByKey() on a dataset of (K, V) pairs, the data shuffle according to the key value K in another RDD. In this transformation, lots of unnecessary data transfer over the network.

Spark provides the provision to save data to disk when there is more data shuffling onto a single executor machine than can fit in memory.

Example:

val data = spark.sparkContext.parallelize(Array((‘k’,5),(‘s’,3),(‘s’,4),(‘p’,7),(‘p’,5),(‘t’,8),(‘k’,6)),3)
val group = data.groupByKey().collect()
group.foreach(println)
On applying reduceByKey on a dataset (K, V), before shuffeling of data the pairs on the same machine with the same key are combined.

Example:

val words = Array(“one”,”two”,”two”,”four”,”five”,”six”,”six”,”eight”,”nine”,”ten”)
val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_+_)
data.foreach(println)
What is Map transformation operation in Apache Spark?
What is the need for the Map transformation?
What processing can be done in the Map in Spark explain with example
Map is a transformation applied to each element in a RDD and it provides a new RDD as a result.
In Map transformation, user-defined business logic will be applied to all the elements in the RDD.
It is similar to FlatMap, but unlike FlatMap Which can produce 0, 1 or many outputs, Map can only produce one to one output.
Map operation will transforms an RDD of length N into another RDD of length N.

A——->a
B——->b
C——->c
Map Operation

Map transformation will not shuffle data from one partition to many. It will keep the operation narrow.

What is FlatMap transformation operation in Apache Spark?
What is the need for the FlatMap when we already have Map Operation?
What processing can be done in the FlatMap in Spark explain with example?

FlatMap is a transformation operation in Apache Sparkto create an RDD from existing RDD. It takes one element from an RDD and can produce 0, 1 or many outputs based on business logic. It is similar to Map operation, but Map produces one to one output. If we perform Map operation on an RDD of length N, output RDD will also be of length N. But for FlatMap operation output RDD can be of different length based on business logic

X——A x———–a
Y——B y———–b,c
Z——C z———–d,e,f

Map Operation FlatMap Operation

We can also say as flatMap transforms an RDD of length N into a collection of N collection, then flattens into a single RDD of results.

If we observe the below example data1 RDD which is the output of Map operation has same no of element as of data RDD,
But data2 RDD does not have the same number of elements. We can also observe here as data2 RDD is a flattened output of data1 RDD
—-
Explain fold() operation in Spark.
old() is an action. It is wide operation (i.e. shuffle data across multiple partitions and output a single value)
It takes function as an input which has two parameters of the same type and outputs a single value of the input type.
It is similar to reduce but has one more argument ‘ZERO VALUE’ (say initial value) which will be used in the initial call on each partition.

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”. The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

zeroValue: The initial value for the accumulated result of each partition for the op operator, and also the initial value for the combine results from different partitions for the op operator – this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)
Op: an operator used to both accumulate results within a partition and combine results from different partitions

Example :

val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
rdd1.fold(5)(_+_)

Output :
Int = 35

val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1.fold(5)(_+_)

Output :
Int = 25

val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
rdd1.fold(3)(_+_)

Int = 27

Explain API createOrReplaceTempView().
<li style=”list-style-type: none”>
Its basic Dataset function.
Its under org.apache.spark.sql

def createOrReplaceTempView(viewName: String): Unit
Creates a temporary view using the given name.
The lifetime of this temporary view is tied to the SparkSession that was used to create this Dataset.

Explain values() operation in Apache Spark.

values() is a transformation.
It returns an RDD of values only.
<br />
val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))<br />
val rdd2 = rdd1.values<br />
rdd2.collect<br />
Output:
Array[Int] = Array(4, 6, 8, 10, 12, 14, 16, 18, 20)

Example2 : Values are duplicate in data set

<br />
val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(2,6),(4,12),(5,10),(5,40),(10,40)))<br />
val rdd2 = rdd1.keys<br />
rdd2.collect<br />
val rdd3 = rdd1.values<br />
rdd3.collect<br />
Output:

Array[Int] = Array(2, 3, 4, 2, 4, 5, 5, 10)
Array[Int] = Array(4, 6, 8, 6, 12, 10, 40, 40

Explain keys() operation in Apache spark.
keys() is a transformation.
It returns an RDD of keys.
val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))
val rdd2 = rdd1.keys
rdd2.collect
Output:

Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9,
Example 2 : (Keys are repeated – duplicate keys are present in data set)

val rdd1 = sc.parallelize(Seq((2,4),(3,6),(4,8),(2,6),(4,12),(5,10),(5,40),(10,40)))
val rdd2 = rdd1.keys
rdd2.collect
Output:
Array[Int] = Array(2, 3, 4, 2, 4, 5, 5, 10)
Explain textFile Vs wholeTextFile in Spark
li style=”list-style-type: none”>
Both are the method of org.apache.spark.SparkContext.

textFile() :

<li style=”list-style-type: none”>
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings
For example sc.textFile(“/home/hdadmin/wc-data.txt”) so it will create RDD in which each individual line an element.
Everyone knows the use of textFile.

wholeTextFiles() :

<li style=”list-style-type: none”>
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI.
Rather than create basic RDD, the wholeTextFile() returns pairRDD.
For example, you have few files in a directory so by using wholeTextFile() method,
it creates pair RDD with filename with path as key,
and value being the whole file as string
val myfilerdd = sc.wholeTextFiles(“/home/hdadmin/MyFiles”)
val keyrdd = myfilerdd.keys
keyrdd.collect
val filerdd = myfilerdd.values
filerdd.collect

Output :
Array[String] = Array(
file:/home/hdadmin/MyFiles/JavaSparkPi.java,
file:/home/hdadmin/MyFiles/sumnumber.txt,
file:/home/hdadmin/MyFiles/JavaHdfsLR.java,
file:/home/hdadmin/MyFiles/JavaPageRank.java,
file:/home/hdadmin/MyFiles/JavaLogQuery.java,
file:/home/hdadmin/MyFiles/wc-data.txt,
file:/home/hdadmin/MyFiles/nosum.txt)

Array[String] =
Array(“/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the “License”); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and

—-
Explain cogroup() operation
It’s a transformation.
> It’s in package org.apache.spark.rdd.PairRDDFunctions

def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

For each key k in this or other1 or other2 or other3, return a resulting RDD that contains a tuple with the list of values for that key in this, other1, other2 and other3.

Example:

val myrdd1 = sc.parallelize(List((1,”spark”),(2,”HDFS”),(3,”Hive”),(4,”Flink”),(6,”HBase”)))
val myrdd2 = sc.parallelize(List((4,”RealTime”),(5,”Kafka”),(6,”NOSQL”),(1,”stream”),(1,”MLlib”)))
val result = myrdd1.cogroup(myrdd2)
result.collect

Output :
Array[(Int, (Iterable[String], Iterable[String]))] =
Array((4,(CompactBuffer(Flink),CompactBuffer(RealTime))),
(1,(CompactBuffer(spark),CompactBuffer(stream, MLlib))),
(6,(CompactBuffer(HBase),CompactBuffer(NOSQL))),
(3,(CompactBuffer(Hive),CompactBuffer())),
(5,(CompactBuffer(),CompactBuffer(Kafka))),
(2,(CompactBuffer(HDFS),CompactBuffer())))

Explain pipe() operation in Apache Spark
<li style=”list-style-type: none”>
It is a transformation.

def pipe(command: String): RDD[String]
Return an RDD created by piping elements to a forked external process.

In general, Spark is using Scala, Java, and Python to write the program. However, if that is not enough, and one want to pipe (inject) the data which written in other languages like ‘R’, Spark provides general mechanism in the form of pipe() method
Spark provides the pipe() method on RDDs.
With Spark’s pipe() method, one can write a transformation of an RDD that can read each element in the RDD from standard input as String.
It can write the results as String to the standard output.

Explain coalesce() operation.
It is a transformation.
> It’s in a package org.apache.spark.rdd.ShuffledRDD
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, C)] = null): RDD[(K, C)]

Return a new RDD that is reduced into numPartitions partitions.

This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead, each of the 100 new partitions will claim 10 of the current partitions.

However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

Note: With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner.

From :
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#214_Coalesce

It changes a number of the partition where data is stored. It combines original partitions to a new number of partitions, so it reduces the number of partitions. It is an optimized version of repartition that allows data movement, but only if you are decreasing the number of RDD partitions. It runs operations more efficiently after filtering large datasets.

Example :

val myrdd1 = sc.parallelize(1 to 1000, 15)
myrdd1.partitions.length
val myrdd2 = myrdd1.coalesce(5,false)
myrdd2.partitions.length
Int = 5

Output :
Int = 15
Int = 5

Explain the repartition() operation
repartition() is a transformation.
> This function changes the number of partitions mentioned in parameter numPartitions(numPartitions : Int)
> It’s in package org.apache.spark.rdd.ShuffledRDD
def repartition(numPartitions: Int)(implicit ord: Ordering[(K, C)] = null): RDD[(K, C)]
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

From :
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/
Repartition will reshuffle the data in your RDD to produce the final number of partitions you request. it may reduce or increase the number of partitions and shuffles data all over the network.

Example :

val rdd1 = sc.parallelize(1 to 100, 3)
rdd1.getNumPartitions
val rdd2 = rdd1.repartition(6)
rdd2.getNumPartitions
Output :
Int = 3
Int = 6

Explain fullOuterJoin() operation in Apache Spark.
It is transformation.
> It’s in package org.apache.spark.rdd.PairRDDFunctions

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

Perform a full outer join of this and other.
For each element (k, v) in this, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in other,
or the pair (k, (Some(v), None)) if no elements in other have key k.
Similarly, for each element (k, w) in other, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for v in this,
or the pair (k, (None, Some(w))) if no elements in this have key k.
Hash-partitions the resulting RDD using the existing partitioner/ parallelism level.

Example :

val frdd1 = sc.parallelize(Seq((“Spark”,35),(“Hive”,23),(“Spark”,45),(“HBase”,89)))
val frdd2 = sc.parallelize(Seq((“Spark”,74),(“Flume”,12),(“Hive”,14),(“Kafka”,25)))
val fullouterjoinrdd = frdd1.fullOuterJoin(frdd2)
fullouterjoinrdd.collect
Output :
Array[(String, (Option[Int], Option[Int]))] = Array((Spark,(Some(35),Some(74))), (Spark,(Some(45),Some(74))), (Kafka,(None,Some(25))), (Flume,(None,Some(12))), (Hive,(Some(23),Some(14))), (HBase,(Some(89),None)))

Expain leftOuterJoin() and rightOuterJoin() operation.
Both leftOuterJoin() and rightOuterJoin() are transformation.
> Both in package org.apache.spark.rdd.PairRDDFunctions

leftOuterJoin() :

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

Perform a left outer join of this and other. For each element (k, v) in this, the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in other, or the pair (k, (v, None)) if no elements in other have key k. Hash-partitions the output using the existing partitioner/parallelism level.

leftOuterJoin() performs a join between two RDDs where the keys must be present in first RDD

Example :

val rdd1 = sc.parallelize(Seq((“m”,55),(“m”,56),(“e”,57),(“e”,58),(“s”,59),(“s”,54)))
val rdd2 = sc.parallelize(Seq((“m”,60),(“m”,65),(“s”,61),(“s”,62),(“h”,63),(“h”,64)))
val leftjoinrdd = rdd1.leftOuterJoin(rdd2)
leftjoinrdd.collect
Output :
Array[(String, (Int, Option[Int]))] = Array((s,(59,Some(61))), (s,(59,Some(62))), (s,(54,Some(61))), (s,(54,Some(62))), (e,(57,None)), (e,(58,None)), (m,(55,Some(60))), (m,(55,Some(65))), (m,(56,Some(60))), (m,(56,Some(65))))

rightOuterJoin():
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

Perform a right outer join of this and other. For each element (k, w) in other, the resulting RDD will either contain all pairs (k, (Some(v), w)) for v in this, or the pair (k, (None, w)) if no elements in this have key k. Hash-partitions the resulting RDD using the existing partitioner/parallelism level.

It performs the join between two RDDs where the key must be present in other RDD

Example:

val rdd1 = sc.parallelize(Seq((“m”,55),(“m”,56),(“e”,57),(“e”,58),(“s”,59),(“s”,54)))
val rdd2 = sc.parallelize(Seq((“m”,60),(“m”,65),(“s”,61),(“s”,62),(“h”,63),(“h”,64)))
val rightjoinrdd = rdd1.rightOuterJoin(rdd2)
rightjoinrdd.collect

Array[(String, (Option[Int], Int))] = Array((s,(Some(59),61)), (s,(Some(59),62)), (s,(Some(54),61)), (s,(Some(54),62)), (h,(None,63)), (h,(None,64)), (m,(Some(55),60)), (m,(Some(55),65)), (m,(Some(56),60)), (m,(Some(56),65)))

Explain Spark join() operation
join() is transformation.
> It’s in package org.apache.spark.rdd.pairRDDFunction

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]Permalink

Return an RDD containing all pairs of elements with matching keys in this and other.
Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.

From :
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#213_Join

It is joining two datasets. When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

Example1:

val rdd1 = sc.parallelize(Seq((“m”,55),(“m”,56),(“e”,57),(“e”,58),(“s”,59),(“s”,54)))
val rdd2 = sc.parallelize(Seq((“m”,60),(“m”,65),(“s”,61),(“s”,62),(“h”,63),(“h”,64)))
val joinrdd = rdd1.join(rdd2)
joinrdd.collect
Output:

Array[(String, (Int, Int))] = Array((m,(55,60)), (m,(55,65)), (m,(56,60)), (m,(56,65)), (s,(59,61)), (s,(59,62)), (s,(54,61)), (s,(54,62)))

Example2:

val myrdd1 = sc.parallelize(Seq((1,2),(3,4),(3,6)))
val myrdd2 = sc.parallelize(Seq((3,9)))
val myjoinedrdd = myrdd1.join(myrdd2)
myjoinedrdd.collect
Output:
Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))

Explain the top() and takeOrdered() operation.
<li style=”list-style-type: none”>
Both top() and takeOrdered() are actions.
Both returns then elements of RDD based on default ordering or based on custom ordering provided by user.
def top(num: Int)(implicit ord: Ordering[T]): Array[T]

Returns the top k (largest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering. This does the opposite of takeOrdered.

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

Returns the first k (smallest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering. This does the opposite of top.

Example :

val myrdd1 = sc.parallelize(List(5,7,9,13,51,89))
myrdd1.top(3)
myrdd1.takeOrdered(3)
myrdd1.top(3)
Output :

Array[Int] = Array(89, 51, 13)
Array[Int] = Array(5, 7, 9)
Array[Int] = Array(89, 51, 13)
Se

Explain first() operation
It is an action.
> It returns the first element of the RDD.

Example :

val rdd1 = sc.textFile(“/home/hdadmin/wc-data.txt”)
rdd1.count
rdd1.first
Output :
Long: 20
String : DataFlair is the leading technology training provider
Explain sum(), max(), min() operation in Apache Spark
sum() :

> It adds up the value in an RDD.
> It is an package org.apache.spark.rdd.DoubleRDDFunctions.
> Its return type is Double

Example:

val rdd1 = sc.parallelize(1 to 20)
rdd1.sum

Output:
Double = 210.0
max() :

> It returns a max value from RDD element defined by implicit ordering (element order)
> It is an package org.apache.spark.rdd

Example:

val rdd1 = sc.parallelize(List(1,5,9,0,23,56,99,87))
rdd1.max

Output:
Int = 99

min() :

> It returns a min value from RDD element defined by implicit ordering (element order)
> It is an package org.apache.spark.rdd

Example:

val rdd1 = sc.parallelize(List(1,5,9,0,23,56,99,87))
rdd1.min

Output:
Int = 0

Explain countByValue() operation in Apache Spark RDD.
li style=”list-style-type: none”>
It is an action
It returns the count of each unique value in an RDD as a local Map (as a Map to driver program) (value, countofvalues) pair
Care must be taken to use this API since it returns the value to driver program so it’s suitable only for small values.
Example:

val rdd1 = sc.parallelize(Seq((“HR”,5),(“RD”,4),(“ADMIN”,5),(“SALES”,4),(“SER”,6),(“MAN”,8)))
rdd1.countByValue
Output:
scala.collection.Map[(String, Int),Long] = Map((HR,5) -> 1, (RD,4) -> 1, (SALES,4) -> 1, (ADMIN,5) -> 1, (MAN,8) -> 1, (SER,6) -> 1)

val rdd2 = sc.parallelize{Seq(10,4,3,3)}
rdd2.countByValue
Output:
scala.collection.Map[Int,Long] = Map(4 -> 1, 3 -> 2, 10 -> 1)
Explain the lookup() operation in Spark
It is an action
> It returns the list of values in the RDD for key ‘key’
val rdd1 = sc.parallelize(Seq((“Spark”,78),(“Hive”,95),(“spark”,15),(“HBase”,25),(“spark”,39),(“BigData”,78),(“spark”,49)))
rdd1.lookup(“spark”)
rdd1.lookup(“Hive”)
rdd1.lookup(“BigData”)
Output:
Seq[Int] = WrappedArray(15, 39, 49)
Seq[Int] = WrappedArray(95)
Seq[Int] = WrappedArray(78)
Explain Spark countByKey() operation
It is an action operation
> Returns (key, noofkeycount) pairs.

From :
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#38_CountByKey

It counts the value of RDD consisting of two components tuple for each distinct key. It actually counts the number of elements for each key and return the result to the master as lists of (key, count) pairs.

val rdd1 = sc.parallelize(Seq((“Spark”,78),(“Hive”,95),(“spark”,15),(“HBase”,25),(“spark”,39),(“BigData”,78),(“spark”,49)))
rdd1.countByKey

Output:
scala.collection.Map[String,Long] = Map(Hive -> 1, BigData -> 1, HBase -> 1, spark -> 3, Spark -> 1)

Explain Spark saveAsTextFile() operation
It writes the content of RDD to text file or saves the RDD as a text file in file path directory using string representation.
Explain reduceByKey() Spark operation
reduceByKey() is transformation which operate on pairRDD (which contains Key/Value).
> PairRDD contains tuple, hence we need to pass the function that operator on tuple instead of each element.
> It merges the values with the same key using associative reduce function.
> It is wide operation because data shuffles may happen across multiple partitions.
> It merges data locally before sending data across partitions for optimize data shuffling.
> It takes function as an input which has two parameter of the same type (values associated with same key) and one element output of the input type(value)
> We can say that it has three overloaded functions :

reduceBykey(function)
reduceByKey(function, numberofpartition)
reduceBykey(partitioner, function)

From :
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#210_ReduceByKey
It uses associative reduce function, where it merges value of each key. It can be used with Rdd only in key value pair. It’s wide operation which shuffles data from multiple partitions/divisions and creates another RDD. It merges data locally using associative function for optimized data shuffling. Result of the combination (e.g. a sum) is of the same type that the values, and that the operation when combined from different partitions is also the same as the operation when combining values inside a partition.

Example :

val rdd1 = sc.parallelize(Seq(5,10),(5,15),(4,8),(4,12),(5,20),(10,50)))
val rdd2 = rdd1.reduceByKey((x,y)=>x+y)

OR

rdd2.collect()
Output:
Array[(Int, Int)] = Array((4,20),(10,50),(5,45))
Explain the operation reduce() in Spark
reduce() is an action. It is wide operation (i.e. shuffle data across multiple partitions and output a single value)
> It takes function as an input which has two parameter of the same type and output a single value of the input type.
> i.e. combine the elements of RDD together.

Example 1 :
val rdd1 = sc.parallelize(1 to 100)
val rdd2 = rdd1.reduce((x,y) => x+y)

OR

val rdd2 = rdd1.reduce(_ + _)
Output :
rdd2: Int = 5050

Example 2:
val rdd1 = sc.parallelize(1 to 5)
val rdd2 = rdd1.reduce(_*_)
Output :
rdd2: Int = 120
Explain the action count() in Spark RDD
collect() returns all the elements from an RDD.
> collect() returns all the elements from an RDD so card must be taken while using it that all of your data must be fit in single machine.
> Hence it is recommended to use collect() for development purpose or in Unit testing.

Ex.
val rdd1 = sc.parallelize(List(10,20,30,40))
rdd1.collect()
Output :
Array[Int] = Array(10,20,30,40)
Explain Spark map() transformation

Moderator
> map() transformation takes a function as input and apply that function to each element in the RDD.
> Output of the function will be a new element (value) for each input element.
Ex.
val rdd1 = sc.parallelize(List(10,20,30,40))
val rdd2 = rdd1.map(x=>x*x)
println(rdd2.collect().mkString(“,”))
> map()’s return type need not be the same as its input type.
> i.e. input type may be String but Output type may be of type int

Ex.
val m1 = sc.parallelize(List(1,2,3,4))
val m22 = m1.map(z=>z*0.5)
println(m2.collect().mkString(“,”))

Output :
0.5,1.0, 1.5, 2.0
Explain the flatMap() transformation in Apache Spark
When one want to produce multiple elements (values) for each input element, flatMap() is used.
As with map(), flatMap() also takes function as an input.
Output of the function is a List of the element through which we can iterate. (i.e. function can return 0 or more element for each input element)
Simple use of flatMap() is splittin up an input line (string) into words.
Example

val fm1 = sc.parallelize(List(“Good Morning”, “Data Flair”, “Spark Batch”))
val fm2 = fm1.flatMap(y => y.split(” “))
fm2.foreach{println}
Output is as follows:

Good
Morning
Data
Flair
Spark
Batch
———————–

Actions

 

A1

A2

A3

 

A4

A5

 Explain about transformations and actions in the context of RDDs.

  • Transformations are functions executed on demand, to produce a new RDD.
    • All transformations are followed by actions.
    • Map, flatMap, groupByKey, reduceByKey, filter, co-group, join, sortByKey, Union, distinct, sample,intersection are common spark transformations.
    • map() – applies the function passed to it on each element of RDD resulting in a new RDD.
    • filter() – creates a new RDD by picking the elements from the current RDD which pass the function argument.
  • Actions are the results of RDD computations or transformations.
    • After an action is performed, the data from RDD moves back to the local machine.
    • Some examples of actions include reduce, collect, first, and take.
  •  Transformation follows lazy operation and temporaryhold the data until unless called the Action.
  • Each transformation generates/return new RDD.
  • reduce() is an action that implements the function passed again and again until one value if left.
  • take() action takes all the values from RDD to local node.

What is Action in Spark?

  • Actions are RDD’s operation, that value returns back to the spar driver programs, which kick off a job to execute on a cluster.
  • Transformation’s output is an input of Actions.
  • reduce, collect, takeSample, take, first, saveAsTextfile, saveAsSequenceFile, countByKey, foreach are common actions in Apache spark.
  • An action brings back the data from the RDD to the local machine.
  • Execution of an action results in all the previously created transformation. The example of actions are:
  • reduce() – executes the function passed again and again until only one value is left. The function should take two argument and return one value.
  • take() – take all the values back to the local node form RDD.

What are activities and changes?

In a given spark program, how will you identify whether a given operation is Transformation or Action ?

One can identify the operation based on the return type –

  •  The operation is an action, if the return type is other than RDD.
  •  The operation is transformation, if the return type is same as the RDD.

What is the job of blend () and repartition () in Map Reduce?

  •  Both mix and repartition are utilized to change the quantity of segments in a RDD however Coalesce keeps away from full mix.
  • On the off chance that you go from 1000 parcels to 100 segments, there won’t be a mix, rather every one of the 100 new segments will guarantee 10 of the present allotments and this does not require a mix.
  • Repartition plays out a blend with mix. Repartition will result in the predefined number of parcels with the information dispersed utilizing a hash professional.

  What is the job of store() and continue()?

  •   At whatever point you need to store a RDD into memory with the end goal that the RDD will be utilized on different occasions or that RDD may have made after loads of complex preparing in those circumstances, you can exploit Cache or Persist.
  • You can make a RDD to be continued utilizing the persevere() or store() works on it.
  • The first occasion when it is processed in an activity, it will be kept in memory on the hubs.
  • When you call persevere(), you can indicate that you need to store the RDD on the plate or in the memory or both. On the off chance that it is in-memory, regardless of whether it ought to be put away in serialized organization or de-serialized position, you can characterize every one of those things.
  • reserve() resembles endure() work just, where the capacity level is set to memory as it were.

 What is the distinction among continue() and store()

  • endure () enables the client to determine the capacity level while
  • reserve () utilizes the default stockpiling level.

What is reduce() action?

  • It takes a function that operates on two elements of the type in your RDD and returns a new element of the same type.
  • A simple example of such a function is +, which we can use to sum our RDD. With reduce(), we can easily sum the elements of our RDD, count the number of elements, and perform other types of aggregations.
  • It takes a capacity that works on two components of the sort in your RDD and returns another component of a similar kind. A straightforward case of such a capacity is +, which we can use to whole our RDD. With lessen(), we can undoubtedly entirety the components of our RDD, tally the quantity of components, and perform different sorts of conglomerations.