1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.convert
10 
11 import com.codahale.metrics.Counter
12 import com.typesafe.scalalogging.LazyLogging
13 import org.locationtech.geomesa.convert.EvaluationContext.{EvaluationError, FieldAccessor}
14 import org.locationtech.geomesa.convert2.Field
15 import org.locationtech.geomesa.convert2.metrics.ConverterMetrics
16 
17 import scala.util.control.NonFatal
18 
19 /**
20  * Holds the state associated with a conversion attempt. Evaluation contexts are not thread safe.
21  */
22 trait EvaluationContext {
23 
24   /**
25     * The current line being processed.
26     *
27     * This may be an actual line (e.g. a csv row), or a logical line (e.g. an avro record)
28     *
29     * @return
30     */
31   var line: Long = 0
32 
33   /**
34     * Enrichment caches
35     *
36     * @return
37     */
38   def cache: Map[String, EnrichmentCache]
39 
40   /**
41     * Metrics registry, accessible for tracking any custom values
42     *
43     * @return
44     */
45   def metrics: ConverterMetrics
46 
47   /**
48     * Counter for tracking successes
49     *
50     * @return
51     */
52   def success: com.codahale.metrics.Counter
53 
54   /**
55     * Counter for tracking failures
56     *
57     * @return
58     */
59   def failure: com.codahale.metrics.Counter
60 
61   /**
62    * Access to any errors that have occurred - note that errors will generally only be kept if the converter
63    * error mode is set to `ReturnErrors`
64    *
65    * @return
66    */
67   def errors: java.util.Queue[EvaluationError]
68 
69   /**
70    * Gets a references to a field's value
71    *
72    * @param name field name
73    * @return
74    */
75   def accessor(name: String): FieldAccessor
76 
77   /**
78    * Evaluate all values using the given arguments. The returned array may be mutated on subsequent calls to
79    * `evaluate`, so shouldn't be kept long-term
80    *
81    * @param args single row of input
82    */
83   def evaluate(args: Array[AnyRef]): Either[EvaluationError, Array[AnyRef]]
84 }
85 
86 object EvaluationContext extends LazyLogging {
87 
88   val InputFilePathKey = "inputFilePath"
89 
90   /**
91     * Creates a new, empty evaluation context
92     *
93     * @return
94     */
95   def empty: EvaluationContext = {
96     val metrics = ConverterMetrics.empty
97     val success = metrics.counter("success")
98     val failures = metrics.counter("failure")
99     new StatefulEvaluationContext(Array.empty, Map.empty, Map.empty, metrics, success, failures)
100   }
101 
102   /**
103    * Creates a new evaluation context with the given state
104    *
105    * @param fields converter fields, in topological dependency order
106    * @param globalValues global values
107    * @param caches enrichment caches
108    * @param metrics metrics
109    * @param success success counter
110    * @param failure failure counter
111    * @return
112    */
113   def apply(
114       fields: Seq[Field],
115       globalValues: Map[String, _ <: AnyRef],
116       caches: Map[String, EnrichmentCache],
117       metrics: ConverterMetrics,
118       success: Counter,
119       failure: Counter): EvaluationContext = {
120     new StatefulEvaluationContext(fields.toArray, globalValues, caches, metrics, success, failure)
121   }
122 
123   /**
124     * Gets a global parameter map containing the input file path
125     *
126     * @param file input file path
127     * @return
128     */
129   def inputFileParam(file: String): Map[String, AnyRef] = Map(InputFilePathKey -> file)
130 
131   /**
132    * Trait for reading a field from an evaluation context
133    */
134   sealed trait FieldAccessor {
135     def apply(): AnyRef
136   }
137 
138   case object NullFieldAccessor extends FieldAccessor {
139     override def apply(): AnyRef = null
140   }
141 
142   class FieldValueAccessor(values: Array[AnyRef], i: Int) extends FieldAccessor {
143     override def apply(): AnyRef = values(i)
144   }
145 
146   class GlobalFieldAccessor(value: AnyRef) extends FieldAccessor {
147     override def apply(): AnyRef = value
148   }
149 
150   /**
151    * Marker trait for resources that are dependent on the evaluation context state
152    *
153    * @tparam T type
154    */
155   trait ContextDependent[T <: ContextDependent[T]] {
156 
157     /**
158      * Return a copy of the instance tied to the given evaluation context.
159      *
160      * If the instance does not use the evaluation context, it should return itself instead of a copy
161      *
162      * @param ec evaluation context
163      * @return
164      */
165     def withContext(ec: EvaluationContext): T
166   }
167 
168   /**
169    * Evaluation error
170    *
171    * @param field field name that had an error
172    * @param line line number of the input being evaluated
173    * @param e error
174    */
175   case class EvaluationError(field: String, line: Long, e: Throwable)
176 
177   /**
178     * Evaluation context accessors
179     *
180     * @param ec context
181     */
182   implicit class RichEvaluationContext(val ec: EvaluationContext) extends AnyVal {
183     def getInputFilePath: Option[String] = Option(ec.accessor(InputFilePathKey).apply().asInstanceOf[String])
184   }
185 
186   /**
187     * Evaluation context implementation
188     *
189     * @param fields fields to evaluate, in topological dependency order
190     * @param globalValues global variable name/values
191     * @param cache enrichment caches
192     * @param metrics metrics
193     */
194   class StatefulEvaluationContext(
195       fields: Array[Field],
196       globalValues: Map[String, _ <: AnyRef],
197       val cache: Map[String, EnrichmentCache],
198       val metrics: ConverterMetrics,
199       val success: Counter,
200       val failure: Counter,
201       val errors: java.util.Queue[EvaluationError] = new java.util.ArrayDeque[EvaluationError]()
202     ) extends EvaluationContext {
203 
204     // holder for results from evaluating each row
205     private val values = Array.ofDim[AnyRef](fields.length)
206     // temp array for holding the arguments for a field
207     private val fieldArray = Array.ofDim[AnyRef](1)
208     // copy the transforms and tie them to the context
209     // note: the class isn't fully instantiated yet, but this statement is last in the initialization
210     private val transforms = fields.map(_.transforms.map(_.withContext(this)))
211 
212     override def accessor(name: String): FieldAccessor = {
213       val i = fields.indexWhere(_.name == name)
214       if (i >= 0) { new FieldValueAccessor(values, i) } else {
215         globalValues.get(name) match {
216           case Some(value) => new GlobalFieldAccessor(value)
217           case None => NullFieldAccessor
218         }
219       }
220     }
221 
222     override def evaluate(args: Array[AnyRef]): Either[EvaluationError, Array[AnyRef]] = {
223       var i = 0
224       // note: since fields are in topological order we don't need to clear them
225       while (i < values.length) {
226         values(i) = try {
227           val fieldArgs = fields(i).fieldArg match {
228             case None => args
229             case Some(f) => fieldArray(0) = f.apply(args); fieldArray
230           }
231           transforms(i) match {
232             case Some(t) => t.apply(fieldArgs)
233             case None    => fieldArgs(0)
234           }
235         } catch {
236           case NonFatal(e) => return Left(EvaluationError(fields(i).name, line, e))
237         }
238         i += 1
239       }
240       Right(values)
241     }
242   }
243 }
Line Stmt Id Pos Tree Symbol Tests Code
31 223 1151 - 1152 Literal <nosymbol> 0L
88 224 2351 - 2366 Literal <nosymbol> "inputFilePath"
96 225 2500 - 2522 Select org.locationtech.geomesa.convert2.metrics.ConverterMetrics.empty org.locationtech.geomesa.convert2.metrics.ConverterMetrics.empty
97 226 2541 - 2567 Apply org.locationtech.geomesa.convert2.metrics.ConverterMetrics.counter metrics.counter("success")
98 227 2587 - 2613 Apply org.locationtech.geomesa.convert2.metrics.ConverterMetrics.counter metrics.counter("failure")
99 228 2648 - 2659 ApplyToImplicitArgs scala.Array.empty scala.Array.empty[org.locationtech.geomesa.convert2.Field]((ClassTag.apply[org.locationtech.geomesa.convert2.Field](classOf[org.locationtech.geomesa.convert2.package$$Field]): scala.reflect.ClassTag[org.locationtech.geomesa.convert2.Field]))
99 229 2661 - 2670 TypeApply scala.collection.immutable.Map.empty scala.Predef.Map.empty[String, Nothing]
99 230 2672 - 2681 TypeApply scala.collection.immutable.Map.empty scala.Predef.Map.empty[String, Nothing]
99 231 2618 - 2710 Apply org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.<init> new EvaluationContext.this.StatefulEvaluationContext(scala.Array.empty[org.locationtech.geomesa.convert2.Field]((ClassTag.apply[org.locationtech.geomesa.convert2.Field](classOf[org.locationtech.geomesa.convert2.package$$Field]): scala.reflect.ClassTag[org.locationtech.geomesa.convert2.Field])), scala.Predef.Map.empty[String, Nothing], scala.Predef.Map.empty[String, Nothing], metrics, success, failures, EvaluationContext.this.StatefulEvaluationContext.<init>$default$7)
120 232 3317 - 3331 ApplyToImplicitArgs scala.collection.TraversableOnce.toArray fields.toArray[org.locationtech.geomesa.convert2.Field]((ClassTag.apply[org.locationtech.geomesa.convert2.Field](classOf[org.locationtech.geomesa.convert2.package$$Field]): scala.reflect.ClassTag[org.locationtech.geomesa.convert2.Field]))
120 233 3287 - 3381 Apply org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.<init> new EvaluationContext.this.StatefulEvaluationContext(fields.toArray[org.locationtech.geomesa.convert2.Field]((ClassTag.apply[org.locationtech.geomesa.convert2.Field](classOf[org.locationtech.geomesa.convert2.package$$Field]): scala.reflect.ClassTag[org.locationtech.geomesa.convert2.Field])), globalValues, caches, metrics, success, failure, EvaluationContext.this.StatefulEvaluationContext.<init>$default$7)
129 234 3581 - 3605 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String](EvaluationContext.this.InputFilePathKey).->[String](file)
129 235 3577 - 3606 Apply scala.collection.generic.GenMapFactory.apply scala.Predef.Map.apply[String, String](scala.Predef.ArrowAssoc[String](EvaluationContext.this.InputFilePathKey).->[String](file))
139 236 3829 - 3833 Literal <nosymbol> null
143 237 3963 - 3964 Select org.locationtech.geomesa.convert.EvaluationContext.FieldValueAccessor.i FieldValueAccessor.this.i
143 238 3956 - 3965 Apply scala.Array.apply FieldValueAccessor.this.values.apply(FieldValueAccessor.this.i)
147 239 4073 - 4078 Select org.locationtech.geomesa.convert.EvaluationContext.GlobalFieldAccessor.value GlobalFieldAccessor.this.value
183 240 5025 - 5041 Select org.locationtech.geomesa.convert.EvaluationContext.InputFilePathKey EvaluationContext.this.InputFilePathKey
183 241 5013 - 5071 TypeApply scala.Any.asInstanceOf RichEvaluationContext.this.ec.accessor(EvaluationContext.this.InputFilePathKey).apply().asInstanceOf[String]
183 242 5006 - 5072 Apply scala.Option.apply scala.Option.apply[String](RichEvaluationContext.this.ec.accessor(EvaluationContext.this.InputFilePathKey).apply().asInstanceOf[String])
205 243 5806 - 5819 Select scala.Array.length StatefulEvaluationContext.this.fields.length
205 244 5786 - 5820 ApplyToImplicitArgs scala.Array.ofDim scala.Array.ofDim[AnyRef](StatefulEvaluationContext.this.fields.length)((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef]))
207 245 5926 - 5927 Literal <nosymbol> 1
207 246 5906 - 5928 ApplyToImplicitArgs scala.Array.ofDim scala.Array.ofDim[AnyRef](1)((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef]))
210 247 6115 - 6121 Select org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.fields StatefulEvaluationContext.this.fields
210 248 6143 - 6162 Apply org.locationtech.geomesa.convert.EvaluationContext.ContextDependent.withContext x$2.withContext(this)
210 249 6126 - 6163 Apply scala.Option.map x$1.transforms.map[org.locationtech.geomesa.convert2.transforms.Expression](((x$2: org.locationtech.geomesa.convert2.transforms.Expression) => x$2.withContext(this)))
210 250 6125 - 6125 ApplyToImplicitArgs scala.Array.canBuildFrom scala.this.Array.canBuildFrom[Option[org.locationtech.geomesa.convert2.transforms.Expression]]((ClassTag.apply[Option[org.locationtech.geomesa.convert2.transforms.Expression]](classOf[scala.Option]): scala.reflect.ClassTag[Option[org.locationtech.geomesa.convert2.transforms.Expression]]))
210 251 6115 - 6164 ApplyToImplicitArgs scala.collection.TraversableLike.map scala.Predef.refArrayOps[org.locationtech.geomesa.convert2.Field](StatefulEvaluationContext.this.fields).map[Option[org.locationtech.geomesa.convert2.transforms.Expression], Array[Option[org.locationtech.geomesa.convert2.transforms.Expression]]](((x$1: org.locationtech.geomesa.convert2.Field) => x$1.transforms.map[org.locationtech.geomesa.convert2.transforms.Expression](((x$2: org.locationtech.geomesa.convert2.transforms.Expression) => x$2.withContext(this)))))(scala.this.Array.canBuildFrom[Option[org.locationtech.geomesa.convert2.transforms.Expression]]((ClassTag.apply[Option[org.locationtech.geomesa.convert2.transforms.Expression]](classOf[scala.Option]): scala.reflect.ClassTag[Option[org.locationtech.geomesa.convert2.transforms.Expression]])))
213 252 6239 - 6245 Select org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.fields StatefulEvaluationContext.this.fields
213 253 6257 - 6271 Apply java.lang.Object.== x$3.name.==(name)
213 254 6239 - 6272 Apply scala.collection.GenSeqLike.indexWhere scala.Predef.refArrayOps[org.locationtech.geomesa.convert2.Field](StatefulEvaluationContext.this.fields).indexWhere(((x$3: org.locationtech.geomesa.convert2.Field) => x$3.name.==(name)))
214 255 6283 - 6289 Apply scala.Int.>= i.>=(0)
214 256 6316 - 6322 Select org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.values StatefulEvaluationContext.this.values
214 257 6293 - 6326 Apply org.locationtech.geomesa.convert.EvaluationContext.FieldValueAccessor.<init> new EvaluationContext.this.FieldValueAccessor(StatefulEvaluationContext.this.values, i)
214 258 6293 - 6326 Block org.locationtech.geomesa.convert.EvaluationContext.FieldValueAccessor.<init> new EvaluationContext.this.FieldValueAccessor(StatefulEvaluationContext.this.values, i)
215 259 6344 - 6366 Apply scala.collection.MapLike.get StatefulEvaluationContext.this.globalValues.get(name)
215 264 6344 - 6486 Match <nosymbol> StatefulEvaluationContext.this.globalValues.get(name) match { case (value: AnyRef)Some[AnyRef]((value @ _)) => new EvaluationContext.this.GlobalFieldAccessor(value) case scala.None => EvaluationContext.this.NullFieldAccessor }
216 260 6405 - 6435 Apply org.locationtech.geomesa.convert.EvaluationContext.GlobalFieldAccessor.<init> new EvaluationContext.this.GlobalFieldAccessor(value)
216 261 6405 - 6435 Block org.locationtech.geomesa.convert.EvaluationContext.GlobalFieldAccessor.<init> new EvaluationContext.this.GlobalFieldAccessor(value)
217 262 6459 - 6476 Select org.locationtech.geomesa.convert.EvaluationContext.NullFieldAccessor EvaluationContext.this.NullFieldAccessor
217 263 6459 - 6476 Block org.locationtech.geomesa.convert.EvaluationContext.NullFieldAccessor EvaluationContext.this.NullFieldAccessor
223 265 6607 - 6608 Literal <nosymbol> 0
225 266 6707 - 6720 Select scala.Array.length StatefulEvaluationContext.this.values.length
225 267 6703 - 6720 Apply scala.Int.< i.<(StatefulEvaluationContext.this.values.length)
225 288 6722 - 6722 Apply org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.while$1 while$1()
225 289 6722 - 7181 Block <nosymbol> { { StatefulEvaluationContext.this.values.update(i, try { val fieldArgs: Array[AnyRef] = StatefulEvaluationContext.this.fields.apply(i).fieldArg match { case scala.None => args case (value: Array[AnyRef] => AnyRef)Some[Array[AnyRef] => AnyRef]((f @ _)) => { StatefulEvaluationContext.this.fieldArray.update(0, f.apply(args)); StatefulEvaluationContext.this.fieldArray } }; StatefulEvaluationContext.this.transforms.apply(i) match { case (value: org.locationtech.geomesa.convert2.transforms.Expression)Some[org.locationtech.geomesa.convert2.transforms.Expression]((t @ _)) => t.apply(fieldArgs) case scala.None => fieldArgs.apply(0) } } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => return scala.`package`.Left.apply[org.locationtech.geomesa.convert.EvaluationContext.EvaluationError, Nothing](EvaluationContext.this.EvaluationError.apply(StatefulEvaluationContext.this.fields.apply(i).name, StatefulEvaluationContext.this.line, e)) }); i = i.+(1) }; while$1() }
225 290 6696 - 6696 Literal <nosymbol> ()
225 291 6696 - 6696 Block <nosymbol> ()
226 280 6760 - 7046 Block <nosymbol> { val fieldArgs: Array[AnyRef] = StatefulEvaluationContext.this.fields.apply(i).fieldArg match { case scala.None => args case (value: Array[AnyRef] => AnyRef)Some[Array[AnyRef] => AnyRef]((f @ _)) => { StatefulEvaluationContext.this.fieldArray.update(0, f.apply(args)); StatefulEvaluationContext.this.fieldArray } }; StatefulEvaluationContext.this.transforms.apply(i) match { case (value: org.locationtech.geomesa.convert2.transforms.Expression)Some[org.locationtech.geomesa.convert2.transforms.Expression]((t @ _)) => t.apply(fieldArgs) case scala.None => fieldArgs.apply(0) } }
226 286 6732 - 7158 Apply scala.Array.update StatefulEvaluationContext.this.values.update(i, try { val fieldArgs: Array[AnyRef] = StatefulEvaluationContext.this.fields.apply(i).fieldArg match { case scala.None => args case (value: Array[AnyRef] => AnyRef)Some[Array[AnyRef] => AnyRef]((f @ _)) => { StatefulEvaluationContext.this.fieldArray.update(0, f.apply(args)); StatefulEvaluationContext.this.fieldArray } }; StatefulEvaluationContext.this.transforms.apply(i) match { case (value: org.locationtech.geomesa.convert2.transforms.Expression)Some[org.locationtech.geomesa.convert2.transforms.Expression]((t @ _)) => t.apply(fieldArgs) case scala.None => fieldArgs.apply(0) } } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => return scala.`package`.Left.apply[org.locationtech.geomesa.convert.EvaluationContext.EvaluationError, Nothing](EvaluationContext.this.EvaluationError.apply(StatefulEvaluationContext.this.fields.apply(i).name, StatefulEvaluationContext.this.line, e)) })
227 268 6776 - 6794 Select org.locationtech.geomesa.convert2.Field.fieldArg StatefulEvaluationContext.this.fields.apply(i).fieldArg
228 269 6828 - 6832 Ident org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.args args
229 270 6872 - 6873 Literal <nosymbol> 0
229 271 6877 - 6890 Apply scala.Function1.apply f.apply(args)
229 272 6861 - 6890 Apply scala.Array.update StatefulEvaluationContext.this.fieldArray.update(0, f.apply(args))
229 273 6892 - 6902 Select org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.fieldArray StatefulEvaluationContext.this.fieldArray
229 274 6858 - 6902 Block <nosymbol> { StatefulEvaluationContext.this.fieldArray.update(0, f.apply(args)); StatefulEvaluationContext.this.fieldArray }
231 275 6925 - 6938 Apply scala.Array.apply StatefulEvaluationContext.this.transforms.apply(i)
232 276 6975 - 6993 Apply org.locationtech.geomesa.convert2.transforms.Expression.apply t.apply(fieldArgs)
232 277 6975 - 6993 Block org.locationtech.geomesa.convert2.transforms.Expression.apply t.apply(fieldArgs)
233 278 7022 - 7034 Apply scala.Array.apply fieldArgs.apply(0)
233 279 7022 - 7034 Block scala.Array.apply fieldArgs.apply(0)
236 281 7123 - 7137 Select org.locationtech.geomesa.convert2.Field.name StatefulEvaluationContext.this.fields.apply(i).name
236 282 7139 - 7143 Select org.locationtech.geomesa.convert.EvaluationContext.line StatefulEvaluationContext.this.line
236 283 7107 - 7147 Apply org.locationtech.geomesa.convert.EvaluationContext.EvaluationError.apply EvaluationContext.this.EvaluationError.apply(StatefulEvaluationContext.this.fields.apply(i).name, StatefulEvaluationContext.this.line, e)
236 284 7102 - 7148 Apply scala.util.Left.apply scala.`package`.Left.apply[org.locationtech.geomesa.convert.EvaluationContext.EvaluationError, Nothing](EvaluationContext.this.EvaluationError.apply(StatefulEvaluationContext.this.fields.apply(i).name, StatefulEvaluationContext.this.line, e))
236 285 7095 - 7148 Return org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.evaluate return scala.`package`.Left.apply[org.locationtech.geomesa.convert.EvaluationContext.EvaluationError, Nothing](EvaluationContext.this.EvaluationError.apply(StatefulEvaluationContext.this.fields.apply(i).name, StatefulEvaluationContext.this.line, e))
238 287 7167 - 7173 Apply scala.Int.+ i.+(1)
240 292 7194 - 7200 Select org.locationtech.geomesa.convert.EvaluationContext.StatefulEvaluationContext.values StatefulEvaluationContext.this.values
240 293 7188 - 7201 Apply scala.util.Right.apply scala.`package`.Right.apply[Nothing, Array[AnyRef]](StatefulEvaluationContext.this.values)