1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.arrow.io.reader
10 
11 import org.geotools.api.feature.simple.SimpleFeatureType
12 import org.geotools.api.filter.Filter
13 import org.locationtech.geomesa.arrow.features.ArrowSimpleFeature
14 import org.locationtech.geomesa.arrow.io.SimpleFeatureArrowFileReader
15 import org.locationtech.geomesa.arrow.vector.{ArrowDictionary, SimpleFeatureVector}
16 import org.locationtech.geomesa.utils.collection.CloseableIterator
17 import org.locationtech.geomesa.utils.io.CloseWithLogging
18 
19 import java.io.InputStream
20 import java.util.concurrent.atomic.AtomicBoolean
21 
22 /**
23  * Streaming reader that will re-read the input stream on each call to features()
24  *
25  * @param is input stream function
26  */
27 class MultiStreamSimpleFeatureArrowFileReader(is: () => InputStream) extends SimpleFeatureArrowFileReader {
28 
29   private val reader = new StreamingSimpleFeatureArrowFileReader(is())
30   private val createNewReader = new AtomicBoolean(false)
31 
32   override def sft: SimpleFeatureType = reader.sft
33   override def dictionaries: Map[String, ArrowDictionary] = reader.dictionaries
34   override def vectors: Seq[SimpleFeatureVector] = reader.vectors
35   override def features(filter: Filter): CloseableIterator[ArrowSimpleFeature] = {
36     // we can use the original reader for the first query, subsequent queries we have to create a new one
37     val newReader =
38       if (createNewReader.compareAndSet(false, true)) { None } else {
39         Some(new StreamingSimpleFeatureArrowFileReader(is()))
40       }
41     val iter = newReader.getOrElse(reader).features(filter)
42     new CloseableIterator[ArrowSimpleFeature] {
43       override def hasNext: Boolean = iter.hasNext
44       override def next(): ArrowSimpleFeature = iter.next()
45       override def close(): Unit = CloseWithLogging(Seq(iter) ++ newReader)
46     }
47   }
48 
49   override def close(): Unit = reader.close()
50 }
Line Stmt Id Pos Tree Symbol Tests Code
29 21530 1338 - 1342 Apply scala.Function0.apply MultiStreamSimpleFeatureArrowFileReader.this.is.apply()
29 21531 1296 - 1343 Apply org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader.<init> new StreamingSimpleFeatureArrowFileReader(MultiStreamSimpleFeatureArrowFileReader.this.is.apply())
30 21532 1376 - 1400 Apply java.util.concurrent.atomic.AtomicBoolean.<init> new java.util.concurrent.atomic.AtomicBoolean(false)
32 21533 1442 - 1452 Select org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader.sft MultiStreamSimpleFeatureArrowFileReader.this.reader.sft
33 21534 1513 - 1532 Select org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader.dictionaries MultiStreamSimpleFeatureArrowFileReader.this.reader.dictionaries
34 21535 1584 - 1598 Select org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader.vectors MultiStreamSimpleFeatureArrowFileReader.this.reader.vectors
38 21536 1818 - 1860 Apply java.util.concurrent.atomic.AtomicBoolean.compareAndSet MultiStreamSimpleFeatureArrowFileReader.this.createNewReader.compareAndSet(false, true)
38 21537 1864 - 1868 Select scala.None scala.None
38 21538 1864 - 1868 Block scala.None scala.None
39 21539 1933 - 1937 Apply scala.Function0.apply MultiStreamSimpleFeatureArrowFileReader.this.is.apply()
39 21540 1891 - 1938 Apply org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader.<init> new StreamingSimpleFeatureArrowFileReader(MultiStreamSimpleFeatureArrowFileReader.this.is.apply())
39 21541 1886 - 1939 Apply scala.Some.apply scala.Some.apply[org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader](new StreamingSimpleFeatureArrowFileReader(MultiStreamSimpleFeatureArrowFileReader.this.is.apply()))
39 21542 1886 - 1939 Block scala.Some.apply scala.Some.apply[org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader](new StreamingSimpleFeatureArrowFileReader(MultiStreamSimpleFeatureArrowFileReader.this.is.apply()))
41 21543 1963 - 2007 Apply org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader.features newReader.getOrElse[org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader](MultiStreamSimpleFeatureArrowFileReader.this.reader).features(filter)
42 21552 2012 - 2015 Apply org.locationtech.geomesa.arrow.io.reader.MultiStreamSimpleFeatureArrowFileReader.$anon.<init> new $anon()
43 21544 2094 - 2106 Select scala.collection.Iterator.hasNext iter.hasNext
44 21545 2155 - 2166 Apply scala.collection.Iterator.next iter.next()
45 21546 2232 - 2241 ApplyImplicitView scala.Option.option2Iterable scala.this.Option.option2Iterable[org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader](newReader)
45 21547 2229 - 2229 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[java.io.Closeable]
45 21548 2219 - 2241 ApplyToImplicitArgs scala.collection.TraversableLike.++ scala.collection.Seq.apply[org.locationtech.geomesa.utils.collection.CloseableIterator[org.locationtech.geomesa.arrow.features.ArrowSimpleFeature]](iter).++[java.io.Closeable, Seq[java.io.Closeable]](scala.this.Option.option2Iterable[org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader](newReader))(collection.this.Seq.canBuildFrom[java.io.Closeable])
45 21549 2218 - 2218 Select org.locationtech.geomesa.utils.io.IsCloseableImplicits.iterableIsCloseable io.this.IsCloseable.iterableIsCloseable
45 21550 2202 - 2242 ApplyToImplicitArgs org.locationtech.geomesa.utils.io.CloseWithLogging.apply org.locationtech.geomesa.utils.io.`package`.CloseWithLogging.apply[Seq[java.io.Closeable]](scala.collection.Seq.apply[org.locationtech.geomesa.utils.collection.CloseableIterator[org.locationtech.geomesa.arrow.features.ArrowSimpleFeature]](iter).++[java.io.Closeable, Seq[java.io.Closeable]](scala.this.Option.option2Iterable[org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader](newReader))(collection.this.Seq.canBuildFrom[java.io.Closeable]))(io.this.IsCloseable.iterableIsCloseable)
45 21551 2218 - 2218 Literal <nosymbol> ()
49 21553 2285 - 2299 Apply org.locationtech.geomesa.arrow.io.reader.StreamingSimpleFeatureArrowFileReader.close MultiStreamSimpleFeatureArrowFileReader.this.reader.close()