Spark MCP (Model Context Protocol) Optimizer
vgiri2015
README
Spark MCP (Model Context Protocol) Optimizer
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
How It Works
Code Optimization Workflow
graph TB
subgraph Input
A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py]
end
subgraph MCP Client
B --> |Async HTTP| C[SparkMCPClient]
C --> |Protocol Handler| D[Tools Interface]
end
subgraph MCP Server
E[run_server.py] --> F[SparkMCPServer]
F --> |Tool Registry| G[optimize_spark_code]
F --> |Tool Registry| H[analyze_performance]
F --> |Protocol Handler| I[Claude AI Integration]
end
subgraph Resources
I --> |Code Analysis| J[Claude AI Model]
J --> |Optimization| K[Optimized Code Generation]
K --> |Validation| L[PySpark Runtime]
end
subgraph Output
M[optimized_spark_code.py]
N[performance_analysis.md]
end
D --> |MCP Request| F
G --> |Generate| M
H --> |Generate| N
classDef client fill:#e1f5fe,stroke:#01579b
classDef server fill:#f3e5f5,stroke:#4a148c
classDef resource fill:#e8f5e9,stroke:#1b5e20
classDef output fill:#fff3e0,stroke:#e65100
class A,B,C,D client
class E,F,G,H,I server
class J,K,L resource
class M,N,O output
Component Details
-
Input Layer
spark_code_input.py
: Source PySpark code for optimizationrun_client.py
: Client startup and configuration
-
MCP Client Layer
- Tools Interface: Protocol-compliant tool invocation
-
MCP Server Layer
run_server.py
: Server initialization- Tool Registry: Optimization and analysis tools
- Protocol Handler: MCP request/response management
-
Resource Layer
- Claude AI: Code analysis and optimization
- PySpark Runtime: Code execution and validation
-
Output Layer
optimized_spark_code.py
: Optimized codeperformance_analysis.md
: Detailed analysis
This workflow illustrates:
- Input PySpark code submission
- MCP protocol handling and routing
- Claude AI analysis and optimization
- Code transformation and validation
- Performance analysis and reporting
Architecture
This project follows the Model Context Protocol architecture for standardized AI model interactions:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ │ │ MCP Server │ │ Resources │
│ MCP Client │ │ (SparkMCPServer)│ │ │
│ (SparkMCPClient) │ │ │ │ ┌──────────────┐ │
│ │ │ ┌─────────┐ │ │ │ Claude AI │ │
│ ┌─────────┐ │ │ │ Tools │ │ <──> │ │ Model │ │
│ │ Tools │ │ │ │Registry │ │ │ └──────────────┘ │
│ │Interface│ │ <──> │ └─────────┘ │ │ │
│ └─────────┘ │ │ ┌─────────┐ │ │ ┌──────────────┐ │
│ │ │ │Protocol │ │ │ │ PySpark │ │
│ │ │ │Handler │ │ │ │ Runtime │ │
│ │ │ └─────────┘ │ │ └──────────────┘ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
│ │ │
v v v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Available │ │ Registered │ │ External │
│ Tools │ │ Tools │ │ Resources │
├──────────────┤ ├──────────────┤ ├──────────────┤
│optimize_code │ │optimize_code │ │ Claude API │
│analyze_perf │ │analyze_perf │ │ Spark Engine │
└──────────────┘ └──────────────┘ └──────────────┘
Components
-
MCP Client
- Provides tool interface for code optimization
- Handles async communication with server
- Manages file I/O for code generation
-
MCP Server
- Implements MCP protocol handlers
- Manages tool registry and execution
- Coordinates between client and resources
-
Resources
- Claude AI: Provides code optimization intelligence
- PySpark Runtime: Executes and validates optimizations
Protocol Flow
- Client sends optimization request via MCP protocol
- Server validates request and invokes appropriate tool
- Tool utilizes Claude AI for optimization
- Optimized code is returned via MCP response
- Client saves and validates the optimized code
End-to-End Functionality
sequenceDiagram
participant U as User
participant C as MCP Client
participant S as MCP Server
participant AI as Claude AI
participant P as PySpark Runtime
U->>C: Submit Spark Code
C->>S: Send Optimization Request
S->>AI: Analyze Code
AI-->>S: Optimization Suggestions
S->>C: Return Optimized Code
C->>P: Run Original Code
C->>P: Run Optimized Code
P-->>C: Execution Results
C->>C: Generate Analysis
C-->>U: Final Report
-
Code Submission
- User places PySpark code in
v1/input/spark_code_input.py
- Code is read by the MCP client
- User places PySpark code in
-
Optimization Process
- MCP client connects to server via standardized protocol
- Server forwards code to Claude AI for analysis
- AI suggests optimizations based on best practices
- Server validates and processes suggestions
-
Code Generation
- Optimized code saved to
v1/output/optimized_spark_code.py
- Includes detailed comments explaining optimizations
- Maintains original code structure while improving performance
- Optimized code saved to
-
Performance Analysis
- Both versions executed in PySpark runtime
- Execution times compared
- Results validated for correctness
- Metrics collected and analyzed
-
Results Generation
- Comprehensive analysis in
v1/output/performance_analysis.md
- Side-by-side execution comparison
- Performance improvement statistics
- Optimization explanations and rationale
- Comprehensive analysis in
Usage
Requirements
- Python 3.8+
- PySpark 3.2.0+
- Anthropic API Key (for Claude AI)
Installation
pip install -r requirements.txt
Quick Start
-
Add your Spark code to optimize in
input/spark_code_input.py
-
Start the MCP server:
python v1/run_server.py
- Run the client to optimize your code:
python v1/run_client.py
This will generate two files:
output/optimized_spark_example.py
: The optimized Spark code with detailed optimization commentsoutput/performance_analysis.md
: Comprehensive performance analysis
- Run and compare code versions:
python v1/run_optimized.py
This will:
- Execute both original and optimized code
- Compare execution times and results
- Update the performance analysis with execution metrics
- Show detailed performance improvement statistics
Project Structure
ai-mcp/
├── input/
│ └── spark_code_input.py # Original Spark code to optimize
├── output/
│ ├── optimized_spark_example.py # Generated optimized code
│ └── performance_analysis.md # Detailed performance comparison
├── spark_mcp/
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── run_client.py # Client script to optimize code
├── run_server.py # Server startup script
└── run_optimized.py # Script to run and compare code versions
Why MCP?
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
Direct Claude AI Call vs MCP Server
Aspect | Direct Claude AI Call | MCP Server |
---|---|---|
Integration | • Custom integration per team<br>• Manual response handling<br>• Duplicate implementations | • Pre-built client libraries<br>• Automated workflows<br>• Unified interfaces |
Infrastructure | • No built-in validation<br>• No result persistence<br>• Manual tracking | • Automatic validation<br>• Result persistence<br>• Version control |
Context | • Basic code suggestions<br>• No execution context<br>• Limited optimization scope | • Context-aware optimization<br>• Full execution history<br>• Comprehensive improvements |
Validation | • Manual testing required<br>• No performance metrics<br>• Uncertain outcomes | • Automated testing<br>• Performance metrics<br>• Validated results |
Workflow | • Ad-hoc process<br>• No standardization<br>• Manual intervention needed | • Structured process<br>• Standard protocols<br>• Automated pipeline |
Key Differences:
1. AI Integration
Approach | Code Example | Benefits |
---|---|---|
Traditional | client = anthropic.Client(api_key) <br>response = client.messages.create(...) |
• Complex setup<br>• Custom error handling<br>• Tight coupling |
MCP | client = SparkMCPClient() <br>result = await client.optimize_spark_code(code) |
• Simple interface<br>• Built-in validation<br>• Loose coupling |
2. Tool Management
Approach | Code Example | Benefits |
---|---|---|
Traditional | class SparkOptimizer: <br> def register_tool(self, name, func): <br> self.tools[name] = func |
• Manual registration<br>• No validation<br>• Complex maintenance |
MCP | @register_tool("optimize_spark_code") <br>async def optimize_spark_code(code: str): |
• Auto-discovery<br>• Type checking<br>• Easy extension |
3. Resource Management
Approach | Code Example | Benefits |
---|---|---|
Traditional | def __init__(self): <br> self.claude = init_claude() <br> self.spark = init_spark() |
• Manual orchestration<br>• Manual cleanup<br>• Error-prone |
MCP | @requires_resources(["claude_ai", "spark"]) <br>async def optimize_spark_code(code: str): |
• Auto-coordination<br>• Lifecycle management<br>• Error handling |
4. Communication Protocol
Approach | Code Example | Benefits |
---|---|---|
Traditional | {"type": "request", <br> "payload": {"code": code}} |
• Custom format<br>• Manual validation<br>• Custom debugging |
MCP | {"method": "tools/call", <br> "params": {"name": "optimize_code"}} |
• Standard format<br>• Auto-validation<br>• Easy debugging |
Features
- Intelligent Code Optimization: Leverages Claude AI to analyze and optimize PySpark code
- Performance Analysis: Provides detailed analysis of performance differences between original and optimized code
- MCP Architecture: Implements the Model Context Protocol for standardized AI model interactions
- Easy Integration: Simple client interface for code optimization requests
- Code Generation: Automatically saves optimized code to separate files
Advanced Usage
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient
async def main():
# Connect to the MCP server
client = SparkMCPClient()
await client.connect()
# Your Spark code to optimize
spark_code = '''
# Your PySpark code here
'''
# Get optimized code with performance analysis
optimized_code = await client.optimize_spark_code(
code=spark_code,
optimization_level="advanced",
save_to_file=True # Save to output/optimized_spark_example.py
)
# Analyze performance differences
analysis = await client.analyze_performance(
original_code=spark_code,
optimized_code=optimized_code,
save_to_file=True # Save to output/performance_analysis.md
)
# Run both versions and compare
# You can use the run_optimized.py script or implement your own comparison
await client.close()
# Analyze performance
performance = await client.analyze_performance(spark_code, optimized_code)
await client.close()
Example Input and Output
The repository includes an example workflow:
- Input Code (
input/spark_code_input.py
):
# Create DataFrames and join
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])
# Join and analyze
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg({"salary": "avg", "age": "avg", "id": "count"}) \
.orderBy("dept")
- Optimized Code (
output/optimized_spark_example.py
):
# Performance-optimized version with caching and improved configurations
spark = SparkSession.builder \
.appName("EmployeeAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Create and cache DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()
# Optimized join and analysis
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg(
avg("salary").alias("avg_salary"),
avg("age").alias("avg_age"),
count("id").alias("employee_count")
) \
.orderBy("dept")
- Performance Analysis (
output/performance_analysis.md
):
## Execution Results Comparison
### Timing Comparison
- Original Code: 5.18 seconds
- Optimized Code: 0.65 seconds
- Performance Improvement: 87.4%
### Optimization Details
- Caching frequently used DataFrames
- Optimized shuffle partitions
- Improved column expressions
- Better memory management
Project Structure
ai-mcp/
├── spark_mcp/
│ ├── __init__.py
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── examples/
│ ├── optimize_code.py # Example usage
│ └── optimized_spark_example.py # Generated optimized code
├── requirements.txt
└── run_server.py # Server startup script
Available Tools
-
optimize_spark_code
- Optimizes PySpark code for better performance
- Supports basic and advanced optimization levels
- Automatically saves optimized code to examples/optimized_spark_example.py
-
analyze_performance
- Analyzes performance differences between original and optimized code
- Provides insights on:
- Performance improvements
- Resource utilization
- Scalability considerations
- Potential trade-offs
Environment Variables
ANTHROPIC_API_KEY
: Your Anthropic API key for Claude AI
Example Optimizations
The system implements various PySpark optimizations including:
- Broadcast joins for small-large table joins
- Efficient window function usage
- Strategic data caching
- Query plan optimizations
- Performance-oriented operation ordering
Contributing
Feel free to submit issues and enhancement requests!
License
MIT License
Recommended Servers
Crypto Price & Market Analysis MCP Server
A Model Context Protocol (MCP) server that provides comprehensive cryptocurrency analysis using the CoinCap API. This server offers real-time price data, market analysis, and historical trends through an easy-to-use interface.
MCP PubMed Search
Server to search PubMed (PubMed is a free, online database that allows users to search for biomedical and life sciences literature). I have created on a day MCP came out but was on vacation, I saw someone post similar server in your DB, but figured to post mine.
dbt Semantic Layer MCP Server
A server that enables querying the dbt Semantic Layer through natural language conversations with Claude Desktop and other AI assistants, allowing users to discover metrics, create queries, analyze data, and visualize results.
mixpanel
Connect to your Mixpanel data. Query events, retention, and funnel data from Mixpanel analytics.

Sequential Thinking MCP Server
This server facilitates structured problem-solving by breaking down complex issues into sequential steps, supporting revisions, and enabling multiple solution paths through full MCP integration.

Nefino MCP Server
Provides large language models with access to news and information about renewable energy projects in Germany, allowing filtering by location, topic (solar, wind, hydrogen), and date range.
Vectorize
Vectorize MCP server for advanced retrieval, Private Deep Research, Anything-to-Markdown file extraction and text chunking.
Mathematica Documentation MCP server
A server that provides access to Mathematica documentation through FastMCP, enabling users to retrieve function documentation and list package symbols from Wolfram Mathematica.
kb-mcp-server
An MCP server aimed to be portable, local, easy and convenient to support semantic/graph based retrieval of txtai "all in one" embeddings database. Any txtai embeddings db in tar.gz form can be loaded
Research MCP Server
The server functions as an MCP server to interact with Notion for retrieving and creating survey data, integrating with the Claude Desktop Client for conducting and reviewing surveys.