Extend & contribute

Connector SDK

The Connector SDK is the framework every DataFlow AI connector is built on. It lives in the connector-sdk library module under backend/platform/connector-sdk/, defines a single connector contract (ConnectorSDK.kt), provides three base classes for the common connector shapes, ships 21 production connector implementations — including a telecom-native CDR connector — and discovers them at runtime through a ConnectorRegistry. This page is the reference for the connector model and a step-by-step guide to writing your own.


The connector model

A connector links a pipeline step to an external system: it discovers schema, extracts data, and loads data. All connectors implement the ConnectorSDK interface, regardless of whether they speak JDBC, a REST API, a cloud-storage protocol, a binary file format, or a streaming protocol.

The package layout under com.polkomtel.dataflow.connector is:

PackageContents
connector (root)ConnectorSDK interface, ConnectorOperation, ConnectionTestResult
connector.modelConnectorConfig, Schema, DataStream, DataRecord, DataBatch, LoadResult, ConnectorCapability, StorageType, ConnectorException, EventStream
connector.baseJDBCConnectorBase, NativeConnectorBase, FileConnectorBase
connector.registryConnectorRegistry, ConnectorAutoConfiguration
connector.impl.*The 21 connector implementations, one sub-package each

The ConnectorSDK interface

ConnectorSDK extends AutoCloseable. Most members carry default implementations, so a connector only overrides what it actually supports.

interface ConnectorSDK : AutoCloseable {

    /** Unique identifier for this connector type (e.g. "postgresql", "csv"). */
    val connectorType: String get() = javaClass.simpleName

    /** Connector instance identifier — defaults to connectorType. */
    fun connectorId(): String = connectorType

    /** Human-readable name for the UI. */
    fun connectorName(): String = connectorType
    val displayName: String get() = connectorType

    /** Discover the schema of the data source. */
    fun discoverSchema(config: ConnectorConfig): Schema

    /** Extract data from the source as a DataStream. */
    fun extractData(config: ConnectorConfig): DataStream

    /** Load data into the target destination. */
    fun loadData(data: DataStream, config: ConnectorConfig): LoadResult

    /** Validate the connector can reach the configured source/target. */
    fun validateConnection(config: ConnectorConfig): Boolean

    /** Capabilities supported by this connector. */
    fun capabilities(): Set<ConnectorCapability>

    /** Operations supported by this connector. */
    fun supportedOperations(): Set<ConnectorOperation>

    /** Test the connection and return detailed results. */
    fun testConnection(config: ConnectorConfig): ConnectionTestResult

    /** CDC event stream — throws UnsupportedOperationException unless overridden. */
    fun getCDCEvents(config: ConnectorConfig): EventStream

    /** Push-down SQL — throws UnsupportedOperationException unless overridden. */
    fun pushDownSQL(sql: String, dialect: String, config: ConnectorConfig): ResultSet

    /** Release resources. */
    override fun close()
}

The two methods that are not implemented by default — discoverSchema and extractData — are the minimum a connector must provide. loadData defaults to writing zero records, getCDCEvents and pushDownSQL default to throwing UnsupportedOperationException, and testConnection defaults to timing a validateConnection call.

Default-rich contract

The interface is deliberately default-heavy. A read-only file connector can implement just discoverSchema and extractData; the framework supplies sensible no-op or exception-throwing behaviour for everything else. Declare what you support through capabilities() and supportedOperations() so the platform never calls a method you have not implemented.

Method responsibilities

MethodResponsibility
discoverSchemaInspect the source and return a Schema of fields, types, and metadata
extractDataReturn a lazy, closeable DataStream of DataRecords read from the source
loadDataWrite a DataStream (or Iterable<DataRecord>) to the target, returning a LoadResult
validateConnectionCheap reachability check; backs testConnection by default
testConnectionDetailed connectivity probe — backs POST /connections/{id}/test
getCDCEventsReturn an EventStream of change events (CDC connectors only)
pushDownSQLExecute dialect-specific SQL at the source for push-down optimisation
closeRelease pooled connections, HTTP clients, and other resources

Capabilities and operations

A connector advertises what it can do through two enums. The platform consults them before routing work to a connector.

ConnectorCapability

enum class ConnectorCapability {
    SCHEMA_DISCOVERY,    // Can discover schema from the source
    READ,                // Can extract/read data
    WRITE,               // Can load/write data
    COLUMN_PROJECTION,   // Can read a subset of columns
    PREDICATE_PUSHDOWN,  // Can filter at the source
    SCHEMA_EVOLUTION,    // Supports schema changes
    STREAMING,           // Supports streaming/incremental reads
    PARTITIONING,        // Supports partitioned data
    COMPRESSION,         // Supports compression
    CLOUD_STORAGE,       // Supports reading from cloud storage
    BATCH_WRITE,         // Supports batch writing
    ENCODING_DETECTION,  // Supports encoding detection
    MULTI_TABLE          // Multiple sheets/tables in one source
}

ConnectorOperation

enum class ConnectorOperation {
    EXTRACT,
    LOAD,
    PUSHDOWN_SQL,
    SCHEMA_DISCOVERY,
    CDC,
    STREAMING,
    BULK_COPY,
    HEARTBEAT
}

A capability describes what data shapes a connector handles; an operation describes what verbs it supports. The two base classes pre-declare a sensible operation set: JDBCConnectorBase declares EXTRACT, LOAD, PUSHDOWN_SQL, and SCHEMA_DISCOVERY; NativeConnectorBase declares EXTRACT, LOAD, and SCHEMA_DISCOVERY.


The configuration schema

Every connector method takes a ConnectorConfig — a single data class carrying connection details, format options, and extraction/loading parameters.

data class ConnectorConfig(
    val path: String = "",                    // file path, gs:// / s3:// / abfss:// URI
    val properties: Map<String, String> = emptyMap(), // format-specific options
    val columnProjection: List<String>? = null,
    val filterPredicate: String? = null,
    val maxRecords: Long = -1,                 // -1 = unlimited
    val batchSize: Int = 10_000,
    val schemaSampleSize: Int = 100,
    val storageType: StorageType? = null,
    // ---- JDBC / database properties ----
    val host: String = "",
    val port: Int = 0,
    val database: String = "",
    val username: String = "",
    val password: String = "",
    val schema: String = "",
    val query: String = "",
    val table: String = "",
    val fetchSize: Int = 1000,
    val timeoutSeconds: Int = 30,
    val ssl: Boolean = false,
    val connectionId: String = ""
)

Configuration fields

FieldDefaultPurpose
path""Path or URI for file / cloud-storage connectors
propertiesempty mapFree-form, connector-specific options
columnProjectionnullProject a subset of columns when supported
filterPredicatenullSource-side filter for predicate push-down
maxRecords-1Cap rows extracted; -1 means unlimited
batchSize10000Write batch size
schemaSampleSize100Rows sampled for schema inference
host / port / database"" / 0 / ""JDBC connection coordinates
username / password""Connection credentials
schema / table / query""What to read or write
fetchSize1000JDBC cursor fetch size
timeoutSeconds30Connection timeout

Helper accessors

ConnectorConfig provides typed accessors so connectors avoid string parsing:

config.propertyString("delimiter", ",")     // String with default
config.propertyInt("maxRows", 1000)         // Int with default
config.propertyBoolean("skipMalformed", true)
config.jdbcUrl("jdbc:postgresql://")        // prefix + host:port/database
config.effectiveQuery()                     // configured query or SELECT * FROM table
config.qualifiedTableName()                 // schema.table when schema is set

LoadResult

loadData returns a LoadResult describing the outcome:

data class LoadResult(
    val recordsWritten: Long = 0,
    val recordsFailed: Long = 0,
    val bytesWritten: Long = 0,
    val duration: Duration = Duration.ZERO,
    val outputPaths: List<String> = emptyList(),
    val warnings: List<String> = emptyList(),
    val success: Boolean = true,
    val rowsRejected: Long = 0,
    val errors: List<String> = emptyList(),
    val targetTable: String = ""
)

ConnectionTestResult, returned by testConnection, is smaller:

data class ConnectionTestResult(
    val success: Boolean,
    val latencyMs: Long = 0,
    val serverVersion: String? = null,
    val message: String? = null
)

The base classes

Rather than implement ConnectorSDK from scratch, extend one of three base classes that match the common connector shapes.

Base classExtend when the source is…Provides
JDBCConnectorBaseAny SQL database reachable over JDBCHikariCP pooling, metadata-driven schema discovery, batch insert load, push-down SQL
NativeConnectorBaseA REST API, message queue, or other non-JDBC systemconnect / disconnect / isConnected lifecycle hooks
FileConnectorBaseA file format (CSV, JSON, XML, Excel, Parquet, CDR)Storage abstraction over local FS, GCS, S3, and Azure Blob

JDBCConnectorBase

A JDBC connector supplies only three things — the driver class, the URL prefix, and the default port — and inherits a full connector:

abstract class JDBCConnectorBase : ConnectorSDK {
    abstract fun driverClassName(): String
    abstract fun jdbcPrefix(): String
    abstract fun defaultPort(): Int
    // discoverSchema, extractData, loadData, pushDownSQL, testConnection are inherited
}

It builds a HikariDataSource with maximumPoolSize = 10, minimumIdle = 2, idleTimeout = 600000, and maxLifetime = 1800000, copying every entry of config.properties as a data-source property. discoverSchema walks DatabaseMetaData for TABLE and VIEW objects, mapping java.sql.Types to the DataFlow DataType enum. extractData opens a streaming ResultSet with the configured fetchSize; loadData runs batched INSERT statements inside a transaction with rollback on failure.

NativeConnectorBase

Native connectors implement an explicit connection lifecycle plus the do* work methods:

abstract class NativeConnectorBase : ConnectorSDK {
    abstract fun connect(config: ConnectorConfig)
    abstract fun disconnect()
    abstract fun isConnected(): Boolean

    abstract fun doDiscoverSchema(config: ConnectorConfig): Schema
    abstract fun doExtractData(config: ConnectorConfig): DataStream
    abstract fun doLoadData(data: DataStream, config: ConnectorConfig): LoadResult
}

The base class wraps each public method so connect/disconnect always bracket the work, and testConnection measures connect latency.

FileConnectorBase

File connectors implement format-specific inferSchema, readRecords, and writeRecords. The base class supplies a StorageAdapter so the same connector reads from a local path, gs://, s3://, or abfss:// URI transparently.


The 21 built-in connectors

The SDK ships 21 connector implementations under connector.impl.*. Most are auto-discovered via META-INF/services; DebeziumCdcConnector is registered through Spring DI because it needs dependency injection.

CategoryConnectors
Databases (JDBC)PostgreSQL, MySQL, MSSQL, Oracle, Sybase, Teradata, SAP HANA
Cloud warehousesBigQuery, Databricks, Snowflake
NoSQLMongoDB
File formatsCSV, JSON, XML, Excel, Parquet
Streaming & messagingKafka, Pub/Sub, Event Hubs
SaaS / ERPSalesforce, ServiceNow, SAP, REST API
TelecomCDR (ASN.1)

Cross-cutting CDC (change-data-capture) support is provided through Debezium for MySQL binlog, MSSQL change tracking, and MongoDB change streams.

EventHubs and Pub/Sub

The eventhubs and pubsub connector classes exist under connector.impl.* but are temporarily commented out of META-INF/services while a porting task completes — they re-register automatically once that work lands.

The telecom CDR connector

The cdr/ sub-package holds a connector built specifically for Polkomtel's telecom Call Detail Records. CdrConnector extends FileConnectorBase, has connectorType = "cdr-asn1", displayName = "Telecom CDR (ASN.1)", and declares the capabilities SCHEMA_DISCOVERY, READ, STREAMING, and CLOUD_STORAGE. It is read-only — writeRecords throws, directing callers to the CdrParquetConverter for Parquet output.

The decoding pipeline is:

Binary CDR file
  → StorageAdapter.openForRead()     (local / GCS / S3 / Azure)
  → Asn1Decoder.decodeStream()       (BER/DER TLV parsing)
  → CdrBinaryDecoder.decodeStream()  (template-driven field extraction)
  → CdrConnector.cdrRecordToDataRecord
  → DataStream                       (lazy, closeable sequence)

It ships three roaming-record templates, validated by a RoamingTemplateValidator:

TemplateTelecom format
Tap3TemplateTAP3 — Transferred Account Procedure, the standard for billing roaming usage between operators
RapTemplateRAP — Returned Account Procedure, used to return rejected TAP records
NrtrdeTemplateNRTRDE — Near Real Time Roaming Data Exchange, for fast fraud detection on roaming usage

CDR connector configuration is supplied through properties:

PropertyDefaultDescription
templateTypeautoCDR template type (e.g. CS_VOICE_MO, or auto to detect)
templateVersion3GPP-R15Template version
batchSize10000Processing batch size
skipMalformedtrueSkip malformed records instead of failing
filePattern*.cdrFile glob for directory scanning
sourceSwitch(derived)Source switch ID; otherwise derived from the file name

Supported file extensions are .cdr, .asn1, .dat, and .bin. When templateType=auto, the connector reads the first ASN.1 structure, matches it against known templates, and resets the stream so no record is consumed.

Telecom-native decoding

TAP3, RAP, and NRTRDE are not generic file formats — they are the specific binary roaming-record formats used by mobile operators. The CDR connector decodes them directly with ASN.1, so Polkomtel can ingest roaming usage and run fraud checks without a separate decoding stage.


The connector lifecycle

A connector instance follows the same lifecycle every time the pipeline engine uses it.

PhaseWhat happens
DiscoveryAt startup, ConnectorAutoConfiguration merges the ServiceLoader and Spring-DI discovery paths into the ConnectorRegistry
LookupThe engine calls registry.getConnector(connectorId); the registry invokes the stored factory to produce an instance
TesttestConnection(config) runs when a user clicks Test in the connection wizard or calls POST /connections/{id}/test
SchemadiscoverSchema(config) is called during pipeline design to populate field lists
RunextractData (sources) or loadData (targets) is invoked during pipeline execution
Closeclose() releases pools, HTTP clients, and other resources

DataStream returned by extractData is lazy and carries its own Closeables — the engine drains it and then closes it, which in turn closes the underlying ResultSet, file handle, or HTTP response.


Building a custom connector

The pipeline engine supports pluggable connectors. The example below adds a generic REST API connector by extending NativeConnectorBase.

Step 1 — Create the connector class

package com.polkomtel.dataflow.connector.impl.myrest

import com.polkomtel.dataflow.connector.base.NativeConnectorBase
import com.polkomtel.dataflow.connector.model.*

class MyRestApiConnector : NativeConnectorBase() {

    override val connectorType = "my-rest-api"
    override val displayName = "My REST API"

    private var client: java.net.http.HttpClient? = null
    private var baseUrl: String = ""

    override fun connect(config: ConnectorConfig) {
        baseUrl = config.host.ifBlank {
            throw ConnectorException("Base URL (host) is required")
        }
        client = java.net.http.HttpClient.newBuilder()
            .connectTimeout(java.time.Duration.ofSeconds(config.timeoutSeconds.toLong()))
            .build()
    }

    override fun disconnect() { client = null }

    override fun isConnected(): Boolean = client != null

    override fun capabilities(): Set<ConnectorCapability> = setOf(
        ConnectorCapability.SCHEMA_DISCOVERY,
        ConnectorCapability.READ,
        ConnectorCapability.WRITE
    )

    override fun doDiscoverSchema(config: ConnectorConfig): Schema {
        // Sample the endpoint response and infer fields
        return Schema(fields = emptyList())
    }

    override fun doExtractData(config: ConnectorConfig): DataStream {
        // GET config.table (the endpoint path) and map JSON to DataRecords
        return DataStream.empty(Schema(fields = emptyList()))
    }

    override fun doLoadData(data: DataStream, config: ConnectorConfig): LoadResult {
        // POST records to the endpoint
        return LoadResult(recordsWritten = 0)
    }
}

Step 2 — Register the connector

For a plain connector with a public no-arg constructor, add its fully qualified class name to the ServiceLoader manifest:

# backend/platform/connector-sdk/src/main/resources/
# META-INF/services/com.polkomtel.dataflow.connector.ConnectorSDK
com.polkomtel.dataflow.connector.impl.myrest.MyRestApiConnector

ConnectorRegistry.discoverConnectors() picks it up at startup via java.util.ServiceLoader.

If the connector needs dependency injection, annotate it @Component instead — ConnectorAutoConfiguration registers every Spring-managed ConnectorSDK bean using the bean itself as the factory result. This is the path DebeziumCdcConnector uses.

Step 3 — Programmatic registration

You can also register a connector by hand against the singleton registry:

val registry = ConnectorRegistry.getInstance()
registry.register("my-rest-api") { MyRestApiConnector() }

// Use it
val connector = registry.getConnector("my-rest-api")
val schema = connector.discoverSchema(config)

The registry stores a factory (() -> ConnectorSDK), so each getConnector call yields a fresh instance — important because connectors hold per-connection state such as a Hikari pool.


Testing a connector

Connector tests live alongside their implementation and run with the standard Gradle test task.

class MyRestApiConnectorTest {

    @Test
    fun `test connection succeeds with a valid base URL`() {
        val connector = MyRestApiConnector()
        val result = connector.testConnection(
            ConnectorConfig(host = "https://jsonplaceholder.typicode.com")
        )
        assertTrue(result.success)
    }

    @Test
    fun `capabilities advertise read and write`() {
        val caps = MyRestApiConnector().capabilities()
        assertTrue(ConnectorCapability.READ in caps)
        assertTrue(ConnectorCapability.WRITE in caps)
    }
}

Run the SDK module's tests:

./gradlew :platform:connector-sdk:test
Test targetCommand
Connector SDK unit tests./gradlew :platform:connector-sdk:test
All platform unit tests./gradlew test
Integration tests (Docker required)./gradlew integrationTest

Test against real systems

A connector test that mocks the JDBC driver or HTTP client proves nothing about connectivity. Point testConnection and discoverSchema tests at a real database container, a public sandbox API, or a local file fixture so the test exercises the actual decode and connection path.


Packaging

The SDK is a Gradle library module. A connector you add to connector-sdk ships as part of that module's JAR; no separate plugin packaging is needed.

ConcernHow it is handled
Build./gradlew :platform:connector-sdk:build produces the module JAR
JDBC driversDeclared runtimeOnly so they are on the runtime classpath but not the compile classpath
Dependency versionsPinned in the Gradle version catalog gradle/libs.versions.toml
Connection poolingHikariCP 6.2.1, shared by all JDBC connectors via JDBCConnectorBase
Discovery manifestMETA-INF/services/com.polkomtel.dataflow.connector.ConnectorSDK

Because JDBC drivers are declared runtimeOnly, adding a database connector means adding one catalog entry plus one runtimeOnly(libs....) line — the driver is resolved at runtime and never leaks into compile-time API. The full set of driver coordinates, connection strings, and HikariCP tuning lives in the Connector SDK Reference.


Reference summary

TopicWhere it lives
Connector contractconnector/ConnectorSDK.kt
Capabilities / operationsconnector/model/ConnectorCapability.kt, connector/ConnectorOperation.kt
Configurationconnector/model/ConnectorConfig.kt (ConnectorConfig, LoadResult)
Base classesconnector/base/ (JDBCConnectorBase, NativeConnectorBase, FileConnectorBase)
Registryconnector/registry/ (ConnectorRegistry, ConnectorAutoConfiguration)
Built-in connectorsconnector/impl/* — 21 implementations
Telecom CDRconnector/impl/cdr/CdrConnector, Asn1Decoder, TAP3/RAP/NRTRDE templates
Discovery manifestMETA-INF/services/com.polkomtel.dataflow.connector.ConnectorSDK
Previous
Playbooks & runbooks