SPARK STAGE
Spark Executor
SPARK DAG
SPARK CLUSTER MANAGER
——————————————————————————————————————————————————————————
SPARK STAGE
- A stage is nothing but a step in a physical execution plan.
- It is basically a physical unit of the execution plan.
- It covers the types of Stages in Spark which are of two types: ShuffleMapstage in Spark and ResultStage in spark.
- It is a set of parallel tasks i.e. one task per partition.
- In other words, each job which gets divided into smaller sets of tasks is a stage.
- Although, it totally depends on each other. However, we can say it is as same as the map and reduce stages in MapReduce.
- We can associate the spark stage with many other dependent parent stages.
- However, it can only work on the partitions of a single RDD.
- Also, with the boundary of a stage in spark marked by shuffle dependencies.
- Ultimately, submission of Spark stage triggers the execution of a series of dependent parent stages.
- Although, there is a first Job Id present at every stage that is the id of the job which submits stage in Spark.
Types of Spark Stages
Stages in Apache spark have two categories
1. ShuffleMapStage in Spark
2. ResultStage in Spark
1. ShuffleMapStage in Spark
ShuffleMapStage is considered as an intermediate Spark stage in the physical execution of DAG.
- It produces data for another stage(s).
- In a job in Adaptive Query Planning / Adaptive Scheduling, we can consider it as the final stage in Apache Spark and it is possible to submit it independently as a Spark job for Adaptive Query Planning.
- In addition, at the time of execution, a Spark ShuffleMapStage saves map output files. We can fetch those files by reduce tasks.
- When all map outputs are available, the ShuffleMapStage is considered ready.
- Although, output locations can be missing sometimes. Two things we can infer from this scenario.
- Those are partitions might not be calculated or are lost.
- However, we can track how many shuffle map outputs available.
- To track this, stages uses outputLocs &_numAvailableOutputs internal registries.
- We consider ShuffleMapStage in Spark as an input for other following Spark stages in the DAG of stages.
- Basically, that is shuffle dependency’s map side.
- It is possible that there are various multiple pipeline operations in ShuffleMapStage like map and filter, before shuffle operation.
- We can share a single ShuffleMapStage among different jobs.
2. ResultStage in Spark
- By running a function on a spark RDD Stage that executes a Spark action in a user program is a ResultStage.
- It is considered as a final stage in spark.
- ResultStage implies as a final stage in a job that applies a function on one or many partitions of the target RDD in Spark.
- It also helps for computation of the result of an action
Getting StageInfo For Most Recent Attempt
latestInfo method which helps to know the most recent StageInfo.`
latestInfo: StageInfo
Stage Contract
It is a private[scheduler] abstract contract.
abstract class Stage {
def findMissingPartitions(): Seq[Int]
}
Method To Create New Apache Spark Stage
basic method by which we can create a new stage in Spark. The method is:
makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit
- Basically, it creates a new TaskMetrics.
- With the help of RDD’s SparkContext,
- we register the internal accumulators.
- We can also use the same Spark RDD that was defined when we were creating Stage.
- In addition, to set latestInfo to be a StageInfo, from Stage we can use the following:
- nextAttemptId,
- numPartitionsToCompute, &
- taskLocalityPreferences,
- increments nextAttemptId counter.
- important thing to note is that we use this method only when DAGScheduler submits missing tasks for a Spark stage.
Spark Executor
In Apache Spark, some distributed agent is responsible for executing tasks, this agent is what we call Spark Executor.
What is Spark Executor
- Basically, we can sayExecutors in Spark are worker nodes.
- Those help to process in charge of running individual tasks in a given Spark job.Moreover, we launch them at the start of a Spark application.
- Then it typically runs for the entire lifetime of an application.
- As soon as they have run the task, sends results to the driver.
- Executors also provide in-memory storage for Spark RDDs that are cached by user programs through Block Manager.
In addition, for the complete lifespan of a spark application, it runs. - That infers the static allocation of Spark executor. However, we can also prefer for dynamic allocation.
- Moreover, with the help of Heartbeat Sender Thread, it sends metrics and heartbeats.
- One of the advantage we can have as many executors in Spark as data nodes.
- Moreover also possible to have as many cores as you can get from the cluster.
- The other way to describe Apache Spark Executor is either by their id, hostname, environment (as SparkEnv), or classpath.
- The most important point to note is Executor backends exclusively manage Executor in Spar
. Conditions to Create Spark Executor
Some conditions in which we create Executor in Spark is:
- When CoarseGrainedExecutorBackend receives RegisteredExecutor message. Only for Spark Standalone and YARN.
- While Mesos’s MesosExecutorBackend registered on Spark.
- When LocalEndpoint is created for local mode.
. Creating Spark Executor Instance
By using the following, we can create the Spark Executor:
- From Executor ID.
- By using SparkEnv we can access the local MetricsSystem as well as BlockManager. Moreover, we can also access the local serializer by it.
- From Executor’s hostname.
- To add to tasks’ classpath, a collection of user-defined JARs. By default, it is empty.
- By flag whether it runs in local or cluster mode (disabled by default, i.e. cluster is preferred)
Moreover, when creation is successful, the one INFO messages pop up in the logs. That is:
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
. Heartbeater — Heartbeat Sender Thread
Basically, with a single thread, heartbeater is a daemon ScheduledThreadPoolExecutor.
We call this thread pool a driver-heartbeater.
. Launching Task — launchTask Method
By using this method, we execute the input serializedTask task concurrently.
- launchTask(
- context: ExecutorBackend,
- taskId: Long,
- attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer): Unit
- launchTask(
- context: ExecutorBackend,
- taskId: Long,
- attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer): Unit
- Moreover, by using launchTask we use to create a TaskRunner, internally. Then, with the help of taskId, we register it in the runningTasks internal registry.
- Afterwards, we execute it on “Executor task launch worker” thread pool.
. “Executor Task Launch Worker” Thread Pool — ThreadPool Property
- Basically, To launch, by task launch worker id. It uses threadPool daemon cached thread pool.
- Moreover, at the same time of creation of Spark Executor, threadPool is created. Also, shuts it down when it stops.
SPARK DAG
- (Directed Acyclic Graph) DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD.
- In Spark DAG, every edge directs from earlier to later in the sequence.
- On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.
. What is DAG in Apache Spark?
- DAG a finite direct graph with no directed cycles.
- There are finitely many vertices and edges, where each edge directed from one vertex to another.
- It contains a sequence of vertices such that every edge is directed from earlier to later in the sequence.
- It is a strict generalization of MapReduce model.
- DAG operations can do better global optimization than other systems like MapReduce.
- The picture of DAG becomes clear in more complex jobs.
- Apache Spark DAG allows the user to dive into the stage and expand on detail on any stage.
- In the stage view, the details of all RDDs belonging to that stage are expanded.
- The Scheduler splits the Spark RDD into stages based on various transformation applied.
- Each stage is comprised of tasks, based on the partitions of the RDD, which will perform same computation in parallel.
The graph here refers to navigation, and directed and acyclic refers to how it is done.
Explain Directed Acyclic Graph in Spark.
What is the function of Directed Acyclic Graph in Spark?
- In mathematical term, the Directed Acyclic Graph is a graph with cycles which are not directed.
- DAG is a graph which contains set of all the operations that are applied on RDD.
- On RDD when any action is called.
- Spark creates the DAG and submits it to the DAG scheduler.
- Only after the DAG is built, Spark creates the query optimization plan.
- The DAG scheduler divides operators into stages of tasks.
- A stage is comprised of tasks based on partitions of the input data.
- The DAG scheduler pipelines operators together.
Fault tolerance is achieved in Spark using the Directed Acyclic Graph. - The query optimization is possible in Spark by the use of DAG. Thus, we get the better performance by using DAG.
. Need of Directed Acyclic Graph in Spark
The limitations of Hadoop MapReduce became a key point to introduce DAG in Spark. The computation through MapReduce in three steps:
- The data is read from HDFS.
- Then apply Map and Reduce operations.
- The computed result is written back to HDFS.
- Each MapReduce operation is independent of each other and HADOOP has no idea of which Map reduce would come next.
- Sometimes for some iteration, it is irrelevant to read and write back the immediate result between two map-reduce jobs.
- In such case, the memory in stable storage (HDFS) or disk memory gets wasted.
- In multiple-step, till the completion of the previous job all the jobs block from the beginning.
- As a result, complex computation can require a long time with small data volume.
While in Spark, a DAG (Directed Acyclic Graph) of consecutive computation stages is formed.
In this way, we optimize the execution plan, e.g. to minimize shuffling data around. In contrast, it is done manually in MapReduce by tuning each MapReduce step.
. How DAG works in Spark?
- The interpreter is the first layer, using a Scala interpreter, Spark interprets the code with some modifications.
- Spark creates an operator graph when you enter your code in Spark console.
- When we call an Action on Spark RDD at a high level, Spark submits the operator graph to the DAG Scheduler.
- Divide the operators into stages of the task in the DAG Scheduler.
- A stage contains task based on the partition of the input data.
- The DAG scheduler pipelines operators together. For example, map operators schedule in a single stage.
- The stages pass on to the Task Scheduler.
- It launches task through cluster manager.
- The dependencies of stages are unknown to the task scheduler.
- The Workers execute the task on the slave.
The image below briefly describes the steps of How DAG works in the Spark job execution
At higher level, we can apply two type of RDD transformations:
- narrow transformation (e.g. map(), filter() etc.) and
- wide transformation (e.g. reduceByKey()).
- Narrow transformation does not require the shuffling of data across a partition, the narrow transformations will group into single stage while in wide transformation the data shuffles.
- Hence, Wide transformation results in stage boundaries.
- Each RDD maintains a pointer to one or more parent along with metadata about what type of relationship it has with the parent.
- For example, if we call val b=a.map() on an RDD, the RDD b keeps a reference to its parent RDD a, that’s an RDD lineage.
. How DAG functions in Spark?
- At the point when an Action is approached Spark RDD at an abnormal state, Spark presents the heredity chart to the DAG Scheduler.
- Activities are separated into phases of the errand in the DAG Scheduler.
- A phase contains errand dependent on the parcel of the info information.
- The DAG scheduler pipelines administrators together.
- It dispatches task through group chief.
- The conditions of stages are obscure to the errand scheduler.
- The Workers execute the undertaking on the slave.
- Directed Acyclic Graph – DAG is a graph data structure having edges which are directional and do not have any loops or cycles.
- People use DAG almost all the time. Let’s take an example of getting ready for office.

DAG is a way of representing dependencies between objects.
It is widely used in computing. The examples where it is used in computing are:
- Build tools such Apache Ant, Apache Maven, make, sbt
- Tasks Dependencies in project management – Microsoft Project
- The data model of Git
How to attain fault tolerance in Spark?
Is Apache Spark fault tolerant? if yes, how?
The basic semantics of fault tolerance in Apache Spark is, all the Spark RDDs are immutable. It remembers the dependencies between every RDD involved in the operations, through the lineage graph created in the DAG, and in the event of any failure, Spark refers to the lineage graph to apply the same operations to perform the tasks.
There are two types of failures – Worker or driver failure. In case if the worker fails, the executors in that worker node will be killed, along with the data in their memory. Using the lineage graph, those tasks will be accomplished in any other worker nodes. The data is also replicated to other worker nodes to achieve fault tolerance. There are two cases:
1.Data received and replicated – Data is received from the source, and replicated across worker nodes. In the case of any failure, the data replication will help achieve fault tolerance.
2.Data received but not yet replicated – Data is received from the source but buffered for replication. In the case of any failure, the data needs to be retrieved from the source.
For stream inputs based on receivers, the fault tolerance is based on the type of receiver:
<li style=”list-style-type: none”>
Reliable receiver – Once the data is received and replicated, an acknowledgment is sent to the source. In case if the receiver fails, the source will not receive acknowledgment for the received data. When the receiver is restarted, the source will resend the data to achieve fault tolerance.
Unreliable receiver – The received data will not be acknowledged to the source. In this case of any failure, the source will not know if the data has been received or not, and it will nor resend the data, so there is data loss.
To overcome this data loss scenario, Write Ahead Logging (WAL) has been introduced in Apache Spark 1.2. With WAL enabled, the intention of the operation is first noted down in a log file, such that if the driver fails and is restarted, the noted operations in that log file can be applied to the data. For sources that read streaming data, like Kafka or Flume, receivers will be receiving the data, and those will be stored in the executor’s memory. With WAL enabled, these received data will also be stored in the log files.
WAL can be enabled by performing the below:
Setting the checkpoint directory, by using streamingContext.checkpoint(path)
Enabling the WAL logging, by setting spark.stream.receiver.WriteAheadLog.enable to True.
. How to Achieve Fault Tolerance through DAG?
- RDD splits into the partition and each node operates on a partition at any point in time. Here, the series of Scala function executes on a partition of the RDD.
- These operations compose together and Spark execution engine view these as DAG (Directed Acyclic Graph).
- When any node crashes in the middle of any operation say O3 which depends on operation O2, which in turn O1.
- The cluster manager finds out the node is dead and assign another node to continue processing.
- This node will operate on the particular partition of the RDD and the series of operation that it has to execute (O1->O2->O3). Now there will be no data loss.
- You can refer this link to learn Fault Tolerance in Apache Spark.
. Working of DAG Optimizer in Spark
- We optimize the DAG in Apache Spark by rearranging and combining operators wherever possible.
- For, example if we submit a spark job which has a map() operation followed by a filter operation.
- The DAG Optimizer will rearrange the order of these operators since filtering will reduce the number of records to undergo map operation.
. Advantages of DAG in Spark
There are multiple advantages of Spark DAG, let’s discuss them one by one:
- The lost RDD can recover using the Directed Acyclic Graph.
- Map Reduce has just two queries the map, and reduce but in DAG we have multiple levels. So to execute SQL query, DAG is more flexible.
- DAG helps to achieve fault tolerance. Thus we can recover the lost data.
- It can do a better global optimization than a system like Hadoop MapReduce.
- DAG in Apache Spark is an alternative to the MapReduce.
- It is a programming style used in distributed systems.