1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.streams
10 
11 import org.locationtech.geomesa.kafka.streams.MessageAction.MessageAction
12 
13 /**
14  * Data model for a GeoMesa data store message, used as the value in a Kafka record
15  *
16  * @param action message action
17  * @param attributes attributes of the simple feature represented by this message
18  * @param userData user data of the simple feature represented by this message
19  */
20 case class GeoMesaMessage(action: MessageAction, attributes: Seq[AnyRef], userData: Map[String, String] = Map.empty) {
21 
22   import scala.collection.JavaConverters._
23 
24   def asJava(): java.util.List[AnyRef] = attributes.asJava
25 }
26 
27 object GeoMesaMessage {
28 
29   import scala.collection.JavaConverters._
30 
31   /**
32    * Create an upsert message
33    *
34    * @param attributes feature attribute values
35    * @return
36    */
37   def upsert(attributes: Seq[AnyRef]): GeoMesaMessage = GeoMesaMessage(MessageAction.Upsert, attributes)
38 
39   /**
40    * Create an upsert message
41    *
42    * @param attributes feature attribute values
43    * @return
44    */
45   def upsert(attributes: java.util.List[AnyRef]): GeoMesaMessage = upsert(attributes.asScala.toSeq)
46 
47   /**
48    * Create an upsert message
49    *
50    * @param attributes feature attribute values
51    * @param userData feature user data
52    * @return
53    */
54   def upsert(attributes: Seq[AnyRef], userData: Map[String, String]): GeoMesaMessage =
55     GeoMesaMessage(MessageAction.Upsert, attributes, userData)
56 
57   /**
58    * Create an upsert message
59    *
60    * @param attributes feature attribute values
61    * @return
62    */
63   def upsert(attributes: java.util.List[AnyRef], userData: java.util.Map[String, String]): GeoMesaMessage =
64     upsert(attributes.asScala.toSeq, userData.asScala.toMap)
65 
66   /**
67    * Create a delete message
68    *
69    * @return
70    */
71   def delete(): GeoMesaMessage = GeoMesaMessage(MessageAction.Delete, Seq.empty)
72 }
Line Stmt Id Pos Tree Symbol Tests Code
24 2504 1081 - 1091 Select org.locationtech.geomesa.kafka.streams.GeoMesaMessage.attributes GeoMesaMessage.this.attributes
24 2505 1081 - 1098 Select scala.collection.convert.Decorators.AsJava.asJava scala.collection.JavaConverters.seqAsJavaListConverter[AnyRef](GeoMesaMessage.this.attributes).asJava
37 2506 1350 - 1370 Select org.locationtech.geomesa.kafka.streams.MessageAction.Upsert MessageAction.Upsert
37 2507 1335 - 1383 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessage.apply GeoMesaMessage.apply(MessageAction.Upsert, attributes, GeoMesaMessage.apply$default$3)
45 2508 1567 - 1591 Select scala.collection.SeqLike.toSeq scala.collection.JavaConverters.asScalaBufferConverter[AnyRef](attributes).asScala.toSeq
45 2509 1560 - 1592 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessage.upsert GeoMesaMessage.this.upsert(scala.collection.JavaConverters.asScalaBufferConverter[AnyRef](attributes).asScala.toSeq)
55 2510 1847 - 1867 Select org.locationtech.geomesa.kafka.streams.MessageAction.Upsert MessageAction.Upsert
55 2511 1832 - 1890 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessage.apply GeoMesaMessage.apply(MessageAction.Upsert, attributes, userData)
64 2512 2119 - 2143 Select scala.collection.SeqLike.toSeq scala.collection.JavaConverters.asScalaBufferConverter[AnyRef](attributes).asScala.toSeq
64 2513 2162 - 2162 TypeApply scala.Predef.$conforms scala.Predef.$conforms[(String, String)]
64 2514 2145 - 2167 ApplyToImplicitArgs scala.collection.TraversableOnce.toMap scala.collection.JavaConverters.mapAsScalaMapConverter[String, String](userData).asScala.toMap[String, String](scala.Predef.$conforms[(String, String)])
64 2515 2112 - 2168 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessage.upsert GeoMesaMessage.this.upsert(scala.collection.JavaConverters.asScalaBufferConverter[AnyRef](attributes).asScala.toSeq, scala.collection.JavaConverters.mapAsScalaMapConverter[String, String](userData).asScala.toMap[String, String](scala.Predef.$conforms[(String, String)]))
71 2516 2277 - 2297 Select org.locationtech.geomesa.kafka.streams.MessageAction.Delete MessageAction.Delete
71 2517 2299 - 2308 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
71 2518 2262 - 2309 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessage.apply GeoMesaMessage.apply(MessageAction.Delete, scala.collection.Seq.empty[Nothing], GeoMesaMessage.apply$default$3)