1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 /**
10   * Adapted from:
11   *
12   * stream-lib
13   * Copyright (C) 2011 Clearspring Technologies, Inc.
14   * Copyright 2016 AddThis
15   *
16   * This product includes software developed by AddThis.
17   *
18   * This product also includes code adapted from:
19   *
20   * Apache Solr (http://lucene.apache.org/solr/)
21   * Copyright 2014 The Apache Software Foundation
22   *
23   * Apache Mahout (http://mahout.apache.org/)
24   * Copyright 2014 The Apache Software Foundation
25   *
26   */
27 
28 package org.locationtech.geomesa.utils.clearspring
29 
30 import com.clearspring.analytics.hash.MurmurHash
31 import com.clearspring.analytics.stream.cardinality.ICardinality
32 
33 /**
34   * Scala implementation of HyperLogLog (HLL) algorithm from this paper:
35   * <p/>
36   * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
37   * <p/>
38   * HLL is an improved version of LogLog that is capable of estimating
39   * the cardinality of a set with accuracy = 1.04/sqrt(m) where
40   * m = math.pow(2, b).  So we can control accuracy vs space usage by increasing
41   * or decreasing b.
42   * <p/>
43   * The main benefit of using HLL over LL is that it only requires 64%
44   * of the space that LL does to get the same accuracy.
45   * <p/>
46   * This implementation implements a single counter.  If a large (millions)
47   * number of counters are required you may want to refer to:
48   * <p/>
49   * http://dsiutils.dsi.unimi.it/
50   * <p/>
51   * It has a more complex implementation of HLL that supports multiple counters
52   * in a single object, drastically reducing the java overhead from creating
53   * a large number of objects.
54   * <p/>
55   * This implementation leveraged a javascript implementation that Yammer has
56   * been working on:
57   * <p/>
58   * https://github.com/yammer/probablyjs
59   * <p>
60   * Note that this implementation does not include the long range correction function
61   * defined in the original paper.  Empirical evidence shows that the correction
62   * function causes more harm than good.
63   * </p>
64   * <p/>
65   * <p>
66   * Users have different motivations to use different types of hashing functions.
67   * Rather than try to keep up with all available hash functions and to remove
68   * the concern of causing future binary incompatibilities this class allows clients
69   * to offer the value in hashed int or long form.  This way clients are free
70   * to change their hash function on their own time line.  We recommend using Google's
71   * Guava Murmur3_128 implementation as it provides good performance and speed when
72   * high precision is required.  In our tests the 32bit MurmurHash function included
73   * in this project is faster and produces better results than the 32 bit murmur3
74   * implementation google provides.
75   * </p>
76   */
77 /**
78   * Create a new HyperLogLog instance.  The log2m parameter defines the accuracy of
79   * the counter.  The larger the log2m the better the accuracy.
80   * <p/>
81   * accuracy = 1.04/sqrt(math.pow(2, log2m))
82   *
83   * @param log2m - the number of bits to use as the basis for the HLL instance
84   */
85 class HyperLogLog private (val log2m: Int, val registerSet: RegisterSet) extends ICardinality {
86 
87   require(log2m >= 0 && log2m <= 30, s"log2m argument $log2m is outside the acceptable range [0, 30]")
88 
89   private val alphaMM: Double = HyperLogLog.getAlphaMM(log2m, 1 << log2m)
90 
91   override def offerHashed(hashedValue: Long): Boolean = {
92     // j becomes the binary address determined by the first b log2m of x
93     // j will be between 0 and 2^log2m
94     val j = (hashedValue >>> (java.lang.Long.SIZE - log2m)).asInstanceOf[Int]
95     val r = java.lang.Long.numberOfLeadingZeros((hashedValue << log2m) | (1 << (log2m - 1)) + 1) + 1
96     registerSet.updateIfGreater(j, r)
97   }
98 
99   override def offerHashed(hashedValue: Int): Boolean = {
100     // j becomes the binary address determined by the first b log2m of x
101     // j will be between 0 and 2^log2m
102     val j = hashedValue >>> (Integer.SIZE - log2m)
103     val r = Integer.numberOfLeadingZeros((hashedValue << log2m) | (1 << (log2m - 1)) + 1) + 1
104     registerSet.updateIfGreater(j, r)
105   }
106 
107   override def offer(o: Any): Boolean = offerHashed(MurmurHash.hash(o))
108 
109   override def cardinality: Long = {
110     var registerSum = 0d
111     var zeros = 0.0
112 
113     var j = 0
114     while (j < registerSet.count) {
115       val value = registerSet.get(j)
116       registerSum += 1.0 / (1 << value)
117       if (value == 0) {
118         zeros += 1
119       }
120       j += 1
121     }
122 
123     val estimate = alphaMM * (1 / registerSum)
124 
125     if (estimate <= (5.0 / 2.0) * registerSet.count) {
126       // Small Range Estimate
127       math.round(HyperLogLog.linearCounting(registerSet.count, zeros))
128     } else {
129       math.round(estimate)
130     }
131   }
132 
133   override def sizeof(): Int = registerSet.size * 4
134 
135   /**
136     * Add all the elements of the other set to this set.
137     * <p/>
138     * This operation does not imply a loss of precision.
139     *
140     * @param other A compatible Hyperloglog instance (same log2m)
141     * @throws IllegalArgumentException if other is not compatible
142     */
143   def +=(other: HyperLogLog): Unit = {
144     require(registerSet.size == other.registerSet.size, "Cannot merge estimators of different sizes")
145     registerSet.merge(other.registerSet)
146   }
147 
148   override def merge(estimators: ICardinality*): HyperLogLog = {
149     val merged = new HyperLogLog(log2m, new RegisterSet(this.registerSet.count)())
150     merged += this
151     estimators.foreach {
152       case h: HyperLogLog => merged += h
153       case e => throw new IllegalArgumentException(s"Cannot merge estimators of different class: $e")
154     }
155     merged
156   }
157 
158   override def getBytes: Array[Byte] = throw new NotImplementedError
159 }
160 
161 object HyperLogLog {
162 
163   def apply(log2m: Int): HyperLogLog = new HyperLogLog(log2m, new RegisterSet(1 << log2m)())
164 
165   def apply(log2m: Int, register: Array[Int]): HyperLogLog =
166     new HyperLogLog(log2m, new RegisterSet(1 << log2m)(register))
167 
168   private def getAlphaMM(p: Int, m: Int): Double = {
169     // See the paper.
170     p match {
171       case 4 => 0.673 * m * m
172       case 5 => 0.697 * m * m
173       case 6 => 0.709 * m * m
174       case _ => (0.7213 / (1 + 1.079 / m)) * m * m
175     }
176   }
177 
178   private def linearCounting(m: Int, V: Double): Double = m * math.log(m / V)
179 }
Line Stmt Id Pos Tree Symbol Tests Code
87 2979 3521 - 3522 Literal <nosymbol> 0
87 2980 3526 - 3537 Apply scala.Int.<= HyperLogLog.this.log2m.<=(30)
87 2981 3512 - 3537 Apply scala.Boolean.&& HyperLogLog.this.log2m.>=(0).&&(HyperLogLog.this.log2m.<=(30))
87 2982 3541 - 3557 Literal <nosymbol> "log2m argument "
87 2983 3562 - 3603 Literal <nosymbol> " is outside the acceptable range [0, 30]"
87 2984 3557 - 3562 Select org.locationtech.geomesa.utils.clearspring.HyperLogLog.log2m HyperLogLog.this.log2m
87 2985 3539 - 3603 Apply scala.StringContext.s scala.StringContext.apply("log2m argument ", " is outside the acceptable range [0, 30]").s(HyperLogLog.this.log2m)
87 2986 3504 - 3604 Apply scala.Predef.require scala.Predef.require(HyperLogLog.this.log2m.>=(0).&&(HyperLogLog.this.log2m.<=(30)), scala.StringContext.apply("log2m argument ", " is outside the acceptable range [0, 30]").s(HyperLogLog.this.log2m))
89 2987 3661 - 3666 Select org.locationtech.geomesa.utils.clearspring.HyperLogLog.log2m HyperLogLog.this.log2m
89 2988 3668 - 3669 Literal <nosymbol> 1
89 2989 3673 - 3678 Select org.locationtech.geomesa.utils.clearspring.HyperLogLog.log2m HyperLogLog.this.log2m
89 2990 3668 - 3678 Apply scala.Int.<< 1.<<(HyperLogLog.this.log2m)
89 2991 3638 - 3679 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.getAlphaMM HyperLogLog.getAlphaMM(HyperLogLog.this.log2m, 1.<<(HyperLogLog.this.log2m))
94 2992 3882 - 3901 Literal <nosymbol> 64
94 2993 3904 - 3909 Select org.locationtech.geomesa.utils.clearspring.HyperLogLog.log2m HyperLogLog.this.log2m
94 2994 3882 - 3909 Apply scala.Int.- 64.-(HyperLogLog.this.log2m)
94 2995 3864 - 3929 TypeApply scala.Any.asInstanceOf hashedValue.>>>(64.-(HyperLogLog.this.log2m)).asInstanceOf[Int]
95 2996 3942 - 4030 Apply scala.Int.+ java.lang.Long.numberOfLeadingZeros(hashedValue.<<(HyperLogLog.this.log2m).|(1.<<(HyperLogLog.this.log2m.-(1)).+(1))).+(1)
96 2997 4035 - 4068 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.updateIfGreater HyperLogLog.this.registerSet.updateIfGreater(j, r)
102 2998 4273 - 4285 Literal <nosymbol> 32
102 2999 4288 - 4293 Select org.locationtech.geomesa.utils.clearspring.HyperLogLog.log2m HyperLogLog.this.log2m
102 3000 4273 - 4293 Apply scala.Int.- 32.-(HyperLogLog.this.log2m)
102 3001 4256 - 4294 Apply scala.Int.>>> hashedValue.>>>(32.-(HyperLogLog.this.log2m))
103 3002 4307 - 4388 Apply scala.Int.+ java.lang.Integer.numberOfLeadingZeros(hashedValue.<<(HyperLogLog.this.log2m).|(1.<<(HyperLogLog.this.log2m.-(1)).+(1))).+(1)
104 3003 4393 - 4426 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.updateIfGreater HyperLogLog.this.registerSet.updateIfGreater(j, r)
107 3004 4484 - 4502 Apply com.clearspring.analytics.hash.MurmurHash.hash com.clearspring.analytics.hash.MurmurHash.hash(o)
107 3005 4472 - 4503 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.offerHashed HyperLogLog.this.offerHashed(com.clearspring.analytics.hash.MurmurHash.hash(o))
110 3006 4564 - 4566 Literal <nosymbol> 0.0
111 3007 4583 - 4586 Literal <nosymbol> 0.0
113 3008 4600 - 4601 Literal <nosymbol> 0
114 3009 4617 - 4634 Select org.locationtech.geomesa.utils.clearspring.RegisterSet.count HyperLogLog.this.registerSet.count
114 3010 4613 - 4634 Apply scala.Int.< j.<(HyperLogLog.this.registerSet.count)
114 3022 4636 - 4636 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.while$1 while$1()
114 3023 4636 - 4784 Block <nosymbol> { { val value: Int = HyperLogLog.this.registerSet.get(j); registerSum = registerSum.+(1.0./(1.<<(value))); if (value.==(0)) zeros = zeros.+(1) else (); j = j.+(1) }; while$1() }
114 3024 4606 - 4606 Literal <nosymbol> ()
114 3025 4606 - 4606 Block <nosymbol> ()
115 3011 4656 - 4674 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.get HyperLogLog.this.registerSet.get(j)
116 3012 4696 - 4699 Literal <nosymbol> 1.0
116 3013 4703 - 4713 Apply scala.Int.<< 1.<<(value)
116 3014 4696 - 4714 Apply scala.Double./ 1.0./(1.<<(value))
116 3015 4681 - 4714 Apply scala.Double.+ registerSum.+(1.0./(1.<<(value)))
117 3016 4725 - 4735 Apply scala.Int.== value.==(0)
117 3019 4721 - 4721 Literal <nosymbol> ()
117 3020 4721 - 4721 Block <nosymbol> ()
118 3017 4747 - 4757 Apply scala.Double.+ zeros.+(1)
118 3018 4747 - 4757 Assign <nosymbol> zeros = zeros.+(1)
120 3021 4772 - 4778 Apply scala.Int.+ j.+(1)
123 3026 4816 - 4831 Apply scala.Int./ 1./(registerSum)
123 3027 4805 - 4832 Apply scala.Double.* HyperLogLog.this.alphaMM.*(1./(registerSum))
125 3028 4855 - 4864 Literal <nosymbol> 2.5
125 3029 4868 - 4885 Select org.locationtech.geomesa.utils.clearspring.RegisterSet.count HyperLogLog.this.registerSet.count
125 3030 4854 - 4885 Apply scala.Double.* 2.5.*(HyperLogLog.this.registerSet.count)
125 3031 4842 - 4885 Apply scala.Double.<= estimate.<=(2.5.*(HyperLogLog.this.registerSet.count))
127 3032 4963 - 4980 Select org.locationtech.geomesa.utils.clearspring.RegisterSet.count HyperLogLog.this.registerSet.count
127 3033 4936 - 4988 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.linearCounting HyperLogLog.linearCounting(HyperLogLog.this.registerSet.count, zeros)
127 3034 4925 - 4989 Apply scala.math.round scala.math.`package`.round(HyperLogLog.linearCounting(HyperLogLog.this.registerSet.count, zeros))
127 3035 4925 - 4989 Block scala.math.round scala.math.`package`.round(HyperLogLog.linearCounting(HyperLogLog.this.registerSet.count, zeros))
129 3036 5009 - 5029 Apply scala.math.round scala.math.`package`.round(estimate)
129 3037 5009 - 5029 Block scala.math.round scala.math.`package`.round(estimate)
133 3038 5072 - 5092 Apply scala.Int.* HyperLogLog.this.registerSet.size.*(4)
144 3039 5441 - 5463 Select org.locationtech.geomesa.utils.clearspring.RegisterSet.size other.registerSet.size
144 3040 5421 - 5463 Apply scala.Int.== HyperLogLog.this.registerSet.size.==(other.registerSet.size)
144 3041 5465 - 5509 Literal <nosymbol> "Cannot merge estimators of different sizes"
144 3042 5413 - 5510 Apply scala.Predef.require scala.Predef.require(HyperLogLog.this.registerSet.size.==(other.registerSet.size), "Cannot merge estimators of different sizes")
145 3043 5533 - 5550 Select org.locationtech.geomesa.utils.clearspring.HyperLogLog.registerSet other.registerSet
145 3044 5515 - 5551 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.merge HyperLogLog.this.registerSet.merge(other.registerSet)
149 3045 5655 - 5660 Select org.locationtech.geomesa.utils.clearspring.HyperLogLog.log2m HyperLogLog.this.log2m
149 3046 5678 - 5700 Select org.locationtech.geomesa.utils.clearspring.RegisterSet.count this.registerSet.count
149 3047 5662 - 5662 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.<init>$default$2 clearspring.this.RegisterSet.<init>$default$2(x$1)
149 3048 5662 - 5703 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.<init> new RegisterSet(x$1)(x$2)
149 3049 5639 - 5704 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.<init> new HyperLogLog(HyperLogLog.this.log2m, { <artifact> val x$1: Int = this.registerSet.count; <artifact> val x$2: Array[Int] @scala.reflect.internal.annotations.uncheckedBounds = clearspring.this.RegisterSet.<init>$default$2(x$1); new RegisterSet(x$1)(x$2) })
150 3050 5709 - 5723 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.+= merged.+=(this)
151 3055 5728 - 5897 Apply scala.collection.IterableLike.foreach estimators.foreach[Unit](((x0$1: com.clearspring.analytics.stream.cardinality.ICardinality) => x0$1 match { case (h @ (_: org.locationtech.geomesa.utils.clearspring.HyperLogLog)) => merged.+=(h) case (e @ _) => throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Cannot merge estimators of different class: ", "").s(e)) }))
152 3051 5778 - 5789 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.+= merged.+=(h)
152 3052 5778 - 5789 Block org.locationtech.geomesa.utils.clearspring.HyperLogLog.+= merged.+=(h)
153 3053 5806 - 5891 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Cannot merge estimators of different class: ", "").s(e))
153 3054 5806 - 5891 Block <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Cannot merge estimators of different class: ", "").s(e))
158 3056 5953 - 5982 Throw <nosymbol> throw new scala.NotImplementedError()
163 3057 6086 - 6096 Apply scala.Int.<< 1.<<(log2m)
163 3058 6070 - 6070 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.<init>$default$2 clearspring.this.RegisterSet.<init>$default$2(x$1)
163 3059 6070 - 6099 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.<init> new RegisterSet(x$1)(x$2)
163 3060 6047 - 6100 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.<init> new HyperLogLog(log2m, { <artifact> val x$1: Int = 1.<<(log2m); <artifact> val x$2: Array[Int] @scala.reflect.internal.annotations.uncheckedBounds = clearspring.this.RegisterSet.<init>$default$2(x$1); new RegisterSet(x$1)(x$2) })
166 3061 6190 - 6227 Apply org.locationtech.geomesa.utils.clearspring.RegisterSet.<init> new RegisterSet(1.<<(log2m))(register)
166 3062 6167 - 6228 Apply org.locationtech.geomesa.utils.clearspring.HyperLogLog.<init> new HyperLogLog(log2m, new RegisterSet(1.<<(log2m))(register))
171 3063 6335 - 6348 Apply scala.Double.* 0.673.*(m).*(m)
171 3064 6335 - 6348 Block scala.Double.* 0.673.*(m).*(m)
172 3065 6365 - 6378 Apply scala.Double.* 0.697.*(m).*(m)
172 3066 6365 - 6378 Block scala.Double.* 0.697.*(m).*(m)
173 3067 6395 - 6408 Apply scala.Double.* 0.709.*(m).*(m)
173 3068 6395 - 6408 Block scala.Double.* 0.709.*(m).*(m)
174 3069 6425 - 6459 Apply scala.Double.* 0.7213./(1.+(1.079./(m))).*(m).*(m)
174 3070 6425 - 6459 Block scala.Double.* 0.7213./(1.+(1.079./(m))).*(m).*(m)
178 3071 6542 - 6547 Apply scala.Int./ m./(V)
178 3072 6533 - 6548 Apply scala.math.log scala.math.`package`.log(m./(V))
178 3073 6529 - 6548 Apply scala.Int.* m.*(scala.math.`package`.log(m./(V)))