1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.utils.iterators
10 
11 import com.typesafe.scalalogging.StrictLogging
12 import org.geotools.api.data.{DataStore, Query, Transaction}
13 import org.geotools.api.feature.simple.SimpleFeature
14 import org.geotools.api.filter.Filter
15 import org.geotools.api.filter.sort.SortOrder
16 import org.geotools.factory.CommonFactoryFinder
17 import org.geotools.feature.simple.SimpleFeatureImpl
18 import org.geotools.temporal.`object`.{DefaultInstant, DefaultPeriod, DefaultPosition}
19 import org.geotools.util.factory.Hints
20 import org.locationtech.geomesa.utils.collection.{CloseableIterator, SelfClosingIterator}
21 
22 import java.util.Date
23 import java.util.concurrent.{Executors, LinkedBlockingQueue}
24 import scala.concurrent.duration.Duration
25 import scala.util.control.NonFatal
26 
27 /**
28  * Query over a time frame and return the features in sorted order, delayed based on the date of each feature
29  * to simulate the original ingestion stream
30  *
31  * @param ds data store
32  * @param typeName simple feature type name
33  * @param interval interval to query
34  * @param dtg date attribute to sort by
35  * @param filter additional filter predicate, if any
36  * @param transforms query transforms, if any
37  * @param window length of a single query window, used to chunk up the total features
38  * @param rate multiplier for the rate of returning features, applied to the original delay between features
39  * @param live project dates to current time
40  * @param readAhead size of the read-ahead queue used for holding features before returning them
41  */
42 class PlaybackIterator(
43     ds: DataStore,
44     typeName: String,
45     interval: (Date, Date),
46     dtg: Option[String] = None,
47     filter: Option[Filter] = None,
48     transforms: Array[String] = null,
49     window: Option[Duration] = None,
50     rate: Float = 10f,
51     live: Boolean = false,
52     readAhead: Int = 10000
53   ) extends CloseableIterator[SimpleFeature] with StrictLogging {
54 
55   import PlaybackIterator.ff
56   import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType
57 
58   require(interval._2.after(interval._1), s"Interval is not ordered correctly: ${interval._1}/${interval._2}")
59 
60   private val sft = ds.getSchema(typeName)
61   private val dtgName = dtg.orElse(sft.getDtgField).getOrElse {
62     throw new IllegalArgumentException("Schema does not have a default date field")
63   }
64   private val tdefs = transforms match {
65     case null => null
66     case t if t.indexOf(dtgName) == -1 => t :+ dtgName
67     case t => t
68   }
69   private val dtgIndex = tdefs match {
70     case null => sft.indexOf(dtgName)
71     case t => t.indexOf(dtgName)
72   }
73   require(dtgIndex != -1, "Invalid date field")
74   private val dtgProp = ff.property(dtgName)
75   private val sort = ff.sort(dtgName, SortOrder.ASCENDING)
76 
77   private val windowMillis = window.map(_.toMillis).getOrElse(interval._2.getTime - interval._1.getTime + 1)
78 
79   private var start: Long = -1
80   private var eventStart: Long = -1
81 
82   private val features = new LinkedBlockingQueue[SimpleFeature](readAhead)
83   private var staged: SimpleFeature = _
84 
85   private val executor = Executors.newSingleThreadExecutor()
86   executor.submit(new QueryRunnable())
87 
88   override def hasNext: Boolean = {
89     if (staged != null) {
90       true
91     } else {
92       staged = features.take()
93       if (!PlaybackIterator.terminal.eq(staged)) {
94         true
95       } else {
96         features.put(staged) // re-queue the terminal value to keep this method idempotent
97         staged = null
98         false
99       }
100     }
101   }
102 
103   override def next(): SimpleFeature = {
104     val feature = staged
105     staged = null
106     val featureTime = feature.getAttribute(dtgIndex).asInstanceOf[Date].getTime
107     if (start == -1L) {
108       // emit the first feature as soon as it's available, and set the clock to start timing from here
109       logger.debug("Starting replay clock")
110       start = System.currentTimeMillis()
111       eventStart = featureTime
112     }
113     val featureRelativeTime = start + ((featureTime - eventStart) / rate).toLong
114     val sleep = featureRelativeTime - System.currentTimeMillis()
115     if (sleep > 0) {
116       Thread.sleep(sleep)
117     }
118     if (live) {
119       feature.setAttribute(dtgIndex, new Date(featureRelativeTime))
120     }
121     feature
122   }
123 
124   override def close(): Unit = executor.shutdownNow()
125 
126   private class QueryRunnable extends Runnable {
127     override def run(): Unit = {
128       try {
129         var from = interval._1
130         var to = new Date(from.getTime + windowMillis)
131         var loop = true
132 
133         while (loop && !Thread.currentThread().isInterrupted) {
134           if (interval._2.before(to)) {
135             // this query will finish the last window
136             to = interval._2
137             loop = false
138           }
139 
140           logger.debug(s"Running query window $from to $to")
141 
142           val during = {
143             val period = new DefaultPeriod(
144               new DefaultInstant(new DefaultPosition(from)),
145               new DefaultInstant(new DefaultPosition(to))
146             )
147             ff.during(dtgProp, ff.literal(period))
148           }
149           val query = new Query(typeName, filter.map(ff.and(_, during)).getOrElse(during), tdefs: _*)
150           query.setSortBy(sort)
151           // prevent ContentDataStore from sorting on disk
152           query.getHints.put(Hints.MAX_MEMORY_SORT, java.lang.Integer.MAX_VALUE)
153 
154           var count = 0L
155 
156           // populate the queue - this will block if we get too far ahead
157           SelfClosingIterator(ds.getFeatureReader(query, Transaction.AUTO_COMMIT)).foreach { f =>
158             features.put(f)
159             count += 1
160           }
161 
162           logger.debug(s"Returned $count features from query window $from to $to")
163 
164           // increment time window
165           from = to
166           to = new Date(from.getTime + windowMillis)
167         }
168       } catch {
169         case NonFatal(e) => logger.error("Error querying playback:", e)
170       } finally {
171         features.put(PlaybackIterator.terminal)
172       }
173     }
174   }
175 }
176 
177 object PlaybackIterator {
178   private val ff = CommonFactoryFinder.getFilterFactory
179   private val terminal = new SimpleFeatureImpl(null, null, null, false, null)
180 }
Line Stmt Id Pos Tree Symbol Tests Code
58 9327 2516 - 2527 Select scala.Tuple2._1 PlaybackIterator.this.interval._1
58 9328 2498 - 2528 Apply java.util.Date.after PlaybackIterator.this.interval._2.after(PlaybackIterator.this.interval._1)
58 9329 2532 - 2568 Literal <nosymbol> "Interval is not ordered correctly: "
58 9330 2581 - 2583 Literal <nosymbol> "/"
58 9331 2596 - 2597 Literal <nosymbol> ""
58 9332 2569 - 2580 Select scala.Tuple2._1 PlaybackIterator.this.interval._1
58 9333 2584 - 2595 Select scala.Tuple2._2 PlaybackIterator.this.interval._2
58 9334 2530 - 2597 Apply scala.StringContext.s scala.StringContext.apply("Interval is not ordered correctly: ", "/", "").s(PlaybackIterator.this.interval._1, PlaybackIterator.this.interval._2)
58 9335 2490 - 2598 Apply scala.Predef.require scala.Predef.require(PlaybackIterator.this.interval._2.after(PlaybackIterator.this.interval._1), scala.StringContext.apply("Interval is not ordered correctly: ", "/", "").s(PlaybackIterator.this.interval._1, PlaybackIterator.this.interval._2))
60 9336 2633 - 2641 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.typeName PlaybackIterator.this.typeName
60 9337 2620 - 2642 Apply org.geotools.api.data.DataStore.getSchema PlaybackIterator.this.ds.getSchema(PlaybackIterator.this.typeName)
61 9338 2678 - 2681 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.sft PlaybackIterator.this.sft
61 9339 2678 - 2693 Select org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType.getDtgField org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType(PlaybackIterator.this.sft).getDtgField
61 9341 2667 - 2794 Apply scala.Option.getOrElse PlaybackIterator.this.dtg.orElse[String](org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType(PlaybackIterator.this.sft).getDtgField).getOrElse[String](throw new scala.`package`.IllegalArgumentException("Schema does not have a default date field"))
62 9340 2711 - 2790 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException("Schema does not have a default date field")
64 9342 2817 - 2827 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.transforms PlaybackIterator.this.transforms
65 9343 2853 - 2857 Literal <nosymbol> null
65 9344 2853 - 2857 Block <nosymbol> null
66 9345 2872 - 2896 Apply scala.Int.== scala.Predef.refArrayOps[String](t).indexOf[String](PlaybackIterator.this.dtgName).==(-1)
66 9346 2905 - 2912 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.dtgName PlaybackIterator.this.dtgName
66 9347 2900 - 2912 ApplyToImplicitArgs scala.collection.mutable.ArrayOps.:+ scala.Predef.refArrayOps[String](t).:+[String](PlaybackIterator.this.dtgName)((ClassTag.apply[String](classOf[java.lang.String]): scala.reflect.ClassTag[String]))
66 9348 2900 - 2912 Block scala.collection.mutable.ArrayOps.:+ scala.Predef.refArrayOps[String](t).:+[String](PlaybackIterator.this.dtgName)((ClassTag.apply[String](classOf[java.lang.String]): scala.reflect.ClassTag[String]))
67 9349 2927 - 2928 Ident org.locationtech.geomesa.utils.iterators.PlaybackIterator.t t
69 9350 2958 - 2963 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.tdefs PlaybackIterator.this.tdefs
70 9351 3001 - 3008 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.dtgName PlaybackIterator.this.dtgName
70 9352 2989 - 3009 Apply org.geotools.api.feature.simple.SimpleFeatureType.indexOf PlaybackIterator.this.sft.indexOf(PlaybackIterator.this.dtgName)
70 9353 2989 - 3009 Block org.geotools.api.feature.simple.SimpleFeatureType.indexOf PlaybackIterator.this.sft.indexOf(PlaybackIterator.this.dtgName)
71 9354 3034 - 3041 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.dtgName PlaybackIterator.this.dtgName
71 9355 3024 - 3042 Apply scala.collection.GenSeqLike.indexOf scala.Predef.refArrayOps[String](t).indexOf[String](PlaybackIterator.this.dtgName)
71 9356 3024 - 3042 Block scala.collection.GenSeqLike.indexOf scala.Predef.refArrayOps[String](t).indexOf[String](PlaybackIterator.this.dtgName)
73 9357 3057 - 3071 Apply scala.Int.!= PlaybackIterator.this.dtgIndex.!=(-1)
73 9358 3073 - 3093 Literal <nosymbol> "Invalid date field"
73 9359 3049 - 3094 Apply scala.Predef.require scala.Predef.require(PlaybackIterator.this.dtgIndex.!=(-1), "Invalid date field")
74 9360 3131 - 3138 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.dtgName PlaybackIterator.this.dtgName
74 9361 3119 - 3139 Apply org.geotools.api.filter.FilterFactory.property PlaybackIterator.ff.property(PlaybackIterator.this.dtgName)
75 9362 3169 - 3176 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.dtgName PlaybackIterator.this.dtgName
75 9363 3178 - 3197 Select org.geotools.api.filter.sort.SortOrder.ASCENDING org.geotools.api.filter.sort.SortOrder.ASCENDING
75 9364 3161 - 3198 Apply org.geotools.api.filter.FilterFactory.sort PlaybackIterator.ff.sort(PlaybackIterator.this.dtgName, org.geotools.api.filter.sort.SortOrder.ASCENDING)
77 9365 3240 - 3250 Select scala.concurrent.duration.Duration.toMillis x$1.toMillis
77 9366 3262 - 3307 Apply scala.Long.+ PlaybackIterator.this.interval._2.getTime().-(PlaybackIterator.this.interval._1.getTime()).+(1)
77 9367 3229 - 3308 Apply scala.Option.getOrElse PlaybackIterator.this.window.map[Long](((x$1: scala.concurrent.duration.Duration) => x$1.toMillis)).getOrElse[Long](PlaybackIterator.this.interval._2.getTime().-(PlaybackIterator.this.interval._1.getTime()).+(1))
79 9368 3338 - 3340 Literal <nosymbol> -1L
80 9369 3374 - 3376 Literal <nosymbol> -1L
82 9370 3442 - 3451 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.readAhead PlaybackIterator.this.readAhead
82 9371 3403 - 3452 Apply java.util.concurrent.LinkedBlockingQueue.<init> new java.util.concurrent.LinkedBlockingQueue[org.geotools.api.feature.simple.SimpleFeature](PlaybackIterator.this.readAhead)
85 9372 3519 - 3554 Apply java.util.concurrent.Executors.newSingleThreadExecutor java.util.concurrent.Executors.newSingleThreadExecutor()
86 9373 3573 - 3592 Apply org.locationtech.geomesa.utils.iterators.PlaybackIterator.QueryRunnable.<init> new PlaybackIterator.this.QueryRunnable()
86 9374 3557 - 3593 Apply java.util.concurrent.ExecutorService.submit PlaybackIterator.this.executor.submit(new PlaybackIterator.this.QueryRunnable())
89 9375 3639 - 3653 Apply java.lang.Object.!= PlaybackIterator.this.staged.!=(null)
90 9376 3663 - 3667 Literal <nosymbol> true
90 9377 3663 - 3667 Block <nosymbol> true
91 9389 3679 - 3931 Block <nosymbol> { PlaybackIterator.this.staged_=(PlaybackIterator.this.features.take()); if (PlaybackIterator.terminal.eq(PlaybackIterator.this.staged).unary_!) true else { PlaybackIterator.this.features.put(PlaybackIterator.this.staged); PlaybackIterator.this.staged_=(null); false } }
92 9378 3696 - 3711 Apply java.util.concurrent.LinkedBlockingQueue.take PlaybackIterator.this.features.take()
92 9379 3687 - 3711 Apply org.locationtech.geomesa.utils.iterators.PlaybackIterator.staged_= PlaybackIterator.this.staged_=(PlaybackIterator.this.features.take())
93 9380 3752 - 3758 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.staged PlaybackIterator.this.staged
93 9381 3722 - 3759 Select scala.Boolean.unary_! PlaybackIterator.terminal.eq(PlaybackIterator.this.staged).unary_!
94 9382 3771 - 3775 Literal <nosymbol> true
94 9383 3771 - 3775 Block <nosymbol> true
95 9388 3789 - 3925 Block <nosymbol> { PlaybackIterator.this.features.put(PlaybackIterator.this.staged); PlaybackIterator.this.staged_=(null); false }
96 9384 3812 - 3818 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.staged PlaybackIterator.this.staged
96 9385 3799 - 3819 Apply java.util.concurrent.LinkedBlockingQueue.put PlaybackIterator.this.features.put(PlaybackIterator.this.staged)
97 9386 3890 - 3903 Apply org.locationtech.geomesa.utils.iterators.PlaybackIterator.staged_= PlaybackIterator.this.staged_=(null)
98 9387 3912 - 3917 Literal <nosymbol> false
104 9390 3996 - 4002 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.staged PlaybackIterator.this.staged
105 9391 4007 - 4020 Apply org.locationtech.geomesa.utils.iterators.PlaybackIterator.staged_= PlaybackIterator.this.staged_=(null)
106 9392 4043 - 4100 Apply java.util.Date.getTime feature.getAttribute(PlaybackIterator.this.dtgIndex).asInstanceOf[java.util.Date].getTime()
107 9393 4109 - 4121 Apply scala.Long.== PlaybackIterator.this.start.==(-1L)
107 9397 4123 - 4349 Block <nosymbol> { (if (PlaybackIterator.this.logger.underlying.isDebugEnabled()) PlaybackIterator.this.logger.underlying.debug("Starting replay clock") else (): Unit); PlaybackIterator.this.start_=(java.lang.System.currentTimeMillis()); PlaybackIterator.this.eventStart_=(featureTime) }
107 9398 4105 - 4105 Literal <nosymbol> ()
107 9399 4105 - 4105 Block <nosymbol> ()
110 9394 4286 - 4312 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
110 9395 4278 - 4312 Apply org.locationtech.geomesa.utils.iterators.PlaybackIterator.start_= PlaybackIterator.this.start_=(java.lang.System.currentTimeMillis())
111 9396 4319 - 4343 Apply org.locationtech.geomesa.utils.iterators.PlaybackIterator.eventStart_= PlaybackIterator.this.eventStart_=(featureTime)
113 9400 4404 - 4414 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.eventStart PlaybackIterator.this.eventStart
113 9401 4418 - 4422 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.rate PlaybackIterator.this.rate
113 9402 4388 - 4430 Select scala.Float.toLong featureTime.-(PlaybackIterator.this.eventStart)./(PlaybackIterator.this.rate).toLong
113 9403 4380 - 4430 Apply scala.Long.+ PlaybackIterator.this.start.+(featureTime.-(PlaybackIterator.this.eventStart)./(PlaybackIterator.this.rate).toLong)
114 9404 4469 - 4495 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
114 9405 4447 - 4495 Apply scala.Long.- featureRelativeTime.-(java.lang.System.currentTimeMillis())
115 9406 4504 - 4513 Apply scala.Long.> sleep.>(0)
115 9409 4500 - 4500 Literal <nosymbol> ()
115 9410 4500 - 4500 Block <nosymbol> ()
116 9407 4523 - 4542 Apply java.lang.Thread.sleep java.lang.Thread.sleep(sleep)
116 9408 4523 - 4542 Block java.lang.Thread.sleep java.lang.Thread.sleep(sleep)
118 9411 4557 - 4561 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.live PlaybackIterator.this.live
118 9416 4553 - 4553 Literal <nosymbol> ()
118 9417 4553 - 4553 Block <nosymbol> ()
119 9412 4592 - 4600 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.dtgIndex PlaybackIterator.this.dtgIndex
119 9413 4602 - 4631 Apply java.util.Date.<init> new java.util.Date(featureRelativeTime)
119 9414 4571 - 4632 Apply org.geotools.api.feature.simple.SimpleFeature.setAttribute feature.setAttribute(PlaybackIterator.this.dtgIndex, new java.util.Date(featureRelativeTime))
119 9415 4571 - 4632 Block org.geotools.api.feature.simple.SimpleFeature.setAttribute feature.setAttribute(PlaybackIterator.this.dtgIndex, new java.util.Date(featureRelativeTime))
124 9418 4687 - 4709 Apply java.util.concurrent.ExecutorService.shutdownNow PlaybackIterator.this.executor.shutdownNow()
124 9419 4707 - 4707 Literal <nosymbol> ()
128 9463 4813 - 6206 Block <nosymbol> { var from: java.util.Date = PlaybackIterator.this.interval._1; var to: java.util.Date = new java.util.Date(from.getTime().+(PlaybackIterator.this.windowMillis)); var loop: Boolean = true; while$1(){ if (loop.&&(java.lang.Thread.currentThread().isInterrupted().unary_!)) { { if (PlaybackIterator.this.interval._2.before(to)) { to = PlaybackIterator.this.interval._2; loop = false } else (); (if (PlaybackIterator.this.logger.underlying.isDebugEnabled()) PlaybackIterator.this.logger.underlying.debug("Running query window {} to {}", (scala.Array.apply[AnyRef]((from: AnyRef), (to: AnyRef))((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit); val during: org.geotools.api.filter.temporal.During = { val period: org.geotools.temporal.object.DefaultPeriod = new org.geotools.temporal.`object`.DefaultPeriod(new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(from)), new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(to))); PlaybackIterator.ff.during(PlaybackIterator.this.dtgProp, PlaybackIterator.ff.literal(period)) }; val query: org.geotools.api.data.Query = new org.geotools.api.data.Query(PlaybackIterator.this.typeName, PlaybackIterator.this.filter.map[org.geotools.api.filter.And](((x$2: org.geotools.api.filter.Filter) => PlaybackIterator.ff.and(x$2, during))).getOrElse[org.geotools.api.filter.Filter](during), (PlaybackIterator.this.tdefs: _*)); query.setSortBy(PlaybackIterator.this.sort); query.getHints().put(org.geotools.util.factory.Hints.MAX_MEMORY_SORT, 2147483647); var count: Long = 0L; org.locationtech.geomesa.utils.collection.SelfClosingIterator.apply[org.geotools.api.feature.simple.SimpleFeature, org.geotools.api.feature.simple.SimpleFeatureType](PlaybackIterator.this.ds.getFeatureReader(query, org.geotools.api.data.Transaction.AUTO_COMMIT)).foreach[Unit](((f: org.geotools.api.feature.simple.SimpleFeature) => { PlaybackIterator.this.features.put(f); count = count.+(1) })); (if (PlaybackIterator.this.logger.underlying.isDebugEnabled()) PlaybackIterator.this.logger.underlying.debug("Returned {} features from query window {} to {}", count.asInstanceOf[AnyRef], (from: AnyRef), (to: AnyRef)) else (): Unit); from = to; to = new java.util.Date(from.getTime().+(PlaybackIterator.this.windowMillis)) }; while$1() } else () } }
129 9420 4824 - 4835 Select scala.Tuple2._1 PlaybackIterator.this.interval._1
130 9421 4877 - 4889 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.windowMillis PlaybackIterator.this.windowMillis
130 9422 4862 - 4889 Apply scala.Long.+ from.getTime().+(PlaybackIterator.this.windowMillis)
130 9423 4853 - 4890 Apply java.util.Date.<init> new java.util.Date(from.getTime().+(PlaybackIterator.this.windowMillis))
131 9424 4910 - 4914 Literal <nosymbol> true
133 9425 4939 - 4976 Select scala.Boolean.unary_! java.lang.Thread.currentThread().isInterrupted().unary_!
133 9426 4931 - 4976 Apply scala.Boolean.&& loop.&&(java.lang.Thread.currentThread().isInterrupted().unary_!)
133 9459 4978 - 4978 Apply org.locationtech.geomesa.utils.iterators.PlaybackIterator.QueryRunnable.while$1 while$1()
133 9460 4978 - 6206 Block <nosymbol> { { if (PlaybackIterator.this.interval._2.before(to)) { to = PlaybackIterator.this.interval._2; loop = false } else (); (if (PlaybackIterator.this.logger.underlying.isDebugEnabled()) PlaybackIterator.this.logger.underlying.debug("Running query window {} to {}", (scala.Array.apply[AnyRef]((from: AnyRef), (to: AnyRef))((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit); val during: org.geotools.api.filter.temporal.During = { val period: org.geotools.temporal.object.DefaultPeriod = new org.geotools.temporal.`object`.DefaultPeriod(new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(from)), new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(to))); PlaybackIterator.ff.during(PlaybackIterator.this.dtgProp, PlaybackIterator.ff.literal(period)) }; val query: org.geotools.api.data.Query = new org.geotools.api.data.Query(PlaybackIterator.this.typeName, PlaybackIterator.this.filter.map[org.geotools.api.filter.And](((x$2: org.geotools.api.filter.Filter) => PlaybackIterator.ff.and(x$2, during))).getOrElse[org.geotools.api.filter.Filter](during), (PlaybackIterator.this.tdefs: _*)); query.setSortBy(PlaybackIterator.this.sort); query.getHints().put(org.geotools.util.factory.Hints.MAX_MEMORY_SORT, 2147483647); var count: Long = 0L; org.locationtech.geomesa.utils.collection.SelfClosingIterator.apply[org.geotools.api.feature.simple.SimpleFeature, org.geotools.api.feature.simple.SimpleFeatureType](PlaybackIterator.this.ds.getFeatureReader(query, org.geotools.api.data.Transaction.AUTO_COMMIT)).foreach[Unit](((f: org.geotools.api.feature.simple.SimpleFeature) => { PlaybackIterator.this.features.put(f); count = count.+(1) })); (if (PlaybackIterator.this.logger.underlying.isDebugEnabled()) PlaybackIterator.this.logger.underlying.debug("Returned {} features from query window {} to {}", count.asInstanceOf[AnyRef], (from: AnyRef), (to: AnyRef)) else (): Unit); from = to; to = new java.util.Date(from.getTime().+(PlaybackIterator.this.windowMillis)) }; while$1() }
133 9461 4924 - 4924 Literal <nosymbol> ()
133 9462 4924 - 4924 Block <nosymbol> ()
134 9427 4994 - 5016 Apply java.util.Date.before PlaybackIterator.this.interval._2.before(to)
134 9430 5018 - 5139 Block <nosymbol> { to = PlaybackIterator.this.interval._2; loop = false }
134 9431 4990 - 4990 Literal <nosymbol> ()
134 9432 4990 - 4990 Block <nosymbol> ()
136 9428 5091 - 5102 Select scala.Tuple2._2 PlaybackIterator.this.interval._2
137 9429 5122 - 5127 Literal <nosymbol> false
143 9437 5253 - 5404 Apply org.geotools.temporal.object.DefaultPeriod.<init> new org.geotools.temporal.`object`.DefaultPeriod(new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(from)), new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(to)))
144 9433 5305 - 5330 Apply org.geotools.temporal.object.DefaultPosition.<init> new org.geotools.temporal.`object`.DefaultPosition(from)
144 9434 5286 - 5331 Apply org.geotools.temporal.object.DefaultInstant.<init> new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(from))
145 9435 5366 - 5389 Apply org.geotools.temporal.object.DefaultPosition.<init> new org.geotools.temporal.`object`.DefaultPosition(to)
145 9436 5347 - 5390 Apply org.geotools.temporal.object.DefaultInstant.<init> new org.geotools.temporal.`object`.DefaultInstant(new org.geotools.temporal.`object`.DefaultPosition(to))
147 9438 5427 - 5434 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.dtgProp PlaybackIterator.this.dtgProp
147 9439 5436 - 5454 Apply org.geotools.api.filter.FilterFactory.literal PlaybackIterator.ff.literal(period)
147 9440 5417 - 5455 Apply org.geotools.api.filter.FilterFactory.during PlaybackIterator.ff.during(PlaybackIterator.this.dtgProp, PlaybackIterator.ff.literal(period))
149 9441 5500 - 5508 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.typeName PlaybackIterator.this.typeName
149 9442 5510 - 5557 Apply scala.Option.getOrElse PlaybackIterator.this.filter.map[org.geotools.api.filter.And](((x$2: org.geotools.api.filter.Filter) => PlaybackIterator.ff.and(x$2, during))).getOrElse[org.geotools.api.filter.Filter](during)
149 9443 5559 - 5564 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.tdefs PlaybackIterator.this.tdefs
149 9444 5490 - 5569 Apply org.geotools.api.data.Query.<init> new org.geotools.api.data.Query(PlaybackIterator.this.typeName, PlaybackIterator.this.filter.map[org.geotools.api.filter.And](((x$2: org.geotools.api.filter.Filter) => PlaybackIterator.ff.and(x$2, during))).getOrElse[org.geotools.api.filter.Filter](during), (PlaybackIterator.this.tdefs: _*))
150 9445 5596 - 5600 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.sort PlaybackIterator.this.sort
150 9446 5580 - 5601 Apply org.geotools.api.data.Query.setSortBy query.setSortBy(PlaybackIterator.this.sort)
152 9447 5690 - 5711 Select org.geotools.util.factory.Hints.MAX_MEMORY_SORT org.geotools.util.factory.Hints.MAX_MEMORY_SORT
152 9448 5713 - 5740 Literal <nosymbol> 2147483647
152 9449 5671 - 5741 Apply java.awt.RenderingHints.put query.getHints().put(org.geotools.util.factory.Hints.MAX_MEMORY_SORT, 2147483647)
154 9450 5765 - 5767 Literal <nosymbol> 0L
157 9451 5900 - 5923 Select org.geotools.api.data.Transaction.AUTO_COMMIT org.geotools.api.data.Transaction.AUTO_COMMIT
157 9452 5873 - 5924 Apply org.geotools.api.data.DataStore.getFeatureReader PlaybackIterator.this.ds.getFeatureReader(query, org.geotools.api.data.Transaction.AUTO_COMMIT)
157 9455 5853 - 6003 Apply scala.collection.Iterator.foreach org.locationtech.geomesa.utils.collection.SelfClosingIterator.apply[org.geotools.api.feature.simple.SimpleFeature, org.geotools.api.feature.simple.SimpleFeatureType](PlaybackIterator.this.ds.getFeatureReader(query, org.geotools.api.data.Transaction.AUTO_COMMIT)).foreach[Unit](((f: org.geotools.api.feature.simple.SimpleFeature) => { PlaybackIterator.this.features.put(f); count = count.+(1) }))
158 9453 5953 - 5968 Apply java.util.concurrent.LinkedBlockingQueue.put PlaybackIterator.this.features.put(f)
159 9454 5981 - 5991 Apply scala.Long.+ count.+(1)
166 9456 6183 - 6195 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.windowMillis PlaybackIterator.this.windowMillis
166 9457 6168 - 6195 Apply scala.Long.+ from.getTime().+(PlaybackIterator.this.windowMillis)
166 9458 6159 - 6196 Apply java.util.Date.<init> new java.util.Date(from.getTime().+(PlaybackIterator.this.windowMillis))
169 9464 6251 - 6294 Typed <nosymbol> (if (PlaybackIterator.this.logger.underlying.isErrorEnabled()) PlaybackIterator.this.logger.underlying.error("Error querying playback:", e) else (): Unit)
171 9465 6334 - 6359 Select org.locationtech.geomesa.utils.iterators.PlaybackIterator.terminal PlaybackIterator.terminal
171 9466 6321 - 6360 Apply java.util.concurrent.LinkedBlockingQueue.put PlaybackIterator.this.features.put(PlaybackIterator.terminal)
171 9467 6321 - 6360 Block java.util.concurrent.LinkedBlockingQueue.put PlaybackIterator.this.features.put(PlaybackIterator.terminal)
178 9468 6427 - 6463 Apply org.geotools.factory.CommonFactoryFinder.getFilterFactory org.geotools.factory.CommonFactoryFinder.getFilterFactory()
179 9469 6489 - 6541 Apply org.geotools.feature.simple.SimpleFeatureImpl.<init> new org.geotools.feature.simple.SimpleFeatureImpl(null, null, null, false, null)