1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.tools.`export`
10 
11 import com.beust.jcommander.{ParameterException, Parameters}
12 import org.geotools.api.data.{FeatureEvent, FeatureListener, Query}
13 import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
14 import org.geotools.api.filter.Filter
15 import org.locationtech.geomesa.features.TransformSimpleFeature
16 import org.locationtech.geomesa.features.exporters.FeatureExporter
17 import org.locationtech.geomesa.kafka.data.KafkaDataStore
18 import org.locationtech.geomesa.kafka.tools.ConsumerDataStoreParams
19 import org.locationtech.geomesa.kafka.tools.KafkaDataStoreCommand.KafkaDistributedCommand
20 import org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand._
21 import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureChanged
22 import org.locationtech.geomesa.tools.export.ExportCommand
23 import org.locationtech.geomesa.tools.export.ExportCommand.ExportParams
24 import org.locationtech.geomesa.tools.{Command, RequiredTypeNameParam}
25 import org.locationtech.geomesa.utils.geotools.Transform.Transforms
26 
27 import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
28 import scala.util.control.NonFatal
29 
30 class KafkaExportCommand extends ExportCommand[KafkaDataStore] with KafkaDistributedCommand {
31 
32   import org.locationtech.geomesa.index.conf.QueryHints.RichHints
33 
34   override val params = new KafkaExportParameters()
35 
36   private val queue: BlockingQueue[SimpleFeature] = new LinkedBlockingQueue[SimpleFeature]
37 
38   override protected def export(
39       ds: KafkaDataStore,
40       query: Query,
41       exporter: FeatureExporter,
42       writeEmptyFiles: Boolean): Option[Long] = {
43     val sft = ds.getSchema(params.featureName)
44     if (sft == null) {
45       throw new ParameterException(s"Type ${params.featureName} does not exist in ${ds.config.catalog}")
46     }
47 
48     val filter = Option(query.getFilter).filter(_ != Filter.INCLUDE)
49     val transform = query.getHints.getTransform
50 
51     val listener = new ExportFeatureListener(sft, filter, transform, queue)
52 
53     Command.user.info(s"Exporting from kafka topic '${sft.getUserData.get(KafkaDataStore.TopicKey)}' " +
54         "- use `ctrl-c` to stop")
55 
56     val features: Iterator[SimpleFeature] = new Iterator[SimpleFeature] {
57 
58       private var current: SimpleFeature = _
59 
60       override def hasNext: Boolean = {
61         if (current == null) {
62           current = queue.poll(100, TimeUnit.MILLISECONDS)
63         }
64         current != null
65       }
66 
67       override def next(): SimpleFeature = {
68         val res = current
69         current = null
70         res
71       }
72     }
73 
74     val fs = ds.getFeatureSource(query.getTypeName)
75     fs.addFeatureListener(listener)
76 
77     try {
78       query.getHints.getMaxFeatures match {
79         case None    => exportContinuously(query.getHints.getReturnSft, exporter, features, writeEmptyFiles)
80         case Some(m) => exportWithMax(query.getHints.getReturnSft, exporter, features, writeEmptyFiles, m)
81       }
82     } catch {
83       case NonFatal(e) =>
84         throw new RuntimeException("Could not execute export query. Please ensure that all arguments are correct", e)
85     } finally {
86       fs.removeFeatureListener(listener)
87     }
88   }
89 
90   private def exportContinuously(
91       sft: SimpleFeatureType,
92       exporter: FeatureExporter,
93       features: Iterator[SimpleFeature],
94       writeEmptyFiles: Boolean): Option[Long] = {
95     // try to close the exporter when user cancels to finish off whatever the export was
96     sys.addShutdownHook(exporter.close())
97     var count = 0L
98     var started = if (writeEmptyFiles) { exporter.start(sft); true } else { false }
99     while (true) {
100       // hasNext may return false one time, and then true the next if more data is read from kafka
101       if (features.hasNext) {
102         if (!started) {
103           exporter.start(sft)
104           started = true
105         }
106         exporter.export(features).foreach(count += _)
107       } else {
108         Thread.sleep(1000)
109       }
110     }
111     Some(count)
112   }
113 
114   private def exportWithMax(
115       sft: SimpleFeatureType,
116       exporter: FeatureExporter,
117       features: Iterator[SimpleFeature],
118       writeEmptyFiles: Boolean,
119       max: Int): Option[Long] = {
120     var count = 0L
121     var started = if (writeEmptyFiles) { exporter.start(sft); true } else { false }
122     while (count < max) {
123       // hasNext may return false one time, and then true the next if more data is read from kafka
124       if (features.hasNext) {
125         if (!started) {
126           exporter.start(sft)
127           started = true
128         }
129         // note: side effect in map - do count here in case exporter doesn't report counts
130         val batch = features.take(max - count.toInt).map { f => count += 1; f }
131         exporter.export(batch)
132       } else {
133         Thread.sleep(1000)
134       }
135     }
136     Some(count)
137   }
138 }
139 
140 object KafkaExportCommand {
141 
142   @Parameters(commandDescription = "Export features from a GeoMesa Kafka topic")
143   class KafkaExportParameters extends ConsumerDataStoreParams with RequiredTypeNameParam with ExportParams
144 
145   class ExportFeatureListener(sft: SimpleFeatureType,
146                               filter: Option[Filter],
147                               transform: Option[(String, SimpleFeatureType)],
148                               queue: BlockingQueue[SimpleFeature]) extends FeatureListener {
149 
150     private val attributes = transform.map { case (tdefs, tsft) =>
151       (tsft, Transforms(sft, tdefs).toArray)
152     }
153 
154     override def changed(event: FeatureEvent): Unit = {
155       event match {
156         case e: KafkaFeatureChanged => added(e.feature)
157         case _ => // no-op
158       }
159     }
160 
161     def added(sf: SimpleFeature): Unit = {
162       if (filter.forall(_.evaluate(sf))) {
163         queue.put(attributes.map { case (tsft, a) => new TransformSimpleFeature(tsft, a, sf) }.getOrElse(sf))
164       }
165     }
166   }
167 }
Line Stmt Id Pos Tree Symbol Tests Code
34 159 1830 - 1857 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.KafkaExportParameters.<init> new org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.KafkaExportParameters()
36 160 1911 - 1949 Apply java.util.concurrent.LinkedBlockingQueue.<init> new java.util.concurrent.LinkedBlockingQueue[org.geotools.api.feature.simple.SimpleFeature]()
43 161 2140 - 2158 Select org.locationtech.geomesa.tools.RequiredTypeNameParam.featureName KafkaExportCommand.this.params.featureName
43 162 2127 - 2159 Apply org.locationtech.geomesa.kafka.data.KafkaDataStore.getSchema ds.getSchema(KafkaExportCommand.this.params.featureName)
44 163 2168 - 2179 Apply java.lang.Object.== sft.==(null)
44 166 2164 - 2164 Literal <nosymbol> ()
44 167 2164 - 2164 Block <nosymbol> ()
45 164 2189 - 2287 Throw <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Type ", " does not exist in ", "").s(KafkaExportCommand.this.params.featureName, ds.config.catalog))
45 165 2189 - 2287 Block <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Type ", " does not exist in ", "").s(KafkaExportCommand.this.params.featureName, ds.config.catalog))
48 168 2319 - 2334 Apply org.geotools.api.data.Query.getFilter query.getFilter()
48 169 2348 - 2362 Select org.geotools.api.filter.Filter.INCLUDE org.geotools.api.filter.Filter.INCLUDE
48 170 2343 - 2362 Apply java.lang.Object.!= x$1.!=(org.geotools.api.filter.Filter.INCLUDE)
48 171 2312 - 2363 Apply scala.Option.filter scala.Option.apply[org.geotools.api.filter.Filter](query.getFilter()).filter(((x$1: org.geotools.api.filter.Filter) => x$1.!=(org.geotools.api.filter.Filter.INCLUDE)))
49 172 2384 - 2398 Apply org.geotools.api.data.Query.getHints query.getHints()
49 173 2384 - 2411 Select org.locationtech.geomesa.index.conf.QueryHints.RichHints.getTransform org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getTransform
51 174 2482 - 2487 Select org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.queue KafkaExportCommand.this.queue
51 175 2432 - 2488 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.ExportFeatureListener.<init> new org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.ExportFeatureListener(sft, filter, transform, KafkaExportCommand.this.queue)
53 176 2512 - 2627 Apply java.lang.String.+ scala.StringContext.apply("Exporting from kafka topic \'", "\' ").s(sft.getUserData().get(org.locationtech.geomesa.kafka.data.KafkaDataStore.TopicKey)).+("- use `ctrl-c` to stop")
53 177 2494 - 2628 Apply org.slf4j.Logger.info org.locationtech.geomesa.tools.`package`.Command.user.info(scala.StringContext.apply("Exporting from kafka topic \'", "\' ").s(sft.getUserData().get(org.locationtech.geomesa.kafka.data.KafkaDataStore.TopicKey)).+("- use `ctrl-c` to stop"))
56 187 2674 - 2677 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.$anon.<init> new $anon()
61 178 2803 - 2818 Apply java.lang.Object.== $anon.this.current.==(null)
61 182 2799 - 2799 Literal <nosymbol> ()
61 183 2799 - 2799 Block <nosymbol> ()
62 179 2842 - 2880 Apply java.util.concurrent.BlockingQueue.poll KafkaExportCommand.this.queue.poll(100L, MILLISECONDS)
62 180 2832 - 2880 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.$anon.current_= $anon.this.current_=(KafkaExportCommand.this.queue.poll(100L, MILLISECONDS))
62 181 2832 - 2880 Block org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.$anon.current_= $anon.this.current_=(KafkaExportCommand.this.queue.poll(100L, MILLISECONDS))
64 184 2899 - 2914 Apply java.lang.Object.!= $anon.this.current.!=(null)
68 185 2987 - 2994 Select org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.$anon.current $anon.this.current
69 186 3003 - 3017 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.$anon.current_= $anon.this.current_=(null)
74 188 3078 - 3095 Apply org.geotools.api.data.Query.getTypeName query.getTypeName()
74 189 3058 - 3096 Apply org.locationtech.geomesa.kafka.data.KafkaDataStore.getFeatureSource ds.getFeatureSource(query.getTypeName())
75 190 3101 - 3132 Apply org.geotools.api.data.FeatureSource.addFeatureListener fs.addFeatureListener(listener)
78 191 3150 - 3164 Apply org.geotools.api.data.Query.getHints query.getHints()
78 192 3150 - 3179 Select org.locationtech.geomesa.index.conf.QueryHints.RichHints.getMaxFeatures org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getMaxFeatures
78 201 3150 - 3411 Match <nosymbol> org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getMaxFeatures match { case scala.None => KafkaExportCommand.this.exportContinuously(org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft, exporter, features, writeEmptyFiles) case (value: Int)Some[Int]((m @ _)) => KafkaExportCommand.this.exportWithMax(org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft, exporter, features, writeEmptyFiles, m) }
79 193 3231 - 3245 Apply org.geotools.api.data.Query.getHints query.getHints()
79 194 3231 - 3258 Select org.locationtech.geomesa.index.conf.QueryHints.RichHints.getReturnSft org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft
79 195 3212 - 3296 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.exportContinuously KafkaExportCommand.this.exportContinuously(org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft, exporter, features, writeEmptyFiles)
79 196 3212 - 3296 Block org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.exportContinuously KafkaExportCommand.this.exportContinuously(org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft, exporter, features, writeEmptyFiles)
80 197 3335 - 3349 Apply org.geotools.api.data.Query.getHints query.getHints()
80 198 3335 - 3362 Select org.locationtech.geomesa.index.conf.QueryHints.RichHints.getReturnSft org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft
80 199 3321 - 3403 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.exportWithMax KafkaExportCommand.this.exportWithMax(org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft, exporter, features, writeEmptyFiles, m)
80 200 3321 - 3403 Block org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.exportWithMax KafkaExportCommand.this.exportWithMax(org.locationtech.geomesa.index.conf.QueryHints.RichHints(query.getHints()).getReturnSft, exporter, features, writeEmptyFiles, m)
84 202 3460 - 3569 Throw <nosymbol> throw new scala.`package`.RuntimeException("Could not execute export query. Please ensure that all arguments are correct", e)
84 203 3460 - 3569 Block <nosymbol> throw new scala.`package`.RuntimeException("Could not execute export query. Please ensure that all arguments are correct", e)
86 204 3592 - 3626 Apply org.geotools.api.data.FeatureSource.removeFeatureListener fs.removeFeatureListener(listener)
86 205 3592 - 3626 Block org.geotools.api.data.FeatureSource.removeFeatureListener fs.removeFeatureListener(listener)
96 206 3939 - 3955 Apply java.io.Closeable.close exporter.close()
96 207 3919 - 3956 Apply scala.sys.addShutdownHook scala.sys.`package`.addShutdownHook(exporter.close())
97 208 3973 - 3975 Literal <nosymbol> 0L
98 209 4017 - 4036 Apply org.locationtech.geomesa.features.exporters.FeatureExporter.start exporter.start(sft)
98 210 4038 - 4042 Literal <nosymbol> true
98 211 4015 - 4044 Block <nosymbol> { exporter.start(sft); true }
98 212 4052 - 4057 Literal <nosymbol> false
98 213 4052 - 4057 Block <nosymbol> false
99 214 4071 - 4075 Literal <nosymbol> true
99 228 4184 - 4400 Block <nosymbol> { if (features.hasNext) { if (started.unary_!) { exporter.start(sft); started = true } else (); exporter.export(features).foreach[Unit](((x$2: Long) => count = count.+(x$2))) } else java.lang.Thread.sleep(1000L); while$1() }
99 229 4064 - 4064 Literal <nosymbol> ()
99 230 4064 - 4064 Block <nosymbol> ()
101 215 4188 - 4204 Select scala.collection.Iterator.hasNext features.hasNext
101 224 4206 - 4358 Block <nosymbol> { if (started.unary_!) { exporter.start(sft); started = true } else (); exporter.export(features).foreach[Unit](((x$2: Long) => count = count.+(x$2))) }
101 227 4184 - 4184 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.while$1 while$1()
102 216 4220 - 4228 Select scala.Boolean.unary_! started.unary_!
102 219 4230 - 4296 Block <nosymbol> { exporter.start(sft); started = true }
102 220 4216 - 4216 Literal <nosymbol> ()
102 221 4216 - 4216 Block <nosymbol> ()
103 217 4242 - 4261 Apply org.locationtech.geomesa.features.exporters.FeatureExporter.start exporter.start(sft)
104 218 4282 - 4286 Literal <nosymbol> true
106 222 4339 - 4349 Apply scala.Long.+ count.+(x$2)
106 223 4305 - 4350 Apply scala.Option.foreach exporter.export(features).foreach[Unit](((x$2: Long) => count = count.+(x$2)))
108 225 4374 - 4392 Apply java.lang.Thread.sleep java.lang.Thread.sleep(1000L)
108 226 4374 - 4392 Block java.lang.Thread.sleep java.lang.Thread.sleep(1000L)
111 231 4411 - 4422 Apply scala.Some.apply scala.Some.apply[Long](count)
120 232 4643 - 4645 Literal <nosymbol> 0L
121 233 4687 - 4706 Apply org.locationtech.geomesa.features.exporters.FeatureExporter.start exporter.start(sft)
121 234 4708 - 4712 Literal <nosymbol> true
121 235 4685 - 4714 Block <nosymbol> { exporter.start(sft); true }
121 236 4722 - 4727 Literal <nosymbol> false
121 237 4722 - 4727 Block <nosymbol> false
122 238 4741 - 4752 Apply scala.Long.< count.<(max)
122 255 4861 - 5225 Block <nosymbol> { if (features.hasNext) { if (started.unary_!) { exporter.start(sft); started = true } else (); val batch: Iterator[org.geotools.api.feature.simple.SimpleFeature] = features.take(max.-(count.toInt)).map[org.geotools.api.feature.simple.SimpleFeature](((f: org.geotools.api.feature.simple.SimpleFeature) => { count = count.+(1); f })); exporter.export(batch) } else java.lang.Thread.sleep(1000L); while$2() }
122 256 4734 - 4734 Literal <nosymbol> ()
122 257 4734 - 4734 Block <nosymbol> ()
124 239 4865 - 4881 Select scala.collection.Iterator.hasNext features.hasNext
124 251 4883 - 5183 Block <nosymbol> { if (started.unary_!) { exporter.start(sft); started = true } else (); val batch: Iterator[org.geotools.api.feature.simple.SimpleFeature] = features.take(max.-(count.toInt)).map[org.geotools.api.feature.simple.SimpleFeature](((f: org.geotools.api.feature.simple.SimpleFeature) => { count = count.+(1); f })); exporter.export(batch) }
124 254 4861 - 4861 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.while$2 while$2()
125 240 4897 - 4905 Select scala.Boolean.unary_! started.unary_!
125 243 4907 - 4973 Block <nosymbol> { exporter.start(sft); started = true }
125 244 4893 - 4893 Literal <nosymbol> ()
125 245 4893 - 4893 Block <nosymbol> ()
126 241 4919 - 4938 Apply org.locationtech.geomesa.features.exporters.FeatureExporter.start exporter.start(sft)
127 242 4959 - 4963 Literal <nosymbol> true
130 246 5105 - 5116 Select scala.Long.toInt count.toInt
130 247 5099 - 5116 Apply scala.Int.- max.-(count.toInt)
130 248 5129 - 5139 Apply scala.Long.+ count.+(1)
130 249 5085 - 5144 Apply scala.collection.Iterator.map features.take(max.-(count.toInt)).map[org.geotools.api.feature.simple.SimpleFeature](((f: org.geotools.api.feature.simple.SimpleFeature) => { count = count.+(1); f }))
131 250 5153 - 5175 Apply org.locationtech.geomesa.features.exporters.FeatureExporter.export exporter.export(batch)
133 252 5199 - 5217 Apply java.lang.Thread.sleep java.lang.Thread.sleep(1000L)
133 253 5199 - 5217 Block java.lang.Thread.sleep java.lang.Thread.sleep(1000L)
136 258 5236 - 5247 Apply scala.Some.apply scala.Some.apply[Long](count)
150 263 5782 - 5870 Apply scala.Option.map ExportFeatureListener.this.transform.map[(org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform])](((x0$1: (String, org.geotools.api.feature.simple.SimpleFeatureType)) => x0$1 match { case (_1: String, _2: org.geotools.api.feature.simple.SimpleFeatureType)(String, org.geotools.api.feature.simple.SimpleFeatureType)((tdefs @ _), (tsft @ _)) => scala.Tuple2.apply[org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform]](tsft, org.locationtech.geomesa.utils.geotools.Transform.Transforms.apply(ExportFeatureListener.this.sft, tdefs).toArray[org.locationtech.geomesa.utils.geotools.Transform]((ClassTag.apply[org.locationtech.geomesa.utils.geotools.Transform](classOf[org.locationtech.geomesa.utils.geotools.Transform]): scala.reflect.ClassTag[org.locationtech.geomesa.utils.geotools.Transform]))) }))
151 259 5844 - 5847 Select org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.ExportFeatureListener.sft ExportFeatureListener.this.sft
151 260 5833 - 5863 ApplyToImplicitArgs scala.collection.TraversableOnce.toArray org.locationtech.geomesa.utils.geotools.Transform.Transforms.apply(ExportFeatureListener.this.sft, tdefs).toArray[org.locationtech.geomesa.utils.geotools.Transform]((ClassTag.apply[org.locationtech.geomesa.utils.geotools.Transform](classOf[org.locationtech.geomesa.utils.geotools.Transform]): scala.reflect.ClassTag[org.locationtech.geomesa.utils.geotools.Transform]))
151 261 5826 - 5864 Apply scala.Tuple2.apply scala.Tuple2.apply[org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform]](tsft, org.locationtech.geomesa.utils.geotools.Transform.Transforms.apply(ExportFeatureListener.this.sft, tdefs).toArray[org.locationtech.geomesa.utils.geotools.Transform]((ClassTag.apply[org.locationtech.geomesa.utils.geotools.Transform](classOf[org.locationtech.geomesa.utils.geotools.Transform]): scala.reflect.ClassTag[org.locationtech.geomesa.utils.geotools.Transform])))
151 262 5826 - 5864 Block scala.Tuple2.apply scala.Tuple2.apply[org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform]](tsft, org.locationtech.geomesa.utils.geotools.Transform.Transforms.apply(ExportFeatureListener.this.sft, tdefs).toArray[org.locationtech.geomesa.utils.geotools.Transform]((ClassTag.apply[org.locationtech.geomesa.utils.geotools.Transform](classOf[org.locationtech.geomesa.utils.geotools.Transform]): scala.reflect.ClassTag[org.locationtech.geomesa.utils.geotools.Transform])))
156 264 5993 - 6002 Select org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureChanged.feature e.feature
156 265 5987 - 6003 Apply org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.ExportFeatureListener.added ExportFeatureListener.this.added(e.feature)
156 266 5987 - 6003 Block org.locationtech.geomesa.kafka.tools.export.KafkaExportCommand.ExportFeatureListener.added ExportFeatureListener.this.added(e.feature)
157 267 6019 - 6021 Literal <nosymbol> ()
157 268 6019 - 6021 Block <nosymbol> ()
162 269 6113 - 6127 Apply org.geotools.api.filter.Filter.evaluate x$3.evaluate(sf)
162 270 6099 - 6128 Apply scala.Option.forall ExportFeatureListener.this.filter.forall(((x$3: org.geotools.api.filter.Filter) => x$3.evaluate(sf)))
162 274 6095 - 6095 Literal <nosymbol> ()
162 275 6095 - 6095 Block <nosymbol> ()
163 271 6150 - 6240 Apply scala.Option.getOrElse ExportFeatureListener.this.attributes.map[org.locationtech.geomesa.features.TransformSimpleFeature](((x0$1: (org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform])) => x0$1 match { case (_1: org.geotools.api.feature.simple.SimpleFeatureType, _2: Array[org.locationtech.geomesa.utils.geotools.Transform])(org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform])((tsft @ _), (a @ _)) => new org.locationtech.geomesa.features.TransformSimpleFeature(tsft, a, sf) })).getOrElse[org.geotools.api.feature.simple.SimpleFeature](sf)
163 272 6140 - 6241 Apply java.util.concurrent.BlockingQueue.put ExportFeatureListener.this.queue.put(ExportFeatureListener.this.attributes.map[org.locationtech.geomesa.features.TransformSimpleFeature](((x0$1: (org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform])) => x0$1 match { case (_1: org.geotools.api.feature.simple.SimpleFeatureType, _2: Array[org.locationtech.geomesa.utils.geotools.Transform])(org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform])((tsft @ _), (a @ _)) => new org.locationtech.geomesa.features.TransformSimpleFeature(tsft, a, sf) })).getOrElse[org.geotools.api.feature.simple.SimpleFeature](sf))
163 273 6140 - 6241 Block java.util.concurrent.BlockingQueue.put ExportFeatureListener.this.queue.put(ExportFeatureListener.this.attributes.map[org.locationtech.geomesa.features.TransformSimpleFeature](((x0$1: (org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform])) => x0$1 match { case (_1: org.geotools.api.feature.simple.SimpleFeatureType, _2: Array[org.locationtech.geomesa.utils.geotools.Transform])(org.geotools.api.feature.simple.SimpleFeatureType, Array[org.locationtech.geomesa.utils.geotools.Transform])((tsft @ _), (a @ _)) => new org.locationtech.geomesa.features.TransformSimpleFeature(tsft, a, sf) })).getOrElse[org.geotools.api.feature.simple.SimpleFeature](sf))