RDD
RDD
.What is RDD?
- RDDs (Resilient Distributed Datasets) are basic abstraction in Apache Sparkthat represent the data coming into the system in object format.
- RDDs are used for in-memory computations on large clusters, in a fault tolerant manner.
- RDDs are read-only portioned, collection of records,
- that are –
- Immutable – RDDs cannot be altered.
- Resilient – If a node holding the partition fails the other node takes the data.
-
If you have large amount of data, and is not necessarily stored in a single system, all the data can be distributed across all the nodes and one subset of data is called as a partition which will be processed by a particular task.
-
RDD’s are very close to input splits in MapReduce.
- RDD is a collection of partitioned data that satisfies these properties.
- Immutable,
- distributed,
- lazily evaluated,
- catchable
- In the event that you have enormous measure of information, and isn’t really put away in a solitary framework, every one of the information can be dispersed over every one of the hubs and one subset of information is called as a parcelwhich will be prepared by a specific assignment.
- RDD’s are exceptionally near information parts in MapReduce.
. It is a representation of data located on a network which is
- Immutable– You can operate on the rdd to produce another rdd but you can’t alter it.
- Partitioned / Parallel – The data located on RDD is operated in parallel. Any operation on RDD is done using multiple nodes.
- Resilience– If one of the node hosting the partition fails, another nodes takes its data.
- RDD provides two kinds of operations:
- Transformations and Actions.
- RDD provides the abstraction for distributed computing across nodes in Spark Cluster.
- You can always think of RDD as a big array which is under the hood spread over many computers which are completely abstracted.
- RDD is made up many partitions each partition on different computers.
- RDD can hold data of any type from any supported programming language such as Python, Java, Scala.
- The case where RDD’s each element is a tuple – made up of (key, value) is called Pair RDD.
- PairRDDs provides extra functionalities such as “group by” and joins.
- RDD is generally lazily computed i.e. it is not computed unless an action on it is called.
- RDD is either prepared out of another RDD or it is loaded from a data source.
- In case, it is loaded from another data source it has a binding between the actual data storage and partitions.
- So, RDD is essentially a pointer to actual data, not data unless it is cached.
- If a machine that holds a partition of RDD dies, the same partition is regenerated using the lineage graph of RDD.
- If there is a certain RDD that you require very frequently, you can cacheit so that it is readily available instead of re-computation every time.
- Please note that the cached RDD will be available only during the lifetime of the application.
- If it is costly to recreate the RDD every time, you may want to persist it to the disc.
- RDD can be stored at various data storage (such as HDFS, database etc.) in many formats.
There are primarily two types of RDD:
- Parallelized Collections : The existing RDD’s running parallel with one another.
- Hadoop datasets : perform function on each file record in HDFS or other storage system
Generally, RDDs support two types of operations – actions and transformations.
- There are two ways to create RDD. One while loading data from a source.
- Second, by operating on existing RDD.
- And an action causes the computation from an RDD to yield the result.
- The diagram below shows the relationship between RDD, transformations, actions and value/result.

- While loading Data from Source – When an RDD is prepared by loading data from some source (HDFS, Cassandra, in-memory), the machines which exist nearer to the data are assigned for the creation of partitions. These partitions would hold the parts of mappings or pointers to the actual data. When we are loading data from the memory (for example, by using parallelize), the partitions would hold the actual data instead of pointers or mapping
- By converting an in-memory array of objects – An in-memory object can be converted to an RDD using parallelize.
- By operating on existing RDD –
- An RDD is immutable.
- We can’t change an existing RDD.
- We can only form a new RDD based on the previous RDD by operating on it.
- When operating on existing RDD, a new RDD is formed. These operations are also called transformations.
- The operation could also result in shuffling – moving data across the nodes.
- Some operations that do not cause shuffling: map, flatMap and filter.
- Examples of the operations that could result in shuffling are groupByKey, repartition, sortByKey, aggregateByKey, reduceByKey, distinct.
- Spark maintains the relationship between the RDD in the form of a DAG (Directed Acyclic Graph).
- When an action such reduce() or saveAsTextFile() is called, the whole graph is evaluated and the result is returned to the driver or saved to the location such as HDFS.
As the major logical data units in Apache Spark, RDD possesses a distributed collection of data. It is a read-only data structure and you cannot change the original format but it can always be transformed into a different form with the changes. The two operations which are supported by RDD are –
- Transformation – It creates a new RDD from the former one. They are executed only on demand.
- Actions – The final outcomes of the RDD computations are returned by actions.
Why RDD is immutable ?
Following are the reasons:
– Immutable data is always safe to share across multiple processes as well as multiple threads.
– Since RDD is immutable we can recreate the RDD any time. (From lineage graph).
– If the computation is time-consuming, in that we can cache the RDD which result in performance improvement
Two most basic type of transformations is a map(), filter().
Resultant RDD is always dissimilar from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).
Now, let’s focus on the question, there are fundamentally two types of transformations:
there are fundamentally two types of transformations:
1. Narrow transformation –
While talking about Narrow transformation, all the elements which are required to compute the records in single partition reside in the single partition of parent RDD. To calculate the result, a limited subset of partition is used. This Transformation are the result of map(), filter().
2. Wide Transformations –
Wide transformation means all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. Partitions may reside in many different partitions of parent RDD. This Transformation is a result of groupbyKey() and reducebyKey().
Explain the RDD properties.
RDD (Resilient Distributed Dataset) is a basic abstraction in Apache Spark.
RDD is an immutable, partitioned collection of elements on the cluster which can be operated in parallel.
Each RDD is characterized by five main properties :
Below operations are lineage operations.1. List or Set of partitions.
2. List of dependencies on other (parent) RDD
3. A function to compute each partition
Below operations are used for optimization during execution.
4. Optional preferred location [i.e. block location of an HDFS file] [it’s about data locality]
5. Optional partitioned info [i.e. Hash-Partition for Key/Value pair –> When data shuffled how data will be traveled]
Examples :
#HadoopRDD :
HadoopRDD provides core functionality for reading data stored in Hadoop (HDFS, HBase, Amazon S3..) using the older MapReduce API (org.apache.hadoop.mapred)
Properties of HadoopRDD :
1. List or Set of partitions: One per HDFS block
2. List of dependencies on parent RDD: None
3. A function to compute each partition: read respective HDFS block
4. Optional Preferred location: HDFS block location
5. Optional partitioned info: None
#FilteredRDD :
<li style=”list-style-type: none”>
Properties of FilteredRDD:
1. List or Set of partitions: No. of partitions same as parent RDD
2. List of dependencies on parent RDD: ‘one-to-one’ as parent (same as parent)
3. A function to compute each partition: compute parent and then filter it
4. Optional Preferred location: None (Ask Parent)
5. Optional partitioned info: None
On the basis of several features, the difference between RDD and DSM is:
i. Read
RDD – The read operation in RDD is either coarse-grained or fine-grained. Coarse-grained meaning we can transform the whole dataset but not an individual element on the dataset. While fine-grained means we can transform individual element on the dataset.
DSM – The read operation in Distributed shared memory is fine-grained.
ii. Write
RDD – The write operation in RDD is coarse-grained.
DSM – The Write operation is fine grained in distributed shared system.
iii. Consistency
RDD – The consistency of RDD is trivial meaning it is immutable in nature. We can not realtor the content of RDD i.e. any changes on RDD is permanent. Hence, The level of consistency is very high.
DSM – The system guarantees that if the programmer follows the rules, the memory will be consistent. Also, the results of memory operations will be predictable.
iv. Fault-Recovery Mechanism
RDD – By using lineage graph at any moment, the lost data can be easily recovered in Spark RDD. Therefore, for each transformation, new RDD is formed. As RDDs are immutable in nature, hence, it is easy to recover.
DSM – Fault tolerance is achieved by a checkpointing technique which allows applications to roll back to a recent checkpoint rather than restarting.
v. Straggler Mitigation
Stragglers, in general, are those that take more time to complete than their peers. This could happen due to many reasons such as load imbalance, I/O blocks, garbage collections, etc.
An issue with the stragglers is that when the parallel computation is followed by synchronizations such as reductions that causes all the parallel tasks to wait for others.
RDD – It is possible to mitigate stragglers by using backup task, in RDDs.
DSM – To achieve straggler mitigation, is quite difficult.
vi. Behavior if not enough RAM
RDD – As there is not enough space to store RDD in RAM, therefore, the RDDs are shifted to disk.
DSM – If the RAM runs out of storage, the performance decreases, in this type of systems.
. How to create an RDD?
You can create an RDD from an in-memory data or from a data source such as HDFS.
You can load the data from memory using parallelize method of Spark Context in the following manner, in python:
1
|
myrdd = sc.parallelize([1,2,3,4,5]);
|
Here myrdd is the variable that represents an RDD created out of an in-memory object. “sc” is the sparkContext which is readily available if you are running in interactive mode using PySpark. Otherwise, you will have to import the SparkContext and initialize.
And to create RDD from a file in HDFS, use the following:
1
|
linesrdd = sc.textFile(“/data/file_hdfs.txt”);
|
This would create linesrdd by loading a file from HDFS. Please note that this will work only if your Spark is running on top of Yarn. In case, you want to load the data from external HDFS cluster, you might have to specify the protocol and name node:
1
|
linesrdd = sc.textFile(“hdfs://namenode_host/data/file_hdfs.txt”);
|
In the similar fashion, you can load data from third-party systems.
Spark provides two methods to create RDD:
- By parallelizing a collection in your Driver program. This makes use of SparkContext’s ‘parallelize’ methodval
IntellipaatData = Array(2,4,6,8,10) val distIntellipaatData = sc.parallelize(IntellipaatData)
- By loading an external dataset from external storage like HDFS, shared file system.
RDDs are of two types:
- Hadoop Datasets – Perform functions on each file record in HDFS (Hadoop Distributed File System) or other types of storage systems
- Parallelized Collections – Extant RDDs running parallel with one another
There are two ways of creating an RDD in Apache Spark:
- By parallelizing a collection in the Driver program. It makes use of SparkContext’s parallelize() method. For instance:
method val DataArray = Array(22,24,46,81,101) val DataRDD = sc.parallelize(DataArray)
- By means of loading an external dataset from some external storage, including HBase, HDFS, and shared file system
.What do you understand by Pair RDD?
- Special operations can be performed on RDDs in Spark using key/value pairs and such RDDs are referred to as Pair RDDs.
- Pair RDDs allow users to access each key in parallel.
- They have a reduceByKey () method that collects data based on each key and a join () method that combines different RDDs together, based on the elements having the same key.
- Spark provides special operations on RDDs containing key/value pairs.
- These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel.
- For example, pair RDDs have a reduceByKey() method that can aggregate data separately for each key, and a join() method that can merge two RDDs together by grouping elements with the same key.
.What do you know about RDD Persistence?
- Persistence means Caching.
- When an RDD is persisted, each node in the cluster stores the partitions (of the RDD) in memory (RAM).
- When there are multiple transformations or actions on an RDD, persistence helps to cut down the latency by the time required to load the data from file storage to memory.
. What do you mean by the default storage level: MEMORY_ONLY?
- Default Storage Level – MEMORY_ONLY mean store RDD as deserialized Java objects in the JVM.
- If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed.
.How RDD persist the data?
What are the various levels of persistence in Apache Spark?
There are two methods to persist the data, such as
persist()to persist permanently and
cache()to persist temporarily in the memory.
Different storage level options there such as
- MEMORY_ONLY,
- MEMORY_AND_DISK,
- DISK_ONLY and many more.
Both persist() and cache() uses different options depends on the task.
Apache Spark automatically persists the intermediary data from various shuffle operations, however it is often suggested that users call persist () method on the RDD in case they plan to reuse it.Spark has various persistence levels to store the RDDs on disk or in memory or as a combination of both with different replication levels.
The various storage/persistence levels in Spark are –
- MEMORY_ONLY
- MEMORY_ONLY_SER
- MEMORY_AND_DISK
- MEMORY_AND_DISK_SER, DISK_ONLY
- OFF_HEAP
Although the intermediary data from different shuffle operations automatically persists in Spark, it is recommended to use the persist () method on the RDD if the data is to be reused.
Apache Spark features several persistence levels for storing the RDDs on disk, memory, or a combination of the two with distinct replication levels. These various persistence levels are:
- DISK_ONLY – Stores the RDD partitions only on the disk.
- MEMORY_AND_DISK – Stores RDD as deserialized Java objects in the JVM. In case the RDD isn’t able to fit in the memory, additional partitions are stored on the disk. These are read from here each time the requirement arises.
- MEMORY_ONLY_SER – Stores RDD as serialized Java objects with one byte array per partition.
- MEMORY_AND_DISK_SER – Identical to MEMORY_ONLY_SER with the exception of storing partitions not able to fit in the memory to the disk in place of recomputing them on the fly when required.
- MEMORY_ONLY – The default level, it stores the RDD as deserialized Java objects in the JVM. In case the RDD isn’t able to fit in the memory available, some partitions won’t be cached, resulting into recomputing the same on the fly every time they are required.
- OFF_HEAP – Works like MEMORY_ONLY_SER but stores the data in off-heap memory.
what are all the memory tuning parameters and how to achieve parallelism in spark?
.What is the difference between cache() and persist() for an RDD?
- cache() uses default storage level, i.e., MEMORY_ONLY.
- persist() can be provided with any of the possible storage levels.
. What is the role of cache() and persist()?
- Whenever you want to store a RDD into memory such that the RDD will be used multiple times or that RDD might have created after lots of complex processing in those situations, you can take the advantage of Cache or Persist.
- You can make an RDD to be persisted using the persist() or cache() functions on it. The first time it is computed in an action, it will be kept in memory on the nodes.
- When you call persist(), you can specify that you want to store the RDD on the disk or in the memory or both. If it is in-memory, whether it should be stored in serialized format or de-serialized format, you can define all those things.
- cache() is like persist() function only, where the storage level is set to memory only.
With cache(), you utilize just the default stockpiling level MEMORY_ONLY. With persist(), you can indicate which stockpiling level you want.So ache() is equivalent to calling hold on() with the default stockpiling level.Spark has numerous dimensions of determination to browse dependent on what our objectives are.The default continue() will store the information in the JVM pile as unserialized objects. When we work information out to circle, that information is additionally dependably serialized.Different dimensions of tirelessness are MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER, DISK_ONLY.
. If there is certain data that we want to use again and again in different ransformations, what should improve the performance?
- RDD can be persisted or cached.
- There are various ways in which it can be persisted: in-memory, on disc etc.
- So, if there is a dataset that needs a good amount computing to arrive at, you should consider caching it.
- You can cache it to disc if preparing it again is far costlier than just reading from disc or it is very huge in size and would not fit in the RAM.
- You can cache it to memory if it can fit into the memory.
How RDD hang on the data?
What is lineage graph?
- The RDDs in Spark, depend on one or more other RDDs.
- The representation of dependencies in between RDDs is known as the lineage graph.
- Lineage graph information is used to compute each RDD on demand, so that whenever a part of persistent RDD is lost, the data that is lost can be recovered using the lineage graph information.
- Spark does not support data replication in the memory and thus, if any data is lost, it is rebuild using RDD lineage.
- RDD lineage is a process that reconstructs lost data partitions.
- The best is that RDD always remembers how to build from other datasets.
- Lineage is an RDD process to reconstruct lost partitions.
- Spark not replicate the data in memory, if data lost, Rdd use linege to rebuild lost data.
- Each RDD remembers how the RDD build from other datasets.
- As you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph.
- It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost.
What is the difference between DAG and Lineage? Please explain with example
To understand the difference better, we will discuss each topic one by one:
Lineage graph
As we know, that whenever a series of transformations are performed on an RDD, they are not evaluated immediately, but lazily(Lazy Evaluation). When a new RDD has been created from an existing RDD, that new RDD contains a pointer to the parent RDD. Similarly, all the dependencies between the RDDs will be logged in a graph, rather than the actual data. This graph is called the lineage graph.
Now coming to DAG,
Directed Acyclic Graph(DAG)
DAG in Apache Spark is a combination of Vertices as well as Edges. In DAG vertices represent the RDDs and the edges represent the Operation to be applied on RDD. Every edge in DAG is directed from earlier to later in a sequence.When we call anAction, the created DAG is submitted to DAG Scheduler which further splits the graph into the stages of the task.
. What happens to RDD when one of the nodes on which it is distributed goes down?
Since Spark knows how to prepare a certain data set because it is aware of various transformations and actions that have lead to the dataset, it will be able to apply the same transformations and actions to prepare the lost partition of the node which has gone down.
. When we create an RDD, does it bring the data and load it into the memory?
No.
- An RDD is made up of partitions which are located on multiple machines.
- The partition is only kept in memory if the data is being loaded from memory or the RDD has been cached/persisted into the memory.
- Otherwise, an RDD is just mapping of actual data and partitions.
.How to save RDD?
There are few methods provided by Spark:
- saveAsTextFile:
- Write the elements of the RDD as a text file (or set of text files) to the provided directory.
- The directory could be in the local filesystem, HDFS or any other file system.
- Each element of the dataset will be converted to text using toString() method on every element.
- And each element will be appended with newline character “\n”
- saveAsSequenceFile:
- Write the elements of the dataset as a Hadoop SequenceFile.
- This works only on the key-value pair RDD which implement Hadoop’s Writeable interface.
- You can load sequence file using sc.sequenceFile().
- saveAsObjectFile:
- This simply saves data by serializing using standard java object serialization.
.What do you understand by SchemaRDD?
- An RDD that consists of row objects (wrappers around basic string or integer arrays) with schema information about the type of data in each column.
- Row objects are just wrappers around arrays of basic types (e.g., integers and strings).
.How would you determine the quantity of parcels while making a RDD? What are the capacities?
- You can determine the quantity of allotments while making a RDD either by utilizing the sc.textFile or by utilizing parallelize works as pursues:
- Val rdd = sc.parallelize(data,4)
- val information = sc.textFile(“path”,4)
What is the contrast between RDD , DataFrame and DataSets?
RDD:
- It is the structure square of Spark. All Dataframes or Dataset is inside RDDs.
- It is lethargically assessed permanent gathering objects
- RDDS can be effectively reserved if a similar arrangement of information should be recomputed.
DataFrame :
- Gives the construction see ( lines and segments ). It tends to be thought as a table in a database.
- Like RDD even dataframe is sluggishly assessed.
- It offers colossal execution due to a.) Custom Memory Management – Data is put away in off load memory in twofold arrangement .No refuse accumulation because of this.
- Optimized Execution Plan – Query plans are made utilizing Catalyst analyzer.
- DataFrame Limitations : Compile Time wellbeing , i.e no control of information is conceivable when the structure isn’t known.
DataSet: Expansion of DataFrame
- DataSet Feautures – Provides best encoding component and not at all like information edges supports arrange time security.
Dataframe is untyped (throw an exception at runtime in case of any error in the schema mismatch)
Dataset is typed(throw an exception at compile time in case of any error in the schema mismatch)
how to join two dataframes in spark?
How to split single HDFS block into partitions RDD?
When we create the RDD from a file stored in HDFS.
data = context.textFile(“/user/dataflair/file-name”)
by default one partition is created for one block. ie. if we have a file of size 1280 MB (with 128 MB block size) there will be 10 HDFS blocks, hence the similar number of partitions (10) will be created.
If you want to create more partitions than the number of blocks, you can specify the same while RDD creation:
data = context.textFile(“/user/dataflair/file-name”, 20)
It will create 20 partitions for the file. ie for each block 2 partitions will be created.
NOTE: It is often recommended to have more no of partitions than no of the block, it improves the performance.
On what all basis can you differentiate RDD, DataFrame, and DataSet?
DataFrame: A Data Frame is used for storing data into tables. It is equivalent to a table in a relational database but with richer optimization. It is a data abstraction and domain-specific language (DSL) applicable on structure and semi-structured data. It is distributed collection of data in the form of named column and row. It has a matrix-like structure whose column may be different types (numeric, logical, factor, or character ).we can say data frame has two-dimensional array like structure where each column contains the value of one variable and row contains one set of values for each column. It combines feature of list and matrices.
For more details about DataFrame, please refer: DataFrame in Spark
RDD is the representation of set of records, immutable collection of objects with distributed computing. RDD is large collection of data or RDD is an array of reference of partitioned objects. Each and every datasets in RDD is logically partitioned across many servers so that they can be computed on different nodes of the cluster. RDDs are fault tolerant i.e. self-recovered/recomputed in the case of failure. The dataset could be data loaded externally by the users which can be in the form of JSON file, CSV file, text file or database via JDBC with no specific data structure.
For more details about RDD, please refer: RDD in Spark