Architecture

Backend platform services

The DataFlow AI platform tier is a Gradle multi-module build of Kotlin + Spring Boot services under the package root com.polkomtel.dataflow. This page documents every module — five deployable services and three library/utility modules.


Module overview

Each service carries its own Flyway history table (flyway_schema_history, ..._engine, ..._monitor) so that a shared database can host multiple migration sets without collision.

ModulePortTypeDB / FlywayPurpose
api-gateway8080Spring Cloud Gateway (reactive WebFlux)none (Redis)Routing, JWT validation, rate limiting, PII masking
metadata-service8081Spring MVC (JPA)own DB, Flyway metadata (V1–V54)Catalog, governance, GDPR, quality, pipeline metadata
pipeline-engine8082Spring MVC (JPA)own DB, Flyway engine (V1–V9)Pipeline execution, scheduling, self-healing
pushdown-sql8083Spring Boot (no DB/security)noneSQL dialect transpilation library/service
lineage-service8084Spring MVC (JPA)reads metadata DB, Flyway disabledData lineage tracking & graph
monitor-service8080 (SERVER_PORT)Spring MVC (JPA)own DB, Flyway monitor (V9–V25)Monitoring, alerts, cost, SLA, notifications
connector-sdkn/aLibrary modulenoneConnector framework + 21 connector implementations
commonn/aLibrary modulenoneShared security, models, exception handling

Services

api-gateway

The gateway is the single entry point for the platform. It is a reactive Spring Cloud Gateway application (GatewayApplication.kt, reactive web type) that validates Keycloak JWTs, propagates identity to downstream services via headers, and applies rate limiting, security headers, PII masking, and request logging.

Port: 8080 (host 8085).

Key classes

ClassRole
config/RouteConfig.ktDeclarative RouteLocator bean routing /api/v1/** paths to six downstream services. Each route applies request logging, security headers, PII masking, Redis rate limit, removeRequestHeader("Cookie"), preserveHostHeader(). Includes the WebSocket route pipeline-run-websocket (http→ws rewrite) and SSE routes for copilot chat and monitor alerts/metrics.
config/SecurityConfig.kt@EnableWebFluxSecurity. CSRF/basic/formLogin disabled; OAuth2 resource server with JWT. Method/role rules: DELETE→ROLE_ADMIN; POST/PUT/PATCH→ROLE_ADMIN/ROLE_ENGINEER; GET→any role. dev-permit-reads flag permits unauthenticated GETs and copilot/search POSTs in dev.
config/ReactiveKeycloakJwtConverter.ktConverts Keycloak JWT claims to Spring authorities.
filter/AuthFilter.ktGlobalFilter (order -90). Extracts JWT claims, maps Keycloak roles to a DataFlowRole via RBACService, injects X-User-Id, X-User-Email, X-User-Display-Name, X-User-Role, X-Workspace-Id, X-User-Groups.
filter/RedisRateLimitFilter.ktRedis sorted-set sliding-window limiter keyed rate_limit:{ip}:{endpoint}. On Redis failure with fail-closed:true returns 503 + Retry-After; emits X-RateLimit-* headers.
filter/PiiMaskingFilter.kt, SecurityHeadersFilter.kt, RequestLoggingFilter.kt, CorsConfig.ktResponse body PII masking, security headers, structured access logging, CORS.
controller/HealthController.kt/actuator/health/{liveness,readiness,startup} and /api/v1/health. Readiness probes Keycloak .well-known/openid-configuration and metadata-service /actuator/health.

REST surface: the gateway proxies traffic; its only own endpoints are the HealthController actuator/health routes. OAuth2 issuer/JWK come from KEYCLOAK_ISSUER_URI / KEYCLOAK_JWK_SET_URI; downstream service URLs and CORS origins are env-driven.


common

common is a shared library used by every JVM service. It provides security primitives, domain models, and exception handling — no Spring application of its own.

Key classes

  • security/RBACService.kt — hierarchical RBAC. Enum DataFlowRole: ADMIN(100) > ENGINEER(75) > ANALYST(50) > STEWARD(40) > VIEWER(25). Enum Permission with a requiredRole (e.g. CREATE_PIPELINE→ENGINEER, DELETE_PIPELINE→ADMIN, USE_COPILOT→ANALYST). mapKeycloakRolesToDataFlowRole() maps AD groups (PLK-BI-Admins), lowercase names, dataflow-*, and Keycloak realm-native roles (org_admin, data_engineer, business_analyst). Helpers: hasPermission, canAccessWorkspace, canModifyPipeline, canDeletePipeline, canRunPipeline.
  • security/JwtAuthenticator.kt — decodes JWTs, enforces explicit aud audience validation (dataflow-api default), returns AuthenticatedPrincipal.
  • security/KeycloakJwtConverter.kt — extracts principal name + authorities from a JWT (servlet variant).
  • security/SecurityContext.ktSecurityContext data class (userId, email, role, workspaceId, groups) with isAdmin / isEngineerOrAbove / isAnalystOrAbove; SecurityContextHolder ThreadLocal populated from gateway headers in downstream services.
  • security/SecurityConfig.kt, SecurityInterceptorConfig.kt — servlet security config and interceptor wiring.
  • security/AuditInterceptor.kt, AuditPersistence.kt — request-level audit event capture and persistence.
  • exception/ApiException.kt, GlobalExceptionHandler.kt — typed exceptions (NotFoundException, ForbiddenException) plus a @ControllerAdvice global handler.
  • model/ — shared domain models: Pipeline, PipelineRun, Connection, User, Workspace, Alert, AuditEvent, enums.kt.
  • config/JacksonConfig.kt — Jackson configuration.

metadata-service

metadata-service is the largest module — the central catalog and governance backbone. It manages connections, pipeline metadata, the data catalog, glossary, schema registry, classification, governance workflows, GDPR and Polish compliance, data quality, data products and marketplace, tags, the MCP server, and the AI registry. It is annotated @EnableScheduling + @EnableCaching and scans sub-packages including catalog, governance, gdpr, search, policy, quality, domain, compliance, aiRegistry, workflow, tags, and mcp.

Port: 8081. JPA ddl-auto: validate; Hikari pool 20 max / 5 idle; Jackson SNAKE_CASE.

REST surface (controllers by base path)

AreaBase path(s)
Connections / connectors/api/v1/connections, /api/v1/connectors, /api/v1/marketplace
Pipelines / templates/api/v1/pipelines, /api/v1/templates, /api/v1/runbooks
Catalog / quality / profiling/api/v1/catalog, /api/v1/quality, /api/v1/cde (critical data elements)
Governance/api/v1/governance and /access-reviews, /contracts, /endorsements, /reviews, /schema-changes, /tags
GDPR / compliance/api/v1/gdpr, /api/v1/dsar, /api/v1/compliance, /api/v1/compliance/dpia, /api/v1/admin/abac/policy
Data products / versions / domains/api/v1/data-products, /api/v1/data-versions, /api/v1/domains, /api/v1/contracts
Audit/api/v1/audit-log, /api/v1/admin/audit
Masking/api/v1/masking
Incidents / RCA/api/v1/incidents
CDR/api/v1/cdr (telecom Call Detail Record metrics)
Classification/api/v1/classification/rules
Workflows/api/v1/workflows
MCP/api/v1/mcp, /mcp
AI registry/api/v1/admin/ai-registry
Search/api/v1/search (global search over a materialized view)
Admin/api/v1/admin/users, /api/v1/admin/workspaces, /api/v1/admin/settings
File upload/api/v1/data

Notable services include PolicyEngine, ApprovalWorkflow, AccessReviewCycleService, GdprService, PolishComplianceService, RetentionPolicyEnforcer, DynamicMaskingService, IncidentRcaService, DataClassificationService, WorkflowEngine, and AuditChainVerifierService (the hash-chained audit log).

Data model

Roughly 80 JPA entities live under metadata/entity/ and its sub-packages. Core (V1): workspaces, users, connections, pipelines, pipeline_versions, pipeline_runs, pipeline_tasks. Catalog (V9): catalog_databases/schemas/tables/columns/tags, relationships, usages. GDPR (V1): gdpr_dsar_requests, gdpr_consent_records, gdpr_erasure_reports, gdpr_audit_trail. The lineage entities ColumnLineageEntity, DatasetLineageEntity, OpenLineageEventEntity, and ManualLineageEdgeEntity are owned here and shared with lineage-service.

Flyway migrations (db/migration/metadata, V1–V54)

VersionContent
V1–V5Core schema, pipelines, monitoring, lineage, audit
V9–V14Catalog, glossary, schema registry, classification, governance, workflow
V16–V21Data contracts, dynamic masking, data version control, data marketplace, Polish compliance (V20_1), GDPR audit durability
V27 / V29Audit log hash chain
V33 / V35Connector marketplace catalog, global search materialized view
V44–V54Regulatory frameworks, incident RCA (V47), OpenLineage + manual edges (V48), workflow engine (V52), AI registry (V53), tag propagation + erasure runs (V54)

A dev seed lives at db/dev/V6__seed_data.sql, and a sample pipeline template at resources/pipeline-templates/subscriber-360.yaml.


pipeline-engine

pipeline-engine executes pipelines defined in a YAML DSL, schedules runs, manages DAG execution, integrates external orchestrators (Airflow, AutomateNow) and compute engines (Flink, Spark/Dataproc), performs self-healing, orchestrates GDPR erasure, quarantines bad data, and provides Git-based pipeline version control. Application class: PipelineEngineApplication.kt.

Port: 8082. JPA ddl-auto: none; Flyway db/migration/engine, history flyway_schema_history_engine; schedule timezone defaults to Europe/Warsaw.

Key classes

  • Executionexecution/PipelineRunner.kt orchestrates the run: parse YAML → resolve params → validate → build DAG → execute tasks in topological order (parallel within DAG levels, fixed thread pool) → aggregate results, with cooperative cancellation via ExecutionContext. Supported by TaskExecutor.kt and ExecutionContext.kt.
  • DAGdag/DagBuilder.kt, dag/ExecutionDAG.kt, dag/DagNode.kt, with cycle and dangling-dependency detection.
  • DSLdsl/PipelineYamlParser.kt, dsl/PipelineYamlSchema.kt.
  • Validationvalidation/PipelineValidator.kt, validation/ParameterResolver.kt (built-in vars, env vars, runtime params).
  • Service layerservice/PipelineCompiler.kt, ExecutionPlanner.kt, TaskScheduler.kt, PipelineRunLogPublisher.kt.
  • Schedulingscheduler/CronScheduler.kt, BackfillManager.kt, DependencyChain.kt, RetryPolicy.kt.
  • Orchestratorsorchestrator/AirflowClient.kt, AirflowDagGenerator.kt, AutomateNowAdapter.kt, WebhookCallbackService.kt.
  • Compute enginesflink/ (FlinkClusterManager, FlinkJobBuilder/Submitter, FlinkCheckpointManager, FlinkSqlBridge) and spark/ (SparkClusterManager, SparkJobBuilder, DataprocJobSubmitter, SparkSqlBridge).
  • Self-healinghealing/SelfHealingService.kt, FailureClassifier.kt, RecoveryStrategies.kt.
  • Gitgit/GitRepositoryManager.kt, PipelineVersionControl.kt, YamlDiffEngine.kt, GitWebhookHandler.kt.
  • Erasure (GDPR)erasure/ErasureOrchestrator.kt, ErasureController.
  • Qualityquality/DataQuarantineService.kt, streaming/StreamingQualityEngine.kt.
  • Real-timerealtime/PipelineStatusWebSocket.kt, ExecutionEventPublisher.kt.
  • Lineagelineage/OpenLineageEmitter.kt.

REST surface

ControllerBase pathKey operations
ExecutionController/api/v1POST /pipelines/{id}/run, GET /runs/{runId}, POST /runs/{runId}/cancel, POST /runs/{runId}/status
SchedulerController/api/v1/schedulerschedules CRUD + pause/resume, /next-runs, backfill CRUD, dependencies CRUD
OrchestratorController/api/v1/orchestrator/trigger, /callback, Airflow dags/sync/trigger/status, webhooks
GitController/api/v1/gitrepos, history, diff, rollback, branches, merge, webhooks
HealingController/api/v1/pipelines/{id}/healing-history, /healing-summary, /healing-policy
QuarantineController/api/v1/quarantinelist/get, approve/reject/edit-approve, bulk ops, auto-release rules
ErasureController/api/v1/dsarPOST /{dsarId}/erasure/orchestrate, erasure run status
StreamingQualityController/api/v1/monitor/streamingper-pipeline quality, rules, reset, SQL
PipelineReportController/api/v1/pipelines/{id}/reportpipeline run reports

Data model & migrations (db/migration/engine, V1–V9)

Entities are prefixed engine_*: engine_schedules, engine_backfill_jobs/partitions, engine_execution_runs (with a task_states JSONB column), engine_retry_states, engine_circuit_breakers, engine_dead_letter_entries, engine_running_pipelines, engine_webhook_registrations, engine_healing_attempts/policies. Non-prefixed: data_quarantine, quarantine_auto_release_rules, erasure_runs, erasure_steps. Migrations: V1 scheduling, V3 dependency chain, V4 execution runs, V5 webhook + audit, V6 retry/alert, V7 quarantine, V8 healing attempts, V9 erasure orchestration.


lineage-service

lineage-service tracks data lineage — dataset and column-level lineage, impact analysis, OpenLineage event ingestion, lineage graph build and search, BI-tool lineage (Tableau/Looker/PowerBI), manual lineage authoring, and tag propagation. It consumes OpenLineage RunEvents emitted by pipeline-engine. Application class: LineageServiceApplication.kt.

Port: 8084. JPA ddl-auto: none; flyway.enabled: false — it relies on metadata-service's schema (the lineage tables are created by metadata migrations V4/V48).

Deploy-time ordering dependency

Because lineage-service has Flyway disabled, it must not start until metadata-service has applied its lineage migrations. This ordering is implicit and not enforced by the schema itself.

Key classes

  • service/LineageService.kt — the primary service: getDatasetLineage, getColumnLineage, recordEvent/recordEvents, getUpstream/getDownstream, getSubgraph, getFullLineageGraph, getStats. Uses LineageGraphBuilder first, falling back to an in-memory ConcurrentHashMap store plus LineagePersistence.
  • service/LineageGraphBuilder.kt — builds the lineage graph from events.
  • service/ImpactAnalyzer.kt — downstream impact analysis.
  • service/ColumnLineageExtractor.kt — extracts column-level lineage.
  • service/LineageSearchService.kt — dataset/column search, hot datasets, PII propagation detection.
  • service/connectors/BiLineageConnector, TableauLineageConnector, LookerLineageConnector, PowerBiLineageConnector.
  • propagation/PropagationWorker.kt, TagPropagationLogEntity/Repository.kt — tag propagation across lineage.
  • repository/JpaLineageRepositories.ktDatasetLineageRepository, ColumnLineageRepository, reusing metadata-service entities; supports findLineageAsOf (time-travel) and PII filtering.
  • model/LineageEvent.kt — OpenLineage-style RunEvent, LineageGraph, LineageNode, LineageJobNode, SchemaField.

REST surface

ControllerBase pathKey operations
LineageController/api/v1/lineageGET /datasets/{id}, GET /columns/{id}, GET /impact/{id}, POST /events, GET /graph/{datasetId}, GET /upstream/{datasetId}, GET /downstream/{datasetId}, GET /search, GET /stats, GET /subgraph/{datasetId}, GET /hot-datasets, GET /pii/{datasetId}/{columnName}
OpenLineageEventController/api/v1/lineage/openlineagePOST /events, GET /events
BiLineageController/api/v1/lineage/biPOST /ingest, POST /{tool}/ingest
LineageAuthoringController/api/v1/lineage/manual-edgesGET, POST, PATCH /{id}, DELETE /{id}
PropagationController/api/v1/lineage/propagationPOST /run, GET /inherited/{datasetId}
ExportController/api/v1/lineage/exportlineage export

monitor-service

monitor-service handles pipeline and system monitoring: alerting, cost tracking, SLA/SLO burn-rate, data freshness, schema-change monitoring, scheduled reports, notification routing and inbox, anomaly detection, playbooks, and real-time SSE streams. It is @EnableScheduling and instrumented with OpenTelemetry + Prometheus. Application class: MonitorServiceApplication.kt (scans monitor + common).

Port: SERVER_PORT (default 8080, host 8084). Flyway db/migration/monitor, history flyway_schema_history_monitor; Prometheus export enabled; Jackson SNAKE_CASE.

Key classes

  • Alertsservice/AlertService.kt, AlertDefinitionService.kt, alert/AlertDefinition.kt, alert/NotificationService.kt.
  • Metricsservice/MetricsService.kt, SystemMetricsCollector.kt, KubernetesMetricsService.kt, KeycloakMetricsService.kt, ClusterMetricsHistoryService.kt, PerformanceMetricsService.kt.
  • Costservice/CostMetricsService.kt, CostHistoryService.kt, CostAnomalyDetector.kt, PipelineCostService.kt, with config/GcpCostProperties.kt.
  • SLA / freshness / anomalyservice/SlaBurnRateService.kt, FreshnessTracker.kt, AnomalyBaselineService.kt.
  • Schema changeservice/SchemaChangeMonitor.kt.
  • Reportsservice/ReportGenerationService.kt, PipelineRunLogService.kt.
  • Notificationnotification/NotificationRouter.kt, ChannelAdapters.kt, NotificationInboxController.kt, NotificationChannelService.kt.
  • Playbooksplaybook/PlaybookEngine.kt.
  • Real-timerealtime/SseAlertController.kt, AlertEventPublisher.kt, MetricsStreamService.kt.
  • ConfigOpenTelemetryConfig.kt, MetricsExporterConfig.kt, filter/TracingFilter.kt.

REST surface

ControllerBase pathKey operations
AlertController/api/v1/monitor/alertslist/get/create/delete, ack/resolve, bulk ops, events, history, summary, purge
AlertDefinitionController/api/v1/monitordefinitions CRUD + toggle/mute, notification channels CRUD + test
DashboardController/api/v1/monitor/dashboard, /pipelines/{id}/metrics, /connectors/health, /system/health, POST /pipelines/run
CostController family/api/v1/monitor/costssummary, by-pipeline, by-workspace, anomalies, trend, budgets CRUD, rates
FreshnessController/api/v1/monitor/freshnessrules CRUD, history, evaluate
SlaBurnRateController/api/v1/monitor/slapipelines, definitions CRUD
PerformanceMetricsController/api/v1/monitor/metrics/performancepercentiles, throughput, compare, summary
SchemaChangeController/api/v1/monitorschema-changes ack/resolve, schema-monitor register/scan, snapshots
AnomalyBaselineController/api/v1/monitor/anomaliesbaselines, detect, recompute
ReportController/api/v1/reportsschedules CRUD + toggle/send-now, preview, types
NotificationInboxController/api/v1/notificationslist, unread-count, mark-read, GET /stream (SSE)
PlaybookController/api/v1/monitor/playbookslist/create/delete, match
SseAlertController/api/v1/monitor/sse/alerts, /metrics (SSE text/event-stream)

Data model & migrations (db/migration/monitor, V9–V25)

Entities include monitor_alert_definitions/alerts/alert_events/alert_silences, data_freshness_rules/checks, metric_baselines, monitor_connector_health, monitor_pipeline_metrics/runs, pipeline_cost_records/budgets/anomalies, pipeline_run_logs, monitor_sla_burn_rate_snapshots, monitor_slo_definitions, notifications_inbox, notification_routes, playbook_routes, and monitor_schema_change_events/snapshots. Migration highlights: V9 legacy alerts, V11 freshness, V12 baselines, V13 schema-change monitoring, V14 report schedules, V15 pipeline cost, V16 SLA burn rate, V18 cluster metrics history, V19 quarantine, V22 run logs, V24 playbook routes, V25 notification routing/inbox. A resources/prometheus/alert-rules.yml ships with the service.


Library modules

connector-sdk

connector-sdk is the connector framework library — no Spring application, packaged as a JAR consumed by metadata-service and pipeline-engine. It defines the connector contract and ships 21 connector implementations. It has no application.yml, only META-INF ServiceLoader registration.

Key classes

  • ConnectorSDK.kt — the core AutoCloseable interface: discoverSchema(config), extractData(config): DataStream, loadData(...), validateConnection/testConnection, capabilities(), plus connectorType / connectorName / displayName.
  • registry/ConnectorRegistry.kt — singleton registry: register, getConnector, listConnectors; discoverConnectors() via Java ServiceLoader. Paired with ConnectorAutoConfiguration.kt.
  • Base classesbase/JDBCConnectorBase.kt, base/FileConnectorBase.kt, base/NativeConnectorBase.kt.
  • ModelConnectorConfig, ConnectorCapability (enum: SCHEMA_DISCOVERY, READ, WRITE, COLUMN_PROJECTION, PREDICATE_PUSHDOWN, SCHEMA_EVOLUTION, STREAMING, PARTITIONING, COMPRESSION, CLOUD_STORAGE, BATCH_WRITE), Schema, Table, Column, DataStream, DataBatch, DataRecord, EventStream, StorageType, ConnectorException.

Connector implementations (impl/, 21)

CategoryConnectors
Databases (JDBC)postgresql, mysql, mssql, oracle, sybase, teradata, hana (SAP HANA)
Cloud warehousesbigquery, databricks, snowflake
Filescsv, json, xml, excel, parquet (with FileFormatDetector, StorageAdapter)
Streaming / messagingkafka (offset manager, schema registry, SerDe), pubsub, eventhubs
SaaS / ERPsalesforce, servicenow, sap (SapErpConnector, SapODataParser), rest
CDCDebeziumCdcConnector, DebeziumCdcManager, MysqlBinlogCDC, MssqlChangeDataCapture, MongodbChangeStream
Telecom CDRCdrConnector, Asn1Decoder, CdrBinaryDecoder, CdrParquetConverter, with Tap3Template, RapTemplate, NrtrdeTemplate roaming formats
NoSQLmongodb (connector, aggregation, change stream, schema mapper)

pushdown-sql

pushdown-sql is a SQL dialect transpilation service — it translates ANSI/canonical SQL into target-database dialects so query logic can be pushed down to source or target databases. PushdownSqlApplication.kt is a Spring Boot app that excludes DataSource, JPA, and Security autoconfiguration — it is a pure compute service.

Port: 8083. No database, no security, no JPA. Actuator exposes health/info/metrics. No REST controller — it is exposed as an injectable service/library to other modules.

Key classes

  • transpiler/SqlTranspiler.kt@Service; transpile(sql, targetDialect): TranspileResult parses SQL with JSqlParser (CCJSqlParserUtil), validates syntax, delegates to the dialect, and collects warnings (unsupported LATERAL join, ARRAY type, over-length identifiers). Provides translateFunction, translateDataType, getAvailableDialects.
  • transpiler/dialect/Dialect.kt — the dialect interface: name, transpile, identifier quoting, capability flags (supportsWindowFunctions, supportsCTE, supportsLateralJoin, supportsArrayType), maxIdentifierLength, translateFunction, translateDataType, translateLimitOffset/paginationSql, dualTable, castExpression.
  • Dialect implementations (8)PostgresDialect, MssqlDialect, OracleDialect, SnowflakeDialect, DatabricksDialect, HanaDialect, SybaseDialect, TeradataDialect.
  • Visitorsvisitor/SqlAstVisitor.kt, FunctionMapper.kt, DataTypeMapper.kt.
  • TranspileResult — success flag, original/transpiled SQL, target dialect, warnings, error.

Cross-cutting notes

  • Identity propagation — the gateway validates the Keycloak JWT once; downstream Spring MVC services trust gateway-injected X-User-* headers (read into SecurityContextHolder from common). Each service also independently configures an OAuth2 resource server as defense-in-depth.
  • Database strategy — metadata-service owns the catalog/governance schema; lineage-service shares that database with Flyway disabled; pipeline-engine and monitor-service maintain their own Flyway-managed migration sets with per-service history tables.
  • Real-time — SSE for monitor alerts/metrics and the notification inbox; WebSocket for pipeline run status streaming.
  • Telecom specifics — a CDR connector with ASN.1 / TAP3 / RAP / NRTRDE roaming-record templates, a CDR metrics controller in metadata-service, GDPR and Polish-compliance modules, and schedules defaulting to Europe/Warsaw.
Previous
System architecture