17.13. Kafka Streams Integration

The Kafka Data Store can integrate with Kafka Streams applications as both a source and a destination.

To read from or write to a GeoMesa Kafka topic, use the org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder class. This class wraps a regular Kafka StreamsBuilder with convenience methods for mapping feature types to topics and loading serializers.

17.13.1. Streams Data Model

Messages are based on SimpleFeatures and modelled as a simple case class:

case class GeoMesaMessage(action: MessageAction, attributes: Seq[AnyRef], userData: Map[String, String])

object MessageAction extends Enumeration {
  type MessageAction = Value
  val Upsert, Delete = Value
}

17.13.2. Read Example

The following is adapted from the word-count example in Kafka Streams:

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder

// these are the parameters normally passed to DataStoreFinder
val params = Map[String, String](
  "kafka.brokers"    -> "localhost:9092",
  "kafka.zookeepers" -> "localhost:2181"
)

val builder = GeoMesaStreamsBuilder(params)

// read in the feature type and turn the attributes into a space-separated string
val textLines: KStream[String, String] =
  builder.stream("my-feature-type")
      .map((k, v) => (k, v.attributes.map(_.toString.replaceAll(" ", "_")).mkString(" ")))

val wordCounts: KTable[String, Long] =
  textLines
      .flatMapValues(textLine => textLine.split(" +"))
      .groupBy((_, word) => word)
      .count()(Materialized.as("counts-store"))

wordCounts.toStream.to("word-count")

val topology = builder.build()
// construct the streams app as normal

17.13.3. Write Example

The following shows how to persist data back to a GeoMesa topic:

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.locationtech.geomesa.kafka.streams.GeoMesaMessage
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder

// these are the parameters normally passed to DataStoreFinder
val params = Map[String, String](
  "kafka.brokers"    -> "localhost:9092",
  "kafka.zookeepers" -> "localhost:2181"
)

val builder = GeoMesaStreamsBuilder(params)

// use the wrapped native streams builder to create an input based on csv records
val input: KStream[String, String] =
  builder.wrapped.stream[String, String]("input-csv-topic")

// the columns in the csv need to map to the attributes in the feature type
val output: KStream[String, GeoMesaMessage] =
  input.mapValues(lines => GeoMesaMessage.upsert(lines.split(",")))

// write the output to GeoMesa - the feature type must already exist
builder.to("my-feature-type", output)

val topology = builder.build()
// construct the streams app as normal

17.13.4. Joins and Topic Partitioning

Warning

Kafka streams operations that require co-partitioning will usually require a re-partition step when used with data from GeoMesa 3.4.x or earlier.

For feature types created prior to version 3.5.0, GeoMesa will use a custom partitioner for data written to Kafka. When using Kafka Streams operations that require co-partitioning, such as joins, the GeoMesa topic will need to be re-partitioned.

Existing feature types can be updated to use the default Kafka partitioner, which will allow for joins without re-partitioning. However, note that updates for a given feature may go to the wrong partition, which may result in older data being returned when GeoMesa is queried. This will generally resolve itself over time as Kafka log retention policies delete the older data (unless Kafka log compaction is enabled for the topic).

To update an existing feature type, add the user data entry geomesa.kafka.partitioning=default. Through the GeoMesa command-line tools, the following command will disable custom partitioning:

geomesa-kafka update-schema --add-user-data "geomesa.kafka.partitioning:default"