Stratosphere version 0.5 available

31 May 2014

We are happy to announce a new major Stratosphere release, version 0.5. This release adds many new features and improves the interoperability, stability, and performance of the system. The major theme of the release is the completely new Java API that makes it easy to write powerful distributed programs.

The release can be downloaded from the [Stratosphere website] (http://stratosphere.eu/downloads/) and from [GitHub] (https://github.com/stratosphere/stratosphere/releases/tag/release-0.5). All components are available as Apache Maven dependencies, making it simple to include Stratosphere in other projects. The website provides extensive documentation of the system and the new features.

Shortlist of new Features

Below is a short list of the most important additions to the Stratosphere system.

New Java API

This release introduces a completely new data set-centric Java API. This programming model significantly eases the development of Stratosphere programs, supports flexible use of regular Java classes as data types, and adds many new built-in operators to simplify the writing of powerful programs. The result are programs that need less code, are more readable, interoperate better with existing code, and execute faster.

Take a look at the examples to get a feel for the API.

General API Improvements

Broadcast Variables: Publish a data set to all instances of another operator. This is handy if the your operator depends on the result of a computation, e.g., filter all values smaller than the average.

Distributed Cache: Make (local and HDFS) files locally available on each machine processing a task.

Iteration Termination Improvements Iterative algorithms can now terminate based on intermediate data sets, not only through aggregated statistics.

Collection data sources and sinks: Speed-up the development and testing of Stratosphere programs by reading data from regular Java collections and inserting back into them.

JDBC data sources and sinks: Read data from and write data to relational databases using a JDBC driver.

Hadoop input format and output format support: Read and write data with any Hadoop input or output format.

Support for Avro encoded data: Read data that has been materialized using Avro.

Deflate Files: Stratosphere now transparently reads .deflate compressed files.

Runtime and Optimizer Improvements

DAG Runtime Streaming: Detection and resolution of streaming data flow deadlocks in the data flow optimizer.

Intermediate results across iteration boundaries: Intermediate results computed outside iterative parts can be used inside iterative parts of the program.

Stability fixes: Various stability fixes in both optimizer and runtime.

Setup & Tooling

Improved YARN support: Many improvements based on user-feedback: Packaging, Permissions, Error handling.

Java 8 compatibility

Contributors

In total, 26 people have contributed to Stratosphere since the last release. Thank you for making this project possible!

  • Alexander Alexandrov
  • Jesus Camacho
  • Ufuk Celebi
  • Mikhail Erofeev
  • Stephan Ewen
  • Alexandr Ferodov
  • Filip Haase
  • Jonathan Hasenberg
  • Markus Holzemer
  • Fabian Hueske
  • Vasia Kalavri
  • Aljoscha Krettek
  • Rajika Kumarasiri
  • Sebastian Kunert
  • Aaron Lam
  • Robert Metzger
  • Faisal Moeen
  • Martin Neumann
  • Mingliang Qi
  • Till Rohrmann
  • Chesnay Schepler
  • Vyachislav Soludev
  • Tuan Trieu
  • Artem Tsikiridis
  • Timo Walther
  • Robert Waury

Stratosphere is going Apache

The Stratosphere project has been accepted to the Apache Incubator and will continue its work under the umbrella of the Apache Software Foundation. Due to a name conflict, we are switching the name of the project. We will make future releases of Stratosphere through the Apache foundation under a new name.

Stratosphere version 0.5 available

Stratosphere accepted as Apache Incubator Project

16 Apr 2014

We are happy to announce that Stratosphere has been accepted as a project for the Apache Incubator. The proposal has been accepted by the Incubator PMC members earlier this week. The Apache Incubator is the first step in the process of giving a project to the Apache Software Foundation. While under incubation, the project will move to the Apache infrastructure and adopt the community-driven development principles of the Apache Foundation. Projects can graduate from incubation to become top-level projects if they show activity, a healthy community dynamic, and releases.

We are glad to have Alan Gates as champion on board, as well as a set of great mentors, including Sean Owen, Ted Dunning, Owen O’Malley, Henry Saputra, and Ashutosh Chauhan. We are confident that we will make this a great open source effort.

Stratosphere accepted as Apache Incubator Project

Stratosphere got accepted for Google Summer of Code 2014

24 Feb 2014

Students: Apply now for exciting summer projects in the Big Data / Analytics field

We are pleased to announce that Stratosphere got accepted to Google Summer of Code 2014 as a mentoring organization. This means that we will host a bunch of students to conduct projects within Stratosphere over the summer. Read more on the GSoC manual for students and the official FAQ. Students can improve their coding skills, learn to work with open-source projects, improve their CV and get a nice paycheck from Google.

If you are an interested student, check out our idea list in the wiki. It contains different projects with varying ranges of difficulty and requirement profiles. Students can also suggest their own projects.

We welcome students to sign up at our developer mailing list to discuss their ideas. Applying students can use our wiki (create a new page) to create a project proposal. We are happy to have a look at it.

Stratosphere got accepted for Google Summer of Code 2014

Accessing Data Stored in MongoDB with Stratosphere

28 Jan 2014

We recently merged a pull request that allows you to use any existing Hadoop InputFormat with Stratosphere. So you can now (in the 0.5-SNAPSHOT and upwards versions) define a Hadoop-based data source:

HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));

We describe in the following article how to access data stored in MongoDB with Stratosphere. This allows users to join data from multiple sources (e.g. MonogDB and HDFS) or perform machine learning with the documents stored in MongoDB.

The approach here is to use the MongoInputFormat that was developed for Apache Hadoop but now also runs with Stratosphere.

JobConf conf = new JobConf();
conf.set("mongo.input.uri","mongodb://localhost:27017/enron_mail.messages");
HadoopDataSource src = new HadoopDataSource(new MongoInputFormat(), conf, "Read from Mongodb", new WritableWrapperConverter());

Example Program

The example program reads data from the enron dataset that contains about 500k internal e-mails. The data is stored in MongoDB and the Stratosphere program counts the number of e-mails per day.

The complete code of this sample program is available on GitHub.

Prepare MongoDB and the Data

  • Install MongoDB
  • Download the enron dataset from their website.
  • Unpack and load it

bash bunzip2 enron_mongo.tar.bz2 tar xvf enron_mongo.tar mongorestore dump/enron_mail/messages.bson

We used Robomongo to visually examine the dataset stored in MongoDB.

Build MongoInputFormat

MongoDB offers an InputFormat for Hadoop on their GitHub page. The code is not available in any Maven repository, so we have to build the jar file on our own.

  • Check out the repository
git clone https://github.com/mongodb/mongo-hadoop.git
cd mongo-hadoop
  • Set the appropriate Hadoop version in the build.sbt, we used 1.1.
hadoopRelease in ThisBuild := "1.1"
  • Build the input format
./sbt package

The jar-file is now located in core/target.

The Stratosphere Program

Now we have everything prepared to run the Stratosphere program. I only ran it on my local computer, out of Eclipse. To do that, check out the code …

git clone https://github.com/stratosphere/stratosphere-mongodb-example.git

… and import it as a Maven project into your Eclipse. You have to manually add the previously built mongo-hadoop jar-file as a dependency. You can now press the “Run” button and see how Stratosphere executes the little program. It was running for about 8 seconds on the 1.5 GB dataset.

The result (located in /tmp/enronCountByDay) now looks like this.

11,Fri Sep 26 10:00:00 CEST 1997
154,Tue Jun 29 10:56:00 CEST 1999
292,Tue Aug 10 12:11:00 CEST 1999
185,Thu Aug 12 18:35:00 CEST 1999
26,Fri Mar 19 12:33:00 CET 1999

There is one thing left I want to point out here. MongoDB represents objects stored in the database as JSON-documents. Since Stratosphere’s standard types do not support JSON documents, I was using the WritableWrapper here. This wrapper allows to use any Hadoop datatype with Stratosphere.

The following code example shows how the JSON-documents are accessed in Stratosphere.

public void map(Record record, Collector<Record> out) throws Exception {
	Writable valWr = record.getField(1, WritableWrapper.class).value();
	BSONWritable value = (BSONWritable) valWr;
	Object headers = value.getDoc().get("headers");
	BasicDBObject headerOb = (BasicDBObject) headers;
	String date = (String) headerOb.get("Date");
	// further date processing
}

Please use the comments if you have questions or if you want to showcase your own MongoDB-Stratosphere integration.

Written by Robert Metzger (@rmetzger_).

Accessing Data Stored in MongoDB with Stratosphere

Optimizer Plan Visualization Tool

26 Jan 2014

Stratosphere’s hybrid approach combines MapReduce and MPP database techniques. One central part of this approach is to have a separation between the programming (API) and the way programs are executed (execution plans). The compiler/optimizer decides the details concerning caching or when to partition/broadcast with a holistic view of the program. The same program may actually be executed differently in different scenarios (input data of different sizes, different number of machines).

If you want to know how exactly the system executes your program, you can find it out in two ways:

  1. The browser-based webclient UI, which takes programs packaged into JARs and draws the execution plan as a visual data flow (check out the documentation for details).

  2. For programs using the Local- or [Remote Executor] (http://stratosphere.eu/docs/0.4/program_execution/remote_executor.html), you can get the optimizer plan using the method LocalExecutor.optimizerPlanAsJSON(plan). The resulting JSON string describes the execution strategies chosen by the optimizer. Naturally, you do not want to parse that yourself, especially for longer programs.

The builds 0.5-SNAPSHOT and later come with a tool that visualizes the JSON string. It is a standalone version of the webclient’s visualization, packed as an html document tools/planVisualizer.html.

If you open it in a browser (for example chromium-browser tools/planVisualizer.html) it shows a text area where you can paste the JSON string and it renders that string as a dataflow plan (assuming it was a valid JSON string and plan). The pictures below show how that looks for the included sample program that uses delta iterations to compute the connected components of a graph.

Optimizer Plan Visualization Tool

Stratosphere 0.4 Released

13 Jan 2014

We are pleased to announce that version 0.4 of the Stratosphere system has been released.

Our team has been working hard during the last few months to create an improved and stable Stratosphere version. The new version comes with many new features, usability and performance improvements in all levels, including a new Scala API for the concise specification of programs, a Pregel-like API, support for Yarn clusters, and major performance improvements. The system features now first-class support for iterative programs and thus covers traditional analytical use cases as well as data mining and graph processing use cases with great performance.

In the course of the transition from v0.2 to v0.4 of the system, we have changed pre-existing APIs based on valuable user feedback. This means that, in the interest of easier programming, we have broken backwards compatibility and existing jobs must be adapted, as described in the migration guide.

This article will guide you through the feature list of the new release.

Scala Programming Interface

The new Stratosphere version comes with a new programming API in Scala that supports very fluent and efficient programs that can be expressed with very few lines of code. The API uses Scala’s native type system (no special boxed data types) and supports grouping and joining on types beyond key/value pairs. We use code analysis and code generation to transform Scala’s data model to the Stratosphere runtime. Stratosphere Scala programs are optimized before execution by Stratosphere’s optimizer just like Stratosphere Java programs.

Learn more about the Scala API at the Scala Programming Guide

Iterations

Stratosphere v0.4 introduces deep support for iterative algorithms, required by a large class of advanced analysis algorithms. In contrast to most other systems, “looping over the data” is done inside the system’s runtime, rather than in the client. Individual iterations (supersteps) can be as fast as sub-second times. Loop-invariant data is automatically cached in memory.

We support a special form of iterations called “delta iterations” that selectively modify only some elements of intermediate solution in each iteration. These are applicable to a variety of applications, e.g., use cases of Apache Giraph. We have observed speedups of 70x when using delta iterations instead of regular iterations.

Read more about the new iteration feature in the documentation

Hadoop YARN Support

YARN (Yet Another Resource Negotiator) is the major new feature of the recently announced Hadoop 2.2. It allows to share existing clusters with different runtimes. So you can run MapReduce alongside Storm and others. With the 0.4 release, Stratosphere supports YARN. Follow our guide on how to start a Stratosphere YARN session.

Improved Scripting Language Meteor

The high-level language Meteor now natively serializes JSON trees for greater performance and offers additional operators and file formats. We greatly empowered the user to write crispier scripts by adding second-order functions, multi-output operators, and other syntactical sugar. For developers of Meteor packages, the API is much more comprehensive and allows to define custom data types that can be easily embedded in JSON trees through ad-hoc byte code generation.

Spargel: Pregel Inspired Graph Processing

Spargel is a vertex-centric API similar to the interface proposed in Google’s Pregel paper and implemented in Apache Giraph. Spargel is implemented in 500 lines of code (including comments) on top of Stratosphere’s delta iterations feature. This confirms the flexibility of Stratosphere’s architecture.

Web Frontend

Using the new web frontend, you can monitor the progress of Stratosphere jobs. For finished jobs, the frontend shows a breakdown of the execution times for each operator. The webclient also visualizes the execution strategies chosen by the optimizer.

Accumulators

Stratosphere’s accumulators allow program developers to compute simple statistics, such as counts, sums, min/max values, or histograms, as a side effect of the processing functions. An example application would be to count the total number of records/tuples processed by a function. Stratosphere will not launch additional tasks (reducers), but will compute the number “on the fly” as a side-product of the functions application to the data. The concept is similar to Hadoop’s counters, but supports more types of aggregation.

Refactored APIs

Based on valuable user feedback, we refactored the Java programming interface to make it more intuitive and easier to use. The basic concepts are still the same, however the naming of most interfaces changed and the structure of the code was adapted. When updating to the 0.4 release you will need to adapt your jobs and dependencies. A previous blog post has a guide to the necessary changes to adapt programs to Stratosphere 0.4.

Local Debugging

You can now test and debug Stratosphere jobs locally. The LocalExecutor allows to execute Stratosphere Jobs from IDE’s. The same code that runs on clusters also runs in a single JVM multi-threaded. The mode supports full debugging capabilities known from regular applications (placing breakpoints and stepping through the program’s functions). An advanced mode supports simulating fully distributed operation locally.

Miscellaneous

  • The configuration of Stratosphere has been changed to YAML
  • HBase support
  • JDBC Input format
  • Improved Windows Compatibility: Batch-files to start Stratosphere on Windows and all unit tests passing on Windows.
  • Stratosphere is available in Maven Central and Sonatype Snapshot Repository
  • Improved build system that supports different Hadoop versions using Maven profiles
  • Maven Archetypes for Stratosphere Jobs.
  • Stability and Usability improvements with many bug fixes.

Download and get started with Stratosphere v0.4

There are several options for getting started with Stratosphere.

Tell us what you think!

Are you using, or planning to use Stratosphere? Sign up in our mailing list and drop us a line.

Have you found a bug? Post an issue on GitHub.

Follow us on Twitter and GitHub to stay in touch with the latest news!

Stratosphere 0.4 Released

Stratosphere Version 0.4 Migration Guide

12 Jan 2014

This guide is intended to help users of previous Stratosphere versions to migrate their programs to the new API of v0.4.

Version 0.4-rc1, 0.4 and all newer versions have the new API. If you want to have the most recent version before the code change, please set the version to 0.4-alpha.3-SNAPSHOT. (Note that the 0.4-alpha versions are only available in the snapshot repository).

Maven Dependencies

Since we also reorganized the Maven project structure, existing programs need to update the Maven dependencies to stratosphere-java (and stratosphere-clients, for examples and executors).

The typical set of Maven dependencies for Stratosphere Java programs is:

       <groupId>eu.stratosphere</groupId>
-      <artifactId>pact-common</artifactId>
-      <version>0.4-SNAPSHOT</version>
+      <artifactId>stratosphere-java</artifactId>
+      <version>0.4</version>

-      <artifactId>pact-clients</artifactId>
-      <version>0.4-SNAPSHOT</version>
+      <artifactId>stratosphere-clients</artifactId>
+      <version>0.4</version>

Renamed classes

We renamed many of the most commonly used classes to make their names more intuitive:

Old Name (before 0.4) New Name (0.4 and after)
Contract Operator
MatchContract JoinOperator
[Map, Reduce, ...]Stub [Map, Reduce, ...]Function
MatchStub JoinFunction
Pact[Integer, Double, ...] IntValue, DoubleValue, ...
PactRecord Record
PlanAssembler Program
PlanAssemblerDescription ProgramDescription
RecordOutputFormat CsvOutputFormat

Package names have been adapted as well. For a complete overview of the renamings, have a look at issue #257 on GitHub.

We suggest for Eclipse user adjust the programs as follows: Delete all old Stratosphere imports, then rename the the classes (PactRecord to Record and so on). Finally, use the “Organize Imports” function (CTRL+SHIFT+O) to choose the right imports. The names should be unique so always pick the classes that are in the eu.stratosphere package.

Please contact us in the comments below, on the mailing list or on GitHub if you have any issues migrating to the latest Stratosphere release.

Stratosphere Version 0.4 Migration Guide

Stratosphere wins award at Humboldt Innovation Competition "Big Data: Research meets Startups"

13 Dec 2013

Stratosphere won the second place in the competition organized by Humboldt Innovation on "Big Data: Research meets Startups," where several research projects were evaluated by a panel of experts from the Berlin startup ecosystem. The award includes a monetary prize of 10,000 euros.

We are extremely excited about this award, as it further showcases the relevance of the Stratosphere platform and Big Data technology in general for the technology startup world.

Stratosphere wins award at Humboldt Innovation Competition "Big Data: Research meets Startups"

Paper "“All Roads Lead to Rome:” Optimistic Recovery for Distributed Iterative Data Processing" accepted at CIKM 2013

21 Oct 2013

Our paper ““All Roads Lead to Rome:” Optimistic Recovery for Distributed Iterative Data Processing” authored by Sebastian Schelter, Kostas Tzoumas, Stephan Ewen and Volker Markl has been accepted accepted at the ACM International Conference on Information and Knowledge Management (CIKM 2013) in San Francisco.

Abstract

Executing data-parallel iterative algorithms on large datasets is crucial for many advanced analytical applications in the fields of data mining and machine learning. Current systems for executing iterative tasks in large clusters typically achieve fault tolerance through rollback recovery. The principle behind this pessimistic approach is to periodically checkpoint the algorithm state. Upon failure, the system restores a consistent state from a previously written checkpoint and resumes execution from that point.

We propose an optimistic recovery mechanism using algorithmic compensations. Our method leverages the robust, self-correcting nature of a large class of fixpoint algorithms used in data mining and machine learning, which converge to the correct solution from various intermediate consistent states. In the case of a failure, we apply a user-defined compensate function that algorithmically creates such a consistent state, instead of rolling back to a previous checkpointed state. Our optimistic recovery does not checkpoint any state and hence achieves optimal failure-free performance with respect to the overhead necessary for guaranteeing fault tolerance. We illustrate the applicability of this approach for three wide classes of problems. Furthermore, we show how to implement the proposed optimistic recovery mechanism in a data flow system. Similar to the Combine operator in MapReduce, our proposed functionality is optional and can be applied to increase performance without changing the semantics of programs. In an experimental evaluation on large datasets, we show that our proposed approach provides optimal failure-free performance. In the absence of failures our optimistic scheme is able to outperform a pessimistic approach by a factor of two to five. In presence of failures, our approach provides fast recovery and outperforms pessimistic approaches in the majority of cases.

Download the paper [PDF]

Paper "“All Roads Lead to Rome:” Optimistic Recovery for Distributed Iterative Data Processing" accepted at CIKM 2013
  • Previous
  • Page: 1 of 2