mcp-dataforge
Turns natural language into data pipeline actions using six specialist agents that collaborate through MCP to build, validate, and monitor data infrastructure.
README
⚒️ mcp-dataforge
Multi-agent data engineering framework — MCP-native.
Turn natural language into data pipeline actions. Six specialist agents collaborate through the Model Context Protocol (MCP) to build, validate, and monitor your data infrastructure.
Quick Start
# Install
pip install mcp-dataforge
# Initialize a project
dataforge init
# Run a task
dataforge run "profile the customers table and check for nulls"
# Start the web dashboard
dataforge web
# → http://localhost:8080
Architecture
MCP Client (Claude Code, Cursor, etc.)
│
│ MCP Protocol (stdio)
▼
┌─────────────────────────────────────┐
│ Orchestrator MCP Server │
│ route_task · execute_task │
│ execute_parallel · execute_mixed │
│ list_agents · get_pipeline_status │
├─────────────────────────────────────┤
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Pipeline│ │ DQ │ │Schema│ │
│ └──────┘ └──────┘ └──────┘ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Catalog│ │Observ│ │Orch │ │
│ └──────┘ └──────┘ └──────┘ │
│ │
│ Sequential · Parallel · Mixed │
└─────────────────────────────────────┘
Execution Modes
| Mode | Description | Example |
|---|---|---|
| Sequential | Agents run one after another, context passes between them | Profile → Detect drift → Generate migration |
| Parallel | Multiple agents run concurrently, results merged | Scan schema + check health + search catalog |
| Mixed | Multi-stage: parallel groups followed by sequential steps | [DQ + Schema] in parallel → Catalog |
Built-in Agents
| Agent | Tools | Description |
|---|---|---|
| 🔧 Pipeline | generate_pipeline, debug_sql, explain_plan |
SQL generation, debugging, and optimization |
| ✅ Data Quality | profile_data, detect_anomalies, validate_rules |
Data profiling, anomaly detection, rule validation |
| 📐 Schema | detect_drift, generate_migration, lint_schema, lineage |
Schema comparison, migration scripts, linting |
| 📚 Catalog | search, describe, impact_analysis, tag |
Data discovery, documentation, change impact |
| 🔍 Observability | get_pipeline_health, alert_summary, cost_analysis, suggest_optimizations |
Pipeline health, alerts, cost optimization |
| ⚡ Orchestration | create_dag, manage_retry, resolve_deps, backfill, list_dags, pause, unpause, visualize |
DAG management, scheduling, dependency resolution |
CLI Usage
# Project setup
dataforge init # Create config.yaml
dataforge agent list # List configured agents
# Execution
dataforge run "task description" # Run a one-off task
dataforge start # Start orchestrator + agents
# Server modes
dataforge mcp-server # Run as MCP server (stdio)
dataforge mcp-server --transport sse --port 8080 # SSE mode
dataforge mcp # Print MCP config for Claude Code
# Web dashboard
dataforge web # Start web UI (http://localhost:8080)
dataforge web --port 9000 # Custom port
Run Complex Pipelines
# Sequential — agents run in order, context flows between them
dataforge run "profile customers table, detect schema drift, and generate migration"
# Multi-agent — single task routed to relevant agents
dataforge run "check data quality and search catalog for PII data"
Claude Code Integration
Add to your ~/.claude/settings.json:
{
"mcpServers": {
"dataforge": {
"command": "dataforge",
"args": ["mcp-server"]
}
}
}
Then from Claude Code:
route_task("check null rates in orders table")
→ Returns execution plan with 1 agent (dq)
execute_task("profile customers and fix schema drift")
→ Auto-routes to DQ + Schema agents, runs sequentially, returns results
execute_parallel({"steps": [
{"agent": "catalog", "task": "search for PII data"},
{"agent": "observability", "task": "health check"}
]})
→ Both agents run concurrently, results merged
execute_custom_pipeline({"pipeline": [
{"agent": "dq", "task": "profile orders"},
{"agent": "schema", "task": "detect drift"}
]})
→ Custom sequential pipeline with context passing
Web Dashboard
Start the dashboard to monitor pipelines, agents, and execution history:
dataforge web
# Open http://localhost:8080
| Endpoint | Method | Description |
|---|---|---|
/api/agents |
GET | List all agents with capabilities |
/api/pipelines |
GET | List all tracked pipelines |
/api/pipelines/{id} |
GET | Get pipeline status |
/api/execute |
POST | Execute a task |
/api/pipeline/parallel |
POST | Run parallel pipeline |
/api/pipeline/custom |
POST | Run custom sequential pipeline |
/api/pipeline/mixed |
POST | Run mixed (parallel + sequential) pipeline |
Configuration
# config.yaml
version: "1.0"
project: "my-data-platform"
agents:
pipeline:
command: "python -m d4.agents.pipeline.server"
transport: stdio
capabilities: ["sql", "spark"]
dq:
command: "python -m d4.agents.dq.server"
transport: stdio
capabilities: ["data_quality", "profiling", "validation"]
schema:
command: "python -m d4.agents.schema.server"
transport: stdio
capabilities: ["schema", "drift", "migration", "lineage"]
catalog:
command: "python -m d4.agents.catalog.server"
transport: stdio
capabilities: ["catalog", "discovery", "documentation", "tagging"]
observability:
command: "python -m d4.agents.observability.server"
transport: stdio
capabilities: ["observability", "monitoring", "alerts", "cost"]
orchestration:
command: "python -m d4.agents.orchestration.server"
transport: stdio
capabilities: ["orchestration", "dag", "scheduling", "backfill"]
Deploy to Production
See the full Deployment Guide for Docker Compose, Kubernetes, and SSE mode setup.
---
```bash
# Clone and install
git clone git@github.com:Prometheus-agent/mcp-dataforge.git
cd mcp-dataforge
pip install -e ".[dev]"
# Run tests (153+ tests)
python3 -m pytest
# Run specific test file
python3 -m pytest tests/test_orchestrator.py -v
# Run the MCP server locally
dataforge mcp-server
# Run the web dashboard
dataforge web
Project Structure
src/d4/
├── agents/
│ ├── pipeline/ # SQL pipeline generation
│ ├── dq/ # Data profiling & validation
│ ├── schema/ # Drift detection & migration
│ ├── catalog/ # Data discovery & docs
│ ├── observability/ # Health & cost monitoring
│ └── orchestration/ # DAG management & scheduling
├── config/ # YAML config loader
├── registry/ # Agent registry & discovery
├── orchestrator/ # Core orchestrator + MCP server
├── web/ # FastAPI web dashboard
├── cli/ # Click CLI
└── models/ # Pydantic data models
tests/ # 153+ tests across all modules
Building a Plugin
DataForge supports third-party agent plugins:
cp -r templates/d4-plugin d4-plugin-my-agent
cd d4-plugin-my-agent
# Rename <name> to your agent name
pip install -e .
Register in config.yaml:
agents:
my_agent:
command: "python -m d4_plugin_my_agent.server"
transport: stdio
capabilities: ["my_capability"]
See docs/guides/creating-a-plugin.md for full documentation.
Roadmap
Phase 1 — Core Foundation ✅
- [x] 6 specialist agents with 22+ tools
- [x] Orchestrator MCP server (stdio + SSE)
- [x] CLI with init, run, agent, mcp commands
- [x] Sequential, parallel, mixed pipeline execution
- [x] FastAPI web dashboard
- [x] 153+ tests, 100% passing
Phase 2 — Agent Expansion 🚧
- [ ] Data Quality agent with DuckDB profiling
- [ ] Schema agent with migration generation
- [ ] Catalog agent with impact analysis
Phase 3 — Ecosystem 🌐
- [ ] Docker deployment
- [ ] Plugin API documentation
- [ ] Third-party plugin support
License
Apache 2.0. See LICENSE.
Recommended Servers
playwright-mcp
A Model Context Protocol server that enables LLMs to interact with web pages through structured accessibility snapshots without requiring vision models or screenshots.
Magic Component Platform (MCP)
An AI-powered tool that generates modern UI components from natural language descriptions, integrating with popular IDEs to streamline UI development workflow.
Audiense Insights MCP Server
Enables interaction with Audiense Insights accounts via the Model Context Protocol, facilitating the extraction and analysis of marketing insights and audience data including demographics, behavior, and influencer engagement.
VeyraX MCP
Single MCP tool to connect all your favorite tools: Gmail, Calendar and 40 more.
graphlit-mcp-server
The Model Context Protocol (MCP) Server enables integration between MCP clients and the Graphlit service. Ingest anything from Slack to Gmail to podcast feeds, in addition to web crawling, into a Graphlit project - and then retrieve relevant contents from the MCP client.
Kagi MCP Server
An MCP server that integrates Kagi search capabilities with Claude AI, enabling Claude to perform real-time web searches when answering questions that require up-to-date information.
E2B
Using MCP to run code via e2b.
Neon Database
MCP server for interacting with Neon Management API and databases
Exa Search
A Model Context Protocol (MCP) server lets AI assistants like Claude use the Exa AI Search API for web searches. This setup allows AI models to get real-time web information in a safe and controlled way.
Qdrant Server
This repository is an example of how to create a MCP server for Qdrant, a vector search engine.