1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.utils.stats
10 import com.typesafe.scalalogging.LazyLogging
11 import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
12 import org.locationtech.geomesa.utils.clearspring.StreamSummary
13 
14 import scala.collection.immutable.ListMap
15 
16 /**
17   * TopK stat
18   *
19   * @param sft simple feature type
20   * @param property property name of the attribute to track
21   * @param summary stream summary object
22   * @tparam T attribute type binding
23   */
24 class TopK[T] private [stats] (
25     val sft: SimpleFeatureType,
26     val property: String,
27     private [stats] val summary: StreamSummary[T]
28   ) extends Stat with LazyLogging {
29 
30   import TopK.StreamCapacity
31 
32   def this(sft: SimpleFeatureType, property: String) = this(sft, property, StreamSummary[T](TopK.StreamCapacity))
33 
34   override type S = TopK[T]
35 
36   private val i = sft.indexOf(property)
37 
38   def topK(k: Int): Iterator[(T, Long)] = summary.topK(k)
39   def size: Int = summary.size
40 
41   override def observe(sf: SimpleFeature): Unit = {
42     val value = sf.getAttribute(i).asInstanceOf[T]
43     if (value != null) {
44       summary.offer(value)
45     }
46   }
47 
48   override def unobserve(sf: SimpleFeature): Unit = {
49     val value = sf.getAttribute(i).asInstanceOf[T]
50     if (value != null) {
51       summary.offer(value, -1)
52     }
53   }
54 
55   override def +(other: TopK[T]): TopK[T] = {
56     val merged = new TopK[T](sft, property)
57     merged += this
58     merged += other
59     merged
60   }
61 
62   override def +=(other: TopK[T]): Unit =
63     other.summary.topK(StreamCapacity).foreach { case (item, count) => summary.offer(item, count) }
64 
65   override def clear(): Unit = summary.clear()
66 
67   override def isEmpty: Boolean = summary.size == 0
68 
69   override def toJsonObject: Any = {
70     val maps = summary.topK(10).zipWithIndex.map { case ((item, count), rank) =>
71       (rank, ListMap( "value" -> item, "count" -> count))
72     }
73     ListMap(maps.toSeq:_*)
74   }
75 
76   override def isEquivalent(other: Stat): Boolean = other match {
77     case s: TopK[T] if summary.size == s.summary.size =>
78       s.summary.topK(summary.size).sameElements(summary.topK(summary.size))
79     case _ => false
80   }
81 }
82 
83 object TopK {
84   val StreamCapacity = 1000
85 }
Line Stmt Id Pos Tree Symbol Tests Code
32 15735 1236 - 1255 Select org.locationtech.geomesa.utils.stats.TopK.StreamCapacity TopK.StreamCapacity
32 15736 1219 - 1256 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.apply org.locationtech.geomesa.utils.clearspring.StreamSummary.apply[T](TopK.StreamCapacity)
32 15737 1199 - 1257 Apply org.locationtech.geomesa.utils.stats.TopK.<init> TopK.this.<init>(sft, property, org.locationtech.geomesa.utils.clearspring.StreamSummary.apply[T](TopK.StreamCapacity))
32 15738 1199 - 1199 Literal <nosymbol> ()
36 15739 1318 - 1326 Select org.locationtech.geomesa.utils.stats.TopK.property TopK.this.property
36 15740 1306 - 1327 Apply org.geotools.api.feature.simple.SimpleFeatureType.indexOf TopK.this.sft.indexOf(TopK.this.property)
38 15741 1371 - 1386 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.topK TopK.this.summary.topK(k)
39 15742 1405 - 1417 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size TopK.this.summary.size
42 15743 1503 - 1504 Select org.locationtech.geomesa.utils.stats.TopK.i TopK.this.i
42 15744 1487 - 1521 TypeApply scala.Any.asInstanceOf sf.getAttribute(TopK.this.i).asInstanceOf[T]
43 15745 1530 - 1543 Apply scala.Any.!= value.!=(null)
43 15749 1526 - 1526 Literal <nosymbol> ()
43 15750 1526 - 1526 Block <nosymbol> ()
44 15746 1553 - 1573 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(value)
44 15747 1566 - 1566 Literal <nosymbol> ()
44 15748 1553 - 1573 Block <nosymbol> { TopK.this.summary.offer(value); () }
49 15751 1671 - 1672 Select org.locationtech.geomesa.utils.stats.TopK.i TopK.this.i
49 15752 1655 - 1689 TypeApply scala.Any.asInstanceOf sf.getAttribute(TopK.this.i).asInstanceOf[T]
50 15753 1698 - 1711 Apply scala.Any.!= value.!=(null)
50 15757 1694 - 1694 Literal <nosymbol> ()
50 15758 1694 - 1694 Block <nosymbol> ()
51 15754 1721 - 1745 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(value, -1)
51 15755 1734 - 1734 Literal <nosymbol> ()
51 15756 1721 - 1745 Block <nosymbol> { TopK.this.summary.offer(value, -1); () }
56 15759 1832 - 1835 Select org.locationtech.geomesa.utils.stats.TopK.sft TopK.this.sft
56 15760 1837 - 1845 Select org.locationtech.geomesa.utils.stats.TopK.property TopK.this.property
56 15761 1820 - 1846 Apply org.locationtech.geomesa.utils.stats.TopK.<init> new org.locationtech.geomesa.utils.stats.TopK[T](TopK.this.sft, TopK.this.property)
57 15762 1851 - 1865 Apply org.locationtech.geomesa.utils.stats.TopK.+= merged.+=(this)
58 15763 1870 - 1885 Apply org.locationtech.geomesa.utils.stats.TopK.+= merged.+=(other)
63 15764 1967 - 1981 Select org.locationtech.geomesa.utils.stats.TopK.StreamCapacity TopK.StreamCapacity
63 15765 2015 - 2041 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(item, count)
63 15766 2015 - 2041 Block org.locationtech.geomesa.utils.clearspring.StreamSummary.offer TopK.this.summary.offer(item, count)
63 15767 1948 - 2043 Apply scala.collection.Iterator.foreach other.summary.topK(TopK.StreamCapacity).foreach[Boolean](((x0$1: (T, Long)) => x0$1 match { case (_1: T, _2: Long)(T, Long)((item @ _), (count @ _)) => TopK.this.summary.offer(item, count) }))
65 15768 2076 - 2091 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.clear TopK.this.summary.clear()
67 15769 2127 - 2144 Apply scala.Int.== TopK.this.summary.size.==(0)
70 15770 2211 - 2213 Literal <nosymbol> 10
70 15776 2198 - 2327 Apply scala.collection.Iterator.map TopK.this.summary.topK(10).zipWithIndex.map[(Int, scala.collection.immutable.ListMap[String,Any])](((x0$1: ((T, Long), Int)) => x0$1 match { case (_1: (T, Long), _2: Int)((T, Long), Int)((_1: T, _2: Long)(T, Long)((item @ _), (count @ _)), (rank @ _)) => scala.Tuple2.apply[Int, scala.collection.immutable.ListMap[String,Any]](rank, scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count))) }))
71 15771 2286 - 2301 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String]("value").->[T](item)
71 15772 2303 - 2319 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String]("count").->[Long](count)
71 15773 2277 - 2320 Apply scala.collection.generic.GenMapFactory.apply scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count))
71 15774 2270 - 2321 Apply scala.Tuple2.apply scala.Tuple2.apply[Int, scala.collection.immutable.ListMap[String,Any]](rank, scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count)))
71 15775 2270 - 2321 Block scala.Tuple2.apply scala.Tuple2.apply[Int, scala.collection.immutable.ListMap[String,Any]](rank, scala.collection.immutable.ListMap.apply[String, Any](scala.Predef.ArrowAssoc[String]("value").->[T](item), scala.Predef.ArrowAssoc[String]("count").->[Long](count)))
73 15777 2340 - 2350 Select scala.collection.TraversableOnce.toSeq maps.toSeq
73 15778 2332 - 2354 Apply scala.collection.generic.GenMapFactory.apply scala.collection.immutable.ListMap.apply[Int, scala.collection.immutable.ListMap[String,Any]]((maps.toSeq: _*))
77 15779 2465 - 2479 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size s.summary.size
77 15780 2449 - 2479 Apply scala.Int.== TopK.this.summary.size.==(s.summary.size)
78 15781 2504 - 2516 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size TopK.this.summary.size
78 15782 2544 - 2556 Select org.locationtech.geomesa.utils.clearspring.StreamSummary.size TopK.this.summary.size
78 15783 2531 - 2557 Apply org.locationtech.geomesa.utils.clearspring.StreamSummary.topK TopK.this.summary.topK(TopK.this.summary.size)
78 15784 2489 - 2558 Apply scala.collection.Iterator.sameElements s.summary.topK(TopK.this.summary.size).sameElements(TopK.this.summary.topK(TopK.this.summary.size))
78 15785 2489 - 2558 Block scala.collection.Iterator.sameElements s.summary.topK(TopK.this.summary.size).sameElements(TopK.this.summary.topK(TopK.this.summary.size))
79 15786 2573 - 2578 Literal <nosymbol> false
79 15787 2573 - 2578 Block <nosymbol> false
84 15788 2623 - 2627 Literal <nosymbol> 1000