mcp-dataforge

mcp-dataforge

Turns natural language into data pipeline actions using six specialist agents that collaborate through MCP to build, validate, and monitor data infrastructure.

Category
Visit Server

README

⚒️ mcp-dataforge

Multi-agent data engineering framework — MCP-native.

Turn natural language into data pipeline actions. Six specialist agents collaborate through the Model Context Protocol (MCP) to build, validate, and monitor your data infrastructure.

Tests Python License


Quick Start

# Install
pip install mcp-dataforge

# Initialize a project
dataforge init

# Run a task
dataforge run "profile the customers table and check for nulls"

# Start the web dashboard
dataforge web
# → http://localhost:8080

Architecture

MCP Client (Claude Code, Cursor, etc.)
        │
        │ MCP Protocol (stdio)
        ▼
┌─────────────────────────────────────┐
│     Orchestrator MCP Server          │
│  route_task · execute_task           │
│  execute_parallel · execute_mixed    │
│  list_agents · get_pipeline_status   │
├─────────────────────────────────────┤
│                                     │
│  ┌──────┐ ┌──────┐ ┌──────┐        │
│  │Pipeline│ │  DQ  │ │Schema│        │
│  └──────┘ └──────┘ └──────┘        │
│  ┌──────┐ ┌──────┐ ┌──────┐        │
│  │Catalog│ │Observ│ │Orch  │        │
│  └──────┘ └──────┘ └──────┘        │
│                                     │
│  Sequential · Parallel · Mixed      │
└─────────────────────────────────────┘

Execution Modes

Mode Description Example
Sequential Agents run one after another, context passes between them Profile → Detect drift → Generate migration
Parallel Multiple agents run concurrently, results merged Scan schema + check health + search catalog
Mixed Multi-stage: parallel groups followed by sequential steps [DQ + Schema] in parallel → Catalog

Built-in Agents

Agent Tools Description
🔧 Pipeline generate_pipeline, debug_sql, explain_plan SQL generation, debugging, and optimization
Data Quality profile_data, detect_anomalies, validate_rules Data profiling, anomaly detection, rule validation
📐 Schema detect_drift, generate_migration, lint_schema, lineage Schema comparison, migration scripts, linting
📚 Catalog search, describe, impact_analysis, tag Data discovery, documentation, change impact
🔍 Observability get_pipeline_health, alert_summary, cost_analysis, suggest_optimizations Pipeline health, alerts, cost optimization
Orchestration create_dag, manage_retry, resolve_deps, backfill, list_dags, pause, unpause, visualize DAG management, scheduling, dependency resolution

CLI Usage

# Project setup
dataforge init                    # Create config.yaml
dataforge agent list              # List configured agents

# Execution
dataforge run "task description"  # Run a one-off task
dataforge start                   # Start orchestrator + agents

# Server modes
dataforge mcp-server              # Run as MCP server (stdio)
dataforge mcp-server --transport sse --port 8080  # SSE mode
dataforge mcp                     # Print MCP config for Claude Code

# Web dashboard
dataforge web                     # Start web UI (http://localhost:8080)
dataforge web --port 9000         # Custom port

Run Complex Pipelines

# Sequential — agents run in order, context flows between them
dataforge run "profile customers table, detect schema drift, and generate migration"

# Multi-agent — single task routed to relevant agents
dataforge run "check data quality and search catalog for PII data"

Claude Code Integration

Add to your ~/.claude/settings.json:

{
  "mcpServers": {
    "dataforge": {
      "command": "dataforge",
      "args": ["mcp-server"]
    }
  }
}

Then from Claude Code:

route_task("check null rates in orders table")
→ Returns execution plan with 1 agent (dq)

execute_task("profile customers and fix schema drift")
→ Auto-routes to DQ + Schema agents, runs sequentially, returns results

execute_parallel({"steps": [
  {"agent": "catalog", "task": "search for PII data"},
  {"agent": "observability", "task": "health check"}
]})
→ Both agents run concurrently, results merged

execute_custom_pipeline({"pipeline": [
  {"agent": "dq", "task": "profile orders"},
  {"agent": "schema", "task": "detect drift"}
]})
→ Custom sequential pipeline with context passing

Web Dashboard

Start the dashboard to monitor pipelines, agents, and execution history:

dataforge web
# Open http://localhost:8080
Endpoint Method Description
/api/agents GET List all agents with capabilities
/api/pipelines GET List all tracked pipelines
/api/pipelines/{id} GET Get pipeline status
/api/execute POST Execute a task
/api/pipeline/parallel POST Run parallel pipeline
/api/pipeline/custom POST Run custom sequential pipeline
/api/pipeline/mixed POST Run mixed (parallel + sequential) pipeline

Configuration

# config.yaml
version: "1.0"
project: "my-data-platform"

agents:
  pipeline:
    command: "python -m d4.agents.pipeline.server"
    transport: stdio
    capabilities: ["sql", "spark"]
  dq:
    command: "python -m d4.agents.dq.server"
    transport: stdio
    capabilities: ["data_quality", "profiling", "validation"]
  schema:
    command: "python -m d4.agents.schema.server"
    transport: stdio
    capabilities: ["schema", "drift", "migration", "lineage"]
  catalog:
    command: "python -m d4.agents.catalog.server"
    transport: stdio
    capabilities: ["catalog", "discovery", "documentation", "tagging"]
  observability:
    command: "python -m d4.agents.observability.server"
    transport: stdio
    capabilities: ["observability", "monitoring", "alerts", "cost"]
  orchestration:
    command: "python -m d4.agents.orchestration.server"
    transport: stdio
    capabilities: ["orchestration", "dag", "scheduling", "backfill"]

Deploy to Production

See the full Deployment Guide for Docker Compose, Kubernetes, and SSE mode setup.

---

```bash
# Clone and install
git clone git@github.com:Prometheus-agent/mcp-dataforge.git
cd mcp-dataforge
pip install -e ".[dev]"

# Run tests (153+ tests)
python3 -m pytest

# Run specific test file
python3 -m pytest tests/test_orchestrator.py -v

# Run the MCP server locally
dataforge mcp-server

# Run the web dashboard
dataforge web

Project Structure

src/d4/
├── agents/
│   ├── pipeline/         # SQL pipeline generation
│   ├── dq/               # Data profiling & validation
│   ├── schema/           # Drift detection & migration
│   ├── catalog/          # Data discovery & docs
│   ├── observability/    # Health & cost monitoring
│   └── orchestration/    # DAG management & scheduling
├── config/               # YAML config loader
├── registry/             # Agent registry & discovery
├── orchestrator/         # Core orchestrator + MCP server
├── web/                  # FastAPI web dashboard
├── cli/                  # Click CLI
└── models/               # Pydantic data models
tests/                    # 153+ tests across all modules

Building a Plugin

DataForge supports third-party agent plugins:

cp -r templates/d4-plugin d4-plugin-my-agent
cd d4-plugin-my-agent
# Rename <name> to your agent name
pip install -e .

Register in config.yaml:

agents:
  my_agent:
    command: "python -m d4_plugin_my_agent.server"
    transport: stdio
    capabilities: ["my_capability"]

See docs/guides/creating-a-plugin.md for full documentation.


Roadmap

Phase 1 — Core Foundation ✅

  • [x] 6 specialist agents with 22+ tools
  • [x] Orchestrator MCP server (stdio + SSE)
  • [x] CLI with init, run, agent, mcp commands
  • [x] Sequential, parallel, mixed pipeline execution
  • [x] FastAPI web dashboard
  • [x] 153+ tests, 100% passing

Phase 2 — Agent Expansion 🚧

  • [ ] Data Quality agent with DuckDB profiling
  • [ ] Schema agent with migration generation
  • [ ] Catalog agent with impact analysis

Phase 3 — Ecosystem 🌐

  • [ ] Docker deployment
  • [ ] Plugin API documentation
  • [ ] Third-party plugin support

License

Apache 2.0. See LICENSE.

Recommended Servers

playwright-mcp

playwright-mcp

A Model Context Protocol server that enables LLMs to interact with web pages through structured accessibility snapshots without requiring vision models or screenshots.

Official
Featured
TypeScript
Magic Component Platform (MCP)

Magic Component Platform (MCP)

An AI-powered tool that generates modern UI components from natural language descriptions, integrating with popular IDEs to streamline UI development workflow.

Official
Featured
Local
TypeScript
Audiense Insights MCP Server

Audiense Insights MCP Server

Enables interaction with Audiense Insights accounts via the Model Context Protocol, facilitating the extraction and analysis of marketing insights and audience data including demographics, behavior, and influencer engagement.

Official
Featured
Local
TypeScript
VeyraX MCP

VeyraX MCP

Single MCP tool to connect all your favorite tools: Gmail, Calendar and 40 more.

Official
Featured
Local
graphlit-mcp-server

graphlit-mcp-server

The Model Context Protocol (MCP) Server enables integration between MCP clients and the Graphlit service. Ingest anything from Slack to Gmail to podcast feeds, in addition to web crawling, into a Graphlit project - and then retrieve relevant contents from the MCP client.

Official
Featured
TypeScript
Kagi MCP Server

Kagi MCP Server

An MCP server that integrates Kagi search capabilities with Claude AI, enabling Claude to perform real-time web searches when answering questions that require up-to-date information.

Official
Featured
Python
E2B

E2B

Using MCP to run code via e2b.

Official
Featured
Neon Database

Neon Database

MCP server for interacting with Neon Management API and databases

Official
Featured
Exa Search

Exa Search

A Model Context Protocol (MCP) server lets AI assistants like Claude use the Exa AI Search API for web searches. This setup allows AI models to get real-time web information in a safe and controlled way.

Official
Featured
Qdrant Server

Qdrant Server

This repository is an example of how to create a MCP server for Qdrant, a vector search engine.

Official
Featured