Skip to content

Commit 370d3ca

Browse files
committed
Merge branch 'feature/update-lineage-smt'
2 parents 73d863a + 93816f4 commit 370d3ca

File tree

2 files changed

+18
-36
lines changed

2 files changed

+18
-36
lines changed
Binary file not shown.

resources/plugins/smt/ol-kafka-connect-ext/src/main/kotlin/io/factorhouse/smt/OpenLineageLifecycleSmt.kt

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@ class OpenLineageLifecycleSmt<R : ConnectRecord<R>> : Transformation<R> {
2626
private lateinit var runId: UUID
2727
private lateinit var connectorClass: String
2828
private lateinit var topics: List<String>
29-
private lateinit var icebergTables: List<String>
29+
private lateinit var datasetNames: List<String>
3030

3131
// Sink-specific properties
32-
private var s3Bucket: String? = null
33-
private var icebergCatalog: String? = null
34-
private var icebergCatalogUri: String? = null
32+
private var datasetNamespace: String? = null
3533

3634
// State Machine Flags
3735
private var isConfigured: Boolean = false
@@ -50,10 +48,8 @@ class OpenLineageLifecycleSmt<R : ConnectRecord<R>> : Transformation<R> {
5048
private const val CONNECTOR_NAME_CONFIG = "connector.name"
5149
private const val CONNECTOR_CLASS_CONFIG = "connector.class"
5250
private const val TOPICS_CONFIG = "topics"
53-
private const val S3_BUCKET_CONFIG = "s3.bucket.name"
54-
private const val ICEBERG_CATALOG_CONFIG = "iceberg.catalog"
55-
private const val ICEBERG_CATALOG_URI_CONFIG = "iceberg.catalog.uri"
56-
private const val ICEBERG_TABLES_CONFIG = "iceberg.tables"
51+
private const val DATASET_NAMESPACE_CONFIG = "dataset.namespace"
52+
private const val DATASET_NAMES_CONFIG = "dataset.names"
5753

5854
val CONFIG_DEF: ConfigDef =
5955
ConfigDef()
@@ -76,29 +72,17 @@ class OpenLineageLifecycleSmt<R : ConnectRecord<R>> : Transformation<R> {
7672
ConfigDef.Importance.HIGH,
7773
"Comma-separated list of Kafka topics used by the connector.",
7874
).define(
79-
S3_BUCKET_CONFIG,
75+
DATASET_NAMESPACE_CONFIG,
8076
ConfigDef.Type.STRING,
8177
"",
8278
ConfigDef.Importance.MEDIUM,
83-
"The S3 bucket name for S3 sink connectors.",
79+
"The dataset namespace of a sink connector",
8480
).define(
85-
ICEBERG_CATALOG_CONFIG,
81+
DATASET_NAMES_CONFIG,
8682
ConfigDef.Type.STRING,
8783
"",
8884
ConfigDef.Importance.MEDIUM,
89-
"The Iceberg catalog name for Iceberg sink connectors.",
90-
).define(
91-
ICEBERG_CATALOG_URI_CONFIG,
92-
ConfigDef.Type.STRING,
93-
"",
94-
ConfigDef.Importance.MEDIUM,
95-
"The full URI of the Iceberg catalog (e.g., thrift://localhost:9083).",
96-
).define(
97-
ICEBERG_TABLES_CONFIG,
98-
ConfigDef.Type.STRING,
99-
"",
100-
ConfigDef.Importance.MEDIUM,
101-
"Comma-separated list of target Iceberg tables, corresponding to the 'topics' list.",
85+
"Comma-separated list of dataset names corresponding to 'topics' list.",
10286
).define(
10387
KEY_PREFIX + "schema.read",
10488
ConfigDef.Type.BOOLEAN,
@@ -173,10 +157,8 @@ class OpenLineageLifecycleSmt<R : ConnectRecord<R>> : Transformation<R> {
173157
?.split(",")
174158
?.map { it.trim() }
175159
?.filter { it.isNotEmpty() } ?: emptyList()
176-
this.s3Bucket = configs[S3_BUCKET_CONFIG]?.toString()
177-
this.icebergCatalog = configs[ICEBERG_CATALOG_CONFIG]?.toString()
178-
this.icebergCatalogUri = configs[ICEBERG_CATALOG_URI_CONFIG]?.toString()
179-
this.icebergTables = configs[ICEBERG_TABLES_CONFIG]
160+
this.datasetNamespace = configs[DATASET_NAMESPACE_CONFIG]?.toString()
161+
this.datasetNames = configs[DATASET_NAMES_CONFIG]
180162
?.toString()
181163
?.split(",")
182164
?.map { it.trim() }
@@ -344,8 +326,8 @@ class OpenLineageLifecycleSmt<R : ConnectRecord<R>> : Transformation<R> {
344326
return when {
345327
isSource() -> topics.map { topic -> buildKafkaDataset(topic) as OpenLineage.OutputDataset }
346328
isSink("S3Sink") -> {
347-
val bucket = this.s3Bucket ?: "unknown"
348-
val dsFacet = ol.newDatasourceDatasetFacet("s3", URI.create("s3://$bucket"))
329+
val dsNamespace = this.datasetNamespace ?: "unknown"
330+
val dsFacet = ol.newDatasourceDatasetFacet("s3", URI.create(dsNamespace))
349331
val facets =
350332
ol
351333
.newDatasetFacetsBuilder()
@@ -355,26 +337,26 @@ class OpenLineageLifecycleSmt<R : ConnectRecord<R>> : Transformation<R> {
355337
topics.map { topic ->
356338
ol
357339
.newOutputDatasetBuilder()
358-
.namespace("s3://$bucket")
340+
.namespace(dsNamespace)
359341
.name(topic)
360342
.facets(facets)
361343
.build()
362344
}
363345
}
364346
isSink("IcebergSink") -> {
365-
val icebergNamespace = this.icebergCatalogUri ?: "iceberg://${this.icebergCatalog ?: "unknown"}"
366-
val dsFacet = ol.newDatasourceDatasetFacet("iceberg", URI.create(icebergNamespace))
347+
val dsNamespace = this.datasetNamespace ?: "unknown"
348+
val dsFacet = ol.newDatasourceDatasetFacet("iceberg", URI.create(dsNamespace))
367349
val facets =
368350
ol
369351
.newDatasetFacetsBuilder()
370352
.schema(inputTopicSchema)
371353
.dataSource(dsFacet)
372354
.build()
373-
icebergTables.map { table ->
355+
datasetNames.map { ds ->
374356
ol
375357
.newOutputDatasetBuilder()
376-
.namespace(icebergNamespace)
377-
.name(table)
358+
.namespace(dsNamespace)
359+
.name(ds)
378360
.facets(facets)
379361
.build()
380362
}

0 commit comments

Comments
 (0)