1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 
10 package org.locationtech.geomesa.convert.redis
11 
12 import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
13 import com.typesafe.config.Config
14 import org.locationtech.geomesa.convert.{EnrichmentCache, EnrichmentCacheFactory}
15 import redis.clients.jedis.util.JedisURIHelper
16 import redis.clients.jedis.{Jedis, JedisPool}
17 
18 import java.io.Closeable
19 import java.net.URI
20 import java.util.concurrent.TimeUnit
21 import scala.util.Try
22 
23 trait RedisConnectionBuilder extends Closeable {
24   def getConn: Jedis
25 }
26 
27 class RedisEnrichmentCache(jedisPool: RedisConnectionBuilder,
28                            expiration: Long = -1,
29                            localCache: Boolean) extends EnrichmentCache {
30   type KV = java.util.Map[String, String]
31 
32   private val builder =
33     if (expiration > 0) {
34       Caffeine.newBuilder().expireAfterWrite(expiration, TimeUnit.MILLISECONDS)
35     } else {
36       if (!localCache) {
37         Caffeine.newBuilder().expireAfterWrite(0, TimeUnit.MILLISECONDS).maximumSize(0)
38       } else {
39         Caffeine.newBuilder()
40       }
41     }
42 
43   private val cache: LoadingCache[String, KV] =
44     builder
45       .build(new CacheLoader[String, KV] {
46         override def load(k: String): KV = {
47           val conn = jedisPool.getConn
48           try {
49             conn.hgetAll(k)
50           } finally {
51             // Note: for a JedisPool this only returns it to the pool instead of actionally
52             // closing the connection (so it's safe to call close() on the conn)
53             conn.close()
54           }
55         }
56       })
57 
58   override def get(args: Array[String]): Any = cache.get(args(0)).get(args(1))
59   override def put(args: Array[String], value: Any): Unit = ???
60   override def clear(): Unit = ???
61   override def close(): Unit = jedisPool.close()
62 }
63 
64 class RedisEnrichmentCacheFactory extends EnrichmentCacheFactory {
65   override def canProcess(conf: Config): Boolean = conf.hasPath("type") && conf.getString("type").equals("redis")
66 
67   override def build(conf: Config): EnrichmentCache = {
68     val redisUrl = {
69       val url = conf.getString("redis-url")
70       Some(url).filter(u => Try(new URI(u)).toOption.exists(JedisURIHelper.isValid)).getOrElse {
71         if (url.indexOf(":") == -1) { url } else { s"redis://$url" }
72       }
73     }
74     val timeout = if (conf.hasPath("expiration")) conf.getLong("expiration") else -1
75     val connBuilder: RedisConnectionBuilder = new RedisConnectionBuilder {
76       private val pool = new JedisPool(redisUrl)
77       override def getConn: Jedis = pool.getResource
78       override def close(): Unit = pool.close()
79     }
80 
81     val localCache = Try(conf.getBoolean("local-cache")).getOrElse(true)
82     new RedisEnrichmentCache(connBuilder, timeout, localCache)
83   }
84 }
Line Stmt Id Pos Tree Symbol Tests Code
33 1 1243 - 1257 Apply scala.Long.> RedisEnrichmentCache.this.expiration.>(0)
34 2 1306 - 1316 Select org.locationtech.geomesa.convert.redis.RedisEnrichmentCache.expiration RedisEnrichmentCache.this.expiration
34 3 1318 - 1339 Literal <nosymbol> MILLISECONDS
34 4 1267 - 1340 Apply com.github.benmanes.caffeine.cache.Caffeine.expireAfterWrite com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(RedisEnrichmentCache.this.expiration, MILLISECONDS)
34 5 1267 - 1340 Block com.github.benmanes.caffeine.cache.Caffeine.expireAfterWrite com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(RedisEnrichmentCache.this.expiration, MILLISECONDS)
36 6 1364 - 1375 Select scala.Boolean.unary_! RedisEnrichmentCache.this.localCache.unary_!
36 11 1360 - 1519 If <nosymbol> if (RedisEnrichmentCache.this.localCache.unary_!) com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(0L, MILLISECONDS).maximumSize(0L) else com.github.benmanes.caffeine.cache.Caffeine.newBuilder()
37 7 1387 - 1466 Apply com.github.benmanes.caffeine.cache.Caffeine.maximumSize com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(0L, MILLISECONDS).maximumSize(0L)
37 8 1387 - 1466 Block com.github.benmanes.caffeine.cache.Caffeine.maximumSize com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterWrite(0L, MILLISECONDS).maximumSize(0L)
39 9 1490 - 1511 Apply com.github.benmanes.caffeine.cache.Caffeine.newBuilder com.github.benmanes.caffeine.cache.Caffeine.newBuilder()
39 10 1490 - 1511 Block com.github.benmanes.caffeine.cache.Caffeine.newBuilder com.github.benmanes.caffeine.cache.Caffeine.newBuilder()
45 17 1600 - 1603 Apply org.locationtech.geomesa.convert.redis.RedisEnrichmentCache.$anon.<init> new $anon()
45 18 1579 - 2008 Apply com.github.benmanes.caffeine.cache.Caffeine.build RedisEnrichmentCache.this.builder.build[String, RedisEnrichmentCache.this.KV]({ final class $anon extends Object with com.github.benmanes.caffeine.cache.CacheLoader[String,RedisEnrichmentCache.this.KV] { def <init>(): <$anon: com.github.benmanes.caffeine.cache.CacheLoader[String,RedisEnrichmentCache.this.KV]> = { $anon.super.<init>(); () }; override def load(k: String): RedisEnrichmentCache.this.KV = { val conn: redis.clients.jedis.Jedis = RedisEnrichmentCache.this.jedisPool.getConn; try { conn.hgetAll(k) } finally conn.close() } }; new $anon() })
47 12 1696 - 1713 Select org.locationtech.geomesa.convert.redis.RedisConnectionBuilder.getConn RedisEnrichmentCache.this.jedisPool.getConn
49 13 1742 - 1757 Apply redis.clients.jedis.Jedis.hgetAll conn.hgetAll(k)
49 14 1742 - 1757 Block redis.clients.jedis.Jedis.hgetAll conn.hgetAll(k)
53 15 1965 - 1977 Apply redis.clients.jedis.Jedis.close conn.close()
53 16 1965 - 1977 Block redis.clients.jedis.Jedis.close conn.close()
58 19 2067 - 2074 Apply scala.Array.apply args.apply(0)
58 20 2080 - 2087 Apply scala.Array.apply args.apply(1)
58 21 2057 - 2088 Apply java.util.Map.get RedisEnrichmentCache.this.cache.get(args.apply(0)).get(args.apply(1))
59 22 2149 - 2152 Select scala.Predef.??? scala.Predef.???
60 23 2184 - 2187 Select scala.Predef.??? scala.Predef.???
61 24 2219 - 2236 Apply java.io.Closeable.close RedisEnrichmentCache.this.jedisPool.close()
65 25 2371 - 2377 Literal <nosymbol> "type"
65 26 2382 - 2420 Apply java.lang.String.equals conf.getString("type").equals("redis")
65 27 2358 - 2420 Apply scala.Boolean.&& conf.hasPath("type").&&(conf.getString("type").equals("redis"))
69 28 2515 - 2542 Apply com.typesafe.config.Config.getString conf.getString("redis-url")
70 29 2575 - 2585 Apply java.net.URI.<init> new java.net.URI(u)
70 30 2603 - 2625 Apply redis.clients.jedis.util.JedisURIHelper.isValid redis.clients.jedis.util.JedisURIHelper.isValid(x$1)
70 31 2571 - 2626 Apply scala.Option.exists scala.util.Try.apply[java.net.URI](new java.net.URI(u)).toOption.exists({ ((x$1: java.net.URI) => redis.clients.jedis.util.JedisURIHelper.isValid(x$1)) })
70 36 2549 - 2716 Apply scala.Option.getOrElse scala.Some.apply[String](url).filter(((u: String) => scala.util.Try.apply[java.net.URI](new java.net.URI(u)).toOption.exists({ ((x$1: java.net.URI) => redis.clients.jedis.util.JedisURIHelper.isValid(x$1)) }))).getOrElse[String](if (url.indexOf(":").==(-1)) url else scala.StringContext.apply("redis://", "").s(url))
71 32 2652 - 2674 Apply scala.Int.== url.indexOf(":").==(-1)
71 33 2678 - 2681 Ident org.locationtech.geomesa.convert.redis.RedisEnrichmentCacheFactory.url url
71 34 2691 - 2706 Apply scala.StringContext.s scala.StringContext.apply("redis://", "").s(url)
71 35 2691 - 2706 Block scala.StringContext.s scala.StringContext.apply("redis://", "").s(url)
74 37 2745 - 2771 Apply com.typesafe.config.Config.hasPath conf.hasPath("expiration")
74 38 2773 - 2799 Apply com.typesafe.config.Config.getLong conf.getLong("expiration")
74 39 2773 - 2799 Block com.typesafe.config.Config.getLong conf.getLong("expiration")
74 40 2805 - 2807 Literal <nosymbol> -1L
74 41 2805 - 2807 Block <nosymbol> -1L
75 45 2854 - 2857 Apply org.locationtech.geomesa.convert.redis.RedisEnrichmentCacheFactory.$anon.<init> new $anon()
76 42 2908 - 2931 Apply redis.clients.jedis.JedisPool.<init> new redis.clients.jedis.JedisPool(redisUrl)
77 43 2968 - 2984 Apply redis.clients.jedis.JedisPool.getResource $anon.this.pool.getResource()
78 44 3020 - 3032 Apply redis.clients.jedis.util.Pool.close $anon.this.pool.close()
81 46 3061 - 3112 Apply scala.util.Try.getOrElse scala.util.Try.apply[Boolean](conf.getBoolean("local-cache")).getOrElse[Boolean](true)
82 47 3117 - 3175 Apply org.locationtech.geomesa.convert.redis.RedisEnrichmentCache.<init> new RedisEnrichmentCache(connBuilder, timeout, localCache)