API reference

API reference: Pipeline Engine

The Pipeline Engine (openapi-pipeline-engine.yaml, port 8082) owns pipeline definitions, execution, scheduling, orchestration, Git-backed versioning, and Flink streaming jobs. All endpoints are served under /api/v1 and require a bearerAuth JWT — webhook endpoints authenticate with HMAC signature headers instead.


Pipelines

Pipeline definitions are managed by the metadata-service behind the /api/v1/pipelines/** gateway prefix. A pipeline carries a YAML definition and an auto-incrementing version. New pipelines start in DRAFT.

MethodPathOperation IDPurposeAuth
GET/api/v1/pipelineslistPipelinesList pipelinesJWT
POST/api/v1/pipelinescreatePipelineCreate pipeline (starts in DRAFT)JWT
GET/api/v1/pipelines/{id}getPipelineGet pipeline including YAMLJWT
PUT/api/v1/pipelines/{id}updatePipelineUpdate pipeline (auto-increments version)JWT
DELETE/api/v1/pipelines/{id}deletePipelineDelete pipeline + all versions/runsJWT

Parameters

EndpointParameterInTypeNotes
listPipelinesworkspaceIdqueryuuidFilter by workspace
listPipelinessearchquerystringFree-text name filter
getPipeline / updatePipeline / deletePipelineidpathuuidPipeline ID

Pipeline model

Pipeline key fields: id, workspaceId, name, description, yamlDefinition, version (int ≥ 1), status, scheduleCron, tags[], sourceConnectionId, targetConnectionId, maxRetries (default 3), retryDelaySeconds (default 60), timeoutSeconds (default 3600), notifications, isActive, createdAt, updatedAt, createdBy.

status enum: DRAFT | SCHEDULED | RUNNING | SUCCESS | FAILED | WARNING | CANCELLED.

PipelineCreate requires name and yamlDefinition.

// POST /api/v1/pipelines  →  201 Created
// Request: PipelineCreate
{
  "name": "billing-daily-load",
  "description": "Daily Teradata → Snowflake billing extract",
  "yamlDefinition": "version: 1\nsource:\n  connection: teradata-prod\ntarget:\n  connection: snowflake-dwh\n",
  "scheduleCron": "0 2 * * *",
  "tags": ["billing", "daily"],
  "sourceConnectionId": "a1b2c3d4-0000-0000-0000-000000000001",
  "targetConnectionId": "a1b2c3d4-0000-0000-0000-000000000002"
}
// Response: Pipeline
{
  "id": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "workspaceId": "9c1f0e2a-1111-2222-3333-444455556666",
  "name": "billing-daily-load",
  "description": "Daily Teradata → Snowflake billing extract",
  "yamlDefinition": "version: 1\nsource:\n  connection: teradata-prod\n...",
  "version": 1,
  "status": "DRAFT",
  "scheduleCron": "0 2 * * *",
  "tags": ["billing", "daily"],
  "maxRetries": 3,
  "retryDelaySeconds": 60,
  "timeoutSeconds": 3600,
  "isActive": true,
  "createdAt": "2025-12-15T14:30:00Z",
  "updatedAt": "2025-12-15T14:30:00Z",
  "createdBy": "u-1001"
}

Note

updatePipeline does not mutate the existing version — it auto-increments version and stores a new revision. The full revision history is available through the Git endpoints below. deletePipeline cascades to every version and every run.


Execution and runs

A run is a compiled, scheduled execution of a pipeline. runPipeline returns immediately with 202 Accepted — poll getRunStatus for progress.

MethodPathOperation IDPurposeAuth
POST/api/v1/pipelines/{id}/runrunPipelineCompile + schedule execution; returns immediatelyJWT
GET/api/v1/runs/{runId}getRunStatusGet run status + per-task statusesJWT
POST/api/v1/runs/{runId}/cancelcancelRunCancel a running executionJWT

Parameters and bodies

EndpointParameterInTypeNotes
runPipelineidpathuuidPipeline ID
runPipelinebodyRunRequestOptional yamlDefinition, triggeredBy
getRunStatus / cancelRunrunIdpathuuidRun ID

RunResponse: runId, pipelineId, status (RUNNING | PENDING), taskCount, engine (SPARK | FLINK | JDBC). RunStatusResponse.overallStatus: PENDING | RUNNING | COMPLETED | FAILED | CANCELLED; taskStatuses and errors are maps keyed by task ID.

// POST /api/v1/pipelines/{id}/run  →  202 Accepted
// Request: RunRequest (all fields optional)
{
  "triggeredBy": "u-1001"
}
// Response: RunResponse
{
  "runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "status": "RUNNING",
  "taskCount": 4,
  "engine": "SPARK"
}
// GET /api/v1/runs/{runId}  →  200 OK  (RunStatusResponse)
{
  "runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "overallStatus": "RUNNING",
  "taskStatuses": {
    "extract": "COMPLETED",
    "transform": "RUNNING",
    "load": "PENDING"
  },
  "errors": {}
}
// POST /api/v1/runs/{runId}/cancel  →  200 OK
{
  "runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
  "status": "CANCELLED",
  "message": "Run cancelled by user request"
}

Note

The gateway routing table references a WebSocket stream at /api/v1/runs/*/stream for live run updates, but it is not defined as a formal path operation in the spec. Use getRunStatus polling as the documented mechanism.


Scheduler

The scheduler manages cron schedules, backfill jobs, and pipeline dependency graphs.

Schedules

MethodPathOperation IDPurposeAuth
GET/api/v1/scheduler/scheduleslistSchedulesList all schedulesJWT
POST/api/v1/scheduler/schedulescreateScheduleCreate a cron scheduleJWT
GET/api/v1/scheduler/schedules/{id}getScheduleGet scheduleJWT
PUT/api/v1/scheduler/schedules/{id}updateScheduleUpdate scheduleJWT
DELETE/api/v1/scheduler/schedules/{id}deleteScheduleDelete scheduleJWT
POST/api/v1/scheduler/schedules/{id}/pausepauseSchedulePause scheduleJWT
POST/api/v1/scheduler/schedules/{id}/resumeresumeScheduleResume paused scheduleJWT
GET/api/v1/scheduler/next-runsgetNextRunsPreview upcoming run timesJWT

CreateScheduleRequest fields: cronExpression (5-field), timezone, businessDaysOnly, skipHolidays, maintenanceWindows[], overlapStrategy (SKIP | QUEUE | CANCEL_RUNNING), catchupStrategy (SKIP_TO_CURRENT | RUN_ALL | RUN_LATEST_N). getNextRuns accepts limit (default 20).

// POST /api/v1/scheduler/schedules  →  201 Created
// Request: CreateScheduleRequest
{
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "cronExpression": "0 2 * * *",
  "timezone": "Europe/Warsaw",
  "businessDaysOnly": false,
  "skipHolidays": true,
  "overlapStrategy": "SKIP",
  "catchupStrategy": "SKIP_TO_CURRENT"
}
// Response: ScheduleResponse
{
  "id": "7b2e1c44-aaaa-bbbb-cccc-ddddeeeeffff",
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "cronExpression": "0 2 * * *",
  "timezone": "Europe/Warsaw",
  "status": "ACTIVE",
  "nextRunAt": "2025-12-16T01:00:00Z"
}

Backfills

MethodPathOperation IDPurposeAuth
POST/api/v1/scheduler/backfillcreateBackfillCreate backfill job for a date rangeJWT
GET/api/v1/scheduler/backfill/{id}getBackfillProgressGet backfill progressJWT
POST/api/v1/scheduler/backfill/{id}/pausepauseBackfillPause backfillJWT
POST/api/v1/scheduler/backfill/{id}/resumeresumeBackfillResume backfillJWT
POST/api/v1/scheduler/backfill/{id}/cancelcancelBackfillCancel backfillJWT

CreateBackfillRequest fields: granularity (HOURLY | DAILY | WEEKLY | MONTHLY), concurrency, priority (LOW | MEDIUM | HIGH), dryRun, parameterOverrides.

// POST /api/v1/scheduler/backfill  →  201 Created
// Request: CreateBackfillRequest
{
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "from": "2025-11-01",
  "to": "2025-11-30",
  "granularity": "DAILY",
  "concurrency": 4,
  "priority": "MEDIUM",
  "dryRun": false
}
// Response: BackfillResponse
{
  "id": "c9d8e7f6-1234-5678-9abc-def012345678",
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "status": "RUNNING",
  "totalIntervals": 30,
  "completedIntervals": 11,
  "failedIntervals": 0
}

Dependencies

MethodPathOperation IDPurposeAuth
GET/api/v1/scheduler/dependencies/{pipelineId}getDependencyGraphGet up/downstream dependency graphJWT
POST/api/v1/scheduler/dependenciesaddDependencyCreate a dependency between pipelinesJWT
DELETE/api/v1/scheduler/dependencies/{id}removeDependencyRemove a dependencyJWT

AddDependencyRequest.type: STRONG | WEAK. Adding a cross-workspace dependency returns 403.

// POST /api/v1/scheduler/dependencies  →  201 Created
// Request: AddDependencyRequest
{
  "upstreamPipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "downstreamPipelineId": "a1b2c3d4-9999-8888-7777-666655554444",
  "type": "STRONG"
}

Orchestrator

The orchestrator integrates external schedulers (Airflow, AutomateNow) via webhooks, callbacks, and DAG synchronization. Trigger webhooks authenticate with HMAC headers, not JWT.

MethodPathOperation IDPurposeAuth
POST/api/v1/orchestrator/triggertriggerPipelineExternal webhook triggerHMAC
POST/api/v1/orchestrator/callbackreceiveCallbackReceive completion/failure callback
GET/api/v1/orchestrator/airflow/dagslistSyncedDagsList DAGs generated from pipelinesJWT
POST/api/v1/orchestrator/airflow/sync/{pipelineId}syncPipelineAsAirflowDagGenerate + sync an Airflow DAG from YAMLJWT
POST/api/v1/orchestrator/airflow/trigger/{dagId}triggerAirflowDagTrigger an Airflow DAG runJWT
GET/api/v1/orchestrator/airflow/status/{dagId}/{dagRunId}getAirflowDagRunStatusGet Airflow DAG run statusJWT
GET/api/v1/orchestrator/webhookslistWebhooksList registered webhooksJWT
POST/api/v1/orchestrator/webhooksregisterWebhookRegister a webhook endpointJWT

triggerPipeline requires headers X-Webhook-Signature and X-Webhook-Id, and a TriggerRequest body requiring external_job_id. WebhookRegistrationRequest requires name, source, hmac_secret, callback_url. triggerPipeline and triggerAirflowDag return 202 Accepted.

// POST /api/v1/orchestrator/trigger  →  202 Accepted
// Headers: X-Webhook-Signature: <hmac>, X-Webhook-Id: <uuid>
// Request: TriggerRequest
{
  "external_job_id": "airflow-billing-2025-12-15",
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "parameters": { "loadDate": "2025-12-15" }
}
// Response: TriggerResponse
{
  "runId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
  "external_job_id": "airflow-billing-2025-12-15",
  "status": "ACCEPTED"
}
// POST /api/v1/orchestrator/airflow/trigger/{dagId}  →  202 Accepted
{
  "dagId": "dataflow_billing_daily_load",
  "dagRunId": "manual__2025-12-15T14:30:00+00:00",
  "state": "queued"
}

Git

Pipeline definitions are version-controlled in Git. These endpoints manage repositories, history, diffs, rollback, branches, and merges. The Git webhook authenticates with provider HMAC headers.

MethodPathOperation IDPurposeAuth
GET/api/v1/git/reposlistReposList workspaces with Git reposJWT
POST/api/v1/git/reposconnectRepoClone remote / init local repoJWT
GET/api/v1/git/history/{pipelineId}getHistoryGet pipeline version historyJWT
GET/api/v1/git/diff/{pipelineId}diffVersionsDiff two pipeline versionsJWT
POST/api/v1/git/rollback/{pipelineId}/{version}rollbackRoll back a pipeline to a versionJWT
GET/api/v1/git/brancheslistBranchesList branchesJWT
POST/api/v1/git/branchescreateBranchCreate a branchJWT
POST/api/v1/git/mergemergeBranchMerge a branchJWT
POST/api/v1/git/webhooksreceiveGitWebhookReceive GitHub/GitLab push & PR/MR eventsHMAC

Parameters

EndpointParameterInTypeNotes
getHistorypipelineIdpathuuid
getHistoryworkspaceSlugquerystringDefault default
getHistorymaxVersionsqueryintDefault 50
diffVersionsfrom, toquerystringRequired
diffVersionsworkspaceSlugquerystring
rollbackpipelineId, versionpath
rollbackworkspaceSlugquerystring
listBranchesworkspaceSlugquerystring
listBranchesincludeRemotequerybool
receiveGitWebhookX-GitHub-Event, X-Hub-Signature-256, X-Gitlab-Event, X-Gitlab-TokenheaderstringProvider HMAC

ConnectRepoRequest.credentialType: NONE | TOKEN | SSH_KEY. WebhookResult.action: SYNC | DEPLOY | VALIDATE | IGNORE | ERROR. mergeBranch returns 409 on a merge conflict.

// POST /api/v1/git/repos  →  201 Created
// Request: ConnectRepoRequest
{
  "workspaceSlug": "billing-team",
  "remoteUrl": "https://gitlab.polkomtel.pl/dataflow/billing.git",
  "credentialType": "TOKEN",
  "credentialRef": "vault://git/billing-token"
}
// GET /api/v1/git/diff/{pipelineId}?from=3&to=5  →  200 OK  (DiffResponse)
{
  "pipelineId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "fromVersion": 3,
  "toVersion": 5,
  "diff": "@@ -2,3 +2,3 @@\n-  connection: teradata-stage\n+  connection: teradata-prod"
}
// POST /api/v1/git/merge  →  200 OK  (VersionOperationResult)
{
  "success": true,
  "mergedVersion": 6,
  "message": "Branch feature/new-mapping merged into main"
}

Flink endpoints manage streaming jobs and session clusters on Kubernetes.

MethodPathOperation IDPurposeAuth
POST/api/v1/flink/jobssubmitFlinkJobSubmit a Flink job to KubernetesJWT
GET/api/v1/flink/jobslistFlinkJobsList active (non-terminal) jobsJWT
GET/api/v1/flink/jobs/{jobId}getFlinkJobStatusPoll Flink REST API for job statusJWT
DELETE/api/v1/flink/jobs/{jobId}removeFlinkJobRemove a terminated job from trackingJWT
POST/api/v1/flink/jobs/{jobId}/cancelcancelFlinkJobCancel a running job (tears down app cluster)JWT
POST/api/v1/flink/jobs/{jobId}/savepointtriggerFlinkSavepointTrigger a savepoint (stored in GCS)JWT
GET/api/v1/flink/jobs/{jobId}/metricsgetFlinkJobMetricsThroughput/checkpoint/backpressure metricsJWT
GET/api/v1/flink/jobs/{jobId}/logsgetFlinkJobLogsGet TaskManager log linesJWT

FlinkJobConfig.deploymentMode: APPLICATION | SESSION. FlinkJobRecord.status: CREATED | RUNNING | FAILING | FAILED | CANCELLING | CANCELED | FINISHED | RESTARTING | SUSPENDED. triggerFlinkSavepoint requires a path in the body. getFlinkJobLogs accepts lines (default 500).

// POST /api/v1/flink/jobs  →  201 Created
// Request: FlinkJobConfig
{
  "jobName": "cdr-stream-enrich",
  "deploymentMode": "APPLICATION",
  "jarUri": "gs://dataflow-artifacts/cdr-enrich-1.4.0.jar",
  "parallelism": 8
}
// Response: FlinkJobHandle
{
  "jobId": "00000000-cdr-enrich-0001",
  "status": "CREATED",
  "deploymentMode": "APPLICATION"
}
// POST /api/v1/flink/jobs/{jobId}/savepoint  →  200 OK
// Request: { "path": "gs://dataflow-savepoints/cdr-enrich/" }
{
  "savepointPath": "gs://dataflow-savepoints/cdr-enrich/savepoint-abc123"
}
MethodPathOperation IDPurposeAuth
GET/api/v1/flink/clusterslistFlinkClustersList session clustersJWT
POST/api/v1/flink/clusterscreateFlinkClusterCreate a session cluster on K8sJWT
GET/api/v1/flink/clusters/{name}getFlinkClusterGet cluster detailsJWT
DELETE/api/v1/flink/clusters/{name}deleteFlinkClusterDelete cluster, release K8s resourcesJWT
GET/api/v1/flink/clusters/{name}/healthgetFlinkClusterHealthCluster health, TaskManager/slot readinessJWT
POST/api/v1/flink/clusters/{name}/scalescaleFlinkClusterChange the TaskManager countJWT

FlinkClusterInfo.status: STARTING | RUNNING | SCALING | DEGRADED | STOPPING | STOPPED | ERROR. scaleFlinkCluster requires taskManagers (≥ 1) in the body; createFlinkCluster returns 409 if a cluster of that name already exists.

// POST /api/v1/flink/clusters  →  201 Created
// Request: FlinkClusterCreateRequest
{
  "name": "streaming-prod",
  "taskManagers": 3,
  "taskSlotsPerManager": 4
}
// POST /api/v1/flink/clusters/{name}/scale  →  200 OK  (FlinkClusterInfo)
// Request: { "taskManagers": 6 }
{
  "name": "streaming-prod",
  "status": "SCALING",
  "taskManagers": 6,
  "taskSlotsPerManager": 4
}

Heads up

Cancelling an APPLICATION-mode job tears down its dedicated application cluster. For long-running streaming jobs, trigger a savepoint first so the job can be resumed from a consistent checkpoint stored in GCS.

Previous
API overview