1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.utils.stats
10 import com.typesafe.scalalogging.LazyLogging
11 import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
12 import org.locationtech.geomesa.utils.clearspring.StreamSummary
13 
14 import scala.collection.immutable.ListMap
15 
16 /**
17   * TopK stat
18   *
19   * @param sft simple feature type
20   * @param property property name of the attribute to track
21   * @param summary stream summary object
22   * @tparam T attribute type binding
23   */
24 class TopK[T] private [stats] (
25     val sft: SimpleFeatureType,
26     val property: String,
27     private [stats] val summary: StreamSummary[T]
28   ) extends Stat with LazyLogging {
29 
30   import TopK.StreamCapacity
31 
32   def this(sft: SimpleFeatureType, property: String) = this(sft, property, StreamSummary[T](TopK.StreamCapacity))
33 
34   override type S = TopK[T]
35 
36   private val i = sft.indexOf(property)
37 
38   def topK(k: Int): Iterator[(T, Long)] = summary.topK(k)
39   def size: Int = summary.size
40 
41   override def observe(sf: SimpleFeature): Unit = {
42     val value = sf.getAttribute(i).asInstanceOf[T]
43     if (value != null) {
44       summary.offer(value)
45     }
46   }
47 
48   override def unobserve(sf: SimpleFeature): Unit = {
49     val value = sf.getAttribute(i).asInstanceOf[T]
50     if (value != null) {
51       summary.offer(value, -1)
52     }
53   }
54 
55   override def +(other: TopK[T]): TopK[T] = {
56     val merged = new TopK[T](sft, property)
57     merged += this
58     merged += other
59     merged
60   }
61 
62   override def +=(other: TopK[T]): Unit =
63     other.summary.topK(StreamCapacity).foreach { case (item, count) => summary.offer(item, count) }
64 
65   override def clear(): Unit = summary.clear()
66 
67   override def isEmpty: Boolean = summary.size == 0
68 
69   override def toJsonObject: Any = {
70     val maps = summary.topK(10).zipWithIndex.map { case ((item, count), rank) =>
71       (rank, ListMap( "value" -> item, "count" -> count))
72     }
73     ListMap(maps.toSeq:_*)
74   }
75 
76   override def isEquivalent(other: Stat): Boolean = other match {
77     case s: TopK[T] if summary.size == s.summary.size =>
78       s.summary.topK(summary.size).sameElements(summary.topK(summary.size))
79     case _ => false
80   }
81 }
82 
83 object TopK {
84   val StreamCapacity = 1000
85 }
Line Stmt Id Pos Tree Symbol Tests Code
32 13590 1236 - 1255 Select org.locationtech.geomesa.utils.stats.TopK.StreamCapacity TopK.StreamCapacity
32 13591 1219 - 1256 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.apply org.locationtech.geomesa.utils.clearspring.StreamSummary.apply[T](TopK.StreamCapacity)
32 13592 1199 - 1257 Apply org.locationtech.geomesa.utils.stats.TopK.<init> TopK.this.<init>(sft, property, org.locationtech.geomesa.utils.clearspring.StreamSummary.apply[T](TopK.StreamCapacity))
32 13593 1199 - 1199 Literal <nosymbol> ()
36 13594 1318 - 1326 Select org.locationtech.geomesa.utils.stats.TopK.property TopK.this.property
36 13595 1306 - 1327 Apply org.geotools.api.feature.simple.SimpleFeatureType.indexOf TopK.this.sft.indexOf(TopK.this.property)
38 13596 1371 - 1386 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.topK TopK.this.summary.topK(k)
39 13597 1405 - 1417 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size TopK.this.summary.size
42 13598 1503 - 1504 Select org.locationtech.geomesa.utils.stats.TopK.i TopK.this.i
42 13599 1487 - 1521 TypeApply scala.Any.asInstanceOf sf.getAttribute(TopK.this.i).asInstanceOf[T]
43 13600 1530 - 1543 Apply scala.Any.!= value.!=(null)
43 13604 1526 - 1526 Literal <nosymbol> ()
43 13605 1526 - 1526 Block <nosymbol> ()
44 13601 1553 - 1573 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(value)
44 13602 1566 - 1566 Literal <nosymbol> ()
44 13603 1553 - 1573 Block <nosymbol> { TopK.this.summary.offer(value); () }
49 13606 1671 - 1672 Select org.locationtech.geomesa.utils.stats.TopK.i TopK.this.i
49 13607 1655 - 1689 TypeApply scala.Any.asInstanceOf sf.getAttribute(TopK.this.i).asInstanceOf[T]
50 13608 1698 - 1711 Apply scala.Any.!= value.!=(null)
50 13612 1694 - 1694 Literal <nosymbol> ()
50 13613 1694 - 1694 Block <nosymbol> ()
51 13609 1721 - 1745 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(value, -1)
51 13610 1734 - 1734 Literal <nosymbol> ()
51 13611 1721 - 1745 Block <nosymbol> { TopK.this.summary.offer(value, -1); () }
56 13614 1832 - 1835 Select org.locationtech.geomesa.utils.stats.TopK.sft TopK.this.sft
56 13615 1837 - 1845 Select org.locationtech.geomesa.utils.stats.TopK.property TopK.this.property
56 13616 1820 - 1846 Apply org.locationtech.geomesa.utils.stats.TopK.<init> new org.locationtech.geomesa.utils.stats.TopK[T](TopK.this.sft, TopK.this.property)
57 13617 1851 - 1865 Apply org.locationtech.geomesa.utils.stats.TopK.+= merged.+=(this)
58 13618 1870 - 1885 Apply org.locationtech.geomesa.utils.stats.TopK.+= merged.+=(other)
63 13619 1967 - 1981 Select org.locationtech.geomesa.utils.stats.TopK.StreamCapacity TopK.StreamCapacity
63 13620 2015 - 2041 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(item, count)
63 13621 2015 - 2041 Block org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(item, count)
63 13622 1948 - 2043 Apply scala.collection.Iterator.foreach other.summary.topK(TopK.StreamCapacity).foreach[Boolean](((x0$1: (T, Long)) => x0$1 match { case (_1: T, _2: Long)(T, Long)((item @ _), (count @ _)) => TopK.this.summary.offer(item, count) }))
65 13623 2076 - 2091 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.clear TopK.this.summary.clear()
67 13624 2127 - 2144 Apply scala.Int.== TopK.this.summary.size.==(0)
70 13625 2211 - 2213 Literal <nosymbol> 10
70 13631 2198 - 2327 Apply scala.collection.Iterator.map TopK.this.summary.topK(10).zipWithIndex.map[(Int, scala.collection.immutable.ListMap[String,Any])](((x0$1: ((T, Long), Int)) => x0$1 match { case (_1: (T, Long), _2: Int)((T, Long), Int)((_1: T, _2: Long)(T, Long)((item @ _), (count @ _)), (rank @ _)) => scala.Tuple2.apply[Int, scala.collection.immutable.ListMap[String,Any]](rank, scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count))) }))
71 13626 2286 - 2301 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String]("value").->[T](item)
71 13627 2303 - 2319 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String]("count").->[Long](count)
71 13628 2277 - 2320 Apply scala.collection.generic.GenMapFactory.apply scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count))
71 13629 2270 - 2321 Apply scala.Tuple2.apply scala.Tuple2.apply[Int, scala.collection.immutable.ListMap[String,Any]](rank, scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count)))
71 13630 2270 - 2321 Block scala.Tuple2.apply scala.Tuple2.apply[Int, scala.collection.immutable.ListMap[String,Any]](rank, scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count)))
73 13632 2340 - 2350 Select scala.collection.TraversableOnce.toSeq maps.toSeq
73 13633 2332 - 2354 Apply scala.collection.generic.GenMapFactory.apply scala.collection.immutable.ListMap.apply[Int, scala.collection.immutable.ListMap[String,Any]]((maps.toSeq: _*))
77 13634 2465 - 2479 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size s.summary.size
77 13635 2449 - 2479 Apply scala.Int.== TopK.this.summary.size.==(s.summary.size)
78 13636 2504 - 2516 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size TopK.this.summary.size
78 13637 2544 - 2556 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size TopK.this.summary.size
78 13638 2531 - 2557 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.topK TopK.this.summary.topK(TopK.this.summary.size)
78 13639 2489 - 2558 Apply scala.collection.Iterator.sameElements s.summary.topK(TopK.this.summary.size).sameElements(TopK.this.summary.topK(TopK.this.summary.size))
78 13640 2489 - 2558 Block scala.collection.Iterator.sameElements s.summary.topK(TopK.this.summary.size).sameElements(TopK.this.summary.topK(TopK.this.summary.size))
79 13641 2573 - 2578 Literal <nosymbol> false
79 13642 2573 - 2578 Block <nosymbol> false
84 13643 2623 - 2627 Literal <nosymbol> 1000