Architecture
System architecture
DataFlow AI is an ETL/ELT data-integration and modernization product built for Polkomtel, the Polish "Plus" telecom operator. It is a polyglot microservices monorepo fronted by a single API gateway, and this page maps the whole system end to end.
Overview
DataFlow AI converts, runs, monitors, and governs data pipelines for a telecom data estate. The platform is organized as a set of independently built services that cooperate over HTTP, Kafka, and a shared PostgreSQL database. There are four runtime families:
- JVM platform — Kotlin + Spring Boot, a Gradle multi-module build under the package root
com.polkomtel.dataflow. Seven modules: five deployable services plus two libraries. - AI services — two Python 3.12 / FastAPI services:
copilotandmigration-engine. - Frontend — a React 19 + Vite 7 + TypeScript single-page application.
- Edge tooling — a Go command-line tool (
backend/cli/) and a browser extension (browser-extension/).
The architectural style is microservices behind an API gateway: a single reactive Spring Cloud Gateway is the only ingress, it authenticates every request against Keycloak, and proxies to six downstream services. Service-to-service calls are synchronous HTTP; Kafka carries connector and change-data-capture (CDC) workloads; SSE and WebSocket push real-time updates to the UI.
One database, many migration histories
All JVM and AI services point at the same PostgreSQL instance and database (dataflow_metadata). Logical isolation is achieved by per-service Flyway migration histories and table-name prefixes (engine_*, monitor_*), not by separate physical schemas.
Microservices topology
The diagram below shows the deployable units, the single ingress, and the shared infrastructure tier.
┌─────────────────────────────┐
│ Browser SPA (React 19) │
│ served by nginx :3006 │
└──────────────┬──────────────┘
│ HTTPS (Bearer JWT, axios baseURL=/api/v1)
│ + Keycloak OIDC login redirect
▼
┌─────────────────────────────┐
│ API Gateway :8085→8080 │
│ (Spring Cloud Gateway, │
│ reactive WebFlux) │
│ - JWT validation (Keycloak) │
│ - AuthFilter → X-User-* hdrs│
│ - Redis rate limiting │
│ - PII masking / sec headers │
└──┬───┬───┬───┬───┬───┬───────┘
/api/v1/... │ │ │ │ │ │
┌──────────────┬──────────┘ │ │ │ │ └──────────────┐
▼ ▼ ▼ ▼ ▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ ┌──────────┐ ┌────────────┐
│metadata │ │ pipeline │ │ lineage │ │ monitor │ │ copilot │ │ migration │
│ :8181 │ │ engine │ │ :8083 │ │ :8084 │ │ :8090 │ │ engine │
│ │ │ :8082 │ │ │ │ │ │ (FastAPI)│ │ :8091 │
└────┬────┘ └────┬─────┘ └────┬─────┘ └────┬────┘ └────┬─────┘ └─────┬──────┘
│ │ │ │ │ │
└───────────┴────────────┴────────────┴───────────┴─────────────┘
│
┌────────────┬───────────┼────────────┬──────────────┐
▼ ▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌────────┐ ┌──────────┐ ┌──────────────┐
│PostgreSQL│ │ Kafka │ │ Redis │ │ MinIO │ │ Keycloak │
│ +pgvector│ │+Zookeeper│ │ │ │ (S3) │ │ (OIDC) │
└──────────┘ └──────────┘ └────────┘ └──────────┘ └──────────────┘
│
Prometheus → Grafana (scrape /actuator/prometheus, /metrics)
The gateway's RouteConfig references six downstream targets. Note that copilot-service and migration-service are gateway route names; the compose containers are named copilot and migration-engine, addressed via the COPILOT_SERVICE_URL / MIGRATION_SERVICE_URL environment variables.
Service inventory and ports
Inside the Docker network every JVM service listens on 8080 and the AI services on 8000; only the host-published ports differ. The gateway resolves downstream services by service-name URLs.
| Service | Tech | Container port | Host port | DB / Flyway | Role |
|---|---|---|---|---|---|
| api-gateway | Kotlin / Spring Cloud Gateway (reactive WebFlux) | 8080 | 8085 | none (uses Redis) | Single ingress: routing, JWT validation, rate limiting, PII masking, security headers |
| metadata-service | Kotlin / Spring MVC + JPA | 8080 | 8181 | dataflow_metadata, Flyway metadata (V1–V54) | Catalog, connections, governance, GDPR, quality, data products, MCP server |
| pipeline-engine | Kotlin / Spring MVC + JPA | 8080 | 8082 | shared DB, Flyway engine (V1–V9) | Pipeline DAG compilation/execution, scheduling, self-healing, GDPR erasure |
| lineage-service | Kotlin / Spring MVC + JPA | 8080 | 8083 | shared DB, Flyway disabled | Dataset/column lineage, OpenLineage ingestion, impact analysis |
| monitor-service | Kotlin / Spring MVC + JPA | 8080 | 8084 | shared DB, Flyway monitor (V9–V25) | Alerts, metrics, cost, SLA, freshness, notifications, SSE streams |
| copilot | Python 3.12 / FastAPI | 8000 | 8090 | shared DB (pgvector) | NL-to-pipeline, conversational AI, NL-to-SQL, RAG search, RCA |
| migration-engine | Python 3.12 / FastAPI | 8000 | 8091 | shared DB | Converts legacy ETL (Informatica/Alteryx/SSIS/DataStage) → DataFlow YAML |
| frontend | React 19 / Vite, served by nginx | 80 | 3006 | n/a | Single-page application UI |
Two modules are libraries built into the JVM services rather than deployed as containers: connector-sdk (connector framework plus 21 connector implementations) and common (shared security, models, exception handling). pushdown-sql is a Spring Boot SQL-dialect transpiler used as an injectable service/library; it is not wired into the compose file as a standalone container.
Infrastructure components
| Component | Image | Port(s) | Purpose |
|---|---|---|---|
| PostgreSQL | pgvector/pgvector:pg15 | 5432 | Shared relational store + pgvector embeddings; also hosts the dataflow_keycloak DB |
| Keycloak | quay.io/keycloak/keycloak:24.0 | 8180→8080 | OIDC / SSO identity provider; realm dataflow from realm-export.json |
| Kafka | confluentinc/cp-kafka:7.7.1 | 9092 / 29092 | Event streaming backbone (connectors, CDC/Debezium) |
| Zookeeper | confluentinc/cp-zookeeper:7.7.1 | 2181 | Kafka coordination |
| Redis | redis:7-alpine | 6379 | Gateway rate limiting (sorted-set sliding window), caching, session store |
| MinIO | minio/minio | 9000 / 9001 | S3/GCS-compatible object storage (file connectors, CDR Parquet) |
| Prometheus | prom/prometheus:v2.54.1 | 9090 | Metrics scraping |
| Grafana | grafana/grafana:11.2.2 | 3001→3000 | Observability dashboards |
Request flow
All client traffic enters through the gateway under /api/v1/**. The gateway's RouteConfig declares a RouteLocator bean that maps path prefixes to downstream service URLs, and each route applies a filter chain.
Edge routing
RequestLoggingFilter— structured access logging.SecurityHeadersFilter— CSP and security headers on the response.PiiMaskingFilter— masks PII in response bodies.RedisRateLimitFilter— per-route Redis sliding-window limit keyedrate_limit:{ip}:{endpoint}, with per-endpointrequestsPerMinute/burstCapacity; on Redis failure withfail-closed:trueit returns 503 +Retry-After.removeRequestHeader("Cookie")+preserveHostHeader().
Routing highlights:
- Standard REST → metadata / pipeline-engine / lineage / monitor.
- A WebSocket route
pipeline-run-websocketat/api/v1/runs/*/streamrewriteshttp→wsto pipeline-engine. - SSE routes for copilot chat streaming and monitor alerts/metrics (
/api/v1/monitor/sse/**). /api/v1/notifications/**is deliberately routed to monitor-service — theNotificationInboxControllerlives there, not in metadata.- The AI services mount their routers under three prefixes (
/api/v1/ai/**,/api/v1/copilot/**,/api/copilot/**) so the gateway and frontend can address them however configured.
Browser → gateway → service → database
Browser ──▶ nginx :3006 load SPA
Browser ──▶ Keycloak :8180 OIDC login, redirect back with code
Browser keycloak-js exchanges code → access/refresh JWT
Browser ──▶ API Gateway :8085 GET/POST /api/v1/... (Authorization: Bearer JWT)
Gateway OAuth2 resource server validates JWT vs Keycloak JWKS
Gateway AuthFilter maps roles → DataFlowRole, injects X-User-* headers
Gateway RedisRateLimitFilter checks rate_limit:{ip}:{endpoint}
Gateway ──▶ downstream service :8080
Service re-validates JWT + reads X-User-* into SecurityContextHolder
Service RBAC permission check → query PostgreSQL → response
Gateway PiiMaskingFilter + SecurityHeadersFilter ──▶ Browser
The frontend uses a single axios instance (frontend/src/api/client.ts) with baseURL=/api/v1. A request interceptor awaits a waitForAuthReady promise so no request fires before the Keycloak token is available, and injects the bearer token and CSRF token. A response interceptor normalizes errors and implements a 401 → silent token refresh → retry-original-request flow.
Data flow and persistence
Shared database, partitioned migrations
All services target the same PostgreSQL instance and database. Logical isolation comes from per-service Flyway migration histories.
| Service | Flyway location | History table | DDL mode |
|---|---|---|---|
| metadata-service | db/migration/metadata (V1–V54) | flyway_schema_history | ddl-auto: validate |
| pipeline-engine | db/migration/engine (V1–V9) | flyway_schema_history_engine | ddl-auto: none |
| monitor-service | db/migration/monitor (V9–V25) | flyway_schema_history_monitor | — |
| lineage-service | Flyway disabled | reuses metadata's lineage tables (V4/V48) | ddl-auto: none |
This is a shared-database, schema-per-service-by-table-prefix pattern: engine_* tables for the engine, monitor_* tables for monitor, and catalog/governance/GDPR tables owned by metadata. lineage-service is a pure consumer of metadata's lineage tables. Keycloak uses a separate dataflow_keycloak database on the same Postgres instance.
Permissive Flyway settings
The compose Flyway settings are deliberately permissive — BASELINE_ON_MIGRATE, OUT_OF_ORDER, REPAIR_ON_MIGRATE, VALIDATE_ON_MIGRATE: false, BASELINE_VERSION: 20.1 — so a shared database with overlapping legacy histories converges on startup.
Object storage and vectors
- MinIO provides S3/GCS-compatible storage for file connectors and CDR Parquet conversion.
- pgvector backs the copilot's RAG embeddings store. The copilot initializes an asyncpg pool and ensures the embeddings schema on startup, degrading to
rag_mode: "unavailable"if pgvector is unreachable.
Event streaming
Kafka + Zookeeper form the streaming backbone. Kafka usage is concentrated in:
- connector-sdk — the Kafka connector (
KafkaConnector,KafkaOffsetManager,KafkaSchemaRegistry, SerDe), a Pub/Sub connector, and CDC via Debezium (DebeziumCdcConnector,DebeziumCdcManager,DebeziumEventConsumer). - pipeline-engine — streaming jobs (
FlinkJobBuilder) and theStreamingQualityEngine/StreamingQualityController.
JVM services receive KAFKA_BOOTSTRAP_SERVERS: kafka:29092 (internal listener). Kafka carries connector and CDC ingestion workloads rather than acting as an inter-microservice command bus — service-to-service calls remain synchronous HTTP.
Integration patterns
| Pattern | Where used | Mechanism |
|---|---|---|
| API gateway / single ingress | All client traffic | Spring Cloud Gateway, declarative RouteLocator |
| Token-based auth + header propagation | Gateway → downstream | Keycloak JWT validated once; X-User-* headers injected |
| Defense-in-depth resource server | Every JVM service | Each also runs an OAuth2 resource server against Keycloak |
| Synchronous service-to-service HTTP | Gateway ↔ services | Plain HTTP by service-name URL |
| Event streaming | Connector ingestion, CDC, streaming pipelines | Kafka topics; Debezium for change capture |
| Server-Sent Events (SSE) | Monitor alerts/metrics, notification inbox, copilot chat | text/event-stream; gateway SSE routes; frontend SSEManager |
| WebSocket | Pipeline run status / log streaming | /api/v1/runs/*/stream, gateway http→ws rewrite |
| OpenLineage emission/ingestion | pipeline-engine → lineage-service | OpenLineageEmitter emits RunEvents; OpenLineageEventController ingests |
| Connector SDK / plugin architecture | metadata-service & pipeline-engine | ConnectorSDK interface; ConnectorRegistry via Java ServiceLoader |
| External orchestrator adapters | pipeline-engine | AirflowClient / AirflowDagGenerator, AutomateNowAdapter |
| External compute integration | pipeline-engine | Flink (FlinkClusterManager) and Spark/Dataproc (DataprocJobSubmitter) |
| Pluggable LLM provider abstraction | copilot | LLMProvider ABC + factory.py; LLM_PROVIDER switches anthropic / openrouter / local |
| RAG / vector search | copilot | pgvector embeddings; degrades gracefully when unavailable |
| Model Context Protocol (MCP) server | metadata-service | McpServerController at /api/v1/mcp, /mcp |
| SQL pushdown / transpilation | pushdown-sql | JSqlParser-based transpiler, 8 target dialects |
| Git-backed pipeline version control | pipeline-engine | GitRepositoryManager, PipelineVersionControl, YamlDiffEngine |
| Hash-chained immutable audit log | metadata-service | AuditChainVerifierService, audit-log hash chain (Flyway V27/V29) |
Cross-cutting concerns
| Concern | Implementation |
|---|---|
| AuthN | Keycloak 24 OIDC, realm dataflow; keycloak-js in the SPA; JWTs validated at the gateway and again at each resource server |
| AuthZ | Hierarchical RBAC in common/RBACService: DataFlowRole ADMIN(100) > ENGINEER(75) > ANALYST(50) > STEWARD(40) > VIEWER(25); gateway method/role rules (DELETE→ADMIN, write→ADMIN/ENGINEER, GET→any role) |
| Identity propagation | X-User-* headers from gateway → SecurityContextHolder ThreadLocal |
| Rate limiting | Redis sorted-set sliding window per IP+endpoint; fail-closed circuit breaker; X-RateLimit-* headers |
| PII protection | Gateway PiiMaskingFilter on responses; metadata-service DynamicMaskingService; PII classifier; lineage PII-propagation detection |
| Audit | common/AuditInterceptor + AuditPersistence; metadata-service hash-chained immutable audit log with AuditChainVerifierService |
| Observability — metrics | Spring Actuator /actuator/prometheus; monitor-service OpenTelemetry + Prometheus; copilot exposes /metrics; scraped by Prometheus → Grafana |
| Observability — tracing | monitor-service OpenTelemetryConfig + TracingFilter; request-id propagation (X-Request-Id) |
| Health checks | Gateway HealthController with actuator/health/{liveness,readiness,startup}; readiness probes Keycloak .well-known + metadata-service health |
| Error handling | common/GlobalExceptionHandler @ControllerAdvice + typed ApiException hierarchy; SPA react-error-boundary + axios error normalization |
| Real-time | SSE (alerts/metrics/notifications/copilot chat) + WebSocket (pipeline run streaming) |
| Compliance | GDPR + Polish-compliance modules (GdprService, PolishComplianceService, DSAR, retention, breach notification); schedules default to Europe/Warsaw |
Key sequence flows
Login and authenticated API request
Browser ──(1) load SPA──▶ nginx :3006
Browser ──(2) OIDC redirect──▶ Keycloak :8180
Keycloak ──(3) login + redirect back with code──▶ Browser
Browser (4) keycloak-js exchanges code → access/refresh JWT; waitForAuthReady resolves
Browser ──(5) GET /api/v1/... (Bearer JWT)──▶ API Gateway :8085
Gateway (6) OAuth2 resource server validates JWT signature/issuer/audience vs JWKS
Gateway (7) AuthFilter maps Keycloak roles → DataFlowRole, injects X-User-* headers
Gateway (8) RedisRateLimitFilter checks/updates rate_limit:{ip}:{endpoint}
Gateway ──(9) proxy to downstream :8080──▶ metadata / pipeline / monitor / ...
Service (10) re-validates JWT + reads X-User-* into SecurityContextHolder
Service (11) RBAC permission check → query PostgreSQL → response
Gateway (12) PiiMaskingFilter + SecurityHeadersFilter ──▶ Browser
On 401: response interceptor silently refreshes the token and retries the request.
Pipeline run and real-time status streaming
Frontend ──POST /api/v1/pipelines/{id}/run──▶ Gateway ──▶ pipeline-engine (ExecutionController)
pipeline-engine: PipelineRunner parses YAML DSL → ParameterResolver → PipelineValidator
→ DagBuilder builds ExecutionDAG (cycle/dangling detection)
→ executes tasks in topological order (parallel within DAG levels)
→ ExecutionContext supports cooperative cancellation
For streaming/compute: FlinkJobSubmitter / Spark DataprocJobSubmitter; or AirflowClient
On task failure: SelfHealingService → FailureClassifier → RecoveryStrategies
pipeline-engine: ExecutionEventPublisher / PipelineRunLogPublisher push run events
pipeline-engine: OpenLineageEmitter emits RunEvents ──▶ lineage-service /openlineage/events
Frontend ──WS /api/v1/runs/{runId}/stream──▶ Gateway (http→ws) ──▶ PipelineStatusWebSocket
→ live task-state / log updates streamed to the SPA log viewer
On completion: monitor-service ingests run metrics → cost/SLA/freshness eval → alerts
Copilot request (NL-to-pipeline / chat with RAG)
Frontend ──POST /api/v1/copilot/chat (SSE)──▶ Gateway ──▶ copilot :8090
copilot: RAGService queries pgvector for relevant catalog/pipeline context
(rag_max_results=5, similarity_threshold=0.72)
degrades to non-RAG chat if pgvector unavailable
copilot: LLM provider factory selects anthropic | openrouter | local per LLM_PROVIDER
copilot: LLMProvider.stream() → token deltas (StreamChunk) streamed back as SSE events
(X-Copilot-Confidence header set); can produce DataFlow YAML pipeline definitions
Frontend: AICopilotSidebar renders streamed tokens; generated pipeline → Design Studio
Connector ingestion and lineage
metadata-service / pipeline-engine load a connector via ConnectorRegistry (ServiceLoader)
Connector.discoverSchema() → catalog tables/columns persisted (metadata-service catalog)
Pipeline execution: Connector.extractData() → DataStream → transform → loadData() to target
Streaming/CDC connectors (Kafka, Debezium) publish/consume change events via Kafka topics
pipeline-engine OpenLineageEmitter → lineage-service builds dataset/column lineage graph
lineage-service: ImpactAnalyzer + PropagationWorker propagate tags/PII across the graph
Architectural observations
A few properties of the system are worth keeping in mind when extending it:
- Shared single database for all JVM and Python services. Isolation is by table prefix and per-service Flyway history tables, not physical schemas. This couples services at the data layer — closer to a distributed monolith over one database than fully autonomous microservices.
- lineage-service has Flyway disabled and depends on metadata-service having already applied its lineage migrations (V4/V48) — a hidden deploy-time ordering dependency.
- Inter-service communication is synchronous HTTP with no service mesh, circuit breakers, or service discovery beyond Docker DNS and env-var URLs.
- Identity is double-validated at the gateway and again at each resource server — strong defense-in-depth — but the
dev-permit-readsflag defaults totruein compose, which would permit unauthenticated reads if left enabled. - Service naming mismatch: gateway routes name
copilot-service/migration-servicewhile the compose containers arecopilot/migration-engine; reconciled only via theCOPILOT_SERVICE_URL/MIGRATION_SERVICE_URLenv vars.