18.8. Data Management¶
18.8.1. Kafka Topic Name¶
Each SimpleFeatureType (or schema) will be written to a unique Kafka topic. By default, the topic will be
named based on the
kafka.zk.path data store parameter and the SimpleFeatureType name, by appending
the two together and replacing any
/ characters with
-. For example, with the default zookeeper path
geomesa/ds/kafka), a SimpleFeatureType name of ‘foo’ would result in the topic
If desired, the topic name can be set to an arbitrary value by setting the user data key
SimpleFeatureType sft = ....; sft.getUserData().put("geomesa.kafka.topic", "myTopicName");
For more information on how to set schema options, see Setting Schema Options.
18.8.2. Kafka Topic Configuration¶
The Kafka topic for a given SimpleFeatureType will be created when calling
createSchema (if it doesn’t already
exist). GeoMesa exposes a few configuration options through data store parameters. Additional options can
be configured by setting the user data key
kafka.topic.config before calling
SimpleFeatureType sft = ....; sft.getUserData().put("kafka.topic.config", "cleanup.policy=compact\nretention.ms=86400000");
The value should be in standard Java properties format. For a list of available configurations, refer to the Kafka documentation. For more information on how to set schema options, see Setting Schema Options.
Parallelism in Kafka is achieved through the use of multiple topic partitions. Each partition can only be read
by a single Kafka consumer. The number of consumers can be controlled through the
store parameter; however, this will have no effect if there is only a single topic partition. To create more than
one partition, use the
kafka.topic.partitions data store parameter.
Replication in Kafka ensures that data is not lost. To enable replication, use the
data store parameter.
18.8.3. Kafka Topic Compaction¶
Kafka has various options for preventing data from growing unbounded. The simplest is to set a size or time-based retention policy. This will cause older messages to be deleted when the topic reaches a certain threshold.
Starting with GeoMesa 2.1.0, the Kafka data store supports Kafka log compaction. This allows for the topic size to be managed, while preserving the latest state for each feature. When combined with Initial Load (Replay), the persistent state of a system can be maintained through restarts and down-time. Note that when using log compaction, it is important to send explicit deletes for each feature, otherwise the feature will never be compacted out from the log, and the log size will start to grow unbounded.
If upgrading from a version of GeoMesa prior to 2.1.0, the topic should be run for a while using a size or time-based retention policy before enabling compaction, as messages written with older versions of GeoMesa will never be compacted out.
18.8.4. Integration with Other Systems¶
The Kafka data store is easy to integrate with by consuming the Kafka topic. The messages are a change log of
updates. Message keys consist of the simple feature ID, as UTF-8 bytes. Message bodies are serialized simple
features, or null to indicate deletion. The internal serialization version is set as a message header under the
"v", when using Kafka 0.11.x or newer.
By default, message bodies are serialized with a custom Kryo serializer. For Java/Scala clients, the
org.locationtech.geomesa.features.kryo.KryoFeatureSerializer class may be used to decode messages, available
geomesa-feature-kryo module through Maven. Alternatively, producers can be configured to send
Avro-encoded messages through the
kafka.serialization.type data store parameter. Avro libraries exist in many
languages, and Avro messages follow a defined schema that allows for cross-platform parsing.
If you are using the Confluent platform to manage Kafka, please see Confluent Integration.