Skip to content

Commit 5380bae

Browse files
committed
update Bulk api to support file as a source
1 parent fc309ff commit 5380bae

File tree

3 files changed

+606
-1
lines changed

3 files changed

+606
-1
lines changed

core/build.sbt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,15 @@ val mockito = Seq(
3030
"org.mockito" %% "mockito-scala" % "1.17.12" % Test
3131
)
3232

33+
// Parquet & Avro
34+
val avro = Seq(
35+
"org.apache.parquet" % "parquet-avro" % "1.15.2",
36+
"org.apache.avro" % "avro" % "1.11.4",
37+
"org.apache.hadoop" % "hadoop-common" % "3.4.2",
38+
)
39+
3340
libraryDependencies ++= akka ++ typesafeConfig ++ http ++
34-
json4s ++ mockito :+ "com.google.code.gson" % "gson" % Versions.gson :+
41+
json4s ++ mockito ++ avro :+ "com.google.code.gson" % "gson" % Versions.gson :+
3542
"com.typesafe.scala-logging" %% "scala-logging" % Versions.scalaLogging :+
3643
"org.scalatest" %% "scalatest" % Versions.scalatest % Test
3744

core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import akka.actor.ActorSystem
2121
import akka.stream.{FlowShape, Materializer}
2222
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
2323
import app.softnetwork.elastic.client.bulk._
24+
import app.softnetwork.elastic.client.file._
2425
import app.softnetwork.elastic.client.result.{ElasticResult, ElasticSuccess}
26+
27+
import org.apache.hadoop.conf.Configuration
28+
2529
import org.json4s.DefaultFormats
2630
import org.json4s.jackson.JsonMethods.parse
2731

@@ -41,6 +45,142 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
4145
// PUBLIC METHODS
4246
// ========================================================================
4347

48+
/** Bulk from a Parquet or JSON file with automatic detection
49+
*
50+
* @param filePath
51+
* path to the file (.parquet, .json, .jsonl)
52+
* @param indexKey
53+
* JSON key to extract the index
54+
* @param idKey
55+
* JSON key to extract the ID
56+
* @param suffixDateKey
57+
* JSON key to append a date to the index
58+
* @param suffixDatePattern
59+
* date formatting pattern
60+
* @param update
61+
* true for upsert, false for index
62+
* @param delete
63+
* true for delete
64+
* @param parentIdKey
65+
* JSON key for the parent
66+
* @param callbacks
67+
* callbacks for events
68+
* @param bufferSize
69+
* read buffer size
70+
* @param validateJson
71+
* validate each JSON line
72+
* @param skipInvalid
73+
* ignore invalid JSON lines
74+
* @param format
75+
* file format (auto-detection if Unknown)
76+
* @param hadoopConf
77+
* custom Hadoop configuration
78+
* @param bulkOptions
79+
* configuration options
80+
* @return
81+
* Future with the detailed result
82+
*/
83+
def bulkFromFile(
84+
filePath: String,
85+
indexKey: Option[String] = None,
86+
idKey: Option[String] = None,
87+
suffixDateKey: Option[String] = None,
88+
suffixDatePattern: Option[String] = None,
89+
update: Option[Boolean] = None,
90+
delete: Option[Boolean] = None,
91+
parentIdKey: Option[String] = None,
92+
callbacks: BulkCallbacks = BulkCallbacks.default,
93+
bufferSize: Int = 500,
94+
validateJson: Boolean = true,
95+
skipInvalid: Boolean = true,
96+
format: FileFormat = Unknown,
97+
hadoopConf: Option[Configuration] = None
98+
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {
99+
100+
implicit val ec: ExecutionContext = system.dispatcher
101+
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)
102+
103+
logger.info(s"📁 Starting bulk from file: $filePath")
104+
105+
val source: Source[String, NotUsed] = if (validateJson) {
106+
FileSourceFactory.fromFileValidated(filePath, bufferSize, skipInvalid, format)
107+
} else {
108+
FileSourceFactory.fromFile(filePath, bufferSize, format)
109+
}
110+
111+
// Use the existing API with the file source
112+
bulkWithResult(
113+
items = source,
114+
toDocument = _, // The document is already in JSON format.
115+
indexKey = indexKey,
116+
idKey = idKey,
117+
suffixDateKey = suffixDateKey,
118+
suffixDatePattern = suffixDatePattern,
119+
update = update,
120+
delete = delete,
121+
parentIdKey = parentIdKey,
122+
callbacks = callbacks
123+
)
124+
}
125+
126+
/** Bulk from a Parquet file specifically
127+
*/
128+
def bulkFromParquet(
129+
filePath: String,
130+
indexKey: Option[String] = None,
131+
idKey: Option[String] = None,
132+
callbacks: BulkCallbacks = BulkCallbacks.default,
133+
bufferSize: Int = 500,
134+
hadoopConf: Option[Configuration] = None
135+
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {
136+
137+
implicit val ec: ExecutionContext = system.dispatcher
138+
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)
139+
140+
logger.info(s"📁 Starting bulk from Parquet file: $filePath")
141+
142+
bulkWithResult(
143+
items = ParquetFileSource.fromFile(filePath, bufferSize),
144+
toDocument = _,
145+
indexKey = indexKey,
146+
idKey = idKey,
147+
callbacks = callbacks
148+
)
149+
}
150+
151+
/** Bulk from a specific JSON file
152+
*/
153+
def bulkFromJson(
154+
filePath: String,
155+
indexKey: Option[String] = None,
156+
idKey: Option[String] = None,
157+
callbacks: BulkCallbacks = BulkCallbacks.default,
158+
bufferSize: Int = 500,
159+
validateJson: Boolean = true,
160+
skipInvalid: Boolean = true,
161+
hadoopConf: Option[Configuration] = None
162+
)(implicit bulkOptions: BulkOptions, system: ActorSystem): Future[BulkResult] = {
163+
164+
implicit val ec: ExecutionContext = system.dispatcher
165+
implicit val conf: Configuration = hadoopConf.getOrElse(hadoopConfiguration)
166+
167+
logger.info(s"📁 Starting bulk from JSON file: $filePath")
168+
169+
val source = if (validateJson) {
170+
JsonFileSource.fromFileValidated(filePath, bufferSize, skipInvalid)
171+
} else {
172+
JsonFileSource.fromFile(filePath, bufferSize)
173+
}
174+
175+
bulkWithResult(
176+
items = source,
177+
toDocument = _,
178+
indexKey = indexKey,
179+
idKey = idKey,
180+
callbacks = callbacks
181+
)
182+
}
183+
44184
/** Bulk with detailed results (successes + failures).
45185
*
46186
* This method provides:

0 commit comments

Comments
 (0)