1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.fs.storage.common.utils
10 
11 import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
12 import org.apache.hadoop.fs.{FileContext, FileStatus, Path, RemoteIterator}
13 import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty
14 
15 import java.util.concurrent.TimeUnit
16 
17 /**
18   * Caches file statuses to avoid repeated file system operations. Status expires after a
19   * configurable period, by default 10 minutes.
20   */
21 object PathCache {
22 
23   val CacheDurationProperty: SystemProperty = SystemProperty("geomesa.fs.file.cache.duration", "15 minutes")
24 
25   private val duration = CacheDurationProperty.toDuration.get.toMillis
26 
27   // cache for checking existence of files
28   private val pathCache =
29     Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
30       new CacheLoader[(FileContext, Path), java.lang.Boolean]() {
31         override def load(key: (FileContext, Path)): java.lang.Boolean = key._1.util.exists(key._2)
32       }
33     )
34 
35   // cache for individual file status
36   private val statusCache =
37     Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
38       new CacheLoader[(FileContext, Path), FileStatus]() {
39         override def load(key: (FileContext, Path)): FileStatus = key._1.getFileStatus(key._2)
40       }
41     )
42 
43   // cache for checking directory contents
44   private val listCache =
45     Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
46       new CacheLoader[(FileContext, Path), Stream[FileStatus]]() {
47         override def load(key: (FileContext, Path)): Stream[FileStatus] =
48           RemoteIterator(key._1.listStatus(key._2)).toStream
49       }
50     )
51 
52   /**
53     * * Register a path as existing
54     *
55     * @param fc file context
56     * @param path path
57     * @param status file status, if available
58     * @param list directory contents, if available
59     */
60   def register(
61       fc: FileContext,
62       path: Path,
63       status: Option[FileStatus] = None,
64       list: Option[Stream[FileStatus]] = None): Unit = {
65     pathCache.put((fc, path), java.lang.Boolean.TRUE)
66     status.foreach(statusCache.put((fc, path), _))
67     list.foreach(listCache.put((fc, path), _))
68   }
69 
70   /**
71     * Check to see if a path exists
72     *
73     * @param fc file context
74     * @param path path
75     * @param reload reload the file status from the underlying file system before checking
76     * @return
77     */
78   def exists(fc: FileContext, path: Path, reload: Boolean = false): Boolean = {
79     if (reload) {
80       invalidate(fc, path)
81     }
82     pathCache.get((fc, path)).booleanValue()
83   }
84 
85   /**
86     * Gets the file status for a path
87     *
88     * @param fc file context
89     * @param path path
90     * @return
91     */
92   def status(fc: FileContext, path: Path, reload: Boolean = false): FileStatus = {
93     if (reload) {
94       invalidate(fc, path)
95     }
96     statusCache.get((fc, path))
97   }
98 
99   /**
100     * List the children of a path
101     *
102     * @param fc file context
103     * @param dir directory path
104     * @return
105     */
106   def list(fc: FileContext, dir: Path, reload: Boolean = false): Iterator[FileStatus] = {
107     if (reload) {
108       invalidate(fc, dir)
109     }
110     listCache.get((fc, dir)).iterator
111   }
112 
113   /**
114     * Invalidate any cached values for the path - they will be re-loaded on next access
115     *
116     * @param fc file context
117     * @param path path
118     */
119   def invalidate(fc: FileContext, path: Path): Unit =
120     Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fc, path)))
121 
122   object RemoteIterator {
123     def apply[T](iter: RemoteIterator[T]): Iterator[T] = new Iterator[T] {
124       override def hasNext: Boolean = iter.hasNext
125       override def next(): T = iter.next
126     }
127   }
128 }
Line Stmt Id Pos Tree Symbol Tests Code
23 63642 999 - 1061 Apply org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty.apply org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty.apply("geomesa.fs.file.cache.duration", "15 minutes")
25 63643 1088 - 1133 Select scala.concurrent.duration.Duration.toMillis PathCache.this.CacheDurationProperty.toDuration.get.toMillis
29 63644 1247 - 1255 Select org.locationtech.geomesa.fs.storage.common.utils.PathCache.duration PathCache.this.duration
29 63645 1257 - 1278 Literal <nosymbol> MILLISECONDS
29 63650 1208 - 1466 Apply com.github.benmanes.caffeine.cache.Caffeine.build com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(PathCache.this.duration, MILLISECONDS).build[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path), Boolean]({ final class $anon extends Object with com.github.benmanes.caffeine.cache.CacheLoader[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path),Boolean] { def <init>(): <$anon: com.github.benmanes.caffeine.cache.CacheLoader[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path),Boolean]> = { $anon.super.<init>(); () }; override def load(key: (org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path)): Boolean = scala.Predef.boolean2Boolean(key._1.util().exists(key._2)) }; new $anon() })
30 63649 1293 - 1296 Apply org.locationtech.geomesa.fs.storage.common.utils.PathCache.$anon.<init> new $anon()
31 63646 1445 - 1451 Select scala.Tuple2._2 key._2
31 63647 1426 - 1452 Apply org.apache.hadoop.fs.FileContext.Util.exists key._1.util().exists(key._2)
31 63648 1426 - 1452 ApplyImplicitView scala.Predef.boolean2Boolean scala.Predef.boolean2Boolean(key._1.util().exists(key._2))
37 63651 1577 - 1585 Select org.locationtech.geomesa.fs.storage.common.utils.PathCache.duration PathCache.this.duration
37 63652 1587 - 1608 Literal <nosymbol> MILLISECONDS
37 63656 1538 - 1784 Apply com.github.benmanes.caffeine.cache.Caffeine.build com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(PathCache.this.duration, MILLISECONDS).build[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path), org.apache.hadoop.fs.FileStatus]({ final class $anon extends Object with com.github.benmanes.caffeine.cache.CacheLoader[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path),org.apache.hadoop.fs.FileStatus] { def <init>(): <$anon: com.github.benmanes.caffeine.cache.CacheLoader[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path),org.apache.hadoop.fs.FileStatus]> = { $anon.super.<init>(); () }; override def load(key: (org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path)): org.apache.hadoop.fs.FileStatus = key._1.getFileStatus(key._2) }; new $anon() })
38 63655 1623 - 1626 Apply org.locationtech.geomesa.fs.storage.common.utils.PathCache.$anon.<init> new $anon()
39 63653 1763 - 1769 Select scala.Tuple2._2 key._2
39 63654 1742 - 1770 Apply org.apache.hadoop.fs.FileContext.getFileStatus key._1.getFileStatus(key._2)
45 63657 1898 - 1906 Select org.locationtech.geomesa.fs.storage.common.utils.PathCache.duration PathCache.this.duration
45 63658 1908 - 1929 Literal <nosymbol> MILLISECONDS
45 63663 1859 - 2153 Apply com.github.benmanes.caffeine.cache.Caffeine.build com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(PathCache.this.duration, MILLISECONDS).build[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path), Stream[org.apache.hadoop.fs.FileStatus]]({ final class $anon extends Object with com.github.benmanes.caffeine.cache.CacheLoader[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path),Stream[org.apache.hadoop.fs.FileStatus]] { def <init>(): <$anon: com.github.benmanes.caffeine.cache.CacheLoader[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path),Stream[org.apache.hadoop.fs.FileStatus]]> = { $anon.super.<init>(); () }; override def load(key: (org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path)): Stream[org.apache.hadoop.fs.FileStatus] = PathCache.this.RemoteIterator.apply[org.apache.hadoop.fs.FileStatus](key._1.listStatus(key._2)).toStream }; new $anon() })
46 63662 1944 - 1947 Apply org.locationtech.geomesa.fs.storage.common.utils.PathCache.$anon.<init> new $anon()
48 63659 2122 - 2128 Select scala.Tuple2._2 key._2
48 63660 2104 - 2129 Apply org.apache.hadoop.fs.FileContext.listStatus key._1.listStatus(key._2)
48 63661 2089 - 2139 Select scala.collection.Iterator.toStream PathCache.this.RemoteIterator.apply[org.apache.hadoop.fs.FileStatus](key._1.listStatus(key._2)).toStream
65 63664 2532 - 2542 Apply scala.Tuple2.apply scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path)
65 63665 2544 - 2566 Select java.lang.Boolean.TRUE java.lang.Boolean.TRUE
65 63666 2518 - 2567 Apply com.github.benmanes.caffeine.cache.Cache.put PathCache.this.pathCache.put(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path), java.lang.Boolean.TRUE)
66 63667 2603 - 2613 Apply scala.Tuple2.apply scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path)
66 63668 2587 - 2617 Apply com.github.benmanes.caffeine.cache.Cache.put PathCache.this.statusCache.put(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path), x$1)
66 63669 2572 - 2618 Apply scala.Option.foreach status.foreach[Unit](((x$1: org.apache.hadoop.fs.FileStatus) => PathCache.this.statusCache.put(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path), x$1)))
67 63670 2650 - 2660 Apply scala.Tuple2.apply scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path)
67 63671 2636 - 2664 Apply com.github.benmanes.caffeine.cache.Cache.put PathCache.this.listCache.put(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path), x$2)
67 63672 2623 - 2665 Apply scala.Option.foreach list.foreach[Unit](((x$2: Stream[org.apache.hadoop.fs.FileStatus]) => PathCache.this.listCache.put(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path), x$2)))
79 63675 2967 - 2967 Literal <nosymbol> ()
79 63676 2967 - 2967 Block <nosymbol> ()
80 63673 2987 - 3007 Apply org.locationtech.geomesa.fs.storage.common.utils.PathCache.invalidate PathCache.this.invalidate(fc, path)
80 63674 2987 - 3007 Block org.locationtech.geomesa.fs.storage.common.utils.PathCache.invalidate PathCache.this.invalidate(fc, path)
82 63677 3018 - 3058 Apply java.lang.Boolean.booleanValue PathCache.this.pathCache.get(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path)).booleanValue()
93 63680 3274 - 3274 Literal <nosymbol> ()
93 63681 3274 - 3274 Block <nosymbol> ()
94 63678 3294 - 3314 Apply org.locationtech.geomesa.fs.storage.common.utils.PathCache.invalidate PathCache.this.invalidate(fc, path)
94 63679 3294 - 3314 Block org.locationtech.geomesa.fs.storage.common.utils.PathCache.invalidate PathCache.this.invalidate(fc, path)
96 63682 3341 - 3351 Apply scala.Tuple2.apply scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path)
96 63683 3325 - 3352 Apply com.github.benmanes.caffeine.cache.LoadingCache.get PathCache.this.statusCache.get(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path))
107 63686 3580 - 3580 Literal <nosymbol> ()
107 63687 3580 - 3580 Block <nosymbol> ()
108 63684 3600 - 3619 Apply org.locationtech.geomesa.fs.storage.common.utils.PathCache.invalidate PathCache.this.invalidate(fc, dir)
108 63685 3600 - 3619 Block org.locationtech.geomesa.fs.storage.common.utils.PathCache.invalidate PathCache.this.invalidate(fc, dir)
110 63688 3644 - 3653 Apply scala.Tuple2.apply scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, dir)
110 63689 3630 - 3663 Select scala.collection.immutable.Stream.iterator PathCache.this.listCache.get(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, dir)).iterator
120 63690 3890 - 3899 Select org.locationtech.geomesa.fs.storage.common.utils.PathCache.pathCache PathCache.this.pathCache
120 63691 3901 - 3912 Select org.locationtech.geomesa.fs.storage.common.utils.PathCache.statusCache PathCache.this.statusCache
120 63692 3914 - 3923 Select org.locationtech.geomesa.fs.storage.common.utils.PathCache.listCache PathCache.this.listCache
120 63693 3946 - 3956 Apply scala.Tuple2.apply scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path)
120 63694 3933 - 3957 Apply com.github.benmanes.caffeine.cache.Cache.invalidate x$3.invalidate(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path))
120 63695 3886 - 3958 Apply scala.collection.IterableLike.foreach scala.collection.Seq.apply[com.github.benmanes.caffeine.cache.LoadingCache[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path), _ >: Stream[org.apache.hadoop.fs.FileStatus] with org.apache.hadoop.fs.FileStatus with Boolean <: java.io.Serializable]](PathCache.this.pathCache, PathCache.this.statusCache, PathCache.this.listCache).foreach[Unit](((x$3: com.github.benmanes.caffeine.cache.LoadingCache[(org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path), _ >: scala.collection.immutable.Stream[org.apache.hadoop.fs.FileStatus] with org.apache.hadoop.fs.FileStatus with Boolean <: java.io.Serializable]) => x$3.invalidate(scala.Tuple2.apply[org.apache.hadoop.fs.FileContext, org.apache.hadoop.fs.Path](fc, path))))
123 63698 4043 - 4046 Apply org.locationtech.geomesa.fs.storage.common.utils.PathCache.RemoteIterator.$anon.<init> new $anon()
124 63696 4099 - 4111 Apply org.apache.hadoop.fs.RemoteIterator.hasNext iter.hasNext()
125 63697 4143 - 4152 Apply org.apache.hadoop.fs.RemoteIterator.next iter.next()