1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.fs.storage
10 
11 import org.apache.parquet.filter2.predicate.FilterPredicate
12 import org.geotools.api.feature.simple.SimpleFeatureType
13 import org.geotools.api.filter.Filter
14 import org.geotools.feature.simple.SimpleFeatureTypeBuilder
15 import org.locationtech.geomesa.filter.FilterHelper
16 import org.locationtech.geomesa.utils.geotools.Transform.{ExpressionTransform, PropertyTransform, RenameTransform, Transforms}
17 
18 package object parquet {
19 
20   /**
21     * Schema to read and schema to return.
22     *
23     * If we have to return a different feature than we read, we need to apply a secondary transform.
24     * Otherwise, we can just do the transform on read and skip the secondary transform
25     *
26     * @param read read schema, includes fields to filter on
27     * @param transform return schema, if different from read schema
28     */
29   case class ReadSchema(read: SimpleFeatureType, transform: Option[(String, SimpleFeatureType)])
30 
31   /**
32     * Filter to read files
33     *
34     * @param parquet parquet filter that we can push down to the file format
35     * @param residual residual geotools filter that we have to apply after read
36     */
37   case class ReadFilter(parquet: Option[FilterPredicate], residual: Option[Filter])
38 
39   object ReadSchema {
40 
41     import org.locationtech.geomesa.filter.RichTransform.RichTransform
42 
43     /**
44       * Calculates the read schema
45       *
46       * @param sft simple feature type
47       * @param filter query filter
48       * @param transform query transform
49       * @return
50       */
51     def apply(
52         sft: SimpleFeatureType,
53         filter: Option[Filter],
54         transform: Option[(String, SimpleFeatureType)]): ReadSchema = {
55       transform match {
56         case None => ReadSchema(sft, None)
57         case Some((tdefs, _)) =>
58           val definitions = Transforms(sft, tdefs)
59           val secondary = definitions.exists {
60             case _: PropertyTransform   => false
61             case _: RenameTransform     => false
62             case _: ExpressionTransform => true
63           }
64           val transformCols = definitions.flatMap(_.properties).distinct
65           val filterCols = filter match {
66             case None => Seq.empty
67             case Some(f) => FilterHelper.propertyNames(f, sft).filterNot(transformCols.contains)
68           }
69 
70           val projectedSft = {
71             val builder = new SimpleFeatureTypeBuilder()
72             builder.setName(sft.getName)
73             transformCols.foreach(a => builder.add(sft.getDescriptor(a)))
74             filterCols.foreach(a => builder.add(sft.getDescriptor(a)))
75             builder.buildFeatureType()
76           }
77           projectedSft.getUserData.putAll(sft.getUserData)
78 
79           ReadSchema(projectedSft, if (secondary || filterCols.nonEmpty) { transform } else { None })
80       }
81     }
82   }
83 
84   object ReadFilter {
85 
86     /**
87       * Create a read filter
88       *
89       * @param sft simple feature type
90       * @param filter query filter
91       * @return
92       */
93     def apply(sft: SimpleFeatureType, filter: Option[Filter]): ReadFilter = {
94       val (parquet, residual) = filter match {
95         case None | Some(Filter.INCLUDE) => (None, None)
96         case Some(f) => FilterConverter.convert(sft, f)
97       }
98       ReadFilter(parquet, residual)
99     }
100   }
101 }
Line Stmt Id Pos Tree Symbol Tests Code
56 65259 2199 - 2203 Select scala.None scala.None
56 65260 2183 - 2204 Apply org.locationtech.geomesa.fs.storage.parquet.ReadSchema.apply `package`.this.ReadSchema.apply(sft, scala.None)
56 65261 2183 - 2204 Block org.locationtech.geomesa.fs.storage.parquet.ReadSchema.apply `package`.this.ReadSchema.apply(sft, scala.None)
57 65296 2235 - 3240 Block <nosymbol> { val definitions: Seq[org.locationtech.geomesa.utils.geotools.Transform] = org.locationtech.geomesa.utils.geotools.Transform.Transforms.apply(sft, tdefs); val secondary: Boolean = definitions.exists(((x0$1: org.locationtech.geomesa.utils.geotools.Transform) => x0$1 match { case (_: org.locationtech.geomesa.utils.geotools.Transform.PropertyTransform) => false case (_: org.locationtech.geomesa.utils.geotools.Transform.RenameTransform) => false case (_: org.locationtech.geomesa.utils.geotools.Transform.ExpressionTransform) => true })); val transformCols: Seq[String] = definitions.flatMap[String, Seq[String]](((x$1: org.locationtech.geomesa.utils.geotools.Transform) => org.locationtech.geomesa.filter.RichTransform.RichTransform(x$1).properties))(collection.this.Seq.canBuildFrom[String]).distinct; val filterCols: Seq[String] = filter match { case scala.None => scala.collection.Seq.empty[Nothing] case (value: org.geotools.api.filter.Filter)Some[org.geotools.api.filter.Filter]((f @ _)) => org.locationtech.geomesa.filter.FilterHelper.propertyNames(f, sft).filterNot({ ((elem: Any) => transformCols.contains[Any](elem)) }) }; val projectedSft: org.geotools.api.feature.simple.SimpleFeatureType = { val builder: org.geotools.feature.simple.SimpleFeatureTypeBuilder = new org.geotools.feature.simple.SimpleFeatureTypeBuilder(); builder.setName(sft.getName()); transformCols.foreach[Unit](((a: String) => builder.add(sft.getDescriptor(a)))); filterCols.foreach[Unit](((a: String) => builder.add(sft.getDescriptor(a)))); builder.buildFeatureType() }; projectedSft.getUserData().putAll(sft.getUserData()); `package`.this.ReadSchema.apply(projectedSft, if (secondary.||(filterCols.nonEmpty)) transform else scala.None) }
58 65262 2266 - 2288 Apply org.locationtech.geomesa.utils.geotools.Transform.Transforms.apply org.locationtech.geomesa.utils.geotools.Transform.Transforms.apply(sft, tdefs)
59 65269 2315 - 2493 Apply scala.collection.IterableLike.exists definitions.exists(((x0$1: org.locationtech.geomesa.utils.geotools.Transform) => x0$1 match { case (_: org.locationtech.geomesa.utils.geotools.Transform.PropertyTransform) => false case (_: org.locationtech.geomesa.utils.geotools.Transform.RenameTransform) => false case (_: org.locationtech.geomesa.utils.geotools.Transform.ExpressionTransform) => true }))
60 65263 2379 - 2384 Literal <nosymbol> false
60 65264 2379 - 2384 Block <nosymbol> false
61 65265 2428 - 2433 Literal <nosymbol> false
61 65266 2428 - 2433 Block <nosymbol> false
62 65267 2477 - 2481 Literal <nosymbol> true
62 65268 2477 - 2481 Block <nosymbol> true
64 65270 2544 - 2556 Select org.locationtech.geomesa.filter.RichTransform.RichTransform.properties org.locationtech.geomesa.filter.RichTransform.RichTransform(x$1).properties
64 65271 2543 - 2543 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[String]
64 65272 2524 - 2566 Select scala.collection.SeqLike.distinct definitions.flatMap[String, Seq[String]](((x$1: org.locationtech.geomesa.utils.geotools.Transform) => org.locationtech.geomesa.filter.RichTransform.RichTransform(x$1).properties))(collection.this.Seq.canBuildFrom[String]).distinct
66 65273 2634 - 2643 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
66 65274 2634 - 2643 Block scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
67 65275 2717 - 2739 Apply scala.collection.SeqLike.contains transformCols.contains[Any](elem)
67 65276 2672 - 2740 Apply scala.collection.TraversableLike.filterNot org.locationtech.geomesa.filter.FilterHelper.propertyNames(f, sft).filterNot({ ((elem: Any) => transformCols.contains[Any](elem)) })
67 65277 2672 - 2740 Block scala.collection.TraversableLike.filterNot org.locationtech.geomesa.filter.FilterHelper.propertyNames(f, sft).filterNot({ ((elem: Any) => transformCols.contains[Any](elem)) })
71 65278 2811 - 2841 Apply org.geotools.feature.simple.SimpleFeatureTypeBuilder.<init> new org.geotools.feature.simple.SimpleFeatureTypeBuilder()
72 65279 2870 - 2881 Apply org.geotools.api.feature.type.PropertyType.getName sft.getName()
72 65280 2854 - 2882 Apply org.geotools.feature.simple.SimpleFeatureTypeBuilder.setName builder.setName(sft.getName())
73 65281 2934 - 2954 Apply org.geotools.api.feature.simple.SimpleFeatureType.getDescriptor sft.getDescriptor(a)
73 65282 2922 - 2955 Apply org.geotools.feature.simple.SimpleFeatureTypeBuilder.add builder.add(sft.getDescriptor(a))
73 65283 2895 - 2956 Apply scala.collection.IterableLike.foreach transformCols.foreach[Unit](((a: String) => builder.add(sft.getDescriptor(a))))
74 65284 3005 - 3025 Apply org.geotools.api.feature.simple.SimpleFeatureType.getDescriptor sft.getDescriptor(a)
74 65285 2993 - 3026 Apply org.geotools.feature.simple.SimpleFeatureTypeBuilder.add builder.add(sft.getDescriptor(a))
74 65286 2969 - 3027 Apply scala.collection.IterableLike.foreach filterCols.foreach[Unit](((a: String) => builder.add(sft.getDescriptor(a))))
75 65287 3040 - 3066 Apply org.geotools.feature.simple.SimpleFeatureTypeBuilder.buildFeatureType builder.buildFeatureType()
77 65288 3121 - 3136 Apply org.geotools.api.feature.type.PropertyType.getUserData sft.getUserData()
77 65289 3089 - 3137 Apply java.util.Map.putAll projectedSft.getUserData().putAll(sft.getUserData())
79 65290 3191 - 3210 Select scala.collection.TraversableOnce.nonEmpty filterCols.nonEmpty
79 65291 3178 - 3210 Apply scala.Boolean.|| secondary.||(filterCols.nonEmpty)
79 65292 3214 - 3223 Ident org.locationtech.geomesa.fs.storage.parquet.ReadSchema.transform transform
79 65293 3233 - 3237 Select scala.None scala.None
79 65294 3233 - 3237 Block scala.None scala.None
79 65295 3149 - 3240 Apply org.locationtech.geomesa.fs.storage.parquet.ReadSchema.apply `package`.this.ReadSchema.apply(projectedSft, if (secondary.||(filterCols.nonEmpty)) transform else scala.None)
94 65297 3516 - 3516 Select scala.Tuple2._1 x$2._1
94 65298 3525 - 3525 Select scala.Tuple2._2 x$2._2
98 65299 3679 - 3708 Apply org.locationtech.geomesa.fs.storage.parquet.ReadFilter.apply `package`.this.ReadFilter.apply(parquet, residual)