1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.accumulo.util
10 
11 import com.typesafe.scalalogging.LazyLogging
12 import org.apache.accumulo.core.client.BatchWriterConfig
13 import org.locationtech.geomesa.accumulo.AccumuloProperties
14 
15 import java.util.concurrent.TimeUnit
16 
17 object GeoMesaBatchWriterConfig extends LazyLogging {
18 
19   def apply(threads: Option[Int] = None,
20             memory: Option[Long] = None,
21             latency: Option[Long] = None,
22             timeout: Option[Long] = None): BatchWriterConfig = {
23     import AccumuloProperties.BatchWriterProperties
24 
25     val bwc = new BatchWriterConfig
26 
27     threads.orElse(BatchWriterProperties.WRITER_THREADS.option.map(_.toInt)).foreach { threads =>
28       logger.trace(s"GeoMesaBatchWriter config: maxWriteThreads set to $threads")
29       bwc.setMaxWriteThreads(threads)
30     }
31 
32     memory.orElse(BatchWriterProperties.WRITER_MEMORY_BYTES.toBytes).foreach { memory =>
33       logger.trace(s"GeoMesaBatchWriter config: maxMemory set to $memory bytes")
34       bwc.setMaxMemory(memory)
35     }
36 
37     latency.orElse(BatchWriterProperties.WRITER_LATENCY.toDuration.map(_.toMillis)).foreach { latency =>
38       logger.trace(s"GeoMesaBatchWriter config: maxLatency set to $latency millis")
39       bwc.setMaxLatency(latency, TimeUnit.MILLISECONDS)
40     }
41 
42     timeout.orElse(BatchWriterProperties.WRITE_TIMEOUT.toDuration.map(_.toMillis)).foreach { timeout =>
43       logger.trace(s"GeoMesaBatchWriter config: maxTimeout set to $timeout millis")
44       bwc.setTimeout(timeout, TimeUnit.MILLISECONDS)
45     }
46 
47     bwc
48   }
49 }
Line Stmt Id Pos Tree Symbol Tests Code
25 48085 1028 - 1049 Apply org.apache.accumulo.core.client.BatchWriterConfig.<init> new org.apache.accumulo.core.client.BatchWriterConfig()
27 48086 1118 - 1125 Select scala.collection.immutable.StringLike.toInt scala.Predef.augmentString(x$1).toInt
27 48087 1070 - 1126 Apply scala.Option.map org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITER_THREADS.option.map[Int](((x$1: String) => scala.Predef.augmentString(x$1).toInt))
27 48089 1055 - 1274 Apply scala.Option.foreach threads.orElse[Int](org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITER_THREADS.option.map[Int](((x$1: String) => scala.Predef.augmentString(x$1).toInt))).foreach[org.apache.accumulo.core.client.BatchWriterConfig](((threads: Int) => { (if (GeoMesaBatchWriterConfig.this.logger.underlying.isTraceEnabled()) GeoMesaBatchWriterConfig.this.logger.underlying.trace("GeoMesaBatchWriter config: maxWriteThreads set to {}", threads.asInstanceOf[AnyRef]) else (): Unit); bwc.setMaxWriteThreads(threads) }))
29 48088 1237 - 1268 Apply org.apache.accumulo.core.client.BatchWriterConfig.setMaxWriteThreads bwc.setMaxWriteThreads(threads)
32 48090 1294 - 1343 Select org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty.toBytes org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITER_MEMORY_BYTES.toBytes
32 48092 1280 - 1482 Apply scala.Option.foreach memory.orElse[Long](org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITER_MEMORY_BYTES.toBytes).foreach[org.apache.accumulo.core.client.BatchWriterConfig](((memory: Long) => { (if (GeoMesaBatchWriterConfig.this.logger.underlying.isTraceEnabled()) GeoMesaBatchWriterConfig.this.logger.underlying.trace("GeoMesaBatchWriter config: maxMemory set to {} bytes", memory.asInstanceOf[AnyRef]) else (): Unit); bwc.setMaxMemory(memory) }))
34 48091 1452 - 1476 Apply org.apache.accumulo.core.client.BatchWriterConfig.setMaxMemory bwc.setMaxMemory(memory)
37 48093 1555 - 1565 Select scala.concurrent.duration.Duration.toMillis x$2.toMillis
37 48094 1503 - 1566 Apply scala.Option.map org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITER_LATENCY.toDuration.map[Long](((x$2: scala.concurrent.duration.Duration) => x$2.toMillis))
37 48096 1488 - 1734 Apply scala.Option.foreach latency.orElse[Long](org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITER_LATENCY.toDuration.map[Long](((x$2: scala.concurrent.duration.Duration) => x$2.toMillis))).foreach[org.apache.accumulo.core.client.BatchWriterConfig](((latency: Long) => { (if (GeoMesaBatchWriterConfig.this.logger.underlying.isTraceEnabled()) GeoMesaBatchWriterConfig.this.logger.underlying.trace("GeoMesaBatchWriter config: maxLatency set to {} millis", latency.asInstanceOf[AnyRef]) else (): Unit); bwc.setMaxLatency(latency, MILLISECONDS) }))
39 48095 1679 - 1728 Apply org.apache.accumulo.core.client.BatchWriterConfig.setMaxLatency bwc.setMaxLatency(latency, MILLISECONDS)
42 48097 1806 - 1816 Select scala.concurrent.duration.Duration.toMillis x$3.toMillis
42 48098 1755 - 1817 Apply scala.Option.map org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITE_TIMEOUT.toDuration.map[Long](((x$3: scala.concurrent.duration.Duration) => x$3.toMillis))
42 48100 1740 - 1982 Apply scala.Option.foreach timeout.orElse[Long](org.locationtech.geomesa.accumulo.`package`.AccumuloProperties.BatchWriterProperties.WRITE_TIMEOUT.toDuration.map[Long](((x$3: scala.concurrent.duration.Duration) => x$3.toMillis))).foreach[org.apache.accumulo.core.client.BatchWriterConfig](((timeout: Long) => { (if (GeoMesaBatchWriterConfig.this.logger.underlying.isTraceEnabled()) GeoMesaBatchWriterConfig.this.logger.underlying.trace("GeoMesaBatchWriter config: maxTimeout set to {} millis", timeout.asInstanceOf[AnyRef]) else (): Unit); bwc.setTimeout(timeout, MILLISECONDS) }))
44 48099 1930 - 1976 Apply org.apache.accumulo.core.client.BatchWriterConfig.setTimeout bwc.setTimeout(timeout, MILLISECONDS)