Architecture

System architecture

DataFlow AI is an ETL/ELT data-integration and modernization product built for Polkomtel, the Polish "Plus" telecom operator. It is a polyglot microservices monorepo fronted by a single API gateway, and this page maps the whole system end to end.


Overview

DataFlow AI converts, runs, monitors, and governs data pipelines for a telecom data estate. The platform is organized as a set of independently built services that cooperate over HTTP, Kafka, and a shared PostgreSQL database. There are four runtime families:

  • JVM platform — Kotlin + Spring Boot, a Gradle multi-module build under the package root com.polkomtel.dataflow. Seven modules: five deployable services plus two libraries.
  • AI services — two Python 3.12 / FastAPI services: copilot and migration-engine.
  • Frontend — a React 19 + Vite 7 + TypeScript single-page application.
  • Edge tooling — a Go command-line tool (backend/cli/) and a browser extension (browser-extension/).

The architectural style is microservices behind an API gateway: a single reactive Spring Cloud Gateway is the only ingress, it authenticates every request against Keycloak, and proxies to six downstream services. Service-to-service calls are synchronous HTTP; Kafka carries connector and change-data-capture (CDC) workloads; SSE and WebSocket push real-time updates to the UI.

One database, many migration histories

All JVM and AI services point at the same PostgreSQL instance and database (dataflow_metadata). Logical isolation is achieved by per-service Flyway migration histories and table-name prefixes (engine_*, monitor_*), not by separate physical schemas.


Microservices topology

The diagram below shows the deployable units, the single ingress, and the shared infrastructure tier.

                          ┌─────────────────────────────┐
                          │   Browser SPA (React 19)     │
                          │   served by nginx :3006      │
                          └──────────────┬──────────────┘
                                         │ HTTPS (Bearer JWT, axios baseURL=/api/v1)
                                         │ + Keycloak OIDC login redirect

                          ┌─────────────────────────────┐
                          │   API Gateway :8085→8080     │
                          │  (Spring Cloud Gateway,      │
                          │   reactive WebFlux)          │
                          │  - JWT validation (Keycloak) │
                          │  - AuthFilter → X-User-* hdrs│
                          │  - Redis rate limiting       │
                          │  - PII masking / sec headers │
                          └──┬───┬───┬───┬───┬───┬───────┘
            /api/v1/...      │   │   │   │   │   │
   ┌──────────────┬──────────┘   │   │   │   │   └──────────────┐
   ▼              ▼              ▼   ▼   ▼   ▼                  ▼
┌─────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ ┌──────────┐ ┌────────────┐
│metadata │ │ pipeline │ │ lineage  │ │ monitor │ │ copilot  │ │ migration  │
│ :8181   │ │ engine   │ │ :8083    │ │ :8084   │ │ :8090    │ │ engine     │
│         │ │ :8082    │ │          │ │         │ │ (FastAPI)│ │ :8091      │
└────┬────┘ └────┬─────┘ └────┬─────┘ └────┬────┘ └────┬─────┘ └─────┬──────┘
     │           │            │            │           │             │
     └───────────┴────────────┴────────────┴───────────┴─────────────┘

        ┌────────────┬───────────┼────────────┬──────────────┐
        ▼            ▼           ▼            ▼              ▼
  ┌──────────┐ ┌──────────┐ ┌────────┐ ┌──────────┐ ┌──────────────┐
  │PostgreSQL│ │  Kafka   │ │ Redis  │ │  MinIO   │ │  Keycloak    │
  │ +pgvector│ │+Zookeeper│ │        │ │  (S3)    │ │  (OIDC)      │
  └──────────┘ └──────────┘ └────────┘ └──────────┘ └──────────────┘

                Prometheus → Grafana (scrape /actuator/prometheus, /metrics)

The gateway's RouteConfig references six downstream targets. Note that copilot-service and migration-service are gateway route names; the compose containers are named copilot and migration-engine, addressed via the COPILOT_SERVICE_URL / MIGRATION_SERVICE_URL environment variables.


Service inventory and ports

Inside the Docker network every JVM service listens on 8080 and the AI services on 8000; only the host-published ports differ. The gateway resolves downstream services by service-name URLs.

ServiceTechContainer portHost portDB / FlywayRole
api-gatewayKotlin / Spring Cloud Gateway (reactive WebFlux)80808085none (uses Redis)Single ingress: routing, JWT validation, rate limiting, PII masking, security headers
metadata-serviceKotlin / Spring MVC + JPA80808181dataflow_metadata, Flyway metadata (V1–V54)Catalog, connections, governance, GDPR, quality, data products, MCP server
pipeline-engineKotlin / Spring MVC + JPA80808082shared DB, Flyway engine (V1–V9)Pipeline DAG compilation/execution, scheduling, self-healing, GDPR erasure
lineage-serviceKotlin / Spring MVC + JPA80808083shared DB, Flyway disabledDataset/column lineage, OpenLineage ingestion, impact analysis
monitor-serviceKotlin / Spring MVC + JPA80808084shared DB, Flyway monitor (V9–V25)Alerts, metrics, cost, SLA, freshness, notifications, SSE streams
copilotPython 3.12 / FastAPI80008090shared DB (pgvector)NL-to-pipeline, conversational AI, NL-to-SQL, RAG search, RCA
migration-enginePython 3.12 / FastAPI80008091shared DBConverts legacy ETL (Informatica/Alteryx/SSIS/DataStage) → DataFlow YAML
frontendReact 19 / Vite, served by nginx803006n/aSingle-page application UI

Two modules are libraries built into the JVM services rather than deployed as containers: connector-sdk (connector framework plus 21 connector implementations) and common (shared security, models, exception handling). pushdown-sql is a Spring Boot SQL-dialect transpiler used as an injectable service/library; it is not wired into the compose file as a standalone container.

Infrastructure components

ComponentImagePort(s)Purpose
PostgreSQLpgvector/pgvector:pg155432Shared relational store + pgvector embeddings; also hosts the dataflow_keycloak DB
Keycloakquay.io/keycloak/keycloak:24.08180→8080OIDC / SSO identity provider; realm dataflow from realm-export.json
Kafkaconfluentinc/cp-kafka:7.7.19092 / 29092Event streaming backbone (connectors, CDC/Debezium)
Zookeeperconfluentinc/cp-zookeeper:7.7.12181Kafka coordination
Redisredis:7-alpine6379Gateway rate limiting (sorted-set sliding window), caching, session store
MinIOminio/minio9000 / 9001S3/GCS-compatible object storage (file connectors, CDR Parquet)
Prometheusprom/prometheus:v2.54.19090Metrics scraping
Grafanagrafana/grafana:11.2.23001→3000Observability dashboards

Request flow

All client traffic enters through the gateway under /api/v1/**. The gateway's RouteConfig declares a RouteLocator bean that maps path prefixes to downstream service URLs, and each route applies a filter chain.

Edge routing

  1. RequestLoggingFilter — structured access logging.
  2. SecurityHeadersFilter — CSP and security headers on the response.
  3. PiiMaskingFilter — masks PII in response bodies.
  4. RedisRateLimitFilter — per-route Redis sliding-window limit keyed rate_limit:{ip}:{endpoint}, with per-endpoint requestsPerMinute / burstCapacity; on Redis failure with fail-closed:true it returns 503 + Retry-After.
  5. removeRequestHeader("Cookie") + preserveHostHeader().

Routing highlights:

  • Standard REST → metadata / pipeline-engine / lineage / monitor.
  • A WebSocket route pipeline-run-websocket at /api/v1/runs/*/stream rewrites http→ws to pipeline-engine.
  • SSE routes for copilot chat streaming and monitor alerts/metrics (/api/v1/monitor/sse/**).
  • /api/v1/notifications/** is deliberately routed to monitor-service — the NotificationInboxController lives there, not in metadata.
  • The AI services mount their routers under three prefixes (/api/v1/ai/**, /api/v1/copilot/**, /api/copilot/**) so the gateway and frontend can address them however configured.

Browser → gateway → service → database

Browser  ──▶  nginx :3006        load SPA
Browser  ──▶  Keycloak :8180     OIDC login, redirect back with code
Browser       keycloak-js exchanges code → access/refresh JWT
Browser  ──▶  API Gateway :8085  GET/POST /api/v1/...  (Authorization: Bearer JWT)
Gateway       OAuth2 resource server validates JWT vs Keycloak JWKS
Gateway       AuthFilter maps roles → DataFlowRole, injects X-User-* headers
Gateway       RedisRateLimitFilter checks rate_limit:{ip}:{endpoint}
Gateway  ──▶  downstream service :8080
Service       re-validates JWT + reads X-User-* into SecurityContextHolder
Service       RBAC permission check → query PostgreSQL → response
Gateway       PiiMaskingFilter + SecurityHeadersFilter  ──▶  Browser

The frontend uses a single axios instance (frontend/src/api/client.ts) with baseURL=/api/v1. A request interceptor awaits a waitForAuthReady promise so no request fires before the Keycloak token is available, and injects the bearer token and CSRF token. A response interceptor normalizes errors and implements a 401 → silent token refresh → retry-original-request flow.


Data flow and persistence

Shared database, partitioned migrations

All services target the same PostgreSQL instance and database. Logical isolation comes from per-service Flyway migration histories.

ServiceFlyway locationHistory tableDDL mode
metadata-servicedb/migration/metadata (V1–V54)flyway_schema_historyddl-auto: validate
pipeline-enginedb/migration/engine (V1–V9)flyway_schema_history_engineddl-auto: none
monitor-servicedb/migration/monitor (V9–V25)flyway_schema_history_monitor
lineage-serviceFlyway disabledreuses metadata's lineage tables (V4/V48)ddl-auto: none

This is a shared-database, schema-per-service-by-table-prefix pattern: engine_* tables for the engine, monitor_* tables for monitor, and catalog/governance/GDPR tables owned by metadata. lineage-service is a pure consumer of metadata's lineage tables. Keycloak uses a separate dataflow_keycloak database on the same Postgres instance.

Permissive Flyway settings

The compose Flyway settings are deliberately permissive — BASELINE_ON_MIGRATE, OUT_OF_ORDER, REPAIR_ON_MIGRATE, VALIDATE_ON_MIGRATE: false, BASELINE_VERSION: 20.1 — so a shared database with overlapping legacy histories converges on startup.

Object storage and vectors

  • MinIO provides S3/GCS-compatible storage for file connectors and CDR Parquet conversion.
  • pgvector backs the copilot's RAG embeddings store. The copilot initializes an asyncpg pool and ensures the embeddings schema on startup, degrading to rag_mode: "unavailable" if pgvector is unreachable.

Event streaming

Kafka + Zookeeper form the streaming backbone. Kafka usage is concentrated in:

  • connector-sdk — the Kafka connector (KafkaConnector, KafkaOffsetManager, KafkaSchemaRegistry, SerDe), a Pub/Sub connector, and CDC via Debezium (DebeziumCdcConnector, DebeziumCdcManager, DebeziumEventConsumer).
  • pipeline-engine — streaming jobs (FlinkJobBuilder) and the StreamingQualityEngine / StreamingQualityController.

JVM services receive KAFKA_BOOTSTRAP_SERVERS: kafka:29092 (internal listener). Kafka carries connector and CDC ingestion workloads rather than acting as an inter-microservice command bus — service-to-service calls remain synchronous HTTP.


Integration patterns

PatternWhere usedMechanism
API gateway / single ingressAll client trafficSpring Cloud Gateway, declarative RouteLocator
Token-based auth + header propagationGateway → downstreamKeycloak JWT validated once; X-User-* headers injected
Defense-in-depth resource serverEvery JVM serviceEach also runs an OAuth2 resource server against Keycloak
Synchronous service-to-service HTTPGateway ↔ servicesPlain HTTP by service-name URL
Event streamingConnector ingestion, CDC, streaming pipelinesKafka topics; Debezium for change capture
Server-Sent Events (SSE)Monitor alerts/metrics, notification inbox, copilot chattext/event-stream; gateway SSE routes; frontend SSEManager
WebSocketPipeline run status / log streaming/api/v1/runs/*/stream, gateway http→ws rewrite
OpenLineage emission/ingestionpipeline-engine → lineage-serviceOpenLineageEmitter emits RunEvents; OpenLineageEventController ingests
Connector SDK / plugin architecturemetadata-service & pipeline-engineConnectorSDK interface; ConnectorRegistry via Java ServiceLoader
External orchestrator adapterspipeline-engineAirflowClient / AirflowDagGenerator, AutomateNowAdapter
External compute integrationpipeline-engineFlink (FlinkClusterManager) and Spark/Dataproc (DataprocJobSubmitter)
Pluggable LLM provider abstractioncopilotLLMProvider ABC + factory.py; LLM_PROVIDER switches anthropic / openrouter / local
RAG / vector searchcopilotpgvector embeddings; degrades gracefully when unavailable
Model Context Protocol (MCP) servermetadata-serviceMcpServerController at /api/v1/mcp, /mcp
SQL pushdown / transpilationpushdown-sqlJSqlParser-based transpiler, 8 target dialects
Git-backed pipeline version controlpipeline-engineGitRepositoryManager, PipelineVersionControl, YamlDiffEngine
Hash-chained immutable audit logmetadata-serviceAuditChainVerifierService, audit-log hash chain (Flyway V27/V29)

Cross-cutting concerns

ConcernImplementation
AuthNKeycloak 24 OIDC, realm dataflow; keycloak-js in the SPA; JWTs validated at the gateway and again at each resource server
AuthZHierarchical RBAC in common/RBACService: DataFlowRole ADMIN(100) > ENGINEER(75) > ANALYST(50) > STEWARD(40) > VIEWER(25); gateway method/role rules (DELETE→ADMIN, write→ADMIN/ENGINEER, GET→any role)
Identity propagationX-User-* headers from gateway → SecurityContextHolder ThreadLocal
Rate limitingRedis sorted-set sliding window per IP+endpoint; fail-closed circuit breaker; X-RateLimit-* headers
PII protectionGateway PiiMaskingFilter on responses; metadata-service DynamicMaskingService; PII classifier; lineage PII-propagation detection
Auditcommon/AuditInterceptor + AuditPersistence; metadata-service hash-chained immutable audit log with AuditChainVerifierService
Observability — metricsSpring Actuator /actuator/prometheus; monitor-service OpenTelemetry + Prometheus; copilot exposes /metrics; scraped by Prometheus → Grafana
Observability — tracingmonitor-service OpenTelemetryConfig + TracingFilter; request-id propagation (X-Request-Id)
Health checksGateway HealthController with actuator/health/{liveness,readiness,startup}; readiness probes Keycloak .well-known + metadata-service health
Error handlingcommon/GlobalExceptionHandler @ControllerAdvice + typed ApiException hierarchy; SPA react-error-boundary + axios error normalization
Real-timeSSE (alerts/metrics/notifications/copilot chat) + WebSocket (pipeline run streaming)
ComplianceGDPR + Polish-compliance modules (GdprService, PolishComplianceService, DSAR, retention, breach notification); schedules default to Europe/Warsaw

Key sequence flows

Login and authenticated API request

Browser ──(1) load SPA──▶ nginx :3006
Browser ──(2) OIDC redirect──▶ Keycloak :8180
Keycloak ──(3) login + redirect back with code──▶ Browser
Browser  (4) keycloak-js exchanges code → access/refresh JWT; waitForAuthReady resolves
Browser ──(5) GET /api/v1/... (Bearer JWT)──▶ API Gateway :8085
Gateway  (6) OAuth2 resource server validates JWT signature/issuer/audience vs JWKS
Gateway  (7) AuthFilter maps Keycloak roles → DataFlowRole, injects X-User-* headers
Gateway  (8) RedisRateLimitFilter checks/updates rate_limit:{ip}:{endpoint}
Gateway ──(9) proxy to downstream :8080──▶ metadata / pipeline / monitor / ...
Service  (10) re-validates JWT + reads X-User-* into SecurityContextHolder
Service  (11) RBAC permission check → query PostgreSQL → response
Gateway  (12) PiiMaskingFilter + SecurityHeadersFilter ──▶ Browser
   On 401: response interceptor silently refreshes the token and retries the request.

Pipeline run and real-time status streaming

Frontend ──POST /api/v1/pipelines/{id}/run──▶ Gateway ──▶ pipeline-engine (ExecutionController)
pipeline-engine: PipelineRunner parses YAML DSL → ParameterResolver → PipelineValidator
                 → DagBuilder builds ExecutionDAG (cycle/dangling detection)
                 → executes tasks in topological order (parallel within DAG levels)
                 → ExecutionContext supports cooperative cancellation
   For streaming/compute: FlinkJobSubmitter / Spark DataprocJobSubmitter; or AirflowClient
   On task failure: SelfHealingService → FailureClassifier → RecoveryStrategies
pipeline-engine: ExecutionEventPublisher / PipelineRunLogPublisher push run events
pipeline-engine: OpenLineageEmitter emits RunEvents ──▶ lineage-service /openlineage/events
Frontend ──WS /api/v1/runs/{runId}/stream──▶ Gateway (http→ws) ──▶ PipelineStatusWebSocket
   → live task-state / log updates streamed to the SPA log viewer
On completion: monitor-service ingests run metrics → cost/SLA/freshness eval → alerts

Copilot request (NL-to-pipeline / chat with RAG)

Frontend ──POST /api/v1/copilot/chat (SSE)──▶ Gateway ──▶ copilot :8090
copilot: RAGService queries pgvector for relevant catalog/pipeline context
         (rag_max_results=5, similarity_threshold=0.72)
         degrades to non-RAG chat if pgvector unavailable
copilot: LLM provider factory selects anthropic | openrouter | local per LLM_PROVIDER
copilot: LLMProvider.stream() → token deltas (StreamChunk) streamed back as SSE events
         (X-Copilot-Confidence header set); can produce DataFlow YAML pipeline definitions
Frontend: AICopilotSidebar renders streamed tokens; generated pipeline → Design Studio

Connector ingestion and lineage

metadata-service / pipeline-engine load a connector via ConnectorRegistry (ServiceLoader)
Connector.discoverSchema() → catalog tables/columns persisted (metadata-service catalog)
Pipeline execution: Connector.extractData() → DataStream → transform → loadData() to target
Streaming/CDC connectors (Kafka, Debezium) publish/consume change events via Kafka topics
pipeline-engine OpenLineageEmitter → lineage-service builds dataset/column lineage graph
lineage-service: ImpactAnalyzer + PropagationWorker propagate tags/PII across the graph

Architectural observations

A few properties of the system are worth keeping in mind when extending it:

  • Shared single database for all JVM and Python services. Isolation is by table prefix and per-service Flyway history tables, not physical schemas. This couples services at the data layer — closer to a distributed monolith over one database than fully autonomous microservices.
  • lineage-service has Flyway disabled and depends on metadata-service having already applied its lineage migrations (V4/V48) — a hidden deploy-time ordering dependency.
  • Inter-service communication is synchronous HTTP with no service mesh, circuit breakers, or service discovery beyond Docker DNS and env-var URLs.
  • Identity is double-validated at the gateway and again at each resource server — strong defense-in-depth — but the dev-permit-reads flag defaults to true in compose, which would permit unauthenticated reads if left enabled.
  • Service naming mismatch: gateway routes name copilot-service / migration-service while the compose containers are copilot / migration-engine; reconciled only via the COPILOT_SERVICE_URL / MIGRATION_SERVICE_URL env vars.
Previous
Data & pipeline lifecycle