1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.metrics.core
10 
11 import com.codahale.metrics.{MetricRegistry, ScheduledReporter}
12 import com.typesafe.config.{Config, ConfigFactory}
13 import org.locationtech.geomesa.utils.classpath.ServiceLoader
14 import pureconfig.error.{CannotConvert, ConfigReaderFailures}
15 import pureconfig.{ConfigCursor, ConfigObjectCursor, ConfigReader, ConfigSource}
16 
17 import java.util.concurrent.TimeUnit
18 import scala.concurrent.duration.Duration
19 import scala.util.control.NonFatal
20 
21 /**
22   * Factory for SPI loading reporters at runtime
23   */
24 trait ReporterFactory {
25   def apply(conf: Config, registry: MetricRegistry, rates: TimeUnit, durations: TimeUnit): Option[ScheduledReporter]
26 }
27 
28 object ReporterFactory {
29 
30   private val defaults = ConfigFactory.parseString("""{units:seconds,interval:"60 seconds"}""")
31 
32   private lazy val factories = ServiceLoader.load[ReporterFactory]()
33 
34   private implicit val reader: ConfigReader[ReporterConfig] = ReporterReader
35 
36   /**
37     * Load a reporter from the available factories
38     *
39     * @param config config
40     * @param registry registry
41     * @return
42     */
43   def apply(config: Config, registry: MetricRegistry): ScheduledReporter = {
44     val ReporterConfig(rates, durations, interval) =
45       ConfigSource.fromConfig(config.withFallback(defaults)).loadOrThrow[ReporterConfig]
46     val reporter = factories.toStream.flatMap(_.apply(config, registry, rates, durations)).headOption.getOrElse {
47       throw new IllegalArgumentException(
48         s"Could not load reporter factory using provided config:\n${config.root().render()}")
49     }
50     if (interval > 0) {
51       reporter.start(interval, TimeUnit.MILLISECONDS)
52     }
53     reporter
54   }
55 
56   case class ReporterConfig(rates: TimeUnit, durations: TimeUnit, interval: Long)
57 
58   object ReporterReader extends ConfigReader[ReporterConfig] {
59     override def from(cur: ConfigCursor): Either[ConfigReaderFailures, ReporterConfig] = {
60       for {
61         obj      <- cur.asObjectCursor.right
62         rate     <- timeUnit(obj, "rate-units", "units").right
63         duration <- timeUnit(obj, "duration-units", "units").right
64         interval <- durationMillis(obj.atKeyOrUndefined("interval")).right
65       } yield {
66         ReporterConfig(rate, duration, interval)
67       }
68     }
69 
70     private def timeUnit(cur: ConfigObjectCursor, key: String, fallback: String): Either[ConfigReaderFailures, TimeUnit] = {
71       val primary = cur.atKey(key).right.flatMap(_.asString).right.flatMap { unit =>
72         TimeUnit.values().find(_.toString.equalsIgnoreCase(unit)).toRight[ConfigReaderFailures](
73           ConfigReaderFailures(cur.failureFor(CannotConvert(cur.valueOpt.map(_.toString).orNull, "TimeUnit", "Does not match a TimeUnit")))
74         )
75       }
76       if (primary.isRight || fallback == null) { primary } else {
77         // use the fallback if it's a right, otherwise use the primary as a left
78         timeUnit(cur, fallback, null).left.flatMap(_ => primary)
79       }
80     }
81 
82     private def durationMillis(cur: ConfigCursor): Either[ConfigReaderFailures, Long] = {
83       if (cur.isUndefined) { Right(-1L) } else {
84         cur.asString.right.flatMap { d =>
85           try { Right(Duration(d).toMillis) } catch {
86             case NonFatal(e) => cur.failed(CannotConvert(cur.valueOpt.map(_.toString).orNull, "Duration", e.getMessage))
87           }
88         }
89       }
90     }
91   }
92 }
Line Stmt Id Pos Tree Symbol Tests Code
30 53156 1201 - 1271 Apply com.typesafe.config.ConfigFactory.parseString com.typesafe.config.ConfigFactory.parseString("{units:seconds,interval:\"60 seconds\"}")
34 53157 1405 - 1419 Select org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterReader ReporterFactory.this.ReporterReader
44 53158 1663 - 1663 Select scala.Tuple3._1 x$1._1
44 53159 1670 - 1670 Select scala.Tuple3._2 x$1._2
44 53160 1681 - 1681 Select scala.Tuple3._3 x$1._3
46 53161 1828 - 1871 Apply org.locationtech.geomesa.metrics.core.ReporterFactory.apply x$2.apply(config, registry, rates, durations)
46 53162 1828 - 1871 ApplyImplicitView scala.Option.option2Iterable scala.this.Option.option2Iterable[com.codahale.metrics.ScheduledReporter](x$2.apply(config, registry, rates, durations))
46 53163 1827 - 1827 TypeApply scala.collection.immutable.Stream.canBuildFrom immutable.this.Stream.canBuildFrom[com.codahale.metrics.ScheduledReporter]
46 53165 1801 - 2037 Apply scala.Option.getOrElse ReporterFactory.this.factories.toStream.flatMap[com.codahale.metrics.ScheduledReporter, scala.collection.immutable.Stream[com.codahale.metrics.ScheduledReporter]](((x$2: org.locationtech.geomesa.metrics.core.ReporterFactory) => scala.this.Option.option2Iterable[com.codahale.metrics.ScheduledReporter](x$2.apply(config, registry, rates, durations))))(immutable.this.Stream.canBuildFrom[com.codahale.metrics.ScheduledReporter]).headOption.getOrElse[com.codahale.metrics.ScheduledReporter](throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Could not load reporter factory using provided config:\\n", "").s(config.root().render())))
47 53164 1902 - 2031 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Could not load reporter factory using provided config:\\n", "").s(config.root().render()))
50 53166 2046 - 2058 Apply scala.Long.> interval.>(0)
50 53169 2042 - 2042 Literal <nosymbol> ()
50 53170 2042 - 2042 Block <nosymbol> ()
51 53167 2068 - 2115 Apply com.codahale.metrics.ScheduledReporter.start reporter.start(interval, MILLISECONDS)
51 53168 2068 - 2115 Block com.codahale.metrics.ScheduledReporter.start reporter.start(interval, MILLISECONDS)
61 53180 2383 - 2703 Apply scala.util.Either.RightProjection.flatMap cur.asObjectCursor.right.flatMap[pureconfig.error.ConfigReaderFailures, org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((obj: pureconfig.ConfigObjectCursor) => ReporterReader.this.timeUnit(obj, "rate-units", "units").right.flatMap[pureconfig.error.ConfigReaderFailures, org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((rate: java.util.concurrent.TimeUnit) => ReporterReader.this.timeUnit(obj, "duration-units", "units").right.flatMap[pureconfig.error.ConfigReaderFailures, org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((duration: java.util.concurrent.TimeUnit) => ReporterReader.this.durationMillis(obj.atKeyOrUndefined("interval")).right.map[org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((interval: Long) => ReporterFactory.this.ReporterConfig.apply(rate, duration, interval)))))))))
62 53171 2468 - 2480 Literal <nosymbol> "rate-units"
62 53172 2482 - 2489 Literal <nosymbol> "units"
62 53179 2442 - 2703 Apply scala.util.Either.RightProjection.flatMap ReporterReader.this.timeUnit(obj, "rate-units", "units").right.flatMap[pureconfig.error.ConfigReaderFailures, org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((rate: java.util.concurrent.TimeUnit) => ReporterReader.this.timeUnit(obj, "duration-units", "units").right.flatMap[pureconfig.error.ConfigReaderFailures, org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((duration: java.util.concurrent.TimeUnit) => ReporterReader.this.durationMillis(obj.atKeyOrUndefined("interval")).right.map[org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((interval: Long) => ReporterFactory.this.ReporterConfig.apply(rate, duration, interval)))))))
63 53173 2531 - 2547 Literal <nosymbol> "duration-units"
63 53174 2549 - 2556 Literal <nosymbol> "units"
63 53178 2505 - 2703 Apply scala.util.Either.RightProjection.flatMap ReporterReader.this.timeUnit(obj, "duration-units", "units").right.flatMap[pureconfig.error.ConfigReaderFailures, org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((duration: java.util.concurrent.TimeUnit) => ReporterReader.this.durationMillis(obj.atKeyOrUndefined("interval")).right.map[org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((interval: Long) => ReporterFactory.this.ReporterConfig.apply(rate, duration, interval)))))
64 53175 2599 - 2631 Apply pureconfig.ConfigObjectCursor.atKeyOrUndefined obj.atKeyOrUndefined("interval")
64 53177 2572 - 2703 Apply scala.util.Either.RightProjection.map ReporterReader.this.durationMillis(obj.atKeyOrUndefined("interval")).right.map[org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig](((interval: Long) => ReporterFactory.this.ReporterConfig.apply(rate, duration, interval)))
66 53176 2663 - 2703 Apply org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterConfig.apply ReporterFactory.this.ReporterConfig.apply(rate, duration, interval)
71 53181 2893 - 2903 Select pureconfig.ConfigCursor.asString x$3.asString
71 53193 2864 - 3183 Apply scala.util.Either.RightProjection.flatMap cur.atKey(key).right.flatMap[pureconfig.error.ConfigReaderFailures, String](((x$3: pureconfig.ConfigCursor) => x$3.asString)).right.flatMap[pureconfig.error.ConfigReaderFailures, java.util.concurrent.TimeUnit](((unit: String) => scala.Predef.refArrayOps[java.util.concurrent.TimeUnit](java.util.concurrent.TimeUnit.values()).find(((x$4: java.util.concurrent.TimeUnit) => x$4.toString().equalsIgnoreCase(unit))).toRight[pureconfig.error.ConfigReaderFailures](pureconfig.error.ConfigReaderFailures.apply(cur.failureFor(pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$5: com.typesafe.config.ConfigObject) => x$5.toString())).orNull[String](scala.Predef.$conforms[Null]), "TimeUnit", "Does not match a TimeUnit"))))))
72 53182 2937 - 2954 Apply java.util.concurrent.TimeUnit.values java.util.concurrent.TimeUnit.values()
72 53183 2960 - 2993 Apply java.lang.String.equalsIgnoreCase x$4.toString().equalsIgnoreCase(unit)
72 53192 2937 - 3175 Apply scala.Option.toRight scala.Predef.refArrayOps[java.util.concurrent.TimeUnit](java.util.concurrent.TimeUnit.values()).find(((x$4: java.util.concurrent.TimeUnit) => x$4.toString().equalsIgnoreCase(unit))).toRight[pureconfig.error.ConfigReaderFailures](pureconfig.error.ConfigReaderFailures.apply(cur.failureFor(pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$5: com.typesafe.config.ConfigObject) => x$5.toString())).orNull[String](scala.Predef.$conforms[Null]), "TimeUnit", "Does not match a TimeUnit"))))
73 53184 3103 - 3113 Apply java.lang.Object.toString x$5.toString()
73 53185 3115 - 3115 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
73 53186 3086 - 3121 ApplyToImplicitArgs scala.Option.orNull cur.valueOpt.map[String](((x$5: com.typesafe.config.ConfigObject) => x$5.toString())).orNull[String](scala.Predef.$conforms[Null])
73 53187 3123 - 3133 Literal <nosymbol> "TimeUnit"
73 53188 3135 - 3162 Literal <nosymbol> "Does not match a TimeUnit"
73 53189 3072 - 3163 Apply pureconfig.error.CannotConvert.apply pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$5: com.typesafe.config.ConfigObject) => x$5.toString())).orNull[String](scala.Predef.$conforms[Null]), "TimeUnit", "Does not match a TimeUnit")
73 53190 3057 - 3164 Apply pureconfig.ConfigCursor.failureFor cur.failureFor(pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$5: com.typesafe.config.ConfigObject) => x$5.toString())).orNull[String](scala.Predef.$conforms[Null]), "TimeUnit", "Does not match a TimeUnit"))
73 53191 3036 - 3165 Apply pureconfig.error.ConfigReaderFailures.apply pureconfig.error.ConfigReaderFailures.apply(cur.failureFor(pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$5: com.typesafe.config.ConfigObject) => x$5.toString())).orNull[String](scala.Predef.$conforms[Null]), "TimeUnit", "Does not match a TimeUnit")))
76 53194 3213 - 3229 Apply java.lang.Object.== fallback.==(null)
76 53195 3194 - 3229 Apply scala.Boolean.|| primary.isRight.||(fallback.==(null))
76 53196 3233 - 3240 Ident org.locationtech.geomesa.metrics.core.ReporterFactory.ReporterReader.primary primary
78 53197 3363 - 3367 Literal <nosymbol> null
78 53198 3339 - 3395 Apply scala.util.Either.LeftProjection.flatMap ReporterReader.this.timeUnit(cur, fallback, null).left.flatMap[pureconfig.error.ConfigReaderFailures, java.util.concurrent.TimeUnit](((x$6: pureconfig.error.ConfigReaderFailures) => primary))
78 53199 3339 - 3395 Block scala.util.Either.LeftProjection.flatMap ReporterReader.this.timeUnit(cur, fallback, null).left.flatMap[pureconfig.error.ConfigReaderFailures, java.util.concurrent.TimeUnit](((x$6: pureconfig.error.ConfigReaderFailures) => primary))
83 53200 3511 - 3526 Select pureconfig.ConfigCursor.isUndefined cur.isUndefined
83 53201 3530 - 3540 Apply scala.util.Right.apply scala.`package`.Right.apply[Nothing, Long](-1L)
83 53202 3530 - 3540 Block scala.util.Right.apply scala.`package`.Right.apply[Nothing, Long](-1L)
84 53214 3558 - 3788 Apply scala.util.Either.RightProjection.flatMap cur.asString.right.flatMap[pureconfig.error.ConfigReaderFailures, Long](((d: String) => try { scala.`package`.Right.apply[Nothing, Long](scala.concurrent.duration.Duration.apply(d).toMillis) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => cur.failed[Nothing](pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$7: com.typesafe.config.ConfigValue) => x$7.toString())).orNull[String](scala.Predef.$conforms[Null]), "Duration", e.getMessage())) }))
84 53215 3558 - 3788 Block scala.util.Either.RightProjection.flatMap cur.asString.right.flatMap[pureconfig.error.ConfigReaderFailures, Long](((d: String) => try { scala.`package`.Right.apply[Nothing, Long](scala.concurrent.duration.Duration.apply(d).toMillis) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => cur.failed[Nothing](pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$7: com.typesafe.config.ConfigValue) => x$7.toString())).orNull[String](scala.Predef.$conforms[Null]), "Duration", e.getMessage())) }))
85 53203 3614 - 3634 Select scala.concurrent.duration.Duration.toMillis scala.concurrent.duration.Duration.apply(d).toMillis
85 53204 3608 - 3635 Apply scala.util.Right.apply scala.`package`.Right.apply[Nothing, Long](scala.concurrent.duration.Duration.apply(d).toMillis)
85 53205 3608 - 3635 Block scala.util.Right.apply scala.`package`.Right.apply[Nothing, Long](scala.concurrent.duration.Duration.apply(d).toMillis)
86 53206 3720 - 3730 Apply java.lang.Object.toString x$7.toString()
86 53207 3732 - 3732 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
86 53208 3703 - 3738 ApplyToImplicitArgs scala.Option.orNull cur.valueOpt.map[String](((x$7: com.typesafe.config.ConfigValue) => x$7.toString())).orNull[String](scala.Predef.$conforms[Null])
86 53209 3740 - 3750 Literal <nosymbol> "Duration"
86 53210 3752 - 3764 Apply java.lang.Throwable.getMessage e.getMessage()
86 53211 3689 - 3765 Apply pureconfig.error.CannotConvert.apply pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$7: com.typesafe.config.ConfigValue) => x$7.toString())).orNull[String](scala.Predef.$conforms[Null]), "Duration", e.getMessage())
86 53212 3678 - 3766 Apply pureconfig.ConfigCursor.failed cur.failed[Nothing](pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$7: com.typesafe.config.ConfigValue) => x$7.toString())).orNull[String](scala.Predef.$conforms[Null]), "Duration", e.getMessage()))
86 53213 3678 - 3766 Block pureconfig.ConfigCursor.failed cur.failed[Nothing](pureconfig.error.CannotConvert.apply(cur.valueOpt.map[String](((x$7: com.typesafe.config.ConfigValue) => x$7.toString())).orNull[String](scala.Predef.$conforms[Null]), "Duration", e.getMessage()))