1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.accumulo.iterators
10 
11 import com.typesafe.scalalogging.LazyLogging
12 import org.apache.accumulo.core.client.IteratorSetting
13 import org.apache.accumulo.core.client.admin.TableOperations
14 import org.apache.accumulo.core.data.{Key, Value}
15 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope
16 import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator}
17 import org.geotools.api.feature.simple.SimpleFeatureType
18 import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex
19 import org.locationtech.geomesa.index.filters.{AgeOffFilter, DtgAgeOffFilter}
20 import org.locationtech.geomesa.utils.conf.FeatureExpiration
21 import org.locationtech.geomesa.utils.conf.FeatureExpiration.FeatureTimeExpiration
22 
23 import java.util.concurrent.TimeUnit
24 import scala.concurrent.duration.Duration
25 import scala.util.control.NonFatal
26 
27 /**
28   * Age off data based on the dtg value stored in the SimpleFeature
29   */
30 class DtgAgeOffIterator extends AgeOffIterator with DtgAgeOffFilter {
31 
32   override def init(source: SortedKeyValueIterator[Key, Value],
33                     options: java.util.Map[String, String],
34                     env: IteratorEnvironment): Unit = {
35     import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType
36 
37     import scala.collection.JavaConverters._
38 
39     super.init(source, options, env)
40     try {
41       super.init(options.asScala.toMap)
42     } catch {
43       case _: NoSuchElementException => dtgIndex = sft.getDtgIndex.get // fallback for old configuration
44     }
45   }
46 
47   override def accept(k: Key, v: Value): Boolean = {
48     val value = v.get()
49     accept(null, -1, -1, value, 0, value.length, -1)
50   }
51 
52   override def deepCopy(env: IteratorEnvironment): SortedKeyValueIterator[Key, Value] = {
53     val copy = super.deepCopy(env).asInstanceOf[DtgAgeOffIterator]
54     copy.sft = sft
55     copy.index = index
56     copy.reusableSf = reusableSf
57     copy.dtgIndex = dtgIndex
58     copy
59   }
60 }
61 
62 object DtgAgeOffIterator extends LazyLogging {
63 
64   val Name = "dtg-age-off"
65 
66   def configure(sft: SimpleFeatureType,
67                 index: GeoMesaFeatureIndex[_, _],
68                 expiry: Duration,
69                 dtgField: Option[String],
70                 priority: Int = 5): IteratorSetting = {
71     val is = new IteratorSetting(priority, Name, classOf[DtgAgeOffIterator])
72     DtgAgeOffFilter.configure(sft, index, expiry, dtgField).foreach { case (k, v) => is.addOption(k, v) }
73     is
74   }
75 
76   def expiry(tableOps: TableOperations, sft: SimpleFeatureType, table: String): Option[FeatureExpiration] = {
77     try {
78       list(tableOps, table).map { is =>
79         val attribute = sft.getDescriptor(is.getOptions.get(DtgAgeOffFilter.Configuration.DtgOpt).toInt).getLocalName
80         val expiry = java.time.Duration.parse(is.getOptions.get(AgeOffFilter.Configuration.ExpiryOpt)).toMillis
81         FeatureTimeExpiration(attribute, sft.indexOf(attribute), Duration(expiry, TimeUnit.MILLISECONDS))
82       }
83     } catch {
84       case NonFatal(e) => logger.error("Error converting iterator settings to FeatureExpiration:", e); None
85     }
86   }
87 
88   def list(tableOps: TableOperations, table: String): Option[IteratorSetting] = {
89     import org.locationtech.geomesa.utils.conversions.ScalaImplicits.RichIterator
90     IteratorScope.values.iterator.flatMap(scope => Option(tableOps.getIteratorSetting(table, Name, scope))).headOption
91   }
92 
93   def set(
94       tableOps: TableOperations,
95       sft: SimpleFeatureType,
96       index: GeoMesaFeatureIndex[_, _],
97       expiry: Duration,
98       dtg: String): Unit = {
99     index.getTableNames().foreach { table =>
100       tableOps.attachIterator(table, configure(sft, index, expiry, Option(dtg))) // all scopes
101     }
102   }
103 
104   def clear(tableOps: TableOperations, table: String): Unit = {
105     if (IteratorScope.values.exists(scope => tableOps.getIteratorSetting(table, Name, scope) != null)) {
106       tableOps.removeIterator(table, Name, java.util.EnumSet.allOf(classOf[IteratorScope]))
107     }
108   }
109 }
Line Stmt Id Pos Tree Symbol Tests Code
39 46686 1817 - 1849 Apply org.locationtech.geomesa.accumulo.iterators.AgeOffIterator.init DtgAgeOffIterator.super.init(source, options, env)
41 46687 1893 - 1893 TypeApply scala.Predef.$conforms scala.Predef.$conforms[(String, String)]
41 46688 1877 - 1898 ApplyToImplicitArgs scala.collection.TraversableOnce.toMap scala.collection.JavaConverters.mapAsScalaMapConverter[String, String](options).asScala.toMap[String, String](scala.Predef.$conforms[(String, String)])
41 46689 1866 - 1899 Apply org.locationtech.geomesa.index.filters.DtgAgeOffFilter.init DtgAgeOffIterator.super.init(scala.collection.JavaConverters.mapAsScalaMapConverter[String, String](options).asScala.toMap[String, String](scala.Predef.$conforms[(String, String)]))
41 46690 1866 - 1899 Block org.locationtech.geomesa.index.filters.DtgAgeOffFilter.init DtgAgeOffIterator.super.init(scala.collection.JavaConverters.mapAsScalaMapConverter[String, String](options).asScala.toMap[String, String](scala.Predef.$conforms[(String, String)]))
43 46691 1965 - 1968 Select org.locationtech.geomesa.index.filters.DtgAgeOffFilter.sft DtgAgeOffIterator.this.sft
43 46692 1965 - 1984 Select scala.Option.get org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType(DtgAgeOffIterator.this.sft).getDtgIndex.get
43 46693 1954 - 1984 Apply org.locationtech.geomesa.index.filters.DtgAgeOffFilter.dtgIndex_= DtgAgeOffIterator.this.dtgIndex_=(org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType(DtgAgeOffIterator.this.sft).getDtgIndex.get)
43 46694 1954 - 1984 Block org.locationtech.geomesa.index.filters.DtgAgeOffFilter.dtgIndex_= DtgAgeOffIterator.this.dtgIndex_=(org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType(DtgAgeOffIterator.this.sft).getDtgIndex.get)
48 46695 2099 - 2106 Apply org.apache.accumulo.core.data.Value.get v.get()
49 46696 2118 - 2122 Literal <nosymbol> null
49 46697 2124 - 2126 Literal <nosymbol> -1
49 46698 2128 - 2130 Literal <nosymbol> -1
49 46699 2139 - 2140 Literal <nosymbol> 0
49 46700 2142 - 2154 Select scala.Array.length value.length
49 46701 2156 - 2158 Literal <nosymbol> -1L
49 46702 2111 - 2159 Apply org.locationtech.geomesa.index.filters.DtgAgeOffFilter.accept DtgAgeOffIterator.this.accept(null, -1, -1, value, 0, value.length, -1L)
53 46703 2270 - 2321 TypeApply scala.Any.asInstanceOf DtgAgeOffIterator.super.deepCopy(env).asInstanceOf[org.locationtech.geomesa.accumulo.iterators.DtgAgeOffIterator]
54 46704 2337 - 2340 Select org.locationtech.geomesa.index.filters.DtgAgeOffFilter.sft DtgAgeOffIterator.this.sft
54 46705 2326 - 2340 Apply org.locationtech.geomesa.index.filters.DtgAgeOffFilter.sft_= copy.sft_=(DtgAgeOffIterator.this.sft)
55 46706 2358 - 2363 Select org.locationtech.geomesa.index.filters.DtgAgeOffFilter.index DtgAgeOffIterator.this.index
55 46707 2345 - 2363 Apply org.locationtech.geomesa.index.filters.DtgAgeOffFilter.index_= copy.index_=(DtgAgeOffIterator.this.index)
56 46708 2386 - 2396 Select org.locationtech.geomesa.index.filters.DtgAgeOffFilter.reusableSf DtgAgeOffIterator.this.reusableSf
56 46709 2368 - 2396 Apply org.locationtech.geomesa.index.filters.DtgAgeOffFilter.reusableSf_= copy.reusableSf_=(DtgAgeOffIterator.this.reusableSf)
57 46710 2417 - 2425 Select org.locationtech.geomesa.index.filters.DtgAgeOffFilter.dtgIndex DtgAgeOffIterator.this.dtgIndex
57 46711 2401 - 2425 Apply org.locationtech.geomesa.index.filters.DtgAgeOffFilter.dtgIndex_= copy.dtgIndex_=(DtgAgeOffIterator.this.dtgIndex)
64 46712 2503 - 2516 Literal <nosymbol> "dtg-age-off"
71 46713 2783 - 2787 Select org.locationtech.geomesa.accumulo.iterators.DtgAgeOffIterator.Name DtgAgeOffIterator.this.Name
71 46714 2789 - 2815 Literal <nosymbol> classOf[org.locationtech.geomesa.accumulo.iterators.DtgAgeOffIterator]
71 46715 2753 - 2816 Apply org.apache.accumulo.core.client.IteratorSetting.<init> new org.apache.accumulo.core.client.IteratorSetting(priority, DtgAgeOffIterator.this.Name, classOf[org.locationtech.geomesa.accumulo.iterators.DtgAgeOffIterator])
72 46716 2902 - 2920 Apply org.apache.accumulo.core.client.IteratorSetting.addOption is.addOption(k, v)
72 46717 2902 - 2920 Block org.apache.accumulo.core.client.IteratorSetting.addOption is.addOption(k, v)
72 46718 2821 - 2922 Apply scala.collection.IterableLike.foreach org.locationtech.geomesa.index.filters.DtgAgeOffFilter.configure(sft, index, expiry, dtgField).foreach[Unit](((x0$1: (String, String)) => x0$1 match { case (_1: String, _2: String)(String, String)((k @ _), (v @ _)) => is.addOption(k, v) }))
78 46724 3061 - 3438 Apply scala.Option.map DtgAgeOffIterator.this.list(tableOps, table).map[org.locationtech.geomesa.utils.conf.FeatureExpiration.FeatureTimeExpiration](((is: org.apache.accumulo.core.client.IteratorSetting) => { val attribute: String = sft.getDescriptor(scala.Predef.augmentString(is.getOptions().get(org.locationtech.geomesa.index.filters.DtgAgeOffFilter.Configuration.DtgOpt)).toInt).getLocalName(); val expiry: Long = java.time.Duration.parse(is.getOptions().get(org.locationtech.geomesa.index.filters.AgeOffFilter.Configuration.ExpiryOpt)).toMillis(); org.locationtech.geomesa.utils.conf.FeatureExpiration.FeatureTimeExpiration.apply(attribute, sft.indexOf(attribute), scala.concurrent.duration.Duration.apply(expiry, MILLISECONDS)) }))
78 46725 3061 - 3438 Block scala.Option.map DtgAgeOffIterator.this.list(tableOps, table).map[org.locationtech.geomesa.utils.conf.FeatureExpiration.FeatureTimeExpiration](((is: org.apache.accumulo.core.client.IteratorSetting) => { val attribute: String = sft.getDescriptor(scala.Predef.augmentString(is.getOptions().get(org.locationtech.geomesa.index.filters.DtgAgeOffFilter.Configuration.DtgOpt)).toInt).getLocalName(); val expiry: Long = java.time.Duration.parse(is.getOptions().get(org.locationtech.geomesa.index.filters.AgeOffFilter.Configuration.ExpiryOpt)).toMillis(); org.locationtech.geomesa.utils.conf.FeatureExpiration.FeatureTimeExpiration.apply(attribute, sft.indexOf(attribute), scala.concurrent.duration.Duration.apply(expiry, MILLISECONDS)) }))
79 46719 3119 - 3212 Apply org.geotools.api.feature.type.AttributeDescriptor.getLocalName sft.getDescriptor(scala.Predef.augmentString(is.getOptions().get(org.locationtech.geomesa.index.filters.DtgAgeOffFilter.Configuration.DtgOpt)).toInt).getLocalName()
80 46720 3234 - 3324 Apply java.time.Duration.toMillis java.time.Duration.parse(is.getOptions().get(org.locationtech.geomesa.index.filters.AgeOffFilter.Configuration.ExpiryOpt)).toMillis()
81 46721 3366 - 3388 Apply org.geotools.api.feature.simple.SimpleFeatureType.indexOf sft.indexOf(attribute)
81 46722 3390 - 3429 Apply scala.concurrent.duration.Duration.apply scala.concurrent.duration.Duration.apply(expiry, MILLISECONDS)
81 46723 3333 - 3430 Apply org.locationtech.geomesa.utils.conf.FeatureExpiration.FeatureTimeExpiration.apply org.locationtech.geomesa.utils.conf.FeatureExpiration.FeatureTimeExpiration.apply(attribute, sft.indexOf(attribute), scala.concurrent.duration.Duration.apply(expiry, MILLISECONDS))
84 46726 3556 - 3560 Select scala.None scala.None
84 46727 3476 - 3560 Block <nosymbol> { (if (DtgAgeOffIterator.this.logger.underlying.isErrorEnabled()) DtgAgeOffIterator.this.logger.underlying.error("Error converting iterator settings to FeatureExpiration:", e) else (): Unit); scala.None }
90 46728 3740 - 3760 Apply org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope.values org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope.values()
90 46729 3829 - 3833 Select org.locationtech.geomesa.accumulo.iterators.DtgAgeOffIterator.Name DtgAgeOffIterator.this.Name
90 46730 3794 - 3841 Apply org.apache.accumulo.core.client.admin.TableOperations.getIteratorSetting tableOps.getIteratorSetting(table, DtgAgeOffIterator.this.Name, scope)
90 46731 3787 - 3842 Apply scala.Option.apply scala.Option.apply[org.apache.accumulo.core.client.IteratorSetting](tableOps.getIteratorSetting(table, DtgAgeOffIterator.this.Name, scope))
90 46732 3787 - 3842 ApplyImplicitView scala.Option.option2Iterable scala.this.Option.option2Iterable[org.apache.accumulo.core.client.IteratorSetting](scala.Option.apply[org.apache.accumulo.core.client.IteratorSetting](tableOps.getIteratorSetting(table, DtgAgeOffIterator.this.Name, scope)))
90 46733 3740 - 3843 Apply scala.collection.Iterator.flatMap scala.Predef.refArrayOps[org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope](org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope.values()).iterator.flatMap[org.apache.accumulo.core.client.IteratorSetting](((scope: org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope) => scala.this.Option.option2Iterable[org.apache.accumulo.core.client.IteratorSetting](scala.Option.apply[org.apache.accumulo.core.client.IteratorSetting](tableOps.getIteratorSetting(table, DtgAgeOffIterator.this.Name, scope)))))
90 46734 3740 - 3854 Select org.locationtech.geomesa.utils.conversions.ScalaImplicits.RichIterator.headOption org.locationtech.geomesa.utils.conversions.ScalaImplicits.RichIterator[org.apache.accumulo.core.client.IteratorSetting](scala.Predef.refArrayOps[org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope](org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope.values()).iterator.flatMap[org.apache.accumulo.core.client.IteratorSetting](((scope: org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope) => scala.this.Option.option2Iterable[org.apache.accumulo.core.client.IteratorSetting](scala.Option.apply[org.apache.accumulo.core.client.IteratorSetting](tableOps.getIteratorSetting(table, DtgAgeOffIterator.this.Name, scope)))))).headOption
99 46738 4031 - 4172 Apply scala.collection.IterableLike.foreach index.getTableNames(index.getTableNames$default$1).foreach[Unit](((table: String) => tableOps.attachIterator(table, DtgAgeOffIterator.this.configure(sft, index, expiry, scala.Option.apply[String](dtg), DtgAgeOffIterator.this.configure$default$5))))
100 46735 4139 - 4150 Apply scala.Option.apply scala.Option.apply[String](dtg)
100 46736 4109 - 4151 Apply org.locationtech.geomesa.accumulo.iterators.DtgAgeOffIterator.configure DtgAgeOffIterator.this.configure(sft, index, expiry, scala.Option.apply[String](dtg), DtgAgeOffIterator.this.configure$default$5)
100 46737 4078 - 4152 Apply org.apache.accumulo.core.client.admin.TableOperations.attachIterator tableOps.attachIterator(table, DtgAgeOffIterator.this.configure(sft, index, expiry, scala.Option.apply[String](dtg), DtgAgeOffIterator.this.configure$default$5))
105 46739 4250 - 4270 Apply org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope.values org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope.values()
105 46740 4287 - 4342 Apply java.lang.Object.!= tableOps.getIteratorSetting(table, DtgAgeOffIterator.this.Name, scope).!=(null)
105 46741 4250 - 4343 Apply scala.collection.IndexedSeqOptimized.exists scala.Predef.refArrayOps[org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope](org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope.values()).exists(((scope: org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope) => tableOps.getIteratorSetting(table, DtgAgeOffIterator.this.Name, scope).!=(null)))
105 46746 4246 - 4246 Literal <nosymbol> ()
105 46747 4246 - 4246 Block <nosymbol> ()
106 46742 4384 - 4388 Select org.locationtech.geomesa.accumulo.iterators.DtgAgeOffIterator.Name DtgAgeOffIterator.this.Name
106 46743 4390 - 4437 Apply java.util.EnumSet.allOf java.util.EnumSet.allOf[org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope](classOf[org.apache.accumulo.core.iterators.IteratorUtil$IteratorScope])
106 46744 4353 - 4438 Apply org.apache.accumulo.core.client.admin.TableOperations.removeIterator tableOps.removeIterator(table, DtgAgeOffIterator.this.Name, java.util.EnumSet.allOf[org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope](classOf[org.apache.accumulo.core.iterators.IteratorUtil$IteratorScope]))
106 46745 4353 - 4438 Block org.apache.accumulo.core.client.admin.TableOperations.removeIterator tableOps.removeIterator(table, DtgAgeOffIterator.this.Name, java.util.EnumSet.allOf[org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope](classOf[org.apache.accumulo.core.iterators.IteratorUtil$IteratorScope]))