Architecture
AI services
The DataFlow AI Platform ships two Python 3.12 / FastAPI services that bring large-language-model capability to the data estate: the Copilot for conversational and generative assistance, and the Migration Engine for converting legacy ETL into DataFlow pipelines.
Overview
Both AI services live under backend/ai-services/, are built with Poetry, and run on FastAPI. They share the platform's PostgreSQL instance (the Copilot also uses its pgvector extension for RAG).
| Service | Path | Port | Purpose |
|---|---|---|---|
| Copilot | backend/ai-services/copilot/ | 8090 | NL-to-pipeline, conversational AI, NL-to-SQL, RAG search, suggestions, error diagnosis, agentic builder, RCA |
| Migration Engine | backend/ai-services/migration-engine/ | 8091 | Converts legacy ETL (Informatica PowerCenter, Alteryx, SSIS, DataStage) to DataFlow YAML pipelines |
A defining property of both services is graceful degradation — RAG, catalog, and LLM failures fall back to non-RAG chat, canned schemas, or explicit llm_unavailable markers rather than surfacing fakes. A second is pervasive confidence scoring — every AI output carries a 0–1 confidence score, and the Migration Engine uses it as a hard release gate.
Copilot service
Application structure
copilot/copilot/main.py builds the FastAPI app DataFlow AI Copilot v0.3.0.
- Triple route mounting — the same router set is mounted under three prefixes so it is reachable however the gateway or frontend addresses it:
/api/v1/ai/**— the canonical mount (api/router.py)./api/v1/copilot/**— the frontend'sbaseURL=/api/v1plus/copilot/chat./api/copilot/**— bare access whenVITE_API_URL=/api.
- Lifespan — on startup the service initializes the
RAGServicepgvector pool and ensures the embeddings schema, degrading gracefully torag_mode: "unavailable"if pgvector is unreachable; the pool is closed on shutdown. - Ops endpoints —
/health,/ready(returns 503 with reasonspgvector_unavailable/llm_unavailablewhen degraded), and/metrics(a minimal Prometheus text exposition: service info, uptime, request counter). - CORS exposes
X-Request-IdandX-Copilot-Confidence.
Configuration
Configuration uses pydantic-settings and is env-driven (copilot/config.py). Notable defaults:
| Setting | Default |
|---|---|
anthropic_api_key | "" |
anthropic_model | claude-sonnet-4-20250514 |
database_url | postgresql+asyncpg://postgres:postgres@localhost:5432/copilot |
catalog_base_url | http://localhost:8585/api/v1 |
service_port | 8090 |
rag_similarity_threshold | 0.72 |
rag_max_results | 5 |
Dependencies (pyproject.toml): fastapi, uvicorn, anthropic ^0.39, asyncpg, pgvector, httpx, pyyaml, numpy. sentence-transformers ^3.3 is an optional extra (ml group) — when it is not installed, RAG vector search degrades.
LLM provider factory
A pluggable provider layer lives under copilot/providers/ (base.py, factory.py, three implementations). The LLMProvider ABC defines complete() and stream(), returning a normalized CompletionResponse (content, model, prompt/completion tokens, latency_ms, finish_reason) and StreamChunk deltas. LLMProviderError is the uniform error type.
| Provider | File | Default model | Notes |
|---|---|---|---|
| anthropic | anthropic_provider.py | claude-sonnet-4-6-20251001 | Official anthropic SDK; native system kwarg; true streaming |
| openrouter | openrouter_provider.py | openai/gpt-oss-120b:free | Raw httpx to the OpenRouter OpenAI-compatible API; sends HTTP-Referer / X-Title |
| local | local_provider.py | llama3.2 | Any OpenAI-compatible self-hosted endpoint — Ollama, vLLM, LM Studio, llama.cpp |
factory.py reads the LLM_PROVIDER env var (anthropic | openrouter | local; default anthropic), caches a thread-safe singleton, and offers reset_provider() for reload. A legacy ClaudeClient shim (clients/claude_client.py) is kept for backward compatibility — every Copilot service calls self._claude.generate/stream/complete, but the shim now routes through factory.get_provider(). Flipping LLM_PROVIDER therefore rewires the whole Copilot stack with no caller changes. CatalogClient (clients/catalog_client.py) is an async httpx client to the OpenMetadata-compatible catalog (search_tables(), get_table(), get_lineage()).
Two distinct LLM-access paths
The Copilot's main services route LLM calls through the pluggable LLMProvider factory via the ClaudeClient shim. However, the Copilot's AgentService and the Migration Engine's LLMConverter call the anthropic SDK directly — they are not on the pluggable factory path.
API endpoints
All endpoints are reachable under each of /api/v1/ai, /api/v1/copilot, and /api/copilot.
| Endpoint | Method | Purpose |
|---|---|---|
/chat | POST | Conversational copilot (RAG-augmented), returns ChatResponse |
/chat/stream | GET | SSE streaming chat — events token / done / error |
/suggest | POST | Inline code completions (yaml/sql/python) |
/insights | GET | Proactive recommendations (cached 300s) |
/generate-pipeline | POST | NL → DataFlow YAML pipeline |
/sql/generate | POST | NL → SQL |
/sql/explain | POST | Plain-language SQL explanation |
/sql/optimize | POST | SQL optimization suggestions |
/sql/validate | POST | SQL validity check against a known schema |
/semantic/query | POST | NL→SQL with business-glossary enrichment |
/semantic/glossary | GET | List glossary metrics/dimensions/filters |
/diagnose | POST | Structured error diagnosis |
/quality/generate-rules | POST | NL → data quality rules |
/agent/chat | POST | Agentic pipeline builder (tool_use loop) |
/agent/approve | POST | Approve a pending destructive agent action |
/agent/status/{id} | GET | Agent session state |
/schema/match | POST | LLM source→target column matching |
/schema/match/override | POST | Apply manual match overrides |
/schema/match/{id} | GET | Fetch a stored match result |
/describe/asset | POST | Auto-generate catalog asset descriptions |
/catalog/ask | POST | Catalog Q&A grounded in metadata-service |
/rca/analyze | POST | Root-cause analysis agent |
/provider/status | GET | Active LLM provider health/model |
CopilotService — the central orchestrator
services/copilot_service.py is the central orchestrator. Its chat() flow is:
- Intent routing — keyword detection (
_SQL_INTENT_KEYWORDS,_ERROR_INTENT_INDICATORS) routes SQL questions to NL-to-SQL and error reports to error diagnosis. - RAG enrichment —
RAGService.get_context_for_query()runs best-effort, falling back to non-RAG chat on failure. - Builds an enriched system prompt (
build_chat_system_prompt) injecting pipeline context plus RAG results. - Calls Claude via
ClaudeClient.generate(). _parse_response()extracts[ACTION type=... label=... payload={...}]blocks into structuredActionobjects.- Applies a confidence heuristic — 0.7 base, +0.15 for RAG, +0.05 for pipeline context.
chat_stream() yields tokens via provider streaming. get_insights() uses an in-process TTL cache (300s) keyed on (pipeline_id, limit) — the LLM call is slow (~50s) and the dashboard polls every 60s, with a static _fallback_insights() on LLM failure. The SQL dialect is auto-detected from the message text (Teradata / Snowflake / Postgres / SAP HANA / MSSQL).
NL-to-SQL
services/nl_to_sql.py provides NLToSQLService — generate, explain, optimize, and validate SQL across five dialects (the SQLDialect enum: snowflake, teradata, postgresql, sap_hana, mssql). It has rich per-dialect hint blocks (_DIALECT_HINTS) and four system prompts. The workflow:
- Build schema context from RAG
search_with_scores()(table/column entities). - Optionally apply semantic-layer enrichment (on by default).
- Format the dialect-specific system prompt and call Claude (temperature 0.1).
- Extract SQL from the fenced block;
_extract_table_references()via regex;_estimate_confidence()heuristic;_detect_warnings()flagsSELECT *,DELETE/UPDATEwithoutWHERE,DROP/TRUNCATE, and dialect-invalidLIMIT.
Pipeline generation
services/pipeline_generator.py — PipelineGenerator.generate() fetches catalog schema for the requested source_systems, retrieves similar RAG templates best-effort, calls Claude with _GENERATOR_SYSTEM_PROMPT (the DataFlow YAML DSL spec), parses the YAML, extracts PipelineNode / PipelineEdge for the frontend graph, and applies a confidence heuristic on YAML structural quality.
RAG and the vector store
services/rag_service.py and services/embedding_pipeline.py implement RAG over pgvector cosine similarity.
- Embeddings use sentence-transformers
all-MiniLM-L6-v2(EMBEDDING_DIM= 384), lazy-loaded. - The
catalog_embeddingstable holdsentity_type,entity_id,entity_name,workspace_id,content,metadataJSONB, and avector(384)column, with anivfflatcosine index (lists=100) plus workspace and entity_type btree indexes. search()/search_with_scores()applyrag_similarity_threshold(0.72) and optionalworkspace_id/entity_typefilters, orderingBY embedding <=>.- When pgvector is down the service degrades to empty results (
mode="unavailable") — there is no silent keyword fallback. - CRUD operations:
index_catalog()upsert,delete_by_entity/all/stale,get_stats.
EmbeddingPipeline indexes catalog metadata into the vector store. It ships a built-in corpus of 6 doc chunks (Pipeline YAML DSL Reference, Connector Configuration Guide, Polkomtel Data Landscape, ETL Best Practices, Common Error Patterns, SQL Dialect Reference) and 5 pipeline templates. run_full_index() fetches tables and columns from 6 Polkomtel services (teradata-dwh-mona, sap-hana, snowflake-analytics, databricks-lakehouse, oracle-legacy, mssql-departmental) with a batch size of 64.
Optional dependency caveat
A missing sentence_transformers module has been a documented cause of chat HTTP 500s. The ml extra must be installed for full RAG vector search.
Agentic builder
services/agent_service.py provides AgentService — an autonomous pipeline builder using Claude tool_use (Level-4 agentic). The loop runs up to 15 iterations. Six tools are defined in AGENT_TOOLS:
| Tool | Destructive | Purpose |
|---|---|---|
list_connections | no | Enumerate available connections |
discover_schema | no | Discover a source schema |
generate_pipeline | no | Produce a pipeline definition |
validate_pipeline | no | Validate a pipeline |
create_pipeline | yes | Persist a new pipeline (requires approval) |
run_pipeline | yes | Execute a pipeline (requires approval) |
_ToolBackend executes against the real platform and catalog APIs, with domain-knowledge fallbacks (e.g. canned Polkomtel CDR/billing schemas when the catalog is down). Per-session state is held in an in-memory AgentSession dict. The approval gate pauses destructive tools as a PendingAction; approve() resumes the conversation. The agent can route Anthropic-formatted requests through OpenRouter's /v1/messages when only OPENROUTER_API_KEY is configured.
RCA agent
services/rca_agent.py (FIX-052) provides RcaAgent.analyze() for incident root-cause analysis. It walks 4 real HTTP tool calls — recent alerts and runs (monitor-service), run logs, and a schema-change timeline (metadata-service) — then synthesizes via the configured LLMProvider (temperature 0.2) into a structured RcaResponse: a summary, a timeline, 2–4 ranked Hypothesis entries (each with confidence, evidence, and suggestedAction), and citedAssets. It carries the real model, tokenCount, latencyMs, and toolCallCount, uses tolerant JSON parsing, and never invents content.
Other Copilot services
- Schema matcher (
services/schema_matcher.py) —SchemaMatcherService.match_schemas()performs LLM column matching for migrations. With optional RAG enrichment, it calls Claude (temperature 0.2) and parses JSON into aSchemaMatchResultofColumnMatchentries classifiedEXACT/SEMANTIC/TYPE_COERCE/TRANSFORM_NEEDED, each with confidence, atransform_expression, and reasoning. Results are stored in an in-memory_match_store;apply_overrides()supports user accept/reject/modify actions. A Polish-telecom domain prompt handles Polish column names. - Quality rule generator (
services/quality_rule_generator.py) —QualityRuleGenerator.generate_rules()turns NL into structured rules across 10RuleTypes (NOT_NULL, UNIQUE, RANGE, REGEX, ROW_COUNT, CUSTOM_SQL, FRESHNESS, COMPLETENESS, SCHEMA_MATCH, STATISTICAL). Post-validation lowers confidence for columns not present in the schema context. Its prompt embeds Polkomtel context (PESEL^\d{11}$, MSISDN^48\d{9}$, CDR, revenue-never-negative). - Semantic layer (
services/semantic_layer.py) —SemanticLayerServiceis a business-glossary enrichment layer in front of NL-to-SQL. Its built-in Polkomtel glossary holds ~11 metrics (churn rate, ARPU, MoU, data usage, net adds, revenue), ~14 dimensions, and 7 filter templates.resolve_terms()scans the question (longest-match),enrich_prompt()injects formulas and mappings, and it then delegates toNLToSQLService.generate_sql().
Prompt strategy
The copilot/prompts/ directory encodes the platform's domain knowledge:
chat_prompt.py— a largeCHAT_SYSTEM_PROMPTencoding the full Polkomtel data landscape (DWH-MONA Teradata, SAP HANA, Snowflake, Databricks, Oracle, MSSQL, Kafka, S3), the telecom glossary (CDR, MSISDN, IMSI, ARPU, churn), the pipeline YAML DSL, connectors, ETL best practices, and the ACTION-block format.suggest_prompt.py— language-specific completion prompts (YAML/SQL/connection config) with aconfidence|textoutput format.insights_prompt.py— 6 insight categories, JSON-array output.error_diagnosis_prompt.py— 8 error categories with Polkomtel-specific error knowledge (Teradata TPT/PERM, SAP HANA CDC), JSON-object output.schema_match_prompt.py— 4 match-rule tiers with confidence bands.
Copilot data models
Pydantic models under copilot/models/ include chat.py (Role, Message, PipelineContext, ChatRequest, Action/ActionType, ChatResponse), suggestion.py, insight.py, pipeline.py, sql.py (SQLDialect, SQLRequest/Result/ValidationResult, error-diagnosis models), quality_rule.py, schema_match.py, semantic.py, and rca.py (RcaRequest/Response, TimelineEvent, Hypothesis, CitedAsset). All carry a confidence float where applicable.
Migration Engine
Application structure
The Migration Engine is the FastAPI app DataFlow Migration Engine v0.2.0 on port 8091, mounted under /api/v1/migration and /api/migration. It converts legacy ETL scripts to DataFlow YAML using rule-based pattern matching plus LLM-assisted translation. Configuration: anthropic_api_key, anthropic_model = claude-sonnet-4-20250514, max_upload_size_mb = 50, upload_dir = /tmp/migration-uploads.
What it migrates
The MigrationType enum covers four legacy ETL tools (not SQL dialects):
| Tool | Extension | Parser |
|---|---|---|
| Informatica PowerCenter | .xml | powercenter_parser.py |
| Alteryx | .yxmd | alteryx_parser.py |
| SSIS | .dtsx | ssis_parser.py |
| DataStage | .dsx | datastage_parser.py |
Upload currently accepts only .xml, .yxmd, and .dtsx; the DataStage parser exists and is wired into the rule engine but .dsx is not in _ALLOWED_EXTENSIONS.
API endpoints
| Endpoint | Method | Purpose |
|---|---|---|
/upload | POST | Multipart upload — runs the full pipeline inline |
/jobs | GET | Paginated/filtered/sorted job list |
/jobs/{id}/status | GET | Job status (/{id}/status legacy alias) |
/jobs/{id}/report | GET | Full MigrationReport with stats |
/jobs/{id}/download | GET | Download converted YAML (or JSON) |
/jobs/{id}/validate | POST | Run the 6-check validation suite |
/jobs/{id}/convert | POST | Re-trigger conversion |
The parse → convert → validate pipeline
The MigrationStatus lifecycle is: uploaded → parsing → parsed → converting → validating → validated → completed | completed_with_warnings | failed. The /upload endpoint runs all stages inline.
upload file (.xml / .yxmd / .dtsx)
│
▼
┌───────────────────┐ Stage 1 — PARSE
│ parsers/<tool> │ tool-specific parser → common WorkflowAST
└─────────┬─────────┘ (WorkflowAST → MappingAST → SourceDefinition /
│ Transformation / TransformationField / Connector)
▼
┌───────────────────┐ Stage 2 — CONVERT
│ RuleEngine.convert│ TRANSFORM_RULES registry handles known types
│ + Expression │ deterministically; ExpressionTranslator turns
│ Translator │ PowerCenter expression language into SQL
│ → LLMConverter │ sub-threshold / unknown types fall back to LLM
│ → YamlGenerator │ assembles the final DataFlow YAML
└─────────┬─────────┘
▼
┌───────────────────┐ Stage 3 — VALIDATE
│ PipelineValidator │ 6 checks on the generated YAML
└─────────┬─────────┘
▼
release gate: completed only if confidence ≥ 0.85
Stage 1 — Parse. A parser per source tool turns the file into a common WorkflowAST. powercenter_parser.py and alteryx_parser.py use lxml; ssis_parser.py reads DTS:Executable XML (Data Flow / Control Flow tasks, Connection Managers, Variables, embedded SQL); datastage_parser.py reads a proprietary text format (BEGIN DSJOB … END DSJOB blocks). All expose a classmethod get_confidence(type) and extend a base parsers/base.py.
Stage 2 — Convert. converters/rule_engine.py — RuleEngine.convert() walks each transformation. A TRANSFORM_RULES registry of TransformRule subclasses handles 13 PowerCenter types deterministically:
| Source type | DataFlow target | Confidence |
|---|---|---|
| Source Qualifier | source_connector_sql_pushdown | 0.90–0.95 |
| Expression | sql_expression | 0.85 |
| Lookup Procedure | sql_join_pushdown | 0.65–0.85 |
| Aggregator | sql_group_by | 0.95 |
| Filter | sql_where | 0.98 |
| Joiner | sql_join | 0.90 |
| Sorter | sql_order_by | 0.98 |
| Router | conditional_branch (CASE WHEN) | 0.85 |
| Sequence Generator | sequence_generator (ROW_NUMBER) | 0.90 |
| Update Strategy | upsert_strategy | 0.80 |
| Union | sql_union_all | 0.95 |
| Rank | sql_rank (RANK() window) | 0.85 |
| Normalizer | normalizer (unpivot) | 0.70 |
Alteryx (Filter/Formula/Summarize/Join/Sort/Union/AlteryxSelect) and DataStage stages (PxFilter/PxAggregator/PxJoin/PxLookup/PxSort/PxMerge/PxChangeCapture/PxSCD/CTransformerStage/PxModify/ContainerView) have their own inline conversion blocks. _LLM_FALLBACK_THRESHOLD = 0.60 — types with no rule or sub-threshold confidence fall through to the LLM converter.
ExpressionTranslator (converters/expression_translator.py) translates PowerCenter expression language to SQL. A recursive-descent tokenizer/parser maps ~60 functions via PC_TO_SQL_FUNCTIONS, with specialized handlers (_fn_iif → CASE WHEN, _fn_decode → CASE, NVL → COALESCE, SUBSTR → SUBSTRING, ADD_TO_DATE → INTERVAL, date-format token mapping). It handles $$parameter variables; unknown functions pass through with a warning and a confidence drop.
LLMConverter (converters/llm_converter.py) is the Claude fallback for complex transformations. It uses the anthropic SDK directly (anthropic.AsyncAnthropic) — not the Copilot's provider factory. Three specialized system prompts cover the general case, stored procedures, and custom-transform/Java logic. On an API auth, rate-limit, or other error it returns _unavailable_result() with conversion_source="llm_unavailable", confidence 0.0, and requires_manual_review=True — distinguishing genuine LLM output from a failure fallback. The ConversionSource enum is rule_engine | llm | llm_unavailable | unsupported.
YamlGenerator (converters/yaml_generator.py) assembles the final DataFlow YAML — it classifies nodes into sources/transformations/targets, builds depends_on linkage from edges, injects 3 default quality checks (row_count, null_percentage, duplicate) plus per-sink schema-validation checks and a low_confidence_reconciliation check for mappings below 0.80, and adds metadata (generated_at, overall_confidence, warnings_count).
Stage 3 — Validate. validators/pipeline_validator.py — PipelineValidator.validate() runs 6 checks on the generated YAML: YAML syntax, required pipeline keys (name, nodes), node schema (id/type, valid types, valid connectors), edge references resolving to node ids, and a dangerous-SQL scan (DROP TABLE/DATABASE, TRUNCATE, ALTER TABLE, EXEC, xp_cmdshell). It returns a ValidationResult with an is_valid flag plus ERROR/WARNING-prefixed issues.
Confidence gating
The release gate (api/upload.py::collect_blocking_conversion_issues) marks a job completed only if all of the following hold:
- Overall confidence ≥ 0.85.
- No objects below 0.80 confidence.
- No objects needing manual review.
- No validation issues.
Otherwise the job is marked failed.
Services and persistence
services/migration_service.py—MigrationServicewraps the job store.list_jobs()filters/sorts/paginates;validate_output()runs a richer 6-check suite (YAML syntax, SQL safety, transform coverage ≥ 50%, confidence ≥ 60%, node completeness for source+sink, mapping issues);start_conversion()backs the re-convert endpoint.services/report_generator.py— producesMappingSummaryandEffortEstimatefigures using a person-hour model: 0.25h base per mapping, 2.0h per manual review, 1.0h per LLM-assisted conversion, 0.5h testing, 4.0h integration; confidence thresholds 0.85 / 0.60 / 0.40.
Job persistence is an in-memory _jobs dict that is also persisted to jobs-store.json in the upload directory (persist_job() / save_jobs_store() / _load_jobs_store()) — it survives restarts via the JSON file. There is no real database.
Migration data models
migration/models/migration.py defines MigrationType, MigrationStatus, MigrationJob, UploadResponse, MigrationReport (total_objects / auto_converted / needs_review / manual_required), PaginatedJobsResponse, ValidationCheckResult, ValidationResponse, and ConvertResponse. migration/models/mapping.py defines ConversionSource, ObjectMapping, ConversionResult, and the AST dataclasses SourceDefinition, TransformationField, Transformation, Connector, MappingAST, and WorkflowAST.
Key observations
- Two distinct LLM-access paths — the Copilot uses the pluggable
LLMProviderfactory (anthropic/openrouter/local) routed via theClaudeClientshim, while the Migration Engine'sLLMConverterand the Copilot'sAgentServicecall theanthropicSDK directly. - Embeddings use
all-MiniLM-L6-v2(384-dim) with a pgvectorivfflatcosine index — an optional dependency whose absence has caused chat 500s. - Graceful degradation everywhere — RAG, catalog, and LLM failures fall back to non-RAG chat, canned schemas, or
llm_unavailablemarkers; the Copilot's/readyendpoint reportsdegradedhonestly. - Confidence is pervasive — every AI output (chat, SQL, rules, matches, conversions, RCA hypotheses) carries a 0–1 confidence score; the Migration Engine uses it as a hard release gate at ≥ 0.85.
- Heavy Polkomtel/telecom domain encoding — the data landscape, the CDR/MSISDN/ARPU glossary, PESEL/MSISDN regexes, and Polish-language column handling are baked into prompts and the semantic glossary.