1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.hbase.utils
10 
11 import org.apache.hadoop.conf.Configuration
12 import org.apache.hadoop.fs.Path
13 import org.apache.hadoop.hbase.client.{Admin, HBaseAdmin}
14 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
15 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
16 import org.apache.hadoop.hbase.regionserver.BloomType
17 import org.apache.hadoop.hbase.{Coprocessor, NamespaceDescriptor, TableName}
18 
19 import java.lang.reflect.InvocationTargetException
20 import scala.util.Try
21 
22 /**
23   * Reflection wrapper for method signature differences in the HBase API
24   */
25 object HBaseVersions {
26 
27   /**
28    * Create a new table
29    *
30    * @param admin admin connection to hbase
31    * @param name table name
32    * @param colFamilies column families
33    * @param bloom bloom filter
34    * @param compression compression
35    * @param encoding data block encoding
36    * @param coprocessor coprocessor class and optional jar path
37    * @param splits initial table splits (empty for no splits)
38    */
39   def createTableAsync(
40     admin: Admin,
41     name: TableName,
42     colFamilies: Seq[Array[Byte]],
43     bloom: Option[BloomType],
44     compression: Option[Algorithm],
45     encoding: Option[DataBlockEncoding],
46     inMemory: Option[Boolean],
47     coprocessor: Option[(String, Option[Path])],
48     splits: Seq[Array[Byte]]): Unit = {
49 
50     if (name.getNamespaceAsString != null &&
51         name.getNamespaceAsString != NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR &&
52         name.getNamespaceAsString != NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR &&
53         Try(Option(admin.getNamespaceDescriptor(name.getNamespaceAsString))).getOrElse(None).isEmpty) {
54       admin.createNamespace(NamespaceDescriptor.create(name.getNamespaceAsString).build())
55     }
56 
57     val descriptor = hTableDescriptorClass.getConstructor(classOf[TableName]).newInstance(name).asInstanceOf[AnyRef]
58 
59     colFamilies.foreach { k =>
60       val column = hColumnDescriptorClass.getConstructor(classOf[Array[Byte]]).newInstance(k)
61       bloom.foreach(_setBloomFilterType(column, _))
62       compression.foreach(_setCompressionType(column, _))
63       encoding.foreach(_setDataBlockEncoding(column, _))
64       inMemory.foreach(_setInMemory(column, _))
65       _addFamily(descriptor, column)
66     }
67 
68     coprocessor.foreach { case (clas, path) =>
69       _addCoprocessor(descriptor, clas, path.orNull, Coprocessor.PRIORITY_USER, null)
70     }
71 
72     _createTableAsync(admin, descriptor, if (splits.isEmpty) { null } else { splits.toArray })
73   }
74 
75   /**
76    * Disable a table asynchronously
77    *
78    * @param admin admin hbase connection
79    * @param table table to disable
80    */
81   def disableTableAsync(admin: Admin, table: TableName): Unit = _disableTableAsync(admin, table)
82 
83   /**
84    * Checks whether HBase is available, and throws an exception if not
85    *
86    * @param conf HBase configuration
87    */
88   def checkAvailable(conf: Configuration): Unit = _available(conf)
89 
90   private lazy val hTableDescriptorClass = Class.forName("org.apache.hadoop.hbase.HTableDescriptor")
91 
92   private lazy val hColumnDescriptorClass = Class.forName("org.apache.hadoop.hbase.HColumnDescriptor")
93 
94   private lazy val _setBloomFilterType: (Any, BloomType) => Unit =
95     findMethod(hColumnDescriptorClass, "setBloomFilterType", classOf[BloomType])
96 
97   private lazy val _setCompressionType: (Any, Algorithm) => Unit =
98     findMethod(hColumnDescriptorClass, "setCompressionType", classOf[Algorithm])
99 
100   private lazy val _setDataBlockEncoding: (Any, DataBlockEncoding) => Unit =
101     findMethod(hColumnDescriptorClass, "setDataBlockEncoding", classOf[DataBlockEncoding])
102 
103   private lazy val _setInMemory: (Any, Boolean) => Unit =
104     findMethod(hColumnDescriptorClass, "setInMemory", classOf[Boolean])
105 
106   // HBase 1.3 signature: public HTableDescriptor addFamily(final HColumnDescriptor family)
107   // CDH 5.12 signature: public void addFamily(final HColumnDescriptor family)
108   private lazy val _addFamily: (Any, Any) => Unit =
109     findMethod(hTableDescriptorClass, "addFamily", hColumnDescriptorClass)
110 
111   private lazy val _disableTableAsync: (Admin, TableName) => Unit =
112     findMethod(classOf[Admin], "disableTableAsync", classOf[TableName])
113 
114   // HBase 1.3 signature:
115   // public HTableDescriptor addCoprocessor(String className, Path jarFilePath, int priority, final Map<String, String> kvs)
116   // CDH 5.12 signature:
117   // public void addCoprocessor(String className, Path jarFilePath, int priority, final Map<String, String> kvs)
118   private lazy val _addCoprocessor: (Any, String, Path, Int, java.util.Map[String, String]) => Unit = {
119     val methods = hTableDescriptorClass.getMethods
120     val method = methods.find(m => m.getName == "addCoprocessor" && m.getParameterCount == 4).getOrElse {
121       throw new NoSuchMethodException("Couldn't find HTableDescriptor.addCoprocessor method")
122     }
123     val parameterTypes = method.getParameterTypes.asInstanceOf[Array[AnyRef]]
124     val expected = Array[AnyRef](classOf[String], classOf[Path], classOf[Int], classOf[java.util.Map[String, String]])
125     if (java.util.Arrays.equals(parameterTypes, expected)) {
126       (descriptor, className, jarFilePath, priority, kvs) => {
127         try { method.invoke(descriptor, className, jarFilePath, Int.box(priority), kvs)  } catch {
128           case e: InvocationTargetException => throw e.getCause
129         }
130       }
131 
132     } else {
133       throw new NoSuchMethodException(
134         s"Couldn't find HTableDescriptor.addCoprocessor method with correct parameters: $method")
135     }
136   }
137 
138   private lazy val _createTableAsync: (Admin, AnyRef, Array[Array[Byte]]) => Unit = {
139     val methods = classOf[Admin].getMethods
140     val method = methods.find(m => m.getName == "createTableAsync" && m.getParameterCount == 2).getOrElse {
141       throw new NoSuchMethodException("Couldn't find Admin.createTableAsync method")
142     }
143     val parameterTypes = method.getParameterTypes
144     if (parameterTypes.lengthCompare(2) == 0
145         && parameterTypes.head.isAssignableFrom(hTableDescriptorClass)
146         && parameterTypes.last == classOf[Array[Array[Byte]]]) {
147       (admin, descriptor, splits) => {
148         try { method.invoke(admin, descriptor, splits) } catch {
149           case e: InvocationTargetException => throw e.getCause
150         }
151       }
152     } else {
153       throw new NoSuchMethodException(s"Couldn't find Admin.createTableAsync method with correct parameters: $method")
154     }
155   }
156 
157   private lazy val _available: Configuration => Unit = {
158     val names = Seq("available", "checkHBaseAvailable")
159     val method = classOf[HBaseAdmin].getMethods.find(m => names.contains(m.getName)).getOrElse {
160       throw new NoSuchMethodException("Couldn't find HBaseAdmin.available method")
161     }
162     if (method.getParameterCount != 1 || method.getParameterTypes.head != classOf[Configuration]) {
163       throw new NoSuchMethodException(s"Couldn't find HBaseAdmin.available method with correct parameters: $method")
164     }
165     conf => {
166       try { method.invoke(null, conf) } catch {
167         case e: InvocationTargetException => throw e.getCause
168       }
169     }
170   }
171 
172   private def findMethod(clas: Class[_], name: String, param: Class[_]): (Any, Any) => Unit = {
173     val method = clas.getMethods.find(_.getName == name).getOrElse {
174       throw new NoSuchMethodException(s"Couldn't find ${clas.getSimpleName}.$name method")
175     }
176     val parameterTypes = method.getParameterTypes
177     if (parameterTypes.length == 1 && parameterTypes.head == param) {
178       (obj, p) => {
179         try { method.invoke(obj, p.asInstanceOf[AnyRef]) } catch {
180           case e: InvocationTargetException => throw e.getCause
181         }
182       }
183     } else {
184       throw new NoSuchMethodException(
185         s"Couldn't find ${clas.getSimpleName}.$name method with correct parameters: $method")
186     }
187   }
188 }
Line Stmt Id Pos Tree Symbol Tests Code
50 1790 1837 - 1841 Literal <nosymbol> null
50 1804 1804 - 1804 Literal <nosymbol> ()
50 1805 1804 - 1804 Block <nosymbol> ()
51 1791 1882 - 1928 Select org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR
51 1792 1853 - 1928 Apply java.lang.Object.!= name.getNamespaceAsString().!=(org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR)
52 1793 1969 - 2014 Select org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR
52 1794 1940 - 2014 Apply java.lang.Object.!= name.getNamespaceAsString().!=(org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)
52 1800 1808 - 2118 Apply scala.Boolean.&& name.getNamespaceAsString().!=(null).&&(name.getNamespaceAsString().!=(org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR)).&&(name.getNamespaceAsString().!=(org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)).&&(scala.util.Try.apply[Option[org.apache.hadoop.hbase.NamespaceDescriptor]](scala.Option.apply[org.apache.hadoop.hbase.NamespaceDescriptor](admin.getNamespaceDescriptor(name.getNamespaceAsString()))).getOrElse[Option[org.apache.hadoop.hbase.NamespaceDescriptor]](scala.None).isEmpty)
53 1795 2066 - 2091 Apply org.apache.hadoop.hbase.TableName.getNamespaceAsString name.getNamespaceAsString()
53 1796 2037 - 2092 Apply org.apache.hadoop.hbase.client.Admin.getNamespaceDescriptor admin.getNamespaceDescriptor(name.getNamespaceAsString())
53 1797 2030 - 2093 Apply scala.Option.apply scala.Option.apply[org.apache.hadoop.hbase.NamespaceDescriptor](admin.getNamespaceDescriptor(name.getNamespaceAsString()))
53 1798 2105 - 2109 Select scala.None scala.None
53 1799 2026 - 2118 Select scala.Option.isEmpty scala.util.Try.apply[Option[org.apache.hadoop.hbase.NamespaceDescriptor]](scala.Option.apply[org.apache.hadoop.hbase.NamespaceDescriptor](admin.getNamespaceDescriptor(name.getNamespaceAsString()))).getOrElse[Option[org.apache.hadoop.hbase.NamespaceDescriptor]](scala.None).isEmpty
54 1801 2150 - 2211 Apply org.apache.hadoop.hbase.NamespaceDescriptor.Builder.build org.apache.hadoop.hbase.NamespaceDescriptor.create(name.getNamespaceAsString()).build()
54 1802 2128 - 2212 Apply org.apache.hadoop.hbase.client.Admin.createNamespace admin.createNamespace(org.apache.hadoop.hbase.NamespaceDescriptor.create(name.getNamespaceAsString()).build())
54 1803 2128 - 2212 Block org.apache.hadoop.hbase.client.Admin.createNamespace admin.createNamespace(org.apache.hadoop.hbase.NamespaceDescriptor.create(name.getNamespaceAsString()).build())
57 1806 2278 - 2296 Literal <nosymbol> classOf[org.apache.hadoop.hbase.TableName]
57 1807 2241 - 2336 TypeApply scala.Any.asInstanceOf HBaseVersions.this.hTableDescriptorClass.getConstructor(classOf[org.apache.hadoop.hbase.TableName]).newInstance(name).asInstanceOf[AnyRef]
59 1818 2342 - 2720 Apply scala.collection.IterableLike.foreach colFamilies.foreach[Unit](((k: Array[Byte]) => { val column: Any = HBaseVersions.this.hColumnDescriptorClass.getConstructor(classOf[[B]).newInstance(k); bloom.foreach[Unit](((x$1: org.apache.hadoop.hbase.regionserver.BloomType) => HBaseVersions.this._setBloomFilterType.apply(column, x$1))); compression.foreach[Unit](((x$2: org.apache.hadoop.hbase.io.compress.Compression.Algorithm) => HBaseVersions.this._setCompressionType.apply(column, x$2))); encoding.foreach[Unit](((x$3: org.apache.hadoop.hbase.io.encoding.DataBlockEncoding) => HBaseVersions.this._setDataBlockEncoding.apply(column, x$3))); inMemory.foreach[Unit](((x$4: Boolean) => HBaseVersions.this._setInMemory.apply(column, x$4))); HBaseVersions.this._addFamily.apply(descriptor, column) }))
60 1808 2388 - 2462 Apply java.lang.reflect.Constructor.newInstance HBaseVersions.this.hColumnDescriptorClass.getConstructor(classOf[[B]).newInstance(k)
61 1809 2483 - 2513 Apply scala.Function2.apply HBaseVersions.this._setBloomFilterType.apply(column, x$1)
61 1810 2469 - 2514 Apply scala.Option.foreach bloom.foreach[Unit](((x$1: org.apache.hadoop.hbase.regionserver.BloomType) => HBaseVersions.this._setBloomFilterType.apply(column, x$1)))
62 1811 2541 - 2571 Apply scala.Function2.apply HBaseVersions.this._setCompressionType.apply(column, x$2)
62 1812 2521 - 2572 Apply scala.Option.foreach compression.foreach[Unit](((x$2: org.apache.hadoop.hbase.io.compress.Compression.Algorithm) => HBaseVersions.this._setCompressionType.apply(column, x$2)))
63 1813 2596 - 2628 Apply scala.Function2.apply HBaseVersions.this._setDataBlockEncoding.apply(column, x$3)
63 1814 2579 - 2629 Apply scala.Option.foreach encoding.foreach[Unit](((x$3: org.apache.hadoop.hbase.io.encoding.DataBlockEncoding) => HBaseVersions.this._setDataBlockEncoding.apply(column, x$3)))
64 1815 2653 - 2676 Apply scala.Function2.apply HBaseVersions.this._setInMemory.apply(column, x$4)
64 1816 2636 - 2677 Apply scala.Option.foreach inMemory.foreach[Unit](((x$4: Boolean) => HBaseVersions.this._setInMemory.apply(column, x$4)))
65 1817 2684 - 2714 Apply scala.Function2.apply HBaseVersions.this._addFamily.apply(descriptor, column)
68 1825 2726 - 2860 Apply scala.Option.foreach coprocessor.foreach[Unit](((x0$1: (String, Option[org.apache.hadoop.fs.Path])) => x0$1 match { case (_1: String, _2: Option[org.apache.hadoop.fs.Path])(String, Option[org.apache.hadoop.fs.Path])((clas @ _), (path @ _)) => HBaseVersions.this._addCoprocessor.apply(descriptor, clas, path.orNull[org.apache.hadoop.fs.Path](scala.Predef.$conforms[Null]), 1073741823, null) }))
69 1819 2814 - 2814 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
69 1820 2809 - 2820 ApplyToImplicitArgs scala.Option.orNull path.orNull[org.apache.hadoop.fs.Path](scala.Predef.$conforms[Null])
69 1821 2822 - 2847 Literal <nosymbol> 1073741823
69 1822 2849 - 2853 Literal <nosymbol> null
69 1823 2775 - 2854 Apply scala.Function5.apply HBaseVersions.this._addCoprocessor.apply(descriptor, clas, path.orNull[org.apache.hadoop.fs.Path](scala.Predef.$conforms[Null]), 1073741823, null)
69 1824 2775 - 2854 Block scala.Function5.apply HBaseVersions.this._addCoprocessor.apply(descriptor, clas, path.orNull[org.apache.hadoop.fs.Path](scala.Predef.$conforms[Null]), 1073741823, null)
72 1826 2907 - 2921 Select scala.collection.SeqLike.isEmpty splits.isEmpty
72 1827 2925 - 2929 Literal <nosymbol> null
72 1828 2925 - 2929 Block <nosymbol> null
72 1829 2939 - 2953 ApplyToImplicitArgs scala.collection.TraversableOnce.toArray splits.toArray[Array[Byte]]((ClassTag.apply[Array[Byte]](scala.runtime.ScalaRunTime.arrayClass(classOf[scala.Byte])): scala.reflect.ClassTag[Array[Byte]]))
72 1830 2939 - 2953 Block scala.collection.TraversableOnce.toArray splits.toArray[Array[Byte]]((ClassTag.apply[Array[Byte]](scala.runtime.ScalaRunTime.arrayClass(classOf[scala.Byte])): scala.reflect.ClassTag[Array[Byte]]))
72 1831 2866 - 2956 Apply scala.Function3.apply HBaseVersions.this._createTableAsync.apply(admin, descriptor, if (splits.isEmpty) null else splits.toArray[Array[Byte]]((ClassTag.apply[Array[Byte]](scala.runtime.ScalaRunTime.arrayClass(classOf[scala.Byte])): scala.reflect.ClassTag[Array[Byte]])))
81 1832 3155 - 3187 Apply scala.Function2.apply HBaseVersions.this._disableTableAsync.apply(admin, table)
88 1833 3364 - 3380 Apply scala.Function1.apply HBaseVersions.this._available.apply(conf)
173 1834 7619 - 7634 Apply java.lang.Class.getMethods clas.getMethods()
173 1835 7640 - 7657 Apply java.lang.Object.== x$5.getName().==(name)
173 1837 7619 - 7767 Apply scala.Option.getOrElse scala.Predef.refArrayOps[java.lang.reflect.Method](clas.getMethods()).find(((x$5: java.lang.reflect.Method) => x$5.getName().==(name))).getOrElse[java.lang.reflect.Method](throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find ", ".", " method").s(clas.getSimpleName(), name)))
174 1836 7677 - 7761 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find ", ".", " method").s(clas.getSimpleName(), name))
176 1838 7793 - 7817 Apply java.lang.reflect.Method.getParameterTypes method.getParameterTypes()
177 1839 7851 - 7852 Literal <nosymbol> 1
177 1840 7856 - 7884 Apply java.lang.Object.== scala.Predef.refArrayOps[Class[_]](parameterTypes).head.==(param)
177 1841 7826 - 7884 Apply scala.Boolean.&& parameterTypes.length.==(1).&&(scala.Predef.refArrayOps[Class[_]](parameterTypes).head.==(param))
178 1848 7894 - 8056 Function org.locationtech.geomesa.hbase.utils.HBaseVersions.$anonfun ((obj: Any, p: Any) => try { method.invoke(obj, p.asInstanceOf[AnyRef]); () } catch { case (e @ (_: java.lang.reflect.InvocationTargetException)) => throw e.getCause() })
179 1842 7941 - 7963 TypeApply scala.Any.asInstanceOf p.asInstanceOf[AnyRef]
179 1843 7922 - 7964 Apply java.lang.reflect.Method.invoke method.invoke(obj, p.asInstanceOf[AnyRef])
179 1844 7935 - 7935 Literal <nosymbol> ()
179 1845 7922 - 7964 Block <nosymbol> { method.invoke(obj, p.asInstanceOf[AnyRef]); () }
180 1846 8022 - 8038 Throw <nosymbol> throw e.getCause()
180 1847 8022 - 8038 Block <nosymbol> throw e.getCause()
184 1849 8076 - 8202 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find ", ".", " method with correct parameters: ", "").s(clas.getSimpleName(), name, method))
184 1850 8076 - 8202 Block <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find ", ".", " method with correct parameters: ", "").s(clas.getSimpleName(), name, method))