1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.accumulo.iterators
10 
11 import org.apache.accumulo.core.data.{Range => aRange, _}
12 import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator}
13 import org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback
14 import org.locationtech.geomesa.index.iterators.AggregatingScan
15 import org.locationtech.geomesa.index.iterators.AggregatingScan.{AggregateCallback, RowValue}
16 
17 /**
18  * Aggregating iterator - only works on kryo-encoded features
19  */
20 abstract class BaseAggregatingIterator[T <: AggregatingScan.Result]
21     extends SortedKeyValueIterator[Key, Value] with AggregatingScan[T] {
22 
23   import scala.collection.JavaConverters._
24 
25   var source: SortedKeyValueIterator[Key, Value] = _
26 
27   protected var topKey: Key = _
28   private var topValue: Value = new Value()
29   private var currentRange: aRange = _
30   private var needToAdvance = false
31 
32   override def init(src: SortedKeyValueIterator[Key, Value],
33                     options: java.util.Map[String, String],
34                     env: IteratorEnvironment): Unit = {
35     this.source = src
36     super.init(options.asScala.toMap)
37   }
38 
39   override def hasTop: Boolean = topKey != null
40   override def getTopKey: Key = topKey
41   override def getTopValue: Value = topValue
42 
43   override def seek(range: aRange, columnFamilies: java.util.Collection[ByteSequence], inclusive: Boolean): Unit = {
44     currentRange = range
45     source.seek(range, columnFamilies, inclusive)
46     needToAdvance = false
47     findTop()
48   }
49 
50   override def next(): Unit = findTop()
51 
52   private def findTop(): Unit = {
53     val result = aggregate(new BatchScanCallback()).result
54     if (result == null) {
55       topKey = null // hasTop will be false
56       topValue = null
57     } else {
58       if (topValue == null) {
59         // only re-create topValue if it was nulled out
60         topValue = new Value()
61       }
62       topValue.set(result)
63     }
64   }
65 
66   override protected def hasNextData: Boolean = {
67     if (needToAdvance) {
68       source.next() // advance the source iterator, this may invalidate the top key/value we've already read
69       needToAdvance = false
70     }
71     source.hasTop && !currentRange.afterEndKey(source.getTopKey)
72   }
73 
74   override protected def nextData(): RowValue = {
75     needToAdvance = true
76     topKey = source.getTopKey
77     val value = source.getTopValue.get()
78     RowValue(topKey.getRow.getBytes, 0, topKey.getRow.getLength, value, 0, value.length)
79   }
80 
81   override def deepCopy(env: IteratorEnvironment): SortedKeyValueIterator[Key, Value] =
82     throw new NotImplementedError()
83 }
84 
85 object BaseAggregatingIterator {
86 
87   private class BatchScanCallback extends AggregateCallback {
88 
89     private var bytes: Array[Byte] = _
90 
91     override def batch(bytes: Array[Byte]): Boolean = {
92       this.bytes = bytes
93       false // we want to stop scanning and return the batch
94     }
95 
96     // we always keep scanning and rely on client connections to stop the scan
97     override def partial(bytes: => Array[Byte]): Boolean = true
98 
99     def result: Array[Byte] = bytes
100   }
101 }
Line Stmt Id Pos Tree Symbol Tests Code
28 272 1290 - 1301 Apply org.apache.accumulo.core.data.Value.<init> new org.apache.accumulo.core.data.Value()
30 273 1371 - 1376 Literal <nosymbol> false
35 274 1559 - 1576 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.source_= this.source_=(src)
36 275 1608 - 1608 TypeApply scala.Predef.$conforms scala.Predef.$conforms[(String, String)]
36 276 1592 - 1613 ApplyToImplicitArgs scala.collection.TraversableOnce.toMap scala.collection.JavaConverters.mapAsScalaMapConverter[String, String](options).asScala.toMap[String, String](scala.Predef.$conforms[(String, String)])
36 277 1581 - 1614 Apply org.locationtech.geomesa.index.iterators.AggregatingScan.init BaseAggregatingIterator.super.init(scala.collection.JavaConverters.mapAsScalaMapConverter[String, String](options).asScala.toMap[String, String](scala.Predef.$conforms[(String, String)]))
39 278 1653 - 1667 Apply java.lang.Object.!= BaseAggregatingIterator.this.topKey.!=(null)
40 279 1700 - 1706 Select org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.topKey BaseAggregatingIterator.this.topKey
41 280 1743 - 1751 Select org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.topValue BaseAggregatingIterator.this.topValue
44 281 1874 - 1894 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.currentRange_= BaseAggregatingIterator.this.currentRange_=(range)
45 282 1899 - 1944 Apply org.apache.accumulo.core.iterators.SortedKeyValueIterator.seek BaseAggregatingIterator.this.source.seek(range, columnFamilies, inclusive)
46 283 1949 - 1970 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.needToAdvance_= BaseAggregatingIterator.this.needToAdvance_=(false)
47 284 1975 - 1984 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.findTop BaseAggregatingIterator.this.findTop()
50 285 2020 - 2029 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.findTop BaseAggregatingIterator.this.findTop()
53 286 2092 - 2115 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback.<init> new org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback()
53 287 2082 - 2123 Select org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback.result BaseAggregatingIterator.this.aggregate[org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback](new org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback()).result
54 288 2132 - 2146 Apply java.lang.Object.== result.==(null)
54 291 2148 - 2221 Block <nosymbol> { BaseAggregatingIterator.this.topKey_=(null); BaseAggregatingIterator.this.topValue_=(null) }
55 289 2156 - 2169 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.topKey_= BaseAggregatingIterator.this.topKey_=(null)
56 290 2200 - 2215 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.topValue_= BaseAggregatingIterator.this.topValue_=(null)
57 299 2227 - 2386 Block <nosymbol> { if (BaseAggregatingIterator.this.topValue.==(null)) BaseAggregatingIterator.this.topValue_=(new org.apache.accumulo.core.data.Value()) else (); BaseAggregatingIterator.this.topValue.set(result) }
58 292 2239 - 2255 Apply java.lang.Object.== BaseAggregatingIterator.this.topValue.==(null)
58 296 2235 - 2235 Literal <nosymbol> ()
58 297 2235 - 2235 Block <nosymbol> ()
60 293 2334 - 2345 Apply org.apache.accumulo.core.data.Value.<init> new org.apache.accumulo.core.data.Value()
60 294 2323 - 2345 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.topValue_= BaseAggregatingIterator.this.topValue_=(new org.apache.accumulo.core.data.Value())
60 295 2323 - 2345 Block org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.topValue_= BaseAggregatingIterator.this.topValue_=(new org.apache.accumulo.core.data.Value())
62 298 2360 - 2380 Apply org.apache.accumulo.core.data.Value.set BaseAggregatingIterator.this.topValue.set(result)
67 300 2450 - 2463 Select org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.needToAdvance BaseAggregatingIterator.this.needToAdvance
67 303 2465 - 2609 Block <nosymbol> { BaseAggregatingIterator.this.source.next(); BaseAggregatingIterator.this.needToAdvance_=(false) }
67 304 2446 - 2446 Literal <nosymbol> ()
67 305 2446 - 2446 Block <nosymbol> ()
68 301 2473 - 2486 Apply org.apache.accumulo.core.iterators.SortedKeyValueIterator.next BaseAggregatingIterator.this.source.next()
69 302 2582 - 2603 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.needToAdvance_= BaseAggregatingIterator.this.needToAdvance_=(false)
71 306 2657 - 2673 Apply org.apache.accumulo.core.iterators.SortedKeyValueIterator.getTopKey BaseAggregatingIterator.this.source.getTopKey()
71 307 2631 - 2674 Select scala.Boolean.unary_! BaseAggregatingIterator.this.currentRange.afterEndKey(BaseAggregatingIterator.this.source.getTopKey()).unary_!
71 308 2614 - 2674 Apply scala.Boolean.&& BaseAggregatingIterator.this.source.hasTop().&&(BaseAggregatingIterator.this.currentRange.afterEndKey(BaseAggregatingIterator.this.source.getTopKey()).unary_!)
75 309 2734 - 2754 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.needToAdvance_= BaseAggregatingIterator.this.needToAdvance_=(true)
76 310 2768 - 2784 Apply org.apache.accumulo.core.iterators.SortedKeyValueIterator.getTopKey BaseAggregatingIterator.this.source.getTopKey()
76 311 2759 - 2784 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.topKey_= BaseAggregatingIterator.this.topKey_=(BaseAggregatingIterator.this.source.getTopKey())
77 312 2801 - 2825 Apply org.apache.accumulo.core.data.Value.get BaseAggregatingIterator.this.source.getTopValue().get()
78 313 2839 - 2861 Apply org.apache.hadoop.io.Text.getBytes BaseAggregatingIterator.this.topKey.getRow().getBytes()
78 314 2863 - 2864 Literal <nosymbol> 0
78 315 2866 - 2889 Apply org.apache.hadoop.io.Text.getLength BaseAggregatingIterator.this.topKey.getRow().getLength()
78 316 2898 - 2899 Literal <nosymbol> 0
78 317 2901 - 2913 Select scala.Array.length value.length
78 318 2830 - 2914 Apply org.locationtech.geomesa.index.iterators.AggregatingScan.RowValue.apply org.locationtech.geomesa.index.iterators.AggregatingScan.RowValue.apply(BaseAggregatingIterator.this.topKey.getRow().getBytes(), 0, BaseAggregatingIterator.this.topKey.getRow().getLength(), value, 0, value.length)
82 319 3012 - 3043 Throw <nosymbol> throw new scala.NotImplementedError()
92 320 3246 - 3264 Apply org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback.bytes_= this.bytes_=(bytes)
93 321 3271 - 3276 Literal <nosymbol> false
97 322 3471 - 3475 Literal <nosymbol> true
99 323 3507 - 3512 Select org.locationtech.geomesa.accumulo.iterators.BaseAggregatingIterator.BatchScanCallback.bytes BatchScanCallback.this.bytes