Architecture
Backend platform services
The DataFlow AI platform tier is a Gradle multi-module build of Kotlin + Spring Boot services under the package root com.polkomtel.dataflow. This page documents every module — five deployable services and three library/utility modules.
Module overview
Each service carries its own Flyway history table (flyway_schema_history, ..._engine, ..._monitor) so that a shared database can host multiple migration sets without collision.
| Module | Port | Type | DB / Flyway | Purpose |
|---|---|---|---|---|
api-gateway | 8080 | Spring Cloud Gateway (reactive WebFlux) | none (Redis) | Routing, JWT validation, rate limiting, PII masking |
metadata-service | 8081 | Spring MVC (JPA) | own DB, Flyway metadata (V1–V54) | Catalog, governance, GDPR, quality, pipeline metadata |
pipeline-engine | 8082 | Spring MVC (JPA) | own DB, Flyway engine (V1–V9) | Pipeline execution, scheduling, self-healing |
pushdown-sql | 8083 | Spring Boot (no DB/security) | none | SQL dialect transpilation library/service |
lineage-service | 8084 | Spring MVC (JPA) | reads metadata DB, Flyway disabled | Data lineage tracking & graph |
monitor-service | 8080 (SERVER_PORT) | Spring MVC (JPA) | own DB, Flyway monitor (V9–V25) | Monitoring, alerts, cost, SLA, notifications |
connector-sdk | n/a | Library module | none | Connector framework + 21 connector implementations |
common | n/a | Library module | none | Shared security, models, exception handling |
Services
api-gateway
The gateway is the single entry point for the platform. It is a reactive Spring Cloud Gateway application (GatewayApplication.kt, reactive web type) that validates Keycloak JWTs, propagates identity to downstream services via headers, and applies rate limiting, security headers, PII masking, and request logging.
Port: 8080 (host 8085).
Key classes
| Class | Role |
|---|---|
config/RouteConfig.kt | Declarative RouteLocator bean routing /api/v1/** paths to six downstream services. Each route applies request logging, security headers, PII masking, Redis rate limit, removeRequestHeader("Cookie"), preserveHostHeader(). Includes the WebSocket route pipeline-run-websocket (http→ws rewrite) and SSE routes for copilot chat and monitor alerts/metrics. |
config/SecurityConfig.kt | @EnableWebFluxSecurity. CSRF/basic/formLogin disabled; OAuth2 resource server with JWT. Method/role rules: DELETE→ROLE_ADMIN; POST/PUT/PATCH→ROLE_ADMIN/ROLE_ENGINEER; GET→any role. dev-permit-reads flag permits unauthenticated GETs and copilot/search POSTs in dev. |
config/ReactiveKeycloakJwtConverter.kt | Converts Keycloak JWT claims to Spring authorities. |
filter/AuthFilter.kt | GlobalFilter (order -90). Extracts JWT claims, maps Keycloak roles to a DataFlowRole via RBACService, injects X-User-Id, X-User-Email, X-User-Display-Name, X-User-Role, X-Workspace-Id, X-User-Groups. |
filter/RedisRateLimitFilter.kt | Redis sorted-set sliding-window limiter keyed rate_limit:{ip}:{endpoint}. On Redis failure with fail-closed:true returns 503 + Retry-After; emits X-RateLimit-* headers. |
filter/PiiMaskingFilter.kt, SecurityHeadersFilter.kt, RequestLoggingFilter.kt, CorsConfig.kt | Response body PII masking, security headers, structured access logging, CORS. |
controller/HealthController.kt | /actuator/health/{liveness,readiness,startup} and /api/v1/health. Readiness probes Keycloak .well-known/openid-configuration and metadata-service /actuator/health. |
REST surface: the gateway proxies traffic; its only own endpoints are the HealthController actuator/health routes. OAuth2 issuer/JWK come from KEYCLOAK_ISSUER_URI / KEYCLOAK_JWK_SET_URI; downstream service URLs and CORS origins are env-driven.
common
common is a shared library used by every JVM service. It provides security primitives, domain models, and exception handling — no Spring application of its own.
Key classes
security/RBACService.kt— hierarchical RBAC. EnumDataFlowRole: ADMIN(100) > ENGINEER(75) > ANALYST(50) > STEWARD(40) > VIEWER(25). EnumPermissionwith arequiredRole(e.g.CREATE_PIPELINE→ENGINEER,DELETE_PIPELINE→ADMIN,USE_COPILOT→ANALYST).mapKeycloakRolesToDataFlowRole()maps AD groups (PLK-BI-Admins), lowercase names,dataflow-*, and Keycloak realm-native roles (org_admin,data_engineer,business_analyst). Helpers:hasPermission,canAccessWorkspace,canModifyPipeline,canDeletePipeline,canRunPipeline.security/JwtAuthenticator.kt— decodes JWTs, enforces explicitaudaudience validation (dataflow-apidefault), returnsAuthenticatedPrincipal.security/KeycloakJwtConverter.kt— extracts principal name + authorities from a JWT (servlet variant).security/SecurityContext.kt—SecurityContextdata class (userId, email, role, workspaceId, groups) withisAdmin/isEngineerOrAbove/isAnalystOrAbove;SecurityContextHolderThreadLocal populated from gateway headers in downstream services.security/SecurityConfig.kt,SecurityInterceptorConfig.kt— servlet security config and interceptor wiring.security/AuditInterceptor.kt,AuditPersistence.kt— request-level audit event capture and persistence.exception/ApiException.kt,GlobalExceptionHandler.kt— typed exceptions (NotFoundException,ForbiddenException) plus a@ControllerAdviceglobal handler.model/— shared domain models:Pipeline,PipelineRun,Connection,User,Workspace,Alert,AuditEvent,enums.kt.config/JacksonConfig.kt— Jackson configuration.
metadata-service
metadata-service is the largest module — the central catalog and governance backbone. It manages connections, pipeline metadata, the data catalog, glossary, schema registry, classification, governance workflows, GDPR and Polish compliance, data quality, data products and marketplace, tags, the MCP server, and the AI registry. It is annotated @EnableScheduling + @EnableCaching and scans sub-packages including catalog, governance, gdpr, search, policy, quality, domain, compliance, aiRegistry, workflow, tags, and mcp.
Port: 8081. JPA ddl-auto: validate; Hikari pool 20 max / 5 idle; Jackson SNAKE_CASE.
REST surface (controllers by base path)
| Area | Base path(s) |
|---|---|
| Connections / connectors | /api/v1/connections, /api/v1/connectors, /api/v1/marketplace |
| Pipelines / templates | /api/v1/pipelines, /api/v1/templates, /api/v1/runbooks |
| Catalog / quality / profiling | /api/v1/catalog, /api/v1/quality, /api/v1/cde (critical data elements) |
| Governance | /api/v1/governance and /access-reviews, /contracts, /endorsements, /reviews, /schema-changes, /tags |
| GDPR / compliance | /api/v1/gdpr, /api/v1/dsar, /api/v1/compliance, /api/v1/compliance/dpia, /api/v1/admin/abac/policy |
| Data products / versions / domains | /api/v1/data-products, /api/v1/data-versions, /api/v1/domains, /api/v1/contracts |
| Audit | /api/v1/audit-log, /api/v1/admin/audit |
| Masking | /api/v1/masking |
| Incidents / RCA | /api/v1/incidents |
| CDR | /api/v1/cdr (telecom Call Detail Record metrics) |
| Classification | /api/v1/classification/rules |
| Workflows | /api/v1/workflows |
| MCP | /api/v1/mcp, /mcp |
| AI registry | /api/v1/admin/ai-registry |
| Search | /api/v1/search (global search over a materialized view) |
| Admin | /api/v1/admin/users, /api/v1/admin/workspaces, /api/v1/admin/settings |
| File upload | /api/v1/data |
Notable services include PolicyEngine, ApprovalWorkflow, AccessReviewCycleService, GdprService, PolishComplianceService, RetentionPolicyEnforcer, DynamicMaskingService, IncidentRcaService, DataClassificationService, WorkflowEngine, and AuditChainVerifierService (the hash-chained audit log).
Data model
Roughly 80 JPA entities live under metadata/entity/ and its sub-packages. Core (V1): workspaces, users, connections, pipelines, pipeline_versions, pipeline_runs, pipeline_tasks. Catalog (V9): catalog_databases/schemas/tables/columns/tags, relationships, usages. GDPR (V1): gdpr_dsar_requests, gdpr_consent_records, gdpr_erasure_reports, gdpr_audit_trail. The lineage entities ColumnLineageEntity, DatasetLineageEntity, OpenLineageEventEntity, and ManualLineageEdgeEntity are owned here and shared with lineage-service.
Flyway migrations (db/migration/metadata, V1–V54)
| Version | Content |
|---|---|
| V1–V5 | Core schema, pipelines, monitoring, lineage, audit |
| V9–V14 | Catalog, glossary, schema registry, classification, governance, workflow |
| V16–V21 | Data contracts, dynamic masking, data version control, data marketplace, Polish compliance (V20_1), GDPR audit durability |
| V27 / V29 | Audit log hash chain |
| V33 / V35 | Connector marketplace catalog, global search materialized view |
| V44–V54 | Regulatory frameworks, incident RCA (V47), OpenLineage + manual edges (V48), workflow engine (V52), AI registry (V53), tag propagation + erasure runs (V54) |
A dev seed lives at db/dev/V6__seed_data.sql, and a sample pipeline template at resources/pipeline-templates/subscriber-360.yaml.
pipeline-engine
pipeline-engine executes pipelines defined in a YAML DSL, schedules runs, manages DAG execution, integrates external orchestrators (Airflow, AutomateNow) and compute engines (Flink, Spark/Dataproc), performs self-healing, orchestrates GDPR erasure, quarantines bad data, and provides Git-based pipeline version control. Application class: PipelineEngineApplication.kt.
Port: 8082. JPA ddl-auto: none; Flyway db/migration/engine, history flyway_schema_history_engine; schedule timezone defaults to Europe/Warsaw.
Key classes
- Execution —
execution/PipelineRunner.ktorchestrates the run: parse YAML → resolve params → validate → build DAG → execute tasks in topological order (parallel within DAG levels, fixed thread pool) → aggregate results, with cooperative cancellation viaExecutionContext. Supported byTaskExecutor.ktandExecutionContext.kt. - DAG —
dag/DagBuilder.kt,dag/ExecutionDAG.kt,dag/DagNode.kt, with cycle and dangling-dependency detection. - DSL —
dsl/PipelineYamlParser.kt,dsl/PipelineYamlSchema.kt. - Validation —
validation/PipelineValidator.kt,validation/ParameterResolver.kt(built-in vars, env vars, runtime params). - Service layer —
service/PipelineCompiler.kt,ExecutionPlanner.kt,TaskScheduler.kt,PipelineRunLogPublisher.kt. - Scheduling —
scheduler/CronScheduler.kt,BackfillManager.kt,DependencyChain.kt,RetryPolicy.kt. - Orchestrators —
orchestrator/AirflowClient.kt,AirflowDagGenerator.kt,AutomateNowAdapter.kt,WebhookCallbackService.kt. - Compute engines —
flink/(FlinkClusterManager,FlinkJobBuilder/Submitter,FlinkCheckpointManager,FlinkSqlBridge) andspark/(SparkClusterManager,SparkJobBuilder,DataprocJobSubmitter,SparkSqlBridge). - Self-healing —
healing/SelfHealingService.kt,FailureClassifier.kt,RecoveryStrategies.kt. - Git —
git/GitRepositoryManager.kt,PipelineVersionControl.kt,YamlDiffEngine.kt,GitWebhookHandler.kt. - Erasure (GDPR) —
erasure/ErasureOrchestrator.kt,ErasureController. - Quality —
quality/DataQuarantineService.kt,streaming/StreamingQualityEngine.kt. - Real-time —
realtime/PipelineStatusWebSocket.kt,ExecutionEventPublisher.kt. - Lineage —
lineage/OpenLineageEmitter.kt.
REST surface
| Controller | Base path | Key operations |
|---|---|---|
ExecutionController | /api/v1 | POST /pipelines/{id}/run, GET /runs/{runId}, POST /runs/{runId}/cancel, POST /runs/{runId}/status |
SchedulerController | /api/v1/scheduler | schedules CRUD + pause/resume, /next-runs, backfill CRUD, dependencies CRUD |
OrchestratorController | /api/v1/orchestrator | /trigger, /callback, Airflow dags/sync/trigger/status, webhooks |
GitController | /api/v1/git | repos, history, diff, rollback, branches, merge, webhooks |
HealingController | /api/v1/pipelines | /{id}/healing-history, /healing-summary, /healing-policy |
QuarantineController | /api/v1/quarantine | list/get, approve/reject/edit-approve, bulk ops, auto-release rules |
ErasureController | /api/v1/dsar | POST /{dsarId}/erasure/orchestrate, erasure run status |
StreamingQualityController | /api/v1/monitor/streaming | per-pipeline quality, rules, reset, SQL |
PipelineReportController | /api/v1/pipelines/{id}/report | pipeline run reports |
Data model & migrations (db/migration/engine, V1–V9)
Entities are prefixed engine_*: engine_schedules, engine_backfill_jobs/partitions, engine_execution_runs (with a task_states JSONB column), engine_retry_states, engine_circuit_breakers, engine_dead_letter_entries, engine_running_pipelines, engine_webhook_registrations, engine_healing_attempts/policies. Non-prefixed: data_quarantine, quarantine_auto_release_rules, erasure_runs, erasure_steps. Migrations: V1 scheduling, V3 dependency chain, V4 execution runs, V5 webhook + audit, V6 retry/alert, V7 quarantine, V8 healing attempts, V9 erasure orchestration.
lineage-service
lineage-service tracks data lineage — dataset and column-level lineage, impact analysis, OpenLineage event ingestion, lineage graph build and search, BI-tool lineage (Tableau/Looker/PowerBI), manual lineage authoring, and tag propagation. It consumes OpenLineage RunEvents emitted by pipeline-engine. Application class: LineageServiceApplication.kt.
Port: 8084. JPA ddl-auto: none; flyway.enabled: false — it relies on metadata-service's schema (the lineage tables are created by metadata migrations V4/V48).
Deploy-time ordering dependency
Because lineage-service has Flyway disabled, it must not start until metadata-service has applied its lineage migrations. This ordering is implicit and not enforced by the schema itself.
Key classes
service/LineageService.kt— the primary service:getDatasetLineage,getColumnLineage,recordEvent/recordEvents,getUpstream/getDownstream,getSubgraph,getFullLineageGraph,getStats. UsesLineageGraphBuilderfirst, falling back to an in-memoryConcurrentHashMapstore plusLineagePersistence.service/LineageGraphBuilder.kt— builds the lineage graph from events.service/ImpactAnalyzer.kt— downstream impact analysis.service/ColumnLineageExtractor.kt— extracts column-level lineage.service/LineageSearchService.kt— dataset/column search, hot datasets, PII propagation detection.service/connectors/—BiLineageConnector,TableauLineageConnector,LookerLineageConnector,PowerBiLineageConnector.propagation/PropagationWorker.kt,TagPropagationLogEntity/Repository.kt— tag propagation across lineage.repository/JpaLineageRepositories.kt—DatasetLineageRepository,ColumnLineageRepository, reusing metadata-service entities; supportsfindLineageAsOf(time-travel) and PII filtering.model/LineageEvent.kt— OpenLineage-styleRunEvent,LineageGraph,LineageNode,LineageJobNode,SchemaField.
REST surface
| Controller | Base path | Key operations |
|---|---|---|
LineageController | /api/v1/lineage | GET /datasets/{id}, GET /columns/{id}, GET /impact/{id}, POST /events, GET /graph/{datasetId}, GET /upstream/{datasetId}, GET /downstream/{datasetId}, GET /search, GET /stats, GET /subgraph/{datasetId}, GET /hot-datasets, GET /pii/{datasetId}/{columnName} |
OpenLineageEventController | /api/v1/lineage/openlineage | POST /events, GET /events |
BiLineageController | /api/v1/lineage/bi | POST /ingest, POST /{tool}/ingest |
LineageAuthoringController | /api/v1/lineage/manual-edges | GET, POST, PATCH /{id}, DELETE /{id} |
PropagationController | /api/v1/lineage/propagation | POST /run, GET /inherited/{datasetId} |
ExportController | /api/v1/lineage/export | lineage export |
monitor-service
monitor-service handles pipeline and system monitoring: alerting, cost tracking, SLA/SLO burn-rate, data freshness, schema-change monitoring, scheduled reports, notification routing and inbox, anomaly detection, playbooks, and real-time SSE streams. It is @EnableScheduling and instrumented with OpenTelemetry + Prometheus. Application class: MonitorServiceApplication.kt (scans monitor + common).
Port: SERVER_PORT (default 8080, host 8084). Flyway db/migration/monitor, history flyway_schema_history_monitor; Prometheus export enabled; Jackson SNAKE_CASE.
Key classes
- Alerts —
service/AlertService.kt,AlertDefinitionService.kt,alert/AlertDefinition.kt,alert/NotificationService.kt. - Metrics —
service/MetricsService.kt,SystemMetricsCollector.kt,KubernetesMetricsService.kt,KeycloakMetricsService.kt,ClusterMetricsHistoryService.kt,PerformanceMetricsService.kt. - Cost —
service/CostMetricsService.kt,CostHistoryService.kt,CostAnomalyDetector.kt,PipelineCostService.kt, withconfig/GcpCostProperties.kt. - SLA / freshness / anomaly —
service/SlaBurnRateService.kt,FreshnessTracker.kt,AnomalyBaselineService.kt. - Schema change —
service/SchemaChangeMonitor.kt. - Reports —
service/ReportGenerationService.kt,PipelineRunLogService.kt. - Notification —
notification/NotificationRouter.kt,ChannelAdapters.kt,NotificationInboxController.kt,NotificationChannelService.kt. - Playbooks —
playbook/PlaybookEngine.kt. - Real-time —
realtime/SseAlertController.kt,AlertEventPublisher.kt,MetricsStreamService.kt. - Config —
OpenTelemetryConfig.kt,MetricsExporterConfig.kt,filter/TracingFilter.kt.
REST surface
| Controller | Base path | Key operations |
|---|---|---|
AlertController | /api/v1/monitor/alerts | list/get/create/delete, ack/resolve, bulk ops, events, history, summary, purge |
AlertDefinitionController | /api/v1/monitor | definitions CRUD + toggle/mute, notification channels CRUD + test |
DashboardController | /api/v1/monitor | /dashboard, /pipelines/{id}/metrics, /connectors/health, /system/health, POST /pipelines/run |
CostController family | /api/v1/monitor/costs | summary, by-pipeline, by-workspace, anomalies, trend, budgets CRUD, rates |
FreshnessController | /api/v1/monitor/freshness | rules CRUD, history, evaluate |
SlaBurnRateController | /api/v1/monitor/sla | pipelines, definitions CRUD |
PerformanceMetricsController | /api/v1/monitor/metrics/performance | percentiles, throughput, compare, summary |
SchemaChangeController | /api/v1/monitor | schema-changes ack/resolve, schema-monitor register/scan, snapshots |
AnomalyBaselineController | /api/v1/monitor/anomalies | baselines, detect, recompute |
ReportController | /api/v1/reports | schedules CRUD + toggle/send-now, preview, types |
NotificationInboxController | /api/v1/notifications | list, unread-count, mark-read, GET /stream (SSE) |
PlaybookController | /api/v1/monitor/playbooks | list/create/delete, match |
SseAlertController | /api/v1/monitor/sse | /alerts, /metrics (SSE text/event-stream) |
Data model & migrations (db/migration/monitor, V9–V25)
Entities include monitor_alert_definitions/alerts/alert_events/alert_silences, data_freshness_rules/checks, metric_baselines, monitor_connector_health, monitor_pipeline_metrics/runs, pipeline_cost_records/budgets/anomalies, pipeline_run_logs, monitor_sla_burn_rate_snapshots, monitor_slo_definitions, notifications_inbox, notification_routes, playbook_routes, and monitor_schema_change_events/snapshots. Migration highlights: V9 legacy alerts, V11 freshness, V12 baselines, V13 schema-change monitoring, V14 report schedules, V15 pipeline cost, V16 SLA burn rate, V18 cluster metrics history, V19 quarantine, V22 run logs, V24 playbook routes, V25 notification routing/inbox. A resources/prometheus/alert-rules.yml ships with the service.
Library modules
connector-sdk
connector-sdk is the connector framework library — no Spring application, packaged as a JAR consumed by metadata-service and pipeline-engine. It defines the connector contract and ships 21 connector implementations. It has no application.yml, only META-INF ServiceLoader registration.
Key classes
ConnectorSDK.kt— the coreAutoCloseableinterface:discoverSchema(config),extractData(config): DataStream,loadData(...),validateConnection/testConnection,capabilities(), plusconnectorType/connectorName/displayName.registry/ConnectorRegistry.kt— singleton registry:register,getConnector,listConnectors;discoverConnectors()via JavaServiceLoader. Paired withConnectorAutoConfiguration.kt.- Base classes —
base/JDBCConnectorBase.kt,base/FileConnectorBase.kt,base/NativeConnectorBase.kt. - Model —
ConnectorConfig,ConnectorCapability(enum: SCHEMA_DISCOVERY, READ, WRITE, COLUMN_PROJECTION, PREDICATE_PUSHDOWN, SCHEMA_EVOLUTION, STREAMING, PARTITIONING, COMPRESSION, CLOUD_STORAGE, BATCH_WRITE),Schema,Table,Column,DataStream,DataBatch,DataRecord,EventStream,StorageType,ConnectorException.
Connector implementations (impl/, 21)
| Category | Connectors |
|---|---|
| Databases (JDBC) | postgresql, mysql, mssql, oracle, sybase, teradata, hana (SAP HANA) |
| Cloud warehouses | bigquery, databricks, snowflake |
| Files | csv, json, xml, excel, parquet (with FileFormatDetector, StorageAdapter) |
| Streaming / messaging | kafka (offset manager, schema registry, SerDe), pubsub, eventhubs |
| SaaS / ERP | salesforce, servicenow, sap (SapErpConnector, SapODataParser), rest |
| CDC | DebeziumCdcConnector, DebeziumCdcManager, MysqlBinlogCDC, MssqlChangeDataCapture, MongodbChangeStream |
| Telecom CDR | CdrConnector, Asn1Decoder, CdrBinaryDecoder, CdrParquetConverter, with Tap3Template, RapTemplate, NrtrdeTemplate roaming formats |
| NoSQL | mongodb (connector, aggregation, change stream, schema mapper) |
pushdown-sql
pushdown-sql is a SQL dialect transpilation service — it translates ANSI/canonical SQL into target-database dialects so query logic can be pushed down to source or target databases. PushdownSqlApplication.kt is a Spring Boot app that excludes DataSource, JPA, and Security autoconfiguration — it is a pure compute service.
Port: 8083. No database, no security, no JPA. Actuator exposes health/info/metrics. No REST controller — it is exposed as an injectable service/library to other modules.
Key classes
transpiler/SqlTranspiler.kt—@Service;transpile(sql, targetDialect): TranspileResultparses SQL with JSqlParser (CCJSqlParserUtil), validates syntax, delegates to the dialect, and collects warnings (unsupported LATERAL join, ARRAY type, over-length identifiers). ProvidestranslateFunction,translateDataType,getAvailableDialects.transpiler/dialect/Dialect.kt— the dialect interface:name,transpile, identifier quoting, capability flags (supportsWindowFunctions,supportsCTE,supportsLateralJoin,supportsArrayType),maxIdentifierLength,translateFunction,translateDataType,translateLimitOffset/paginationSql,dualTable,castExpression.- Dialect implementations (8) —
PostgresDialect,MssqlDialect,OracleDialect,SnowflakeDialect,DatabricksDialect,HanaDialect,SybaseDialect,TeradataDialect. - Visitors —
visitor/SqlAstVisitor.kt,FunctionMapper.kt,DataTypeMapper.kt. TranspileResult— success flag, original/transpiled SQL, target dialect, warnings, error.
Cross-cutting notes
- Identity propagation — the gateway validates the Keycloak JWT once; downstream Spring MVC services trust gateway-injected
X-User-*headers (read intoSecurityContextHolderfromcommon). Each service also independently configures an OAuth2 resource server as defense-in-depth. - Database strategy — metadata-service owns the catalog/governance schema; lineage-service shares that database with Flyway disabled; pipeline-engine and monitor-service maintain their own Flyway-managed migration sets with per-service history tables.
- Real-time — SSE for monitor alerts/metrics and the notification inbox; WebSocket for pipeline run status streaming.
- Telecom specifics — a CDR connector with ASN.1 / TAP3 / RAP / NRTRDE roaming-record templates, a CDR metrics controller in metadata-service, GDPR and Polish-compliance modules, and schedules defaulting to
Europe/Warsaw.