GeoMesa Kafka Quick Start

This tutorial will show you how to:

  1. Write custom Java code to produce and consume messages in Apache Kafka using GeoMesa.
  2. Query the data and replay the messages in a Kafka topic to achieve an earlier state.
  3. Visualize the changes being made in Kafka with GeoServer.

Background

Apache Kafka is “publish-subscribe messaging rethought as a distributed commit log.”

In the context of GeoMesa, Kafka is a useful tool for working with streams of geospatial data. Interaction with Kafka in GeoMesa occurs with the KafkaDataStore which implements the GeoTools DataStore interface.

Prerequisites

  • basic knowledge of GeoTools, GeoServer, and Kafka,
  • an instance of Kafka 0.8.2.x, 0.9.0.1, or 0.10.0.1 with (an) appropriate Zookeeper instance(s),
  • an instance of GeoServer version 2.9.1 with the GeoMesa Kafka plugin installed,
  • Java JDK 8,
  • Apache Maven 3.2.2 or better,
  • a git client.

In order to install the GeoMesa Kafka GeoServer plugin, follow the instructions here: Installing GeoMesa Kafka in GeoServer.

Ensure your Kafka and Zookeeper instances are running. You can use Kafka’s quickstart to get Kafka/Zookeeper instances up and running quickly.

There are now three versions of Kafka that are supported with GeoMesa, and there are separate tutorials for each build.

Download and Build the Tutorial

Pick a reasonable directory on your machine, and run:

$ git clone https://github.com/geomesa/geomesa-tutorials.git
$ cd geomesa-tutorials

To build, run

For Kafka 0.8.2.1

$ mvn clean install -pl geomesa-quickstart-kafka/geomesa-quickstart-kafka-08

For Kafka 0.9.0.1

$ mvn clean install -pl geomesa-quickstart-kafka/geomesa-quickstart-kafka-09

For Kafka 0.10.0.1

$ mvn clean install -pl geomesa-quickstart-kafka/geomesa-quickstart-kafka-10

Note

Ensure that the version of Kafka and Zookeeper in the root pom.xml match your environment.

Run the Code

On the command-line, run:

For Kafka 0.8.2.1

$ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-08/target/geomesa-quickstart-kafka-08-$VERSION.jar com.example.geomesa.kafka08.KafkaQuickStart \
> -brokers <brokers> -zookeepers <zookeepers>

For Kafka 0.9.0.1

$ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-09/target/geomesa-quickstart-kafka-09-$VERSION.jar com.example.geomesa.kafka09.KafkaQuickStart \
> -brokers <brokers> -zookeepers <zookeepers>

For Kafka 0.10.0.1

$ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-10/target/geomesa-quickstart-kafka-10-$VERSION.jar com.example.geomesa.kafka10.KafkaQuickStart \
> -brokers <brokers> -zookeepers <zookeepers>

where you provide the values for the following arguments:

  • <brokers> your Kafka broker instances, comma separated. For a local install, this would be localhost:9092.
  • <zookeepers> your Zookeeper nodes, comma separated. For a local install, this would be localhost:2181.

The program will create some metadata in Zookeeper and an associated topic in your Kafka instance, and pause execution to let you add the newly created KafkaDataStore to GeoServer. Once GeoServer has been configured, we’ll pick back up with the paused program.

Optional command-line arguments for KafkaQuickStart are:

  • -zkPath <zkpath>: used for specifying the Zookeeper path for storing GeoMesa metadata. Defaults to “/geomesa/ds/kafka” and ordinarily does not need to be changed
  • -automated: omits the pause in execution for configuring GeoServer.

The class may also be run using Maven via the live-test profile.

$ mvn -Plive-test exec:exec -Dbrokers=<brokers> -Dzookeepers=<zookeepers>

Register the Store in GeoServer

Log into GeoServer using your credentials. Click “Stores” in the left-hand gutter and “Add new Store”. If you do not see the Kafka Data Store listed under Vector Data Sources, ensure the plugin and dependencies are in the right directory and restart GeoServer.

Select the Kafka (GeoMesa) vector data source and enter the following parameters:

  • Basic Store Info
  • workspace this is dependent upon your GeoServer installation
  • data source name pick a sensible name, such as, geomesa_kafka_quickstart
  • description pick a sensible description, such as GeoMesa Kafka quick start
  • Connection Parameters
  • brokers your Kafka broker instances, comma separated. Use the same ones you used on the command line.
  • zookeepers your Zookeeper nodes, comma separated. Use the same ones you used on the command line.

Leave all other fields empty or with the default value.

Click “Save” and GeoServer will search your Kafka instance for any GeoMesa-managed feature types.

Publish the Layer

GeoServer should find the KafkaQuickStart feature type in the data store and redirect you to the “New Layer” page, presenting the feature type as a layer that can be published. Click on the “Publish” link. You will be taken to the “Edit Layer” page.

Warning

If you have not yet run the quick start code as described in Run the Code above, the feature type will not have been registered and you will not get a “New Layer” page after saving the store. In this case, run the code as described above, click on “Layers” in the left-hand gutter, click on “Add a new resource”, and select your data store in the pulldown next to “Add layer from”. The link to publish the KafkaQuickStart feature should appear.

You can leave most fields as default. In the Data pane, you’ll need to enter values for the bounding boxes. In this case, you can click on the links to compute these values from the data. Click “Save”.

View the layer

Click on the “Layer Preview” link in the left-hand gutter. If you don’t see the quick-start layer on the first page of results, enter the name of the layer you just created into the search box, and press <Enter>.

Once you see your layer, click on the “OpenLayers” link, which will open a new tab. At this point, there are no messages in Kafka so nothing will be shown.

Produce Some Data

Resume the program’s execution by inputting <Enter> in your terminal now that the KafkaDataStore is registered in GeoServer. The program will create two SimpleFeatures and then write a stream of updates to the two SimpleFeatures over the course of about a minute.

You can refresh the GeoServer layer preview repeatedly to visualize the updates being written to Kafka.

What’s Happening in GeoServer

The layer preview of GeoServer uses the LiveKafkaConsumerFeatureSource to show a real time view of the current state of the data stream. Two SimpleFeatures are being updated over time in Kafka which is reflected in the GeoServer display.

As you refresh the page, you should see two SimpleFeatures that start on the left side gradually move to the right side while crossing each other in the middle. As the two SimpleFeatures get updated, the older SimpleFeatures disappear from the display.

GeoServer view

GeoServer view

Consumers Explained

GeoMesa wraps Kafka consumers in two different ways; as a LiveKafkaConsumerFeatureSource or a ReplayKafkaConsumerFeatureSource. Both of these classes implement GeoTools’ FeatureSource API.

The LiveKafkaConsumerFeatureSource will consume messages as they are being produced and maintain the real time state of SimpleFeatures pertaining to a Kafka topic.

The ReplayKafkaConsumerFeatureSource allows users to specify any range of time in order to obtain the state of SimpleFeatures from any previous moment.

View the Consumer Output

The program will construct the live and replay consumers and log SimpleFeatures to the console after all the messages are sent to Kafka and therefore after all the updates are made.

The live consumer will log the state of the two SimpleFeatures after all updates are finished. The replay consumer will log the state of the two SimpleFeatures five seconds earlier than the last update. The replay consumer will create a new SimpleFeatureType with an additional attribute KafkaLogTime. By preserving the KafkaLogTime as an attribute, we can create the state of SimpleFeatures at time x by querying for when KafkaLogTime equals x.

Consuming with the live consumer...
2 features were written to Kafka
Here are the two SimpleFeatures that were obtained with the live consumer:
fid:1 | name:James | age:20 | dtg:Mon Dec 14 19:08:23 EST 2015 | geom:POINT (180 90)
fid:2 | name:John | age:62 | dtg:Fri Oct 02 09:56:49 EDT 2015 | geom:POINT (180 -90)

Consuming with the replay consumer...
2 features were written to Kafka
Here are the two SimpleFeatures that were obtained with the replay consumer:
fid:2 | name:John | age:52 | dtg:Thu May 21 21:27:19 EDT 2015 | geom:POINT (132 -66) | KafkaLogTime:Tue Jun 09 13:33:47 EDT 2015
fid:1 | name:James | age:59 | dtg:Sat Jan 24 06:26:44 EST 2015 | geom:POINT (132 66) | KafkaLogTime:Tue Jun 09 13:33:47 EDT 2015

For a deeper understanding of what’s going on, we recommend exploring the source code.

(Optional) Listening for FeatureEvents

The GeoTools API also includes a mechanism to fire off a FeatureEvent each time there is an event (typically when the data are changed) in a DataStore. A client may implement a FeatureListener, which has a single method called changed() that is invoked as each FeatureEvent is fired.

The code in KafkaListener implements a simple FeatureListener that prints the messages received. Open up a second terminal window and run (with $KAFKA_VERSION set to “08”, “09”, or “10” as appropriate):

$ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-$KAFKA_VERSION/target/geomesa-quickstart-kafka-$KAFKA_VERSION-${geomesa.version}.jar \
> com.example.geomesa.kafka$KAFKA_VERSION.KafkaListener \
> -brokers <brokers> -zookeepers <zookeepers>

and use the same settings for <brokers> and <zookeepers>. Then in the first terminal window, re-run the KafkaQuickStart code as before. The KafkaListener terminal should produce messages like the following:

Received FeatureEvent of Type: CHANGED
fid:1 | name:Hannah | age:53 | dtg:Sun Dec 13 11:04:40 EST 2015 | geom:POINT (-66 -33)
Received FeatureEvent of Type: CHANGED
fid:2 | name:Claire | age:77 | dtg:Thu Feb 26 02:06:41 EST 2015 | geom:POINT (-66 33)

The KafkaListener code will run until interrupted.

The portion of KafkaListener that creates and implements the FeatureListener is:

// the live consumer must be created before the producer writes features
// in order to read streaming data.
// i.e. the live consumer will only read data written after its instantiation
SimpleFeatureSource consumerFS = consumerDS.getFeatureSource(sftName);

consumerFS.addFeatureListener(new FeatureListener() {
    @Override
    public void changed(FeatureEvent featureEvent) {
        System.out.println("Received FeatureEvent of Type: " + featureEvent.getType());

        if (featureEvent.getType() == FeatureEvent.Type.CHANGED &&
                featureEvent instanceof KafkaFeatureEvent) {
            printFeature(((KafkaFeatureEvent) featureEvent).feature());
        }

        if (featureEvent.getType() == FeatureEvent.Type.REMOVED) {
            System.out.println("Received Delete for filter: " + featureEvent.getFilter());
        }
    }
});

Additionally, the KafkaQuickStart class run above can generate a ‘clear’ control message at the end of the run if you specify “-Dclear=true” on the commandline. This will generate a Feature removed FeatureEvent with a Filter.INCLUDE.

$ java -Dclear=true -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-$KAFKA_VERSION/target/geomesa-quickstart-kafka-$KAFKA_VERSION-${geomesa.version}.jar \
  com.example.geomesa.kafka$KAFKA_VERSION.KafkaQuickStart \
  -brokers <brokers> -zookeepers <zookeepers>

KafkaDataStore Load Test

For those interested in load testing the KafkaDataStore, there is a simple utility with constructs any number of SimpleFeatures, rolls a random latitude, and then have them step left or right.

$ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-$KAFKA_VERSION/target/geomesa-quickstart-kafka-$KAFKA_VERSION-${geomesa.version}.jar \
  com.example.geomesa.kafka$KAFKA_VERSION.KafkaLoadTester \
  -brokers <brokers> -zookeepers <zookeepers> -count <count>

The ‘count’ parameter is optional. Without it, the tool defaults to 1000 SimpleFeatures.

Conclusion

Given a stream of geospatial data, GeoMesa’s integration with Kafka enables users to maintain a real time state of SimpleFeatures or retrieve any arbitrary state preserved in history. One can additionally process and analyze streams of data by integrating a data processing system like Storm or Samza. See the GeoMesa Storm Quick Start tutorial for more information on using Storm with GeoMesa.