Architecture

AI services

The DataFlow AI Platform ships two Python 3.12 / FastAPI services that bring large-language-model capability to the data estate: the Copilot for conversational and generative assistance, and the Migration Engine for converting legacy ETL into DataFlow pipelines.


Overview

Both AI services live under backend/ai-services/, are built with Poetry, and run on FastAPI. They share the platform's PostgreSQL instance (the Copilot also uses its pgvector extension for RAG).

ServicePathPortPurpose
Copilotbackend/ai-services/copilot/8090NL-to-pipeline, conversational AI, NL-to-SQL, RAG search, suggestions, error diagnosis, agentic builder, RCA
Migration Enginebackend/ai-services/migration-engine/8091Converts legacy ETL (Informatica PowerCenter, Alteryx, SSIS, DataStage) to DataFlow YAML pipelines

A defining property of both services is graceful degradation — RAG, catalog, and LLM failures fall back to non-RAG chat, canned schemas, or explicit llm_unavailable markers rather than surfacing fakes. A second is pervasive confidence scoring — every AI output carries a 0–1 confidence score, and the Migration Engine uses it as a hard release gate.


Copilot service

Application structure

copilot/copilot/main.py builds the FastAPI app DataFlow AI Copilot v0.3.0.

  • Triple route mounting — the same router set is mounted under three prefixes so it is reachable however the gateway or frontend addresses it:
    • /api/v1/ai/** — the canonical mount (api/router.py).
    • /api/v1/copilot/** — the frontend's baseURL=/api/v1 plus /copilot/chat.
    • /api/copilot/** — bare access when VITE_API_URL=/api.
  • Lifespan — on startup the service initializes the RAGService pgvector pool and ensures the embeddings schema, degrading gracefully to rag_mode: "unavailable" if pgvector is unreachable; the pool is closed on shutdown.
  • Ops endpoints/health, /ready (returns 503 with reasons pgvector_unavailable / llm_unavailable when degraded), and /metrics (a minimal Prometheus text exposition: service info, uptime, request counter).
  • CORS exposes X-Request-Id and X-Copilot-Confidence.

Configuration

Configuration uses pydantic-settings and is env-driven (copilot/config.py). Notable defaults:

SettingDefault
anthropic_api_key""
anthropic_modelclaude-sonnet-4-20250514
database_urlpostgresql+asyncpg://postgres:postgres@localhost:5432/copilot
catalog_base_urlhttp://localhost:8585/api/v1
service_port8090
rag_similarity_threshold0.72
rag_max_results5

Dependencies (pyproject.toml): fastapi, uvicorn, anthropic ^0.39, asyncpg, pgvector, httpx, pyyaml, numpy. sentence-transformers ^3.3 is an optional extra (ml group) — when it is not installed, RAG vector search degrades.

LLM provider factory

A pluggable provider layer lives under copilot/providers/ (base.py, factory.py, three implementations). The LLMProvider ABC defines complete() and stream(), returning a normalized CompletionResponse (content, model, prompt/completion tokens, latency_ms, finish_reason) and StreamChunk deltas. LLMProviderError is the uniform error type.

ProviderFileDefault modelNotes
anthropicanthropic_provider.pyclaude-sonnet-4-6-20251001Official anthropic SDK; native system kwarg; true streaming
openrouteropenrouter_provider.pyopenai/gpt-oss-120b:freeRaw httpx to the OpenRouter OpenAI-compatible API; sends HTTP-Referer / X-Title
locallocal_provider.pyllama3.2Any OpenAI-compatible self-hosted endpoint — Ollama, vLLM, LM Studio, llama.cpp

factory.py reads the LLM_PROVIDER env var (anthropic | openrouter | local; default anthropic), caches a thread-safe singleton, and offers reset_provider() for reload. A legacy ClaudeClient shim (clients/claude_client.py) is kept for backward compatibility — every Copilot service calls self._claude.generate/stream/complete, but the shim now routes through factory.get_provider(). Flipping LLM_PROVIDER therefore rewires the whole Copilot stack with no caller changes. CatalogClient (clients/catalog_client.py) is an async httpx client to the OpenMetadata-compatible catalog (search_tables(), get_table(), get_lineage()).

Two distinct LLM-access paths

The Copilot's main services route LLM calls through the pluggable LLMProvider factory via the ClaudeClient shim. However, the Copilot's AgentService and the Migration Engine's LLMConverter call the anthropic SDK directly — they are not on the pluggable factory path.

API endpoints

All endpoints are reachable under each of /api/v1/ai, /api/v1/copilot, and /api/copilot.

EndpointMethodPurpose
/chatPOSTConversational copilot (RAG-augmented), returns ChatResponse
/chat/streamGETSSE streaming chat — events token / done / error
/suggestPOSTInline code completions (yaml/sql/python)
/insightsGETProactive recommendations (cached 300s)
/generate-pipelinePOSTNL → DataFlow YAML pipeline
/sql/generatePOSTNL → SQL
/sql/explainPOSTPlain-language SQL explanation
/sql/optimizePOSTSQL optimization suggestions
/sql/validatePOSTSQL validity check against a known schema
/semantic/queryPOSTNL→SQL with business-glossary enrichment
/semantic/glossaryGETList glossary metrics/dimensions/filters
/diagnosePOSTStructured error diagnosis
/quality/generate-rulesPOSTNL → data quality rules
/agent/chatPOSTAgentic pipeline builder (tool_use loop)
/agent/approvePOSTApprove a pending destructive agent action
/agent/status/{id}GETAgent session state
/schema/matchPOSTLLM source→target column matching
/schema/match/overridePOSTApply manual match overrides
/schema/match/{id}GETFetch a stored match result
/describe/assetPOSTAuto-generate catalog asset descriptions
/catalog/askPOSTCatalog Q&A grounded in metadata-service
/rca/analyzePOSTRoot-cause analysis agent
/provider/statusGETActive LLM provider health/model

CopilotService — the central orchestrator

services/copilot_service.py is the central orchestrator. Its chat() flow is:

  1. Intent routing — keyword detection (_SQL_INTENT_KEYWORDS, _ERROR_INTENT_INDICATORS) routes SQL questions to NL-to-SQL and error reports to error diagnosis.
  2. RAG enrichmentRAGService.get_context_for_query() runs best-effort, falling back to non-RAG chat on failure.
  3. Builds an enriched system prompt (build_chat_system_prompt) injecting pipeline context plus RAG results.
  4. Calls Claude via ClaudeClient.generate().
  5. _parse_response() extracts [ACTION type=... label=... payload={...}] blocks into structured Action objects.
  6. Applies a confidence heuristic — 0.7 base, +0.15 for RAG, +0.05 for pipeline context.

chat_stream() yields tokens via provider streaming. get_insights() uses an in-process TTL cache (300s) keyed on (pipeline_id, limit) — the LLM call is slow (~50s) and the dashboard polls every 60s, with a static _fallback_insights() on LLM failure. The SQL dialect is auto-detected from the message text (Teradata / Snowflake / Postgres / SAP HANA / MSSQL).

NL-to-SQL

services/nl_to_sql.py provides NLToSQLService — generate, explain, optimize, and validate SQL across five dialects (the SQLDialect enum: snowflake, teradata, postgresql, sap_hana, mssql). It has rich per-dialect hint blocks (_DIALECT_HINTS) and four system prompts. The workflow:

  1. Build schema context from RAG search_with_scores() (table/column entities).
  2. Optionally apply semantic-layer enrichment (on by default).
  3. Format the dialect-specific system prompt and call Claude (temperature 0.1).
  4. Extract SQL from the fenced block; _extract_table_references() via regex; _estimate_confidence() heuristic; _detect_warnings() flags SELECT *, DELETE/UPDATE without WHERE, DROP/TRUNCATE, and dialect-invalid LIMIT.

Pipeline generation

services/pipeline_generator.pyPipelineGenerator.generate() fetches catalog schema for the requested source_systems, retrieves similar RAG templates best-effort, calls Claude with _GENERATOR_SYSTEM_PROMPT (the DataFlow YAML DSL spec), parses the YAML, extracts PipelineNode / PipelineEdge for the frontend graph, and applies a confidence heuristic on YAML structural quality.

RAG and the vector store

services/rag_service.py and services/embedding_pipeline.py implement RAG over pgvector cosine similarity.

  • Embeddings use sentence-transformers all-MiniLM-L6-v2 (EMBEDDING_DIM = 384), lazy-loaded.
  • The catalog_embeddings table holds entity_type, entity_id, entity_name, workspace_id, content, metadata JSONB, and a vector(384) column, with an ivfflat cosine index (lists=100) plus workspace and entity_type btree indexes.
  • search() / search_with_scores() apply rag_similarity_threshold (0.72) and optional workspace_id / entity_type filters, ordering BY embedding <=>.
  • When pgvector is down the service degrades to empty results (mode="unavailable") — there is no silent keyword fallback.
  • CRUD operations: index_catalog() upsert, delete_by_entity/all/stale, get_stats.

EmbeddingPipeline indexes catalog metadata into the vector store. It ships a built-in corpus of 6 doc chunks (Pipeline YAML DSL Reference, Connector Configuration Guide, Polkomtel Data Landscape, ETL Best Practices, Common Error Patterns, SQL Dialect Reference) and 5 pipeline templates. run_full_index() fetches tables and columns from 6 Polkomtel services (teradata-dwh-mona, sap-hana, snowflake-analytics, databricks-lakehouse, oracle-legacy, mssql-departmental) with a batch size of 64.

Optional dependency caveat

A missing sentence_transformers module has been a documented cause of chat HTTP 500s. The ml extra must be installed for full RAG vector search.

Agentic builder

services/agent_service.py provides AgentService — an autonomous pipeline builder using Claude tool_use (Level-4 agentic). The loop runs up to 15 iterations. Six tools are defined in AGENT_TOOLS:

ToolDestructivePurpose
list_connectionsnoEnumerate available connections
discover_schemanoDiscover a source schema
generate_pipelinenoProduce a pipeline definition
validate_pipelinenoValidate a pipeline
create_pipelineyesPersist a new pipeline (requires approval)
run_pipelineyesExecute a pipeline (requires approval)

_ToolBackend executes against the real platform and catalog APIs, with domain-knowledge fallbacks (e.g. canned Polkomtel CDR/billing schemas when the catalog is down). Per-session state is held in an in-memory AgentSession dict. The approval gate pauses destructive tools as a PendingAction; approve() resumes the conversation. The agent can route Anthropic-formatted requests through OpenRouter's /v1/messages when only OPENROUTER_API_KEY is configured.

RCA agent

services/rca_agent.py (FIX-052) provides RcaAgent.analyze() for incident root-cause analysis. It walks 4 real HTTP tool calls — recent alerts and runs (monitor-service), run logs, and a schema-change timeline (metadata-service) — then synthesizes via the configured LLMProvider (temperature 0.2) into a structured RcaResponse: a summary, a timeline, 2–4 ranked Hypothesis entries (each with confidence, evidence, and suggestedAction), and citedAssets. It carries the real model, tokenCount, latencyMs, and toolCallCount, uses tolerant JSON parsing, and never invents content.

Other Copilot services

  • Schema matcher (services/schema_matcher.py) — SchemaMatcherService.match_schemas() performs LLM column matching for migrations. With optional RAG enrichment, it calls Claude (temperature 0.2) and parses JSON into a SchemaMatchResult of ColumnMatch entries classified EXACT / SEMANTIC / TYPE_COERCE / TRANSFORM_NEEDED, each with confidence, a transform_expression, and reasoning. Results are stored in an in-memory _match_store; apply_overrides() supports user accept/reject/modify actions. A Polish-telecom domain prompt handles Polish column names.
  • Quality rule generator (services/quality_rule_generator.py) — QualityRuleGenerator.generate_rules() turns NL into structured rules across 10 RuleTypes (NOT_NULL, UNIQUE, RANGE, REGEX, ROW_COUNT, CUSTOM_SQL, FRESHNESS, COMPLETENESS, SCHEMA_MATCH, STATISTICAL). Post-validation lowers confidence for columns not present in the schema context. Its prompt embeds Polkomtel context (PESEL ^\d{11}$, MSISDN ^48\d{9}$, CDR, revenue-never-negative).
  • Semantic layer (services/semantic_layer.py) — SemanticLayerService is a business-glossary enrichment layer in front of NL-to-SQL. Its built-in Polkomtel glossary holds ~11 metrics (churn rate, ARPU, MoU, data usage, net adds, revenue), ~14 dimensions, and 7 filter templates. resolve_terms() scans the question (longest-match), enrich_prompt() injects formulas and mappings, and it then delegates to NLToSQLService.generate_sql().

Prompt strategy

The copilot/prompts/ directory encodes the platform's domain knowledge:

  • chat_prompt.py — a large CHAT_SYSTEM_PROMPT encoding the full Polkomtel data landscape (DWH-MONA Teradata, SAP HANA, Snowflake, Databricks, Oracle, MSSQL, Kafka, S3), the telecom glossary (CDR, MSISDN, IMSI, ARPU, churn), the pipeline YAML DSL, connectors, ETL best practices, and the ACTION-block format.
  • suggest_prompt.py — language-specific completion prompts (YAML/SQL/connection config) with a confidence|text output format.
  • insights_prompt.py — 6 insight categories, JSON-array output.
  • error_diagnosis_prompt.py — 8 error categories with Polkomtel-specific error knowledge (Teradata TPT/PERM, SAP HANA CDC), JSON-object output.
  • schema_match_prompt.py — 4 match-rule tiers with confidence bands.

Copilot data models

Pydantic models under copilot/models/ include chat.py (Role, Message, PipelineContext, ChatRequest, Action/ActionType, ChatResponse), suggestion.py, insight.py, pipeline.py, sql.py (SQLDialect, SQLRequest/Result/ValidationResult, error-diagnosis models), quality_rule.py, schema_match.py, semantic.py, and rca.py (RcaRequest/Response, TimelineEvent, Hypothesis, CitedAsset). All carry a confidence float where applicable.


Migration Engine

Application structure

The Migration Engine is the FastAPI app DataFlow Migration Engine v0.2.0 on port 8091, mounted under /api/v1/migration and /api/migration. It converts legacy ETL scripts to DataFlow YAML using rule-based pattern matching plus LLM-assisted translation. Configuration: anthropic_api_key, anthropic_model = claude-sonnet-4-20250514, max_upload_size_mb = 50, upload_dir = /tmp/migration-uploads.

What it migrates

The MigrationType enum covers four legacy ETL tools (not SQL dialects):

ToolExtensionParser
Informatica PowerCenter.xmlpowercenter_parser.py
Alteryx.yxmdalteryx_parser.py
SSIS.dtsxssis_parser.py
DataStage.dsxdatastage_parser.py

Upload currently accepts only .xml, .yxmd, and .dtsx; the DataStage parser exists and is wired into the rule engine but .dsx is not in _ALLOWED_EXTENSIONS.

API endpoints

EndpointMethodPurpose
/uploadPOSTMultipart upload — runs the full pipeline inline
/jobsGETPaginated/filtered/sorted job list
/jobs/{id}/statusGETJob status (/{id}/status legacy alias)
/jobs/{id}/reportGETFull MigrationReport with stats
/jobs/{id}/downloadGETDownload converted YAML (or JSON)
/jobs/{id}/validatePOSTRun the 6-check validation suite
/jobs/{id}/convertPOSTRe-trigger conversion

The parse → convert → validate pipeline

The MigrationStatus lifecycle is: uploaded → parsing → parsed → converting → validating → validated → completed | completed_with_warnings | failed. The /upload endpoint runs all stages inline.

   upload file (.xml / .yxmd / .dtsx)


┌───────────────────┐   Stage 1 — PARSE
│  parsers/<tool>   │   tool-specific parser → common WorkflowAST
└─────────┬─────────┘   (WorkflowAST → MappingAST → SourceDefinition /
          │              Transformation / TransformationField / Connector)

┌───────────────────┐   Stage 2 — CONVERT
│ RuleEngine.convert│   TRANSFORM_RULES registry handles known types
│  + Expression     │   deterministically; ExpressionTranslator turns
│    Translator     │   PowerCenter expression language into SQL
│  → LLMConverter   │   sub-threshold / unknown types fall back to LLM
│  → YamlGenerator  │   assembles the final DataFlow YAML
└─────────┬─────────┘

┌───────────────────┐   Stage 3 — VALIDATE
│ PipelineValidator │   6 checks on the generated YAML
└─────────┬─────────┘

   release gate: completed only if confidence ≥ 0.85

Stage 1 — Parse. A parser per source tool turns the file into a common WorkflowAST. powercenter_parser.py and alteryx_parser.py use lxml; ssis_parser.py reads DTS:Executable XML (Data Flow / Control Flow tasks, Connection Managers, Variables, embedded SQL); datastage_parser.py reads a proprietary text format (BEGIN DSJOB … END DSJOB blocks). All expose a classmethod get_confidence(type) and extend a base parsers/base.py.

Stage 2 — Convert. converters/rule_engine.pyRuleEngine.convert() walks each transformation. A TRANSFORM_RULES registry of TransformRule subclasses handles 13 PowerCenter types deterministically:

Source typeDataFlow targetConfidence
Source Qualifiersource_connector_sql_pushdown0.90–0.95
Expressionsql_expression0.85
Lookup Proceduresql_join_pushdown0.65–0.85
Aggregatorsql_group_by0.95
Filtersql_where0.98
Joinersql_join0.90
Sortersql_order_by0.98
Routerconditional_branch (CASE WHEN)0.85
Sequence Generatorsequence_generator (ROW_NUMBER)0.90
Update Strategyupsert_strategy0.80
Unionsql_union_all0.95
Ranksql_rank (RANK() window)0.85
Normalizernormalizer (unpivot)0.70

Alteryx (Filter/Formula/Summarize/Join/Sort/Union/AlteryxSelect) and DataStage stages (PxFilter/PxAggregator/PxJoin/PxLookup/PxSort/PxMerge/PxChangeCapture/PxSCD/CTransformerStage/PxModify/ContainerView) have their own inline conversion blocks. _LLM_FALLBACK_THRESHOLD = 0.60 — types with no rule or sub-threshold confidence fall through to the LLM converter.

ExpressionTranslator (converters/expression_translator.py) translates PowerCenter expression language to SQL. A recursive-descent tokenizer/parser maps ~60 functions via PC_TO_SQL_FUNCTIONS, with specialized handlers (_fn_iifCASE WHEN, _fn_decodeCASE, NVLCOALESCE, SUBSTRSUBSTRING, ADD_TO_DATEINTERVAL, date-format token mapping). It handles $$parameter variables; unknown functions pass through with a warning and a confidence drop.

LLMConverter (converters/llm_converter.py) is the Claude fallback for complex transformations. It uses the anthropic SDK directly (anthropic.AsyncAnthropic) — not the Copilot's provider factory. Three specialized system prompts cover the general case, stored procedures, and custom-transform/Java logic. On an API auth, rate-limit, or other error it returns _unavailable_result() with conversion_source="llm_unavailable", confidence 0.0, and requires_manual_review=True — distinguishing genuine LLM output from a failure fallback. The ConversionSource enum is rule_engine | llm | llm_unavailable | unsupported.

YamlGenerator (converters/yaml_generator.py) assembles the final DataFlow YAML — it classifies nodes into sources/transformations/targets, builds depends_on linkage from edges, injects 3 default quality checks (row_count, null_percentage, duplicate) plus per-sink schema-validation checks and a low_confidence_reconciliation check for mappings below 0.80, and adds metadata (generated_at, overall_confidence, warnings_count).

Stage 3 — Validate. validators/pipeline_validator.pyPipelineValidator.validate() runs 6 checks on the generated YAML: YAML syntax, required pipeline keys (name, nodes), node schema (id/type, valid types, valid connectors), edge references resolving to node ids, and a dangerous-SQL scan (DROP TABLE/DATABASE, TRUNCATE, ALTER TABLE, EXEC, xp_cmdshell). It returns a ValidationResult with an is_valid flag plus ERROR/WARNING-prefixed issues.

Confidence gating

The release gate (api/upload.py::collect_blocking_conversion_issues) marks a job completed only if all of the following hold:

  • Overall confidence ≥ 0.85.
  • No objects below 0.80 confidence.
  • No objects needing manual review.
  • No validation issues.

Otherwise the job is marked failed.

Services and persistence

  • services/migration_service.pyMigrationService wraps the job store. list_jobs() filters/sorts/paginates; validate_output() runs a richer 6-check suite (YAML syntax, SQL safety, transform coverage ≥ 50%, confidence ≥ 60%, node completeness for source+sink, mapping issues); start_conversion() backs the re-convert endpoint.
  • services/report_generator.py — produces MappingSummary and EffortEstimate figures using a person-hour model: 0.25h base per mapping, 2.0h per manual review, 1.0h per LLM-assisted conversion, 0.5h testing, 4.0h integration; confidence thresholds 0.85 / 0.60 / 0.40.

Job persistence is an in-memory _jobs dict that is also persisted to jobs-store.json in the upload directory (persist_job() / save_jobs_store() / _load_jobs_store()) — it survives restarts via the JSON file. There is no real database.

Migration data models

migration/models/migration.py defines MigrationType, MigrationStatus, MigrationJob, UploadResponse, MigrationReport (total_objects / auto_converted / needs_review / manual_required), PaginatedJobsResponse, ValidationCheckResult, ValidationResponse, and ConvertResponse. migration/models/mapping.py defines ConversionSource, ObjectMapping, ConversionResult, and the AST dataclasses SourceDefinition, TransformationField, Transformation, Connector, MappingAST, and WorkflowAST.


Key observations

  • Two distinct LLM-access paths — the Copilot uses the pluggable LLMProvider factory (anthropic/openrouter/local) routed via the ClaudeClient shim, while the Migration Engine's LLMConverter and the Copilot's AgentService call the anthropic SDK directly.
  • Embeddings use all-MiniLM-L6-v2 (384-dim) with a pgvector ivfflat cosine index — an optional dependency whose absence has caused chat 500s.
  • Graceful degradation everywhere — RAG, catalog, and LLM failures fall back to non-RAG chat, canned schemas, or llm_unavailable markers; the Copilot's /ready endpoint reports degraded honestly.
  • Confidence is pervasive — every AI output (chat, SQL, rules, matches, conversions, RCA hypotheses) carries a 0–1 confidence score; the Migration Engine uses it as a hard release gate at ≥ 0.85.
  • Heavy Polkomtel/telecom domain encoding — the data landscape, the CDR/MSISDN/ARPU glossary, PESEL/MSISDN regexes, and Polish-language column handling are baked into prompts and the semantic glossary.
Previous
Backend platform services