1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.index
10 
11 import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
12 import org.geotools.api.filter.Filter
13 import org.locationtech.geomesa.index.planning.LocalQueryRunner
14 import org.locationtech.geomesa.index.planning.QueryInterceptor.QueryInterceptorFactory
15 import org.locationtech.geomesa.kafka.data.KafkaDataStore
16 import org.locationtech.geomesa.utils.collection.CloseableIterator
17 
18 class KafkaQueryRunner(ds: KafkaDataStore, caches: String => KafkaFeatureCache)
19     extends LocalQueryRunner(Option(ds.config.authProvider)) {
20 
21   override protected def name: String = "Kafka"
22 
23   override protected val interceptors: QueryInterceptorFactory = ds.interceptors
24 
25   override protected def features(sft: SimpleFeatureType, filter: Option[Filter]): CloseableIterator[SimpleFeature] =
26     CloseableIterator(caches(sft.getTypeName).query(filter.getOrElse(Filter.INCLUDE)))
27 }
Line Stmt Id Pos Tree Symbol Tests Code
21 2496 1085 - 1092 Literal <nosymbol> "Kafka"
23 2497 1159 - 1174 Select org.locationtech.geomesa.index.geotools.MetadataBackedDataStore.interceptors KafkaQueryRunner.this.ds.interceptors
26 2498 1323 - 1338 Apply org.geotools.api.feature.simple.SimpleFeatureType.getTypeName sft.getTypeName()
26 2499 1363 - 1377 Select org.geotools.api.filter.Filter.INCLUDE org.geotools.api.filter.Filter.INCLUDE
26 2500 1346 - 1378 Apply scala.Option.getOrElse filter.getOrElse[org.geotools.api.filter.Filter](org.geotools.api.filter.Filter.INCLUDE)
26 2501 1316 - 1379 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.query KafkaQueryRunner.this.caches.apply(sft.getTypeName()).query(filter.getOrElse[org.geotools.api.filter.Filter](org.geotools.api.filter.Filter.INCLUDE))
26 2502 1298 - 1298 TypeApply org.locationtech.geomesa.utils.collection.CloseableIterator.apply$default$2 org.locationtech.geomesa.utils.collection.CloseableIterator.apply$default$2[Nothing]
26 2503 1298 - 1380 Apply org.locationtech.geomesa.utils.collection.CloseableIterator.apply org.locationtech.geomesa.utils.collection.CloseableIterator.apply[org.geotools.api.feature.simple.SimpleFeature](KafkaQueryRunner.this.caches.apply(sft.getTypeName()).query(filter.getOrElse[org.geotools.api.filter.Filter](org.geotools.api.filter.Filter.INCLUDE)), org.locationtech.geomesa.utils.collection.CloseableIterator.apply$default$2[Nothing])