CODING QUESTIONS
IN PROGRESS
code sample to read a data from text file?
code sample to read a data from mysql ?
Say I have a huge list of numbers in RDD(say myRDD). And I wrote the following code to compute average:
1
2
3
|
def myAvg(x, y):
return (x+y)/2.0;
avg = myrdd.reduce(myAvg);
|
What is wrong with it and how would you correct it?
The average function is not commutative and associative. I would simply sum it and then divide by count.
1
2
3
4
|
def sum(x, y):
return x+y;
total = myrdd.reduce(sum);
avg = total / myrdd.count();
|
The only problem with the above code is that the total might become very big thus overflow. So, I would rather divide each number by count and then sum in the following way.
1
2
3
4
5
|
cnt = myrdd.count();
def devideByCnd(x):
return x/cnt;
myrdd1 = myrdd.map(devideByCnd);
avg = myrdd.reduce(sum);
|
The problem with above code is that it uses two jobs – one for the count and other for the sum. We can do it in a single shot as follows:
1
2
3
4
|
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
(total, counts) = sumcount_rdd.reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))
avg = total/counts
|
Again, it might cause a number overflow because we are summing a huge number of values. We could instead keep the average values and keep computing the average from the average and counts of two parts getting reduced.
1
2
3
4
5
6
7
8
9
10
|
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
R = 1.0*B[1]/A[1]
Ri = 1.0/(1+R);
av = A[0]*Ri + B[0]*R*Ri
return (av, B[1] + A[1]);
(av, counts) = sumcount_rdd.reduce(avg)
print(av)
|
If you have two parts having average and counts as (a1, c1) and (a2, c2), the overall average is:
total/counts = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)
If we mark R = c2/c1, It can be re-written further as a1/(1+R) + a2*R/(1+R)
If we further mark Ri as 1/(1+R), we can write it as a1*Ri + a2*R*Ri
1
2
3
4
5
6
7
8
9
10
|
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
R = 1.0*B[1]/A[1]
Ri = 1.0/(1+R);
av = A[0]*Ri + B[0]*R*Ri
return (av, B[1] + A[1]);
(av, counts) = sumcount_rdd.reduce(avg)
print(av)
|
. Say I have a huge list of numbers in a file in HDFS. Each line has one number and I want to compute the square root of the sum of squares of these numbers. How would you do it?
1
2
3
|
# We would first load the file as RDD from HDFS on Spark
numsAsText = sc.textFile(“hdfs:////user/student/sgiri/mynumbersfile.txt”);
|
1
2
3
4
|
# Define the function to compute the squares
def toSqInt(str):
v = int(str);
return v*v;
|
1
2
3
4
5
6
7
8
9
10
|
#Run the function on Spark rdd as transformation
nums = numsAsText.map(toSqInt);
#Run the summation as reduce action
total = nums.reduce(sum)
#finally compute the square root. For which we need to import math.
import math;
print math.sqrt(total);
|
. Is the following approach correct? Is the sqrtOfSumOfSq a valid reducer?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
numsAsText = sc.textFile(“hdfs:///user/student/sgiri/mynumbersfile.txt”);
def toInt(str):
return int(str);
nums = numsAsText.map(toInt);
def sqrtOfSumOfSq(x, y):
return math.sqrt(x*x+y*y);
total = nums.reduce(sum)
import math;
print math.sqrt(total);
|
Yes. The approach is correct and sqrtOfSumOfSq is a valid reducer.
. In a very huge text file, you want to just check if a particular keyword exists. How would you do this using Spark?
1
2
3
4
5
6
7
8
9
10
11
|
lines = sc.textFile(“hdfs:///user/student/sgiri/bigtextfile.txt”);
def isFound(line):
if line.find(“mykeyword”) > –1:
return 1;
return 0;
foundBits = lines.map(isFound);
sum = foundBits.reduce(sum);
if sum > 0:
print “FOUND”;
else:
print “NOT FOUND”;
|
. Can you improve the performance of the code in the previous answer?
Yes. The search is not stopping even after the word we are looking for has been found. Our map code would keep executing on all the nodes which is very inefficient.
We could utilize accumulators to report whether the word has been found or not and then stop the job. Something on these lines.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import thread, threading
from time import sleep
result = “Not Set”
lock = threading.Lock()
accum = sc.accumulator(0)
def map_func(line):
#introduce delay to emulate the slowness
sleep(1);
if line.find(“Adventures”) > –1:
accum.add(1);
return 1;
return 0;
def start_job():
global result
try:
sc.setJobGroup(“job_to_cancel”, “some description”)
lines = sc.textFile(“hdfs:///user/student/sgiri/wordcount/input/big.txt”);
result = lines.map(map_func);
result.take(1);
except Exception as e:
result = “Cancelled”
lock.release()
def stop_job():
while accum.value < 3 :
sleep(1);
sc.cancelJobGroup(“job_to_cancel”)
supress = lock.acquire()
supress = thread.start_new_thread(start_job, tuple())
supress = thread.start_new_thread(stop_job, tuple())
supress = lock.acquire()
|
Say I have a huge list of numbers in RDD(say myrdd). And I wrote the following code to compute average:
def myAvg(x, y):
return (x+y)/2.0;
avg = myrdd.reduce(myAvg);
What is wrong with it? And How would you correct it?
The average function is not commutative and associative;
I would simply sum it and then divide by count.
1
2
3
4
|
def sum(x, y): return x+y; total = myrdd.reduce(sum); avg = total / myrdd.count(); |
The only problem with the above code is that the total might become very big thus over flow. So, I would rather divide each number by count and then sum in the following way.
1
2
3
4
5
|
cnt = myrdd.count(); def devideByCnd(x): return x/cnt; myrdd1 = myrdd.map(devideByCnd); avg = myrdd.reduce(sum); |
Say I have a huge list of numbers in a file in HDFS. Each line has one number.And I want to compute the square root of sum of squares of these numbers. How would you do it?
# We would first load the file as RDD from HDFS on spark
numsAsText = sc.textFile(“hdfs://hadoop1.knowbigdata.com/user/student/sgiri/mynumbersfile.txt”);
# Define the function to compute the squaresdef toSqInt(str):
1
2
|
v = int(str); return v*v; |
#Run the function on spark rdd as transformation
nums = numsAsText.map(toSqInt);
#Run the summation as reduce action
total = nums.reduce(sum)
#finally compute the square root. For which we need to import math.
1
2
|
import math; print math.sqrt(total); |
Q41) Is the following approach correct? Is the sqrtOfSumOfSq a valid reducer?
1
2
3
4
5
6
7
8
9
|
numsAsText =sc.textFile(“hdfs: //hadoop1.knowbigdata.com/user/student/sgiri/mynumbersfile.txt”); def toInt(str): return int(str); nums = numsAsText.map(toInt); def sqrtOfSumOfSq(x, y): return math.sqrt(x*x+y*y); total = nums.reduce(sum) import math; print math.sqrt(total); |
A: Yes. The approach is correct and sqrtOfSumOfSq is a valid reducer.
Could you compare the pros and cons of the your approach (in Question 2 above) and my approach (in Question 3 above)?
You are doing the square and square root as part of reduce action while I am squaring in map() and summing in reduce in my approach.
My approach will be faster because in your case the reducer code is heavy as it is calling math.sqrt() and reducer code is generally executed approximately n-1 times the spark RDD.
The only downside of my approach is that there is a huge chance of integer overflow because I am computing the sum of squares as part of map.
If you have to compute the total counts of each of the unique words on spark, how would you go about it?
#This will load the bigtextfile.txt as RDD in the spark lines =
sc.textFile(“hdfs://hadoop1.knowbigdata.com/user/student/sgiri/bigtextfile.txt”);
#define a function that can break each line into words
1
2
|
def toWords(line): return line.split(); |
# Run the toWords function on each element of RDD on spark as flatMap transformation.
# We are going to flatMap instead of map because our function is returning multiple values.
words = lines.flatMap(toWords);
# Convert each word into (key, value) pair. Her key will be the word itself and value will be 1.
1
2
3
|
def toTuple(word): return (word, 1); wordsTuple = words.map(toTuple); |
# Now we can easily do the reduceByKey() action.
1
2
3
|
def sum(x, y): return x+y; counts = wordsTuple.reduceByKey(sum) |
# Now, print
counts.collect()
In a very huge text file, you want to just check if a particular keyword exists. How would you do this using Spark?
1
2
3
4
5
6
7
8
9
10
11
|
lines = sc.textFile(“hdfs: //hadoop1.knowbigdata.com/user/student/sgiri/bigtextfile.txt”); def isFound(line): if line.find(“mykeyword”) > -1: return 1; return 0; foundBits = lines.map(isFound); sum = foundBits.reduce(sum); if sum > 0: print “FOUND”; else : print “NOT FOUND”; |
Can you improve the performance of this code in previous answer?
Yes. The search is not stopping even after the word we are looking for has been found. Our map code would keep executing on all the nodes which is very inefficient.
We could utilize accumulators to report whether the word has been found or not and then stop the job. Something on these line:
import thread, threading
from time import sleep
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
result = “Not Set” lock = threading.Lock() accum = sc.accumulator(0) def map_func(line): #introduce delay to emulate the slowness sleep(1); if line.find(“Adventures”) > -1: accum.add(1); return 1; return 0; def start_job(): global result try : sc.setJobGroup(“job_to_cancel”, “some description”) lines = sc.textFile(“hdfs: //hadoop1.knowbigdata.com/user/student/sgiri/wordcount/input/big.txt”); result = lines.map(map_func); result.take(1); except Exception as e: result = “Cancelled” lock.release() def stop_job(): while accum.value < 3 : sleep(1); sc.cancelJobGroup(“job_to_cancel”) supress = lock.acquire() supress = thread.start_new_thread(start_job, tuple()) supress = thread.start_new_thread(stop_job, tuple()) supress = lock.acquire() [/tab] |
Suppose that there is an RDD named DeZyrerdd that contains a huge list of numbers. The following spark code is written to calculate the average –
def DeZyreAvg(x, y):
return (x+y)/2.0;
avg = DeZyrerdd.reduce(DeZyreAvg);
What is wrong with the above code and how will you correct it ?
Average function is neither commutative nor associative. The best way to compute average is to first sum it and then divide it by count as shown below –
def sum(x, y):
return x+y;
total =DeZyrerdd.reduce(sum);
avg = total / DeZyrerdd.count();
However, the above code could lead to an overflow if the total becomes big. So, the best way to compute average is divide each number by count and then add up as shown below –
cnt = DeZyrerdd.count();
def divideByCnt(x):
return x/cnt;
myrdd1 = DeZyrerdd.map(divideByCnt);
avg = DeZyrerdd.reduce(sum);
Say I have a huge list of numbers in a file in HDFS. Each line has one number.And I want to com