API reference
API reference: Pipeline Engine
The Pipeline Engine (openapi-pipeline-engine.yaml, port 8082) owns pipeline definitions, execution, scheduling, orchestration, Git-backed versioning, and Flink streaming jobs. All endpoints are served under /api/v1 and require a bearerAuth JWT — webhook endpoints authenticate with HMAC signature headers instead.
Pipelines
Pipeline definitions are managed by the metadata-service behind the /api/v1/pipelines/** gateway prefix. A pipeline carries a YAML definition and an auto-incrementing version. New pipelines start in DRAFT.
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| GET | /api/v1/pipelines | listPipelines | List pipelines | JWT |
| POST | /api/v1/pipelines | createPipeline | Create pipeline (starts in DRAFT) | JWT |
| GET | /api/v1/pipelines/{id} | getPipeline | Get pipeline including YAML | JWT |
| PUT | /api/v1/pipelines/{id} | updatePipeline | Update pipeline (auto-increments version) | JWT |
| DELETE | /api/v1/pipelines/{id} | deletePipeline | Delete pipeline + all versions/runs | JWT |
Parameters
| Endpoint | Parameter | In | Type | Notes |
|---|---|---|---|---|
listPipelines | workspaceId | query | uuid | Filter by workspace |
listPipelines | search | query | string | Free-text name filter |
getPipeline / updatePipeline / deletePipeline | id | path | uuid | Pipeline ID |
Pipeline model
Pipeline key fields: id, workspaceId, name, description, yamlDefinition, version (int ≥ 1), status, scheduleCron, tags[], sourceConnectionId, targetConnectionId, maxRetries (default 3), retryDelaySeconds (default 60), timeoutSeconds (default 3600), notifications, isActive, createdAt, updatedAt, createdBy.
status enum: DRAFT | SCHEDULED | RUNNING | SUCCESS | FAILED | WARNING | CANCELLED.
PipelineCreate requires name and yamlDefinition.
// POST /api/v1/pipelines → 201 Created
// Request: PipelineCreate
{
"name": "billing-daily-load",
"description": "Daily Teradata → Snowflake billing extract",
"yamlDefinition": "version: 1\nsource:\n connection: teradata-prod\ntarget:\n connection: snowflake-dwh\n",
"scheduleCron": "0 2 * * *",
"tags": ["billing", "daily"],
"sourceConnectionId": "a1b2c3d4-0000-0000-0000-000000000001",
"targetConnectionId": "a1b2c3d4-0000-0000-0000-000000000002"
}
// Response: Pipeline
{
"id": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"workspaceId": "9c1f0e2a-1111-2222-3333-444455556666",
"name": "billing-daily-load",
"description": "Daily Teradata → Snowflake billing extract",
"yamlDefinition": "version: 1\nsource:\n connection: teradata-prod\n...",
"version": 1,
"status": "DRAFT",
"scheduleCron": "0 2 * * *",
"tags": ["billing", "daily"],
"maxRetries": 3,
"retryDelaySeconds": 60,
"timeoutSeconds": 3600,
"isActive": true,
"createdAt": "2025-12-15T14:30:00Z",
"updatedAt": "2025-12-15T14:30:00Z",
"createdBy": "u-1001"
}
Note
updatePipeline does not mutate the existing version — it auto-increments version and stores a new revision. The full revision history is available through the Git endpoints below. deletePipeline cascades to every version and every run.
Execution and runs
A run is a compiled, scheduled execution of a pipeline. runPipeline returns immediately with 202 Accepted — poll getRunStatus for progress.
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| POST | /api/v1/pipelines/{id}/run | runPipeline | Compile + schedule execution; returns immediately | JWT |
| GET | /api/v1/runs/{runId} | getRunStatus | Get run status + per-task statuses | JWT |
| POST | /api/v1/runs/{runId}/cancel | cancelRun | Cancel a running execution | JWT |
Parameters and bodies
| Endpoint | Parameter | In | Type | Notes |
|---|---|---|---|---|
runPipeline | id | path | uuid | Pipeline ID |
runPipeline | body | — | RunRequest | Optional yamlDefinition, triggeredBy |
getRunStatus / cancelRun | runId | path | uuid | Run ID |
RunResponse: runId, pipelineId, status (RUNNING | PENDING), taskCount, engine (SPARK | FLINK | JDBC). RunStatusResponse.overallStatus: PENDING | RUNNING | COMPLETED | FAILED | CANCELLED; taskStatuses and errors are maps keyed by task ID.
// POST /api/v1/pipelines/{id}/run → 202 Accepted
// Request: RunRequest (all fields optional)
{
"triggeredBy": "u-1001"
}
// Response: RunResponse
{
"runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"status": "RUNNING",
"taskCount": 4,
"engine": "SPARK"
}
// GET /api/v1/runs/{runId} → 200 OK (RunStatusResponse)
{
"runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"overallStatus": "RUNNING",
"taskStatuses": {
"extract": "COMPLETED",
"transform": "RUNNING",
"load": "PENDING"
},
"errors": {}
}
// POST /api/v1/runs/{runId}/cancel → 200 OK
{
"runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"status": "CANCELLED",
"message": "Run cancelled by user request"
}
Note
The gateway routing table references a WebSocket stream at /api/v1/runs/*/stream for live run updates, but it is not defined as a formal path operation in the spec. Use getRunStatus polling as the documented mechanism.
Scheduler
The scheduler manages cron schedules, backfill jobs, and pipeline dependency graphs.
Schedules
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| GET | /api/v1/scheduler/schedules | listSchedules | List all schedules | JWT |
| POST | /api/v1/scheduler/schedules | createSchedule | Create a cron schedule | JWT |
| GET | /api/v1/scheduler/schedules/{id} | getSchedule | Get schedule | JWT |
| PUT | /api/v1/scheduler/schedules/{id} | updateSchedule | Update schedule | JWT |
| DELETE | /api/v1/scheduler/schedules/{id} | deleteSchedule | Delete schedule | JWT |
| POST | /api/v1/scheduler/schedules/{id}/pause | pauseSchedule | Pause schedule | JWT |
| POST | /api/v1/scheduler/schedules/{id}/resume | resumeSchedule | Resume paused schedule | JWT |
| GET | /api/v1/scheduler/next-runs | getNextRuns | Preview upcoming run times | JWT |
CreateScheduleRequest fields: cronExpression (5-field), timezone, businessDaysOnly, skipHolidays, maintenanceWindows[], overlapStrategy (SKIP | QUEUE | CANCEL_RUNNING), catchupStrategy (SKIP_TO_CURRENT | RUN_ALL | RUN_LATEST_N). getNextRuns accepts limit (default 20).
// POST /api/v1/scheduler/schedules → 201 Created
// Request: CreateScheduleRequest
{
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"cronExpression": "0 2 * * *",
"timezone": "Europe/Warsaw",
"businessDaysOnly": false,
"skipHolidays": true,
"overlapStrategy": "SKIP",
"catchupStrategy": "SKIP_TO_CURRENT"
}
// Response: ScheduleResponse
{
"id": "7b2e1c44-aaaa-bbbb-cccc-ddddeeeeffff",
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"cronExpression": "0 2 * * *",
"timezone": "Europe/Warsaw",
"status": "ACTIVE",
"nextRunAt": "2025-12-16T01:00:00Z"
}
Backfills
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| POST | /api/v1/scheduler/backfill | createBackfill | Create backfill job for a date range | JWT |
| GET | /api/v1/scheduler/backfill/{id} | getBackfillProgress | Get backfill progress | JWT |
| POST | /api/v1/scheduler/backfill/{id}/pause | pauseBackfill | Pause backfill | JWT |
| POST | /api/v1/scheduler/backfill/{id}/resume | resumeBackfill | Resume backfill | JWT |
| POST | /api/v1/scheduler/backfill/{id}/cancel | cancelBackfill | Cancel backfill | JWT |
CreateBackfillRequest fields: granularity (HOURLY | DAILY | WEEKLY | MONTHLY), concurrency, priority (LOW | MEDIUM | HIGH), dryRun, parameterOverrides.
// POST /api/v1/scheduler/backfill → 201 Created
// Request: CreateBackfillRequest
{
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"from": "2025-11-01",
"to": "2025-11-30",
"granularity": "DAILY",
"concurrency": 4,
"priority": "MEDIUM",
"dryRun": false
}
// Response: BackfillResponse
{
"id": "c9d8e7f6-1234-5678-9abc-def012345678",
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"status": "RUNNING",
"totalIntervals": 30,
"completedIntervals": 11,
"failedIntervals": 0
}
Dependencies
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| GET | /api/v1/scheduler/dependencies/{pipelineId} | getDependencyGraph | Get up/downstream dependency graph | JWT |
| POST | /api/v1/scheduler/dependencies | addDependency | Create a dependency between pipelines | JWT |
| DELETE | /api/v1/scheduler/dependencies/{id} | removeDependency | Remove a dependency | JWT |
AddDependencyRequest.type: STRONG | WEAK. Adding a cross-workspace dependency returns 403.
// POST /api/v1/scheduler/dependencies → 201 Created
// Request: AddDependencyRequest
{
"upstreamPipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"downstreamPipelineId": "a1b2c3d4-9999-8888-7777-666655554444",
"type": "STRONG"
}
Orchestrator
The orchestrator integrates external schedulers (Airflow, AutomateNow) via webhooks, callbacks, and DAG synchronization. Trigger webhooks authenticate with HMAC headers, not JWT.
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| POST | /api/v1/orchestrator/trigger | triggerPipeline | External webhook trigger | HMAC |
| POST | /api/v1/orchestrator/callback | receiveCallback | Receive completion/failure callback | — |
| GET | /api/v1/orchestrator/airflow/dags | listSyncedDags | List DAGs generated from pipelines | JWT |
| POST | /api/v1/orchestrator/airflow/sync/{pipelineId} | syncPipelineAsAirflowDag | Generate + sync an Airflow DAG from YAML | JWT |
| POST | /api/v1/orchestrator/airflow/trigger/{dagId} | triggerAirflowDag | Trigger an Airflow DAG run | JWT |
| GET | /api/v1/orchestrator/airflow/status/{dagId}/{dagRunId} | getAirflowDagRunStatus | Get Airflow DAG run status | JWT |
| GET | /api/v1/orchestrator/webhooks | listWebhooks | List registered webhooks | JWT |
| POST | /api/v1/orchestrator/webhooks | registerWebhook | Register a webhook endpoint | JWT |
triggerPipeline requires headers X-Webhook-Signature and X-Webhook-Id, and a TriggerRequest body requiring external_job_id. WebhookRegistrationRequest requires name, source, hmac_secret, callback_url. triggerPipeline and triggerAirflowDag return 202 Accepted.
// POST /api/v1/orchestrator/trigger → 202 Accepted
// Headers: X-Webhook-Signature: <hmac>, X-Webhook-Id: <uuid>
// Request: TriggerRequest
{
"external_job_id": "airflow-billing-2025-12-15",
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"parameters": { "loadDate": "2025-12-15" }
}
// Response: TriggerResponse
{
"runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"external_job_id": "airflow-billing-2025-12-15",
"status": "ACCEPTED"
}
// POST /api/v1/orchestrator/airflow/trigger/{dagId} → 202 Accepted
{
"dagId": "dataflow_billing_daily_load",
"dagRunId": "manual__2025-12-15T14:30:00+00:00",
"state": "queued"
}
Git
Pipeline definitions are version-controlled in Git. These endpoints manage repositories, history, diffs, rollback, branches, and merges. The Git webhook authenticates with provider HMAC headers.
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| GET | /api/v1/git/repos | listRepos | List workspaces with Git repos | JWT |
| POST | /api/v1/git/repos | connectRepo | Clone remote / init local repo | JWT |
| GET | /api/v1/git/history/{pipelineId} | getHistory | Get pipeline version history | JWT |
| GET | /api/v1/git/diff/{pipelineId} | diffVersions | Diff two pipeline versions | JWT |
| POST | /api/v1/git/rollback/{pipelineId}/{version} | rollback | Roll back a pipeline to a version | JWT |
| GET | /api/v1/git/branches | listBranches | List branches | JWT |
| POST | /api/v1/git/branches | createBranch | Create a branch | JWT |
| POST | /api/v1/git/merge | mergeBranch | Merge a branch | JWT |
| POST | /api/v1/git/webhooks | receiveGitWebhook | Receive GitHub/GitLab push & PR/MR events | HMAC |
Parameters
| Endpoint | Parameter | In | Type | Notes |
|---|---|---|---|---|
getHistory | pipelineId | path | uuid | — |
getHistory | workspaceSlug | query | string | Default default |
getHistory | maxVersions | query | int | Default 50 |
diffVersions | from, to | query | string | Required |
diffVersions | workspaceSlug | query | string | — |
rollback | pipelineId, version | path | — | — |
rollback | workspaceSlug | query | string | — |
listBranches | workspaceSlug | query | string | — |
listBranches | includeRemote | query | bool | — |
receiveGitWebhook | X-GitHub-Event, X-Hub-Signature-256, X-Gitlab-Event, X-Gitlab-Token | header | string | Provider HMAC |
ConnectRepoRequest.credentialType: NONE | TOKEN | SSH_KEY. WebhookResult.action: SYNC | DEPLOY | VALIDATE | IGNORE | ERROR. mergeBranch returns 409 on a merge conflict.
// POST /api/v1/git/repos → 201 Created
// Request: ConnectRepoRequest
{
"workspaceSlug": "billing-team",
"remoteUrl": "https://gitlab.polkomtel.pl/dataflow/billing.git",
"credentialType": "TOKEN",
"credentialRef": "vault://git/billing-token"
}
// GET /api/v1/git/diff/{pipelineId}?from=3&to=5 → 200 OK (DiffResponse)
{
"pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
"fromVersion": 3,
"toVersion": 5,
"diff": "@@ -2,3 +2,3 @@\n- connection: teradata-stage\n+ connection: teradata-prod"
}
// POST /api/v1/git/merge → 200 OK (VersionOperationResult)
{
"success": true,
"mergedVersion": 6,
"message": "Branch feature/new-mapping merged into main"
}
Flink
Flink endpoints manage streaming jobs and session clusters on Kubernetes.
Flink jobs
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| POST | /api/v1/flink/jobs | submitFlinkJob | Submit a Flink job to Kubernetes | JWT |
| GET | /api/v1/flink/jobs | listFlinkJobs | List active (non-terminal) jobs | JWT |
| GET | /api/v1/flink/jobs/{jobId} | getFlinkJobStatus | Poll Flink REST API for job status | JWT |
| DELETE | /api/v1/flink/jobs/{jobId} | removeFlinkJob | Remove a terminated job from tracking | JWT |
| POST | /api/v1/flink/jobs/{jobId}/cancel | cancelFlinkJob | Cancel a running job (tears down app cluster) | JWT |
| POST | /api/v1/flink/jobs/{jobId}/savepoint | triggerFlinkSavepoint | Trigger a savepoint (stored in GCS) | JWT |
| GET | /api/v1/flink/jobs/{jobId}/metrics | getFlinkJobMetrics | Throughput/checkpoint/backpressure metrics | JWT |
| GET | /api/v1/flink/jobs/{jobId}/logs | getFlinkJobLogs | Get TaskManager log lines | JWT |
FlinkJobConfig.deploymentMode: APPLICATION | SESSION. FlinkJobRecord.status: CREATED | RUNNING | FAILING | FAILED | CANCELLING | CANCELED | FINISHED | RESTARTING | SUSPENDED. triggerFlinkSavepoint requires a path in the body. getFlinkJobLogs accepts lines (default 500).
// POST /api/v1/flink/jobs → 201 Created
// Request: FlinkJobConfig
{
"jobName": "cdr-stream-enrich",
"deploymentMode": "APPLICATION",
"jarUri": "gs://dataflow-artifacts/cdr-enrich-1.4.0.jar",
"parallelism": 8
}
// Response: FlinkJobHandle
{
"jobId": "00000000-cdr-enrich-0001",
"status": "CREATED",
"deploymentMode": "APPLICATION"
}
// POST /api/v1/flink/jobs/{jobId}/savepoint → 200 OK
// Request: { "path": "gs://dataflow-savepoints/cdr-enrich/" }
{
"savepointPath": "gs://dataflow-savepoints/cdr-enrich/savepoint-abc123"
}
Flink clusters
| Method | Path | Operation ID | Purpose | Auth |
|---|---|---|---|---|
| GET | /api/v1/flink/clusters | listFlinkClusters | List session clusters | JWT |
| POST | /api/v1/flink/clusters | createFlinkCluster | Create a session cluster on K8s | JWT |
| GET | /api/v1/flink/clusters/{name} | getFlinkCluster | Get cluster details | JWT |
| DELETE | /api/v1/flink/clusters/{name} | deleteFlinkCluster | Delete cluster, release K8s resources | JWT |
| GET | /api/v1/flink/clusters/{name}/health | getFlinkClusterHealth | Cluster health, TaskManager/slot readiness | JWT |
| POST | /api/v1/flink/clusters/{name}/scale | scaleFlinkCluster | Change the TaskManager count | JWT |
FlinkClusterInfo.status: STARTING | RUNNING | SCALING | DEGRADED | STOPPING | STOPPED | ERROR. scaleFlinkCluster requires taskManagers (≥ 1) in the body; createFlinkCluster returns 409 if a cluster of that name already exists.
// POST /api/v1/flink/clusters → 201 Created
// Request: FlinkClusterCreateRequest
{
"name": "streaming-prod",
"taskManagers": 3,
"taskSlotsPerManager": 4
}
// POST /api/v1/flink/clusters/{name}/scale → 200 OK (FlinkClusterInfo)
// Request: { "taskManagers": 6 }
{
"name": "streaming-prod",
"status": "SCALING",
"taskManagers": 6,
"taskSlotsPerManager": 4
}
Heads up
Cancelling an APPLICATION-mode job tears down its dedicated application cluster. For long-running streaming jobs, trigger a savepoint first so the job can be resumed from a consistent checkpoint stored in GCS.