11.5. Data Producers

A GeoMesa Kafka data store in producer mode takes features and persists them to Kafka topics. First, create the data store. For example:

String brokers = ...
String zookeepers = ...
String zkPath = ...

// build parameters map
Map<String, Serializable> params = new HashMap<>();
params.put("brokers", brokers);
params.put("zookeepers", zookeepers);
params.put("isProducer", Boolean.TRUE);

// optional
params.put("zkPath", zkPath);

// create the data store
KafkaDataStoreFactory factory = new KafkaDataStoreFactory();
DataStore producerDs = factory.createDataStore(params);

Next, create the schema. Each data store can have one or many schemas. For example:

SimpleFeatureType sft = ...
SimpleFeatureType streamingSFT = KafkaDataStoreHelper.createStreamingSFT(sft, zkPath);
producerDs.createSchema(streamingSFT);

The call to KafkaDataStoreHelper.createSchema creates a copy of the sft with the required hint added. In this case the hint is the name of the Kafka topic. The zkPath parameter is uses to make the Kafka topic name unique to the zkPath used by the KafkaDataStore so that the same SimpleFeatureType can be used by multiple KafkaDataStores where each data store has a different zkPath. Specifically, the resulting Kafka topic’s name will be zkPath-sftName where the forward slashes in the zkPath are replaced by -. e.g. creating a schema with a zkPath of /geomesa/ds/kafka with an sft called example_sft will create a Kafka topic called geomesa-ds-kafka-example_sft.

The createSchema method will throw an exception if the given SimpleFeatureType does not contain the required hint, i.e., if it was not created by the KafkaDataStoreHelper.

Now, you can create or update simple features:

// the name of the simple feature type -  will be the same as sft.getTypeName();
String typeName = streamingSFT.getTypeName();

FeatureWriter<SimpleFeatureType, SimpleFeature> fw =
        producerDs.getFeatureWriter(typeName, null, Transaction.AUTO_COMMIT);
SimpleFeature sf = fw.next();
// set properties on sf
fw.write();

Delete simple features:

SimpleFeatureStore producerStore = (SimpleFeatureStore) producerDs.getFeatureSource(typeName);
FilterFactory2 ff = CommonFactoryFinder.getFilterFactory2();

String id = ...
producerStore.removeFeatures(ff.id(ff.featureId(id)));

And, clear (delete all) features:

producerStore.removeFeatures(Filter.INCLUDE);

Each operation that creates, modifies, deletes, or clears simple features results in a message being sent to the Kafka topic.