Hands-on session: Data analysis with Stratosphere
This session will introduce the attendants to the Scala language and
then go through example data analysis jobs using the Scala interface
of the Stratosphere platform. The attendants will have the opportunity
to develop Stratosphere programs with the help of the Stratosphere
team and get familiar with the basic operations of Big Data analysis
using Stratosphere.
### Detailed Schedule
**15:30 - 16:30 Development Environment Setup**
The attendants will be provided a virtual machine that includes all
the software that will be needed in the hands on session. To expedite
the installation process, the attendants are advised to have already
installed the latest version of
VirtualBox. USB keys with
the virtual machines and the VirtualBox software will also be
available.
**16:00 - 17:00 Part I: Introduction to the Scala Programming Language**
This session will cover the basic aspects of
the Scala language and will
serve as a basis for all the sessions to follow.
**17:00 - 18:30 Part II: Scalable data analysis with Stratosphere**
### Hands-On setup
If you don't want to use the provided VM you can check out the hands-on exercise code from
[https://github.com/stratosphere/stratosphere-summit](https://github.com/stratosphere/stratosphere-summit).
It comes as a maven project that you can import from Eclipse or Intellij. Or you could just use any text
editor and use the provided scripts (for example `./run-task1.sh`) to compile and run the examples.
## Part II: Stratosphere programming in Scala
## Assignment: Information Retrieval using Stratosphere
### Overview
In this programming assignment, you are going to compute
[tf-idf](http://en.wikipedia.org/wiki/Tf%E2%80%93idf) using
Scala. We will show you how to write Stratosphere Operators in Scala in the
respective subtasks.
The following figure gives an overview of what we are going to do in each task:
The following figure gives some details on what each tasks performs:
### Task 1: Document Frequency
You are first going to calculate the document frequency. That is, in how many
documents each distinct word occurs. So if a document contains a word three
times, it is counted only once for this document. But if another document
contains the word as well, the overall frequency is two.
Besides this, you need to do some data cleansing on the input data. That is,
accept only terms (words) that are alphanumerical. There is also a list of
stopwords (provided by us) which should be rejected.
To achieve this, you need to implement a Mapper and a Reducer. The input file is
a regular text file that contains a line for each document. The schema of the
input file looks like this: ` docid, document contents ` The Mapper is
called with the documents. So each call of the user function gets one line
(e.g. a document).
In Scala a lot of things can be accomplished by stringing together operations
on collections.
(See [here](http://docs.scala-lang.org/overviews/collections/introduction.html)
for a very good introduction to the Scala collections library.)
So in this case you would first split the line into the document id and the
actual document text. Then you should split and document text into words, this
gives you a collection of strings on which you can then apply operations to
arrive at your final result. The final result should be a collection of tuples
of `(word, 1)` where every word is only represented once, even if it occurs
several times in the document.
Keep in mind that you can transform any collection into a set using
`toSet`, thereby eliminating duplicates. A set is also a collection which
can be returned from a map operation.
Use the provided `main()` method to test your code.
Task #1 is quite similar to the classical WordCount example, which is something
like the "Hello World" of Big Data.
### Task 2: Term Frequency
Implement a second Mapper that also reads in the documents from the same file.
This time, the output tuples shall look like this `(docid, word, frequency)`.
The code required for this is very similar to the code for Task 1. This time,
though, you have to accumulate a count for the words in some sort of
Map structure. The output of this operation should be a collection of tuples
of `(docId, word, count)`.
### Task 3: Join
This task uses a new operator: Join. It has two inputs, namely the outputs from
the previous tasks. We often refern to them as the left input and the right
input.
The user code gets to inputs, namely a record from the left and a record from
the right. So the join operation looks like this:
```javascript
val tfIdf = documentFrequencies
.join(termFrequencies)
.where { ... }
.isEqualTo { ... }
.map { (left, right) =>
...
}
```
Where left is a tuple of `(word, freq)` and right is a tuple of
`(docId, word, freq)`. Keep in mind that you can neatly extract from
tuples using:
```javascript
val (word, freq) = left
```
The following pseudo code describes what your operator implementation must do
to compute the tf-idf.
```javascript
join( (word, df), (docid, word, tf)) {
tf_idf(word) = tf * log [Util.NUM_DOCUMENTS/df]
return (docid, word, tf_idf(word))
}
```
The output from the join should be a tuple of `(docId, word, tf-idf)`.
#### Preparation
In this task we are going to use a custom data type, `WeightVector`. This
stores a document id and an Iterator of tuples of `(word, tf-idf)`. Using
a reducer you should collect all the tuples `(docId, word, tf-idf)` and
create a single WeightVector that contains them all.
#### Term Weights per Document
This reduce task takes the output of the join and groups it by the document ids
(`docid`). Write the document id and the terms including their weight into the
`WeightVector`.
Note that Iterators have a `buffered` method that returns a `BufferedIterator`.
This `BufferedIterator` has a `head` member that can be used to peek at
the first element in the iterator. You can use this to retrieve the `docId`
to retrieve the document id (which is the same for all tuples). Then you
can use methods on the buffered iterator to arrive at the collection of
`(word, tf-idf)` tuples.
### Congratulations!