Spark Java Tutorial

Spark Java Tutorial

Last updated on 12th Oct 2020, Blog, Tutorials

About author

Balachandar ((Associate Director - Product Development ) )

High level Domain Expert in TOP MNCs with 9+ Years of Experience. Also, Handled Around 36+ Projects and Shared his Knowledge by Writing these Blogs for us.

(5.0) | 12547 Ratings 2122

The Spark Java API exposes all the Spark features available in the Scala version to Java. To learn the basics of Spark, we recommend reading through the Scala programming guide first; it should be easy to follow even if you don’t know Scala. This guide will show how to use the Spark features described there in Java.

The Spark Java API is defined in the org.apache.spark.api.java package, and includes a JavaSparkContext for initializing Spark and JavaRDD classes, which support the same methods as their Scala counterparts but take Java functions and return Java data and collection types. The main differences have to do with passing functions to RDD operations (e.g. map) and handling RDDs of different types, as discussed next.

Subscribe For Free Demo
[contact-form-7 404 "Not Found"]

Key Differences in the Java API

There are a few key differences between the Java and Scala APIs:

  • Java does not support anonymous or first-class functions, so functions must be implemented by extending the org.apache.spark.api.java.function.Function, Function2, etc. classes.
  • To maintain type safety, the Java API defines specialized Function and RDD classes for key-value pairs and doubles. For example, JavaPairRDD stores key-value pairs.
  • RDD methods like collect() and countByKey() return Java collections types, such as java.util.List and java.util.Map.
  • Key-value pairs, which are simply written as (key, value) in Scala, are represented by the scala.Tuple2 class, and need to be created using new Tuple2<K, V>(key, value).

RDD Classes

Spark defines additional operations on RDDs of key-value pairs and doubles, such as reduceByKey, join, and stdev.

In the Scala API, these methods are automatically added using Scala’s implicit conversions mechanism.

In the Java API, the extra methods are defined in the JavaPairRDD and JavaDoubleRDD classes. RDD methods like map are overloaded by specialized PairFunction and DoubleFunction classes, allowing them to return RDDs of the appropriate types. Common methods like filter and sample are implemented by each specialized RDD class, so filtering a PairRDD returns a new PairRDD, etc (this achieves the “same-result-type” principle used by the Scala collections framework).

Function Classes

The following table lists the function classes used by the Java API. Each class has a single abstract method, call(), that must be implemented.

ClassFunction Type
Function<T, R>T => R
DoubleFunction<T>T => Double
PairFunction<T, K, V>T => Tuple2<K, V>
FlatMapFunction<T, R>T => Iterable<R>
DoubleFlatMapFunction<T>T => Iterable<Double>
PairFlatMapFunction<T, K, V>T => Iterable<Tuple2<K, V>>
Function2<T1, T2, R>T1, T2 => R (function of two arguments)

Storage Levels

RDD storage level constants, such as MEMORY_AND_DISK, are declared in the org.apache.spark.api.java.StorageLevels class. To define your own storage level, you can use StorageLevels.create(…).

Other Features

The Java API supports other Spark features, including accumulators, broadcast variables, and caching.

Example

As an example, we will implement word count using the Java API.

  • import org.apache.spark.api.java.*;
  • import org.apache.spark.api.java.function.*;
  • JavaSparkContext sc = new JavaSparkContext(…);
  • JavaRDD<String> lines = ctx.textFile(“hdfs://…”);
  • JavaRDD<String> words = lines.flatMap(
  •   new FlatMapFunction<String, String>() {
  •     public Iterable<String> call(String s) {
  •       return Arrays.asList(s.split(” “));
  •     }
  •   }
  • );

The word count program starts by creating a JavaSparkContext, which accepts the same parameters as its Scala counterpart. JavaSparkContext supports the same data loading methods as the regular SparkContext; here, textFile loads lines from text files stored in HDFS.

To split the lines into words, we use flatMap to split each line on whitespace. flatMap is a FlatMapFunction that accepts a string and returns a java.lang.Iterable of strings.

Here, the FlatMapFunction was created inline; another option is to subclass FlatMapFunction and pass an instance to flatMap:

  • class Split extends FlatMapFunction<String, String> {
  •   public Iterable<String> call(String s) {
  •     return Arrays.asList(s.split(” “));
  •   }
  • );
  • JavaRDD<String> words = lines.flatMap(new Split());

Continuing with the word count example, we map each word to a (word, 1) pair:

  • import scala.Tuple2;
  • JavaPairRDD<String, Integer> ones = words.map(
  •   new PairFunction<String, String, Integer>() {
  •     public Tuple2<String, Integer> call(String s) {
  •       return new Tuple2(s, 1);
  •     }
  •   }
  • );

Note that map was passed a PairFunction<String, String, Integer> and returned a JavaPairRDD<String, Integer>.

To finish the word count program, we will use reduceByKey to count the occurrences of each word:

  • JavaPairRDD<String, Integer> counts = ones.reduceByKey(
  •   new Function2<Integer, Integer, Integer>() {
  •     public Integer call(Integer i1, Integer i2) {
  •       return i1 + i2;
  •     }
  •   }
  • );

Here, reduceByKey is passed a Function2, which implements a function with two arguments. The resulting JavaPairRDD contains (word, count) pairs.

In this example, we explicitly showed each intermediate RDD. It is also possible to chain the RDD transformations, so the word count example could also be written as:

  •     …
  •   ).map(
  •     …
  •   ).reduceByKey(
  •     …
  •   );

There is no performance difference between these approaches; the choice is just a matter of style.

Javadoc

We currently provide documentation for the Java API as Scaladoc, in the org.apache.spark.api.java package, because some of the classes are implemented in Scala. The main downside is that the types and function definitions show Scala syntax (for example, define reduce(func: Function2[T, T]): T instead of T reduce(Function2<T, T> func)). We hope to generate documentation with Java-style syntax in the future.

Where to Go from Here

Spark includes several sample programs using the Java API in examples/src/main/java. You can run them by passing the class name to the bin/run-example script included in Spark; for example:

./bin/run-example org.apache.spark.examples.JavaWordCount

Each example program prints usage help when run without any arguments.

I have two datasets:

  • User information (id, email, language, location)
  • Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)

Given these datasets, I want to find the number of unique locations in which each product has been sold. To do that, I need to join the two datasets together.

Previously I have implemented this solution in java, with hive and with pig. The java solution was ~500 lines of code, hive and pig were like ~20 lines tops.

The Java Spark Solution

This article is a follow up for my earlier article on Spark that shows a Scala Spark solution to the problem. Even though Scala is the native and more popular Spark language, many enterprise-level projects are written in Java and so it is supported by the Spark stack with it’s own API.

This article partially repeats what was written in my Scala overview, although I emphasize the differences between Scala and Java implementations of logically same code.

As it was mentioned before, Spark is an open source project that has been built and is maintained by a thriving and diverse community of developers. It started in 2009 as a research project in the UC Berkeley RAD Labs. Its aim was to compensate for some Hadoop shortcomings. Spark brings us interactive queries, better performance for iterative algorithms, as well as support for in-memory storage and efficient fault recovery.

It contains a number of different components, such as Spark Core, Spark SQL, Spark Streaming, MLlib, and GraphX. It runs over a variety of cluster managers, including Hadoop YARN, Apache Mesos, and a simple cluster manager included in Spark itself called the Standalone Scheduler. It is used for a diversity of tasks from data exploration through to streaming machine learning algorithms. As a technology stack it has really caught fire in recent years.

Demonstration Data

The tables that will be used for demonstration are called users and transactions.

users

1 matthew@test.com  EN  US

2 matthew@test2.com EN  GB

3 matthew@test3.com FR  FR

and

transactions

1 1 1 300 a jumper

2 1 2 300 a jumper

3 1 2 300 a jumper

4 2 3 100 a rubber chicken

5 1 3 300 a jumper

For this task we have used Spark on a Hadoop YARN cluster. Our code will read and write data from/to HDFS. Before starting work with the code we have to copy the input data to HDFS.

hdfs dfs -mkdir input

hdfs dfs -put ./users.txt input

hdfs dfs -put ./transactions.txt input

Code

All code and data used in this post can be found in my hadoop examples GitHub repository.

  • public class ExampleJob {
  •     private static JavaSparkContext sc;
  •     public ExampleJob(JavaSparkContext sc){
  •       this.sc = sc;
  •     }
  •     public static final PairFunction<Tuple2<Integer, Optional<String>>, Integer, String> KEY_VALUE_PAIRER =
  •     new PairFunction<Tuple2<Integer, Optional<String>>, Integer, String>() {
  •       public Tuple2<Integer, String> call(
  •           Tuple2<Integer, Optional<String>> a) throws Exception {
  •       // a._2.isPresent()
  •         return new Tuple2<Integer, String>(a._1, a._2.get());
  •       }
  •   };
  •   public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
  •         JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
  •         return leftJoinOutput;
  •   }
  •   public static JavaPairRDD<Integer, String> modifyData(JavaRDD<Tuple2<Integer,Optional<String>>> d){
  •     return d.mapToPair(KEY_VALUE_PAIRER);
  •   }
  •   public static Map<Integer, Object> countData(JavaPairRDD<Integer, String> d){
  •         Map<Integer, Object> result = d.countByKey();
  •         return result;
  •   }
  •   public static JavaPairRDD<String, String> run(String t, String u){
  •         JavaRDD<String> transactionInputFile = sc.textFile(t);
  •         JavaPairRDD<Integer, Integer> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, Integer, Integer>() {
  •             public Tuple2<Integer, Integer> call(String s) {
  •                 String[] transactionSplit = vs.split(“\t”);
  •                 return new Tuple2<Integer, Integer>(Integer.valueOf(transactionSplit[2]), Integer.valueOf(transactionSplit[1]));
  •             }
  •         });
  •         JavaRDD<String> customerInputFile = sc.textFile(u);
  •         JavaPairRDD<Integer, Strong> customerPairs = customerInputFile.mapToPair(new PairFunction<String, Integer, String>() {
  •             public Tuple2<Integer, String> call(String s) {
  •                 String[] customerSplit = s.split(“\t”);
  •                 return new Tuple2<Integer, String>(Integer.valueOf(customerSplit[0]), customerSplit[3]);
  •             }
  •         });
  •         Map<Integer, Object> result = countData(modifyData(joinData(transactionPairs, customerPairs)));
  •         List<Tuple2<String, String>> output = new ArrayList<>();
  •       for (Entry<Integer, Object> entry : result.entrySet()){
  •         output.add(new Tuple2<>(entry.getKey().toString(), String.valueOf((long)entry.getValue())));
  •       }
  •       JavaPairRDD<String, String> output_rdd = sc.parallelizePairs(output);
  •       return output_rdd;
  •   }
  •     public static void main(String[] args) throws Exception {
  •         JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“SparkJoins”).setMaster(“local”));
  •         ExampleJob job = new ExampleJob(sc);
  •         JavaPairRDD<String, String> output_rdd = job.run(args[0], args[1]);
  •         output_rdd.saveAsHadoopFile(args[2], String.class, String.class, TextOutputFormat.class);
  •         sc.close();
  •     }
  • }

This code does exactly the same thing that the corresponding code of the Scala solution does. The sequence of actions is exactly the same, as well as the input and output data on each step.

  • read / transform transactions data
  • read / transform users data
  • left outer join of transactions on users
  • get rid of user_id key from the result of the previous step by applying values()
  • find distinct() values
  • countByKey()
  • transform result to an RDD
  • save result to Hadoop

If this is confusing (it might be), read the Scala version first, it is way more compact.

As with Scala it is required to define a SparkContext first. Again, it is enough to set an app name and a location of a master node.

  • JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark Count”).setMaster(“local”));

The resilient distributed dataset (RDD), Spark’s core abstraction for working with data, is named RDD as in Scala. As with any other Spark data-processing algorithm all our work is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result.

Spark’s Key/value RDDs are of JavaPairRDD type. Key/value RDDs are commonly used to perform aggregations, such as groupByKey(), and are useful for joins, such as leftOuterJoin(). Explicitly defining key and value elements allows spark to abstract away a lot of these complex operations (like joins), so they are very useful.

Here is how the input and intermediate data is transformed into a Key/value RDD in Java:

  • JavaRDD<String> transactionInputFile = sc.textFile(t);
  • JavaPairRDD<Integer, Integer> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, Integer, Integer>() { public Tuple2<Integer, Integer> call(String s) {
  • String[] transactionSplit = vs.split(“\t”);
  • return new Tuple2<Integer, Integer>(Integer.valueOf(transactionSplit[2]), Integer.valueOf(transactionSplit[1]));
  •     }
  • });

and a stand-alone function

  • public static final PairFunction<Tuple2<Integer, Optional<String>>, Integer, String> KEY_VALUE_PAIRER =
  • new PairFunction<Tuple2<Integer, Optional<String>>, Integer, String>() {
  •     public Tuple2<Integer, String> call(
  •         Tuple2<Integer, Optional<String>> a) throws Exception {
  •       // a._2.isPresent()
  •     return new Tuple2<Integer, String>(a._1, a._2.get());
  • }
  • };

Reading input data is done in exactly the same manner as in Scala. Note that the explicit KEY_VALUE_PAIRER transformation is not needed in Scala, but in Java there seems to be no way to skip it.

Spark has added an Optional class for Java (similar to Scala’s Option) to box values and avoid nulls. There is a special function isPresent() in the Optional class that allows you to check whether the value is present, that is it is not null. Calling get() returns the boxed value.

The main code is again more or less a chain of predefined functions.

 

  • public static JavaRDD<Tuple2<Integer,Optional<String>>> joinData(JavaPairRDD<Integer, Integer> t, JavaPairRDD<Integer, String> u){
  •   JavaRDD<Tuple2<Integer,Optional<String>>> leftJoinOutput = t.leftOuterJoin(u).values().distinct();
  •   return leftJoinOutput;
  • }
  • public static JavaPairRDD<Integer, String> modifyData(JavaRDD<Tuple2<Integer,Optional<String>>> d){
  •   return d.mapToPair(KEY_VALUE_PAIRER);
  • }
  • public static Map<Integer, Object> countData(JavaPairRDD<Integer, String> d){
  •   Map<Integer, Object> result = d.countByKey();
  •   return result;
  • }

The processData() function from the Scala version was broken into three new functions joinData(), modifyData() and countData(). We simply did this to make the code more clear – Java is verbose. All the data transformation steps could have been put into one function that would be similar to processData() from the Scala solution.

The leftOuterJoin() function joins two RDDs on key.

The values() function allows us to omit the key of the Key Value RDD as it is not needed in the operations that follow the join. The distinct() function selects distinct Tuples.

And finally countByKey() counts the number of countries where the product was sold.

Running the resulting jar

/usr/bin/spark-submit –class main.java.com.matthewrathbone.sparktest.SparkJoins –master local ./spark-example-1.0-SNAPSHOT-jar-with-dependencies.jar /path/to/transactions.txt /path/to/users.txt /path/to/output_folder

15/12/20 11:49:47 INFO DAGScheduler: Job 2 finished: countByKey at SparkJoins.java:74, took 0.171325 s

CountByKey function Output: {1=3, 2=1}

$ hadoop fs -ls sparkout

Found 9 items

-rw-r–r–   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/_SUCCESS

-rw-r–r–   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00000

-rw-r–r–   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00001

-rw-r–r–   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00002

-rw-r–r–   1 hadoop hadoop          4 2015-12-20 11:49 sparkout/part-00003

-rw-r–r–   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00004

-rw-r–r–   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00005

-rw-r–r–   1 hadoop hadoop          0 2015-12-20 11:49 sparkout/part-00006

-rw-r–r–   1 hadoop hadoop          4 2015-12-20 11:49 sparkout/part-00007

$ hadoop fs -tail sparkout/part-00003

1 3

$ hadoop fs -tail sparkout/part-00007

2 1

Testing

The idea and the set up are exactly the same for Java and Scala.

  • public class SparkJavaJoinsTest implements Serializable {
  •   private static final long serialVersionUID = 1L;
  •   private transient JavaSparkContext sc;
  •   @Before
  •   public void setUp() {
  •     sc = new JavaSparkContext(“local”, “SparkJoinsTest”);
  •   }
  •   @adminAfter
  •   public void tearDown() {
  •     if (sc != null){
  •       sc.stop();
  •     }
  •   }
  •   @Test
  •   public void testExampleJob() {
  •     ExampleJob job = new ExampleJob(sc);
  •     JavaPairRDD<String, String> result = job.run(“./transactions.txt”, “./users.txt”);
  •     Assert.assertEquals(“1”, result.collect().get(0)._1);
  •     Assert.assertEquals(“3”, result.collect().get(0)._2);
  •     Assert.assertEquals(“2”, result.collect().get(1)._1);
  •     Assert.assertEquals(“1”, result.collect().get(1)._2);
  •   }
  • }

The test is more or less self-explanatory. As usual we check the content of the output to validate it’s operation.

Apache Sample Resumes! Download & Edit, Get Noticed by Top Employers! Download

Thoughts

Java is a lot more verbose than Scala, although this is not a Spark-specific criticism.

The Scala and Java Spark APIs have a very similar set of functions. Looking beyond the heaviness of the Java code reveals calling methods in the same order and following the same logical thinking, albeit with more code.

All things considered, if I were using Spark, I’d use Scala. The functional aspects of Spark are designed to feel native to Scala developers, which means it feels a little alien when working in Java (eg Optional). That said, if Java is the only option (or you really don’t want to learn Scala), Spark certainly presents a capable API to work with.

Are you looking training with Right Jobs?

Contact Us

Popular Courses