Tag Archives: Fold

Scala Fold operation Explained!

Fold takes the data in one format and returns in other,Fold on list takes a initial value and function with two arguments that is to be applied to all the elements of the list.
In initial iteration the initial value (Zero/neutral value) passed as the first argument of the function while the current element of the list on which we are iterating as the second argument,In the next iteration the result of the fist iteration is passed to the first argument while the current element to the second and the process goes on
Below is an example of finding maximum from a list of elements in scala

we have passed the value at position 0 of the list as the initial value
In first Iteration
the value at 0 position i.e 2 is passed as first argument to function i.e assigned to min
the current value on which we are iterating is 2 this is assigned to max as the second argument to function

In Second Iteration
the result of the first iteration i.e 2 is passed as the first argument to the function i.e assigned to min
the current value on which we are iterating i.e 1 is passed as the second parameter to function i.e assigned to max

and the story goes on ….. 🙂

package example

/**
* @author amith2
*/
object ScalaFoldMIn {
def main(args: Array[String]): Unit = {
val scalaData=List(2,1,3,5)
println(scalaData.fold(scalaData(0))((min,max)=>{
if(min<max)
min
else
max
}))
}
}

Understanding Apache Spark’s Fold operation

As described on Apache Spark’s website the fold definition states

“Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.”

Thus fold operates by folding first each element of partition and then the result of partition
it is partition dependent

let’s examine the below code

JavaRDD jrdd=jsc.parallelize(Arrays.asList("FirstFile"));
String resultfold = jrdd.fold("0",new Function2() {

public String call(String arg0, String arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println(" the arg0 .. >> "+arg0);
System.out.println(" the arg1 .. >> "+arg1);
return arg0+","+arg1;
}
});

gives output
0,0,0,FirstFile

arg0 is assigned with value 0 initially for a partition
arg1 with value FirstFile
resulting in 0,FirstFile –> as aggregation for one partition
In more than one core machine when number of threads given to spark are more than 1 the above RDD will have more than one partition since an empty partition ends up folding down to zero element we will get bepw values for empty partitions
arg0 is 0
arg1 is 0
result is 0,0

Once each partition is aggregated the result of all the partitions will be aggregated using the same function and supplied value resulting in
0,0,0,FirstFile

The above result may wary each time we execute it depending on which partition gets executed first

Let’s re-partition the above rdd and set the number of partition count to 1 explicitly

JavaRDD jdpar = jrdd.repartition(1);
String resultfold = jdpar.fold("0",new Function2() {

public String call(String arg0, String arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println(" the arg0 .. >> "+arg0);
System.out.println(" the arg1 .. >> "+arg1);
return arg0+","+arg1;
}
});

Since there is a single partition with value FirstFile

1st iteration
arg0 0
arg1 FirstFile

2nd iteration
arg0 0
arg1 0,FirstFile
result 0,0,FirstFile