GeoMesa Kafka Quick Start ========================= This tutorial will show you how to: 1. Write custom Java code to produce and consume messages in `Apache Kafka `__ using GeoMesa. 2. Query the data and replay the messages in a Kafka topic to achieve an earlier state. 3. Visualize the changes being made in Kafka with GeoServer. Background ---------- `Apache Kafka `__ is "publish-subscribe messaging rethought as a distributed commit log." In the context of GeoMesa, Kafka is a useful tool for working with streams of geospatial data. Interaction with Kafka in GeoMesa occurs with the KafkaDataStore which implements the GeoTools `DataStore `__ interface. Prerequisites ------------- - basic knowledge of `GeoTools `__, `GeoServer `__, and Kafka, - an instance of Kafka 0.8.2.x, 0.9.0.1, or 0.10.0.1 with (an) appropriate Zookeeper instance(s), - an instance of GeoServer version |geoserver_version| with the GeoMesa Kafka plugin installed, - `Java JDK 8 `__, - `Apache Maven `__ |maven_version|, - a `git `__ client. In order to install the GeoMesa Kafka GeoServer plugin, follow the instructions here: :ref:`install_kafka_geoserver`. Ensure your Kafka and Zookeeper instances are running. You can use Kafka's `quickstart `__ to get Kafka/Zookeeper instances up and running quickly. There are now three versions of Kafka that are supported with GeoMesa, and there are separate tutorials for each build. Download and Build the Tutorial ------------------------------- Pick a reasonable directory on your machine, and run: .. code-block:: bash $ git clone https://github.com/geomesa/geomesa-tutorials.git $ cd geomesa-tutorials To build, run For Kafka 0.8.2.1 .. code-block:: bash $ mvn clean install -pl geomesa-quickstart-kafka/geomesa-quickstart-kafka-08 For Kafka 0.9.0.1 .. code-block:: bash $ mvn clean install -pl geomesa-quickstart-kafka/geomesa-quickstart-kafka-09 For Kafka 0.10.0.1 .. code-block:: bash $ mvn clean install -pl geomesa-quickstart-kafka/geomesa-quickstart-kafka-10 .. note:: Ensure that the version of Kafka and Zookeeper in the root ``pom.xml`` match your environment. Run the Code ------------ On the command-line, run: For Kafka 0.8.2.1 .. code-block:: bash $ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-08/target/geomesa-quickstart-kafka-08-$VERSION.jar com.example.geomesa.kafka08.KafkaQuickStart \ > -brokers -zookeepers For Kafka 0.9.0.1 .. code-block:: bash $ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-09/target/geomesa-quickstart-kafka-09-$VERSION.jar com.example.geomesa.kafka09.KafkaQuickStart \ > -brokers -zookeepers For Kafka 0.10.0.1 .. code-block:: bash $ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-10/target/geomesa-quickstart-kafka-10-$VERSION.jar com.example.geomesa.kafka10.KafkaQuickStart \ > -brokers -zookeepers where you provide the values for the following arguments: - ```` your Kafka broker instances, comma separated. For a local install, this would be ``localhost:9092``. - ```` your Zookeeper nodes, comma separated. For a local install, this would be ``localhost:2181``. The program will create some metadata in Zookeeper and an associated topic in your Kafka instance, and pause execution to let you add the newly created ``KafkaDataStore`` to GeoServer. Once GeoServer has been configured, we'll pick back up with the paused program. Optional command-line arguments for ``KafkaQuickStart`` are: - ``-zkPath ``: used for specifying the Zookeeper path for storing GeoMesa metadata. Defaults to "/geomesa/ds/kafka" and ordinarily does not need to be changed - ``-automated``: omits the pause in execution for configuring GeoServer. The class may also be run using Maven via the ``live-test`` profile. .. code-block:: bash $ mvn -Plive-test exec:exec -Dbrokers= -Dzookeepers= Register the Store in GeoServer ------------------------------- Log into GeoServer using your credentials. Click “Stores” in the left-hand gutter and “Add new Store”. If you do not see the Kafka Data Store listed under Vector Data Sources, ensure the plugin and dependencies are in the right directory and restart GeoServer. Select the ``Kafka (GeoMesa)`` vector data source and enter the following parameters: - Basic Store Info - ``workspace`` this is dependent upon your GeoServer installation - ``data source name`` pick a sensible name, such as, ``geomesa_kafka_quickstart`` - ``description`` pick a sensible description, such as ``GeoMesa Kafka quick start`` - Connection Parameters - ``brokers`` your Kafka broker instances, comma separated. Use the same ones you used on the command line. - ``zookeepers`` your Zookeeper nodes, comma separated. Use the same ones you used on the command line. Leave all other fields empty or with the default value. Click "Save" and GeoServer will search your Kafka instance for any GeoMesa-managed feature types. Publish the Layer ----------------- GeoServer should find the ``KafkaQuickStart`` feature type in the data store and redirect you to the "New Layer" page, presenting the feature type as a layer that can be published. Click on the "Publish" link. You will be taken to the "Edit Layer" page. .. warning:: If you have not yet run the quick start code as described in **Run the Code** above, the feature type will not have been registered and you will not get a "New Layer" page after saving the store. In this case, run the code as described above, click on "Layers" in the left-hand gutter, click on "Add a new resource", and select your data store in the pulldown next to "Add layer from". The link to publish the ``KafkaQuickStart`` feature should appear. You can leave most fields as default. In the Data pane, you'll need to enter values for the bounding boxes. In this case, you can click on the links to compute these values from the data. Click "Save". View the layer -------------- Click on the "Layer Preview" link in the left-hand gutter. If you don't see the quick-start layer on the first page of results, enter the name of the layer you just created into the search box, and press . Once you see your layer, click on the "OpenLayers" link, which will open a new tab. At this point, there are no messages in Kafka so nothing will be shown. Produce Some Data ----------------- Resume the program's execution by inputting in your terminal now that the ``KafkaDataStore`` is registered in GeoServer. The program will create two ``SimpleFeature``\ s and then write a stream of updates to the two ``SimpleFeature``\ s over the course of about a minute. You can refresh the GeoServer layer preview repeatedly to visualize the updates being written to Kafka. What's Happening in GeoServer ----------------------------- The layer preview of GeoServer uses the ``LiveKafkaConsumerFeatureSource`` to show a real time view of the current state of the data stream. Two ``SimpleFeature``\ s are being updated over time in Kafka which is reflected in the GeoServer display. As you refresh the page, you should see two ``SimpleFeature``\ s that start on the left side gradually move to the right side while crossing each other in the middle. As the two ``SimpleFeature``\ s get updated, the older ``SimpleFeature``\ s disappear from the display. .. figure:: _static/geomesa-quickstart-kafka/layer-preview.png :alt: GeoServer view GeoServer view Consumers Explained ------------------- GeoMesa wraps Kafka consumers in two different ways; as a ``LiveKafkaConsumerFeatureSource`` or a ``ReplayKafkaConsumerFeatureSource``. Both of these classes implement GeoTools' `FeatureSource `__ API. The ``LiveKafkaConsumerFeatureSource`` will consume messages as they are being produced and maintain the real time state of SimpleFeatures pertaining to a Kafka topic. The ``ReplayKafkaConsumerFeatureSource`` allows users to specify any range of time in order to obtain the state of SimpleFeatures from any previous moment. View the Consumer Output ------------------------ The program will construct the live and replay consumers and log SimpleFeatures to the console after all the messages are sent to Kafka and therefore after all the updates are made. The live consumer will log the state of the two SimpleFeatures after all updates are finished. The replay consumer will log the state of the two SimpleFeatures five seconds earlier than the last update. The replay consumer will create a new ``SimpleFeatureType`` with an additional attribute ``KafkaLogTime``. By preserving the ``KafkaLogTime`` as an attribute, we can create the state of SimpleFeatures at time *x* by querying for when ``KafkaLogTime`` equals *x*. .. code-block:: bash Consuming with the live consumer... 2 features were written to Kafka Here are the two SimpleFeatures that were obtained with the live consumer: fid:1 | name:James | age:20 | dtg:Mon Dec 14 19:08:23 EST 2015 | geom:POINT (180 90) fid:2 | name:John | age:62 | dtg:Fri Oct 02 09:56:49 EDT 2015 | geom:POINT (180 -90) Consuming with the replay consumer... 2 features were written to Kafka Here are the two SimpleFeatures that were obtained with the replay consumer: fid:2 | name:John | age:52 | dtg:Thu May 21 21:27:19 EDT 2015 | geom:POINT (132 -66) | KafkaLogTime:Tue Jun 09 13:33:47 EDT 2015 fid:1 | name:James | age:59 | dtg:Sat Jan 24 06:26:44 EST 2015 | geom:POINT (132 66) | KafkaLogTime:Tue Jun 09 13:33:47 EDT 2015 For a deeper understanding of what's going on, we recommend exploring the source code. (Optional) Listening for FeatureEvents -------------------------------------- The GeoTools API also includes a mechanism to fire off a `FeatureEvent `__ each time there is an event (typically when the data are changed) in a ``DataStore``. A client may implement a `FeatureListener `__, which has a single method called ``changed()`` that is invoked as each ``FeatureEvent`` is fired. The code in ``KafkaListener`` implements a simple ``FeatureListener`` that prints the messages received. Open up a second terminal window and run (with ``$KAFKA_VERSION`` set to "08", "09", or "10" as appropriate): .. code-block:: bash $ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-$KAFKA_VERSION/target/geomesa-quickstart-kafka-$KAFKA_VERSION-${geomesa.version}.jar \ > com.example.geomesa.kafka$KAFKA_VERSION.KafkaListener \ > -brokers -zookeepers and use the same settings for ```` and ````. Then in the first terminal window, re-run the ``KafkaQuickStart`` code as before. The ``KafkaListener`` terminal should produce messages like the following: :: Received FeatureEvent of Type: CHANGED fid:1 | name:Hannah | age:53 | dtg:Sun Dec 13 11:04:40 EST 2015 | geom:POINT (-66 -33) Received FeatureEvent of Type: CHANGED fid:2 | name:Claire | age:77 | dtg:Thu Feb 26 02:06:41 EST 2015 | geom:POINT (-66 33) The ``KafkaListener`` code will run until interrupted. The portion of ``KafkaListener`` that creates and implements the ``FeatureListener`` is: .. code-block:: java // the live consumer must be created before the producer writes features // in order to read streaming data. // i.e. the live consumer will only read data written after its instantiation SimpleFeatureSource consumerFS = consumerDS.getFeatureSource(sftName); consumerFS.addFeatureListener(new FeatureListener() { @Override public void changed(FeatureEvent featureEvent) { System.out.println("Received FeatureEvent of Type: " + featureEvent.getType()); if (featureEvent.getType() == FeatureEvent.Type.CHANGED && featureEvent instanceof KafkaFeatureEvent) { printFeature(((KafkaFeatureEvent) featureEvent).feature()); } if (featureEvent.getType() == FeatureEvent.Type.REMOVED) { System.out.println("Received Delete for filter: " + featureEvent.getFilter()); } } }); Additionally, the ``KafkaQuickStart`` class run above can generate a 'clear' control message at the end of the run if you specify "-Dclear=true" on the commandline. This will generate a Feature removed ``FeatureEvent`` with a ``Filter.INCLUDE``. .. code-block:: bash $ java -Dclear=true -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-$KAFKA_VERSION/target/geomesa-quickstart-kafka-$KAFKA_VERSION-${geomesa.version}.jar \ com.example.geomesa.kafka$KAFKA_VERSION.KafkaQuickStart \ -brokers -zookeepers KafkaDataStore Load Test ------------------------ For those interested in load testing the KafkaDataStore, there is a simple utility with constructs any number of SimpleFeatures, rolls a random latitude, and then have them step left or right. .. code-block:: bash $ java -cp geomesa-quickstart-kafka/geomesa-quickstart-kafka-$KAFKA_VERSION/target/geomesa-quickstart-kafka-$KAFKA_VERSION-${geomesa.version}.jar \ com.example.geomesa.kafka$KAFKA_VERSION.KafkaLoadTester \ -brokers -zookeepers -count The 'count' parameter is optional. Without it, the tool defaults to 1000 SimpleFeatures. Conclusion ---------- Given a stream of geospatial data, GeoMesa's integration with Kafka enables users to maintain a real time state of SimpleFeatures or retrieve any arbitrary state preserved in history. One can additionally process and analyze streams of data by integrating a data processing system like `Storm `__ or `Samza `__. See the :doc:`./geomesa-quickstart-storm` tutorial for more information on using Storm with GeoMesa.