Extend & contribute
Connector SDK
The Connector SDK is the framework every DataFlow AI connector is built on. It lives in the connector-sdk library module under backend/platform/connector-sdk/, defines a single connector contract (ConnectorSDK.kt), provides three base classes for the common connector shapes, ships 21 production connector implementations — including a telecom-native CDR connector — and discovers them at runtime through a ConnectorRegistry. This page is the reference for the connector model and a step-by-step guide to writing your own.
The connector model
A connector links a pipeline step to an external system: it discovers schema, extracts data, and loads data. All connectors implement the ConnectorSDK interface, regardless of whether they speak JDBC, a REST API, a cloud-storage protocol, a binary file format, or a streaming protocol.
The package layout under com.polkomtel.dataflow.connector is:
| Package | Contents |
|---|---|
connector (root) | ConnectorSDK interface, ConnectorOperation, ConnectionTestResult |
connector.model | ConnectorConfig, Schema, DataStream, DataRecord, DataBatch, LoadResult, ConnectorCapability, StorageType, ConnectorException, EventStream |
connector.base | JDBCConnectorBase, NativeConnectorBase, FileConnectorBase |
connector.registry | ConnectorRegistry, ConnectorAutoConfiguration |
connector.impl.* | The 21 connector implementations, one sub-package each |
The ConnectorSDK interface
ConnectorSDK extends AutoCloseable. Most members carry default implementations, so a connector only overrides what it actually supports.
interface ConnectorSDK : AutoCloseable {
/** Unique identifier for this connector type (e.g. "postgresql", "csv"). */
val connectorType: String get() = javaClass.simpleName
/** Connector instance identifier — defaults to connectorType. */
fun connectorId(): String = connectorType
/** Human-readable name for the UI. */
fun connectorName(): String = connectorType
val displayName: String get() = connectorType
/** Discover the schema of the data source. */
fun discoverSchema(config: ConnectorConfig): Schema
/** Extract data from the source as a DataStream. */
fun extractData(config: ConnectorConfig): DataStream
/** Load data into the target destination. */
fun loadData(data: DataStream, config: ConnectorConfig): LoadResult
/** Validate the connector can reach the configured source/target. */
fun validateConnection(config: ConnectorConfig): Boolean
/** Capabilities supported by this connector. */
fun capabilities(): Set<ConnectorCapability>
/** Operations supported by this connector. */
fun supportedOperations(): Set<ConnectorOperation>
/** Test the connection and return detailed results. */
fun testConnection(config: ConnectorConfig): ConnectionTestResult
/** CDC event stream — throws UnsupportedOperationException unless overridden. */
fun getCDCEvents(config: ConnectorConfig): EventStream
/** Push-down SQL — throws UnsupportedOperationException unless overridden. */
fun pushDownSQL(sql: String, dialect: String, config: ConnectorConfig): ResultSet
/** Release resources. */
override fun close()
}
The two methods that are not implemented by default — discoverSchema and extractData — are the minimum a connector must provide. loadData defaults to writing zero records, getCDCEvents and pushDownSQL default to throwing UnsupportedOperationException, and testConnection defaults to timing a validateConnection call.
Default-rich contract
The interface is deliberately default-heavy. A read-only file connector can implement just discoverSchema and extractData; the framework supplies sensible no-op or exception-throwing behaviour for everything else. Declare what you support through capabilities() and supportedOperations() so the platform never calls a method you have not implemented.
Method responsibilities
| Method | Responsibility |
|---|---|
discoverSchema | Inspect the source and return a Schema of fields, types, and metadata |
extractData | Return a lazy, closeable DataStream of DataRecords read from the source |
loadData | Write a DataStream (or Iterable<DataRecord>) to the target, returning a LoadResult |
validateConnection | Cheap reachability check; backs testConnection by default |
testConnection | Detailed connectivity probe — backs POST /connections/{id}/test |
getCDCEvents | Return an EventStream of change events (CDC connectors only) |
pushDownSQL | Execute dialect-specific SQL at the source for push-down optimisation |
close | Release pooled connections, HTTP clients, and other resources |
Capabilities and operations
A connector advertises what it can do through two enums. The platform consults them before routing work to a connector.
ConnectorCapability
enum class ConnectorCapability {
SCHEMA_DISCOVERY, // Can discover schema from the source
READ, // Can extract/read data
WRITE, // Can load/write data
COLUMN_PROJECTION, // Can read a subset of columns
PREDICATE_PUSHDOWN, // Can filter at the source
SCHEMA_EVOLUTION, // Supports schema changes
STREAMING, // Supports streaming/incremental reads
PARTITIONING, // Supports partitioned data
COMPRESSION, // Supports compression
CLOUD_STORAGE, // Supports reading from cloud storage
BATCH_WRITE, // Supports batch writing
ENCODING_DETECTION, // Supports encoding detection
MULTI_TABLE // Multiple sheets/tables in one source
}
ConnectorOperation
enum class ConnectorOperation {
EXTRACT,
LOAD,
PUSHDOWN_SQL,
SCHEMA_DISCOVERY,
CDC,
STREAMING,
BULK_COPY,
HEARTBEAT
}
A capability describes what data shapes a connector handles; an operation describes what verbs it supports. The two base classes pre-declare a sensible operation set: JDBCConnectorBase declares EXTRACT, LOAD, PUSHDOWN_SQL, and SCHEMA_DISCOVERY; NativeConnectorBase declares EXTRACT, LOAD, and SCHEMA_DISCOVERY.
The configuration schema
Every connector method takes a ConnectorConfig — a single data class carrying connection details, format options, and extraction/loading parameters.
data class ConnectorConfig(
val path: String = "", // file path, gs:// / s3:// / abfss:// URI
val properties: Map<String, String> = emptyMap(), // format-specific options
val columnProjection: List<String>? = null,
val filterPredicate: String? = null,
val maxRecords: Long = -1, // -1 = unlimited
val batchSize: Int = 10_000,
val schemaSampleSize: Int = 100,
val storageType: StorageType? = null,
// ---- JDBC / database properties ----
val host: String = "",
val port: Int = 0,
val database: String = "",
val username: String = "",
val password: String = "",
val schema: String = "",
val query: String = "",
val table: String = "",
val fetchSize: Int = 1000,
val timeoutSeconds: Int = 30,
val ssl: Boolean = false,
val connectionId: String = ""
)
Configuration fields
| Field | Default | Purpose |
|---|---|---|
path | "" | Path or URI for file / cloud-storage connectors |
properties | empty map | Free-form, connector-specific options |
columnProjection | null | Project a subset of columns when supported |
filterPredicate | null | Source-side filter for predicate push-down |
maxRecords | -1 | Cap rows extracted; -1 means unlimited |
batchSize | 10000 | Write batch size |
schemaSampleSize | 100 | Rows sampled for schema inference |
host / port / database | "" / 0 / "" | JDBC connection coordinates |
username / password | "" | Connection credentials |
schema / table / query | "" | What to read or write |
fetchSize | 1000 | JDBC cursor fetch size |
timeoutSeconds | 30 | Connection timeout |
Helper accessors
ConnectorConfig provides typed accessors so connectors avoid string parsing:
config.propertyString("delimiter", ",") // String with default
config.propertyInt("maxRows", 1000) // Int with default
config.propertyBoolean("skipMalformed", true)
config.jdbcUrl("jdbc:postgresql://") // prefix + host:port/database
config.effectiveQuery() // configured query or SELECT * FROM table
config.qualifiedTableName() // schema.table when schema is set
LoadResult
loadData returns a LoadResult describing the outcome:
data class LoadResult(
val recordsWritten: Long = 0,
val recordsFailed: Long = 0,
val bytesWritten: Long = 0,
val duration: Duration = Duration.ZERO,
val outputPaths: List<String> = emptyList(),
val warnings: List<String> = emptyList(),
val success: Boolean = true,
val rowsRejected: Long = 0,
val errors: List<String> = emptyList(),
val targetTable: String = ""
)
ConnectionTestResult, returned by testConnection, is smaller:
data class ConnectionTestResult(
val success: Boolean,
val latencyMs: Long = 0,
val serverVersion: String? = null,
val message: String? = null
)
The base classes
Rather than implement ConnectorSDK from scratch, extend one of three base classes that match the common connector shapes.
| Base class | Extend when the source is… | Provides |
|---|---|---|
JDBCConnectorBase | Any SQL database reachable over JDBC | HikariCP pooling, metadata-driven schema discovery, batch insert load, push-down SQL |
NativeConnectorBase | A REST API, message queue, or other non-JDBC system | connect / disconnect / isConnected lifecycle hooks |
FileConnectorBase | A file format (CSV, JSON, XML, Excel, Parquet, CDR) | Storage abstraction over local FS, GCS, S3, and Azure Blob |
JDBCConnectorBase
A JDBC connector supplies only three things — the driver class, the URL prefix, and the default port — and inherits a full connector:
abstract class JDBCConnectorBase : ConnectorSDK {
abstract fun driverClassName(): String
abstract fun jdbcPrefix(): String
abstract fun defaultPort(): Int
// discoverSchema, extractData, loadData, pushDownSQL, testConnection are inherited
}
It builds a HikariDataSource with maximumPoolSize = 10, minimumIdle = 2, idleTimeout = 600000, and maxLifetime = 1800000, copying every entry of config.properties as a data-source property. discoverSchema walks DatabaseMetaData for TABLE and VIEW objects, mapping java.sql.Types to the DataFlow DataType enum. extractData opens a streaming ResultSet with the configured fetchSize; loadData runs batched INSERT statements inside a transaction with rollback on failure.
NativeConnectorBase
Native connectors implement an explicit connection lifecycle plus the do* work methods:
abstract class NativeConnectorBase : ConnectorSDK {
abstract fun connect(config: ConnectorConfig)
abstract fun disconnect()
abstract fun isConnected(): Boolean
abstract fun doDiscoverSchema(config: ConnectorConfig): Schema
abstract fun doExtractData(config: ConnectorConfig): DataStream
abstract fun doLoadData(data: DataStream, config: ConnectorConfig): LoadResult
}
The base class wraps each public method so connect/disconnect always bracket the work, and testConnection measures connect latency.
FileConnectorBase
File connectors implement format-specific inferSchema, readRecords, and writeRecords. The base class supplies a StorageAdapter so the same connector reads from a local path, gs://, s3://, or abfss:// URI transparently.
The 21 built-in connectors
The SDK ships 21 connector implementations under connector.impl.*. Most are auto-discovered via META-INF/services; DebeziumCdcConnector is registered through Spring DI because it needs dependency injection.
| Category | Connectors |
|---|---|
| Databases (JDBC) | PostgreSQL, MySQL, MSSQL, Oracle, Sybase, Teradata, SAP HANA |
| Cloud warehouses | BigQuery, Databricks, Snowflake |
| NoSQL | MongoDB |
| File formats | CSV, JSON, XML, Excel, Parquet |
| Streaming & messaging | Kafka, Pub/Sub, Event Hubs |
| SaaS / ERP | Salesforce, ServiceNow, SAP, REST API |
| Telecom | CDR (ASN.1) |
Cross-cutting CDC (change-data-capture) support is provided through Debezium for MySQL binlog, MSSQL change tracking, and MongoDB change streams.
EventHubs and Pub/Sub
The eventhubs and pubsub connector classes exist under connector.impl.* but are temporarily commented out of META-INF/services while a porting task completes — they re-register automatically once that work lands.
The telecom CDR connector
The cdr/ sub-package holds a connector built specifically for Polkomtel's telecom Call Detail Records. CdrConnector extends FileConnectorBase, has connectorType = "cdr-asn1", displayName = "Telecom CDR (ASN.1)", and declares the capabilities SCHEMA_DISCOVERY, READ, STREAMING, and CLOUD_STORAGE. It is read-only — writeRecords throws, directing callers to the CdrParquetConverter for Parquet output.
The decoding pipeline is:
Binary CDR file
→ StorageAdapter.openForRead() (local / GCS / S3 / Azure)
→ Asn1Decoder.decodeStream() (BER/DER TLV parsing)
→ CdrBinaryDecoder.decodeStream() (template-driven field extraction)
→ CdrConnector.cdrRecordToDataRecord
→ DataStream (lazy, closeable sequence)
It ships three roaming-record templates, validated by a RoamingTemplateValidator:
| Template | Telecom format |
|---|---|
Tap3Template | TAP3 — Transferred Account Procedure, the standard for billing roaming usage between operators |
RapTemplate | RAP — Returned Account Procedure, used to return rejected TAP records |
NrtrdeTemplate | NRTRDE — Near Real Time Roaming Data Exchange, for fast fraud detection on roaming usage |
CDR connector configuration is supplied through properties:
| Property | Default | Description |
|---|---|---|
templateType | auto | CDR template type (e.g. CS_VOICE_MO, or auto to detect) |
templateVersion | 3GPP-R15 | Template version |
batchSize | 10000 | Processing batch size |
skipMalformed | true | Skip malformed records instead of failing |
filePattern | *.cdr | File glob for directory scanning |
sourceSwitch | (derived) | Source switch ID; otherwise derived from the file name |
Supported file extensions are .cdr, .asn1, .dat, and .bin. When templateType=auto, the connector reads the first ASN.1 structure, matches it against known templates, and resets the stream so no record is consumed.
Telecom-native decoding
TAP3, RAP, and NRTRDE are not generic file formats — they are the specific binary roaming-record formats used by mobile operators. The CDR connector decodes them directly with ASN.1, so Polkomtel can ingest roaming usage and run fraud checks without a separate decoding stage.
The connector lifecycle
A connector instance follows the same lifecycle every time the pipeline engine uses it.
| Phase | What happens |
|---|---|
| Discovery | At startup, ConnectorAutoConfiguration merges the ServiceLoader and Spring-DI discovery paths into the ConnectorRegistry |
| Lookup | The engine calls registry.getConnector(connectorId); the registry invokes the stored factory to produce an instance |
| Test | testConnection(config) runs when a user clicks Test in the connection wizard or calls POST /connections/{id}/test |
| Schema | discoverSchema(config) is called during pipeline design to populate field lists |
| Run | extractData (sources) or loadData (targets) is invoked during pipeline execution |
| Close | close() releases pools, HTTP clients, and other resources |
DataStream returned by extractData is lazy and carries its own Closeables — the engine drains it and then closes it, which in turn closes the underlying ResultSet, file handle, or HTTP response.
Building a custom connector
The pipeline engine supports pluggable connectors. The example below adds a generic REST API connector by extending NativeConnectorBase.
Step 1 — Create the connector class
package com.polkomtel.dataflow.connector.impl.myrest
import com.polkomtel.dataflow.connector.base.NativeConnectorBase
import com.polkomtel.dataflow.connector.model.*
class MyRestApiConnector : NativeConnectorBase() {
override val connectorType = "my-rest-api"
override val displayName = "My REST API"
private var client: java.net.http.HttpClient? = null
private var baseUrl: String = ""
override fun connect(config: ConnectorConfig) {
baseUrl = config.host.ifBlank {
throw ConnectorException("Base URL (host) is required")
}
client = java.net.http.HttpClient.newBuilder()
.connectTimeout(java.time.Duration.ofSeconds(config.timeoutSeconds.toLong()))
.build()
}
override fun disconnect() { client = null }
override fun isConnected(): Boolean = client != null
override fun capabilities(): Set<ConnectorCapability> = setOf(
ConnectorCapability.SCHEMA_DISCOVERY,
ConnectorCapability.READ,
ConnectorCapability.WRITE
)
override fun doDiscoverSchema(config: ConnectorConfig): Schema {
// Sample the endpoint response and infer fields
return Schema(fields = emptyList())
}
override fun doExtractData(config: ConnectorConfig): DataStream {
// GET config.table (the endpoint path) and map JSON to DataRecords
return DataStream.empty(Schema(fields = emptyList()))
}
override fun doLoadData(data: DataStream, config: ConnectorConfig): LoadResult {
// POST records to the endpoint
return LoadResult(recordsWritten = 0)
}
}
Step 2 — Register the connector
For a plain connector with a public no-arg constructor, add its fully qualified class name to the ServiceLoader manifest:
# backend/platform/connector-sdk/src/main/resources/
# META-INF/services/com.polkomtel.dataflow.connector.ConnectorSDK
com.polkomtel.dataflow.connector.impl.myrest.MyRestApiConnector
ConnectorRegistry.discoverConnectors() picks it up at startup via java.util.ServiceLoader.
If the connector needs dependency injection, annotate it @Component instead — ConnectorAutoConfiguration registers every Spring-managed ConnectorSDK bean using the bean itself as the factory result. This is the path DebeziumCdcConnector uses.
Step 3 — Programmatic registration
You can also register a connector by hand against the singleton registry:
val registry = ConnectorRegistry.getInstance()
registry.register("my-rest-api") { MyRestApiConnector() }
// Use it
val connector = registry.getConnector("my-rest-api")
val schema = connector.discoverSchema(config)
The registry stores a factory (() -> ConnectorSDK), so each getConnector call yields a fresh instance — important because connectors hold per-connection state such as a Hikari pool.
Testing a connector
Connector tests live alongside their implementation and run with the standard Gradle test task.
class MyRestApiConnectorTest {
@Test
fun `test connection succeeds with a valid base URL`() {
val connector = MyRestApiConnector()
val result = connector.testConnection(
ConnectorConfig(host = "https://jsonplaceholder.typicode.com")
)
assertTrue(result.success)
}
@Test
fun `capabilities advertise read and write`() {
val caps = MyRestApiConnector().capabilities()
assertTrue(ConnectorCapability.READ in caps)
assertTrue(ConnectorCapability.WRITE in caps)
}
}
Run the SDK module's tests:
./gradlew :platform:connector-sdk:test
| Test target | Command |
|---|---|
| Connector SDK unit tests | ./gradlew :platform:connector-sdk:test |
| All platform unit tests | ./gradlew test |
| Integration tests (Docker required) | ./gradlew integrationTest |
Test against real systems
A connector test that mocks the JDBC driver or HTTP client proves nothing about connectivity. Point testConnection and discoverSchema tests at a real database container, a public sandbox API, or a local file fixture so the test exercises the actual decode and connection path.
Packaging
The SDK is a Gradle library module. A connector you add to connector-sdk ships as part of that module's JAR; no separate plugin packaging is needed.
| Concern | How it is handled |
|---|---|
| Build | ./gradlew :platform:connector-sdk:build produces the module JAR |
| JDBC drivers | Declared runtimeOnly so they are on the runtime classpath but not the compile classpath |
| Dependency versions | Pinned in the Gradle version catalog gradle/libs.versions.toml |
| Connection pooling | HikariCP 6.2.1, shared by all JDBC connectors via JDBCConnectorBase |
| Discovery manifest | META-INF/services/com.polkomtel.dataflow.connector.ConnectorSDK |
Because JDBC drivers are declared runtimeOnly, adding a database connector means adding one catalog entry plus one runtimeOnly(libs....) line — the driver is resolved at runtime and never leaks into compile-time API. The full set of driver coordinates, connection strings, and HikariCP tuning lives in the Connector SDK Reference.
Reference summary
| Topic | Where it lives |
|---|---|
| Connector contract | connector/ConnectorSDK.kt |
| Capabilities / operations | connector/model/ConnectorCapability.kt, connector/ConnectorOperation.kt |
| Configuration | connector/model/ConnectorConfig.kt (ConnectorConfig, LoadResult) |
| Base classes | connector/base/ (JDBCConnectorBase, NativeConnectorBase, FileConnectorBase) |
| Registry | connector/registry/ (ConnectorRegistry, ConnectorAutoConfiguration) |
| Built-in connectors | connector/impl/* — 21 implementations |
| Telecom CDR | connector/impl/cdr/ — CdrConnector, Asn1Decoder, TAP3/RAP/NRTRDE templates |
| Discovery manifest | META-INF/services/com.polkomtel.dataflow.connector.ConnectorSDK |