Parquet MCP Server

Parquet MCP Server

Enables querying, modifying, and managing Parquet files with CRUD operations, semantic search, audit logging, and rollback capabilities for structured data storage.

Category
Visit Server

README

Parquet MCP Server

MCP server for interacting with parquet files in a repository. Provides comprehensive data management with audit logging, rollback capabilities, and semantic search.

Credits

This is a custom MCP server implementation for parquet file management with audit trail support.

Features

  • Read/Query: Query parquet files with filters, column selection, and limits
  • Add Records: Add new records to parquet files with audit trail
  • Update Records: Update existing records matching filters with audit trail
  • Upsert Records: Insert or update records (supports enhanced filters for duplicate detection)
  • Delete Records: Delete records matching filters with audit trail
  • Audit Log: Complete change history with old/new values for all modifications
  • Rollback: Undo specific operations using audit IDs
  • Schema Discovery: Get schema definitions for data types
  • Statistics: Get basic statistics about parquet files
  • Efficient Backups: Audit log entries (~1 KB) instead of full snapshots (99%+ storage reduction)
  • Optional Full Snapshots: Configurable periodic snapshots for additional safety

Installation

cd mcp-servers/parquet
pip install -r requirements.txt

Configuration

Cursor Configuration

Add to your Cursor MCP settings (typically ~/.cursor/mcp.json or Cursor settings):

Development (Audit Log Only):

{
  "mcpServers": {
    "parquet": {
      "command": "python",
      "args": [
        "$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
      ],
      "env": {}
    }
  }
}

Production (With Periodic Snapshots):

{
  "mcpServers": {
    "parquet": {
      "command": "python",
      "args": [
        "$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
      ],
      "env": {
        "MCP_FULL_SNAPSHOTS": "true",
        "MCP_SNAPSHOT_FREQUENCY": "weekly"
      }
    }
  }
}

Claude Desktop Configuration

Add to claude_desktop_config.json (typically ~/Library/Application Support/Claude/claude_desktop_config.json on macOS):

{
  "mcpServers": {
    "parquet": {
      "command": "python",
      "args": [
        "$REPO_ROOT/mcp-servers/parquet/parquet_mcp_server.py"
      ]
    }
  }
}

Available Tools

list_data_types

List all available data types (parquet files) in the data directory.

get_schema

Get the schema definition for a data type.

Parameters:

  • data_type (required): The data type name (e.g., 'flows', 'transactions', 'tasks')

read_parquet

Read and query a parquet file with optional filters. Supports enhanced filtering operators.

Parameters:

  • data_type (required): The data type name
  • filters (optional): Key-value pairs to filter records. Supports enhanced operators:
    • Simple value: exact match
    • List: in list (["value1", "value2"])
    • {"$contains": "text"}: substring match (case-insensitive)
    • {"$starts_with": "text"}: prefix match (case-insensitive)
    • {"$ends_with": "text"}: suffix match (case-insensitive)
    • {"$regex": "pattern"}: regex pattern match
    • {"$fuzzy": {"text": "query", "threshold": 0.7}}: fuzzy string matching (0-1 similarity)
    • {"$gt": 100}, {"$gte": 100}, {"$lt": 100}, {"$lte": 100}: numeric comparisons
    • {"$ne": "value"}: not equal
  • limit (optional): Maximum number of rows to return (default: 1000)
  • columns (optional): List of column names to return (default: all columns)

Examples:

{
  "data_type": "flows",
  "filters": {
    "category": "property_maintenance",
    "year": 2025
  },
  "limit": 100
}
{
  "data_type": "tasks",
  "filters": {
    "title": {"$contains": "therapy"},
    "status": {"$ne": "completed"}
  }
}
{
  "data_type": "tasks",
  "filters": {
    "title": {"$fuzzy": {"text": "therapy session", "threshold": 0.7}}
  }
}

add_record

Add a new record to a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name
  • record (required): The record data as a JSON object matching the schema

Example:

{
  "data_type": "flows",
  "record": {
    "flow_name": "Monthly Rent",
    "flow_date": "2025-01-15",
    "amount_usd": 1500.00,
    "category": "housing",
    "flow_type": "recurring_expense"
  }
}

update_records

Update existing records in a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name
  • filters (required): Filters to identify records to update
  • updates (required): Fields to update

Example:

{
  "data_type": "tasks",
  "filters": {
    "task_id": "abc123"
  },
  "updates": {
    "status": "completed",
    "completed_date": "2025-01-15"
  }
}

upsert_record

Insert or update a record (upsert). Checks for existing records using enhanced filters (supports all read_parquet filter operators including $contains, $fuzzy, etc.). If found, updates matching records. If not found, creates a new record. Returns whether it created or updated. Useful for preventing duplicates when adding contacts, tasks, or other records.

Parameters:

  • data_type (required): The data type name
  • filters (required): Enhanced filters to identify existing records (supports all read_parquet filter operators)
  • record (required): The record data to insert or update

Returns:

  • action: "created" or "updated"
  • audit_id or audit_ids: Audit log entry ID(s)
  • record_id: The ID of the created/updated record

Example (exact match):

{
  "data_type": "contacts",
  "filters": {
    "email": "galina@secod.com"
  },
  "record": {
    "name": "Galina Semakova",
    "email": "galina@secod.com",
    "category": "legal",
    "last_contact_date": "2025-12-24"
  }
}

Example (fuzzy match):

{
  "data_type": "contacts",
  "filters": {
    "name": {"$fuzzy": {"text": "Galina Semakova", "threshold": 0.8}}
  },
  "record": {
    "name": "Galina Semakova",
    "email": "galina@secod.com",
    "category": "legal",
    "last_contact_date": "2025-12-24"
  }
}

Example (contains match):

{
  "data_type": "tasks",
  "filters": {
    "title": {"$contains": "therapy payment"}
  },
  "record": {
    "title": "Pay for therapy session",
    "status": "pending",
    "due_date": "2025-12-25"
  }
}

delete_records

Delete records from a parquet file. Creates audit log entry and optional snapshot.

Parameters:

  • data_type (required): The data type name
  • filters (required): Filters to identify records to delete

Example:

{
  "data_type": "tasks",
  "filters": {
    "status": "canceled"
  }
}

get_statistics

Get basic statistics about a parquet file.

Parameters:

  • data_type (required): The data type name

read_audit_log

Read audit log entries with optional filters. View complete history of all data modifications.

Parameters:

  • data_type (optional): Filter by data type
  • operation (optional): Filter by operation (add, update, delete)
  • record_id (optional): Filter by specific record ID
  • limit (optional): Maximum number of entries to return (default: 100)

Example:

{
  "data_type": "transactions",
  "operation": "update",
  "limit": 50
}

rollback_operation

Rollback a specific operation using its audit ID. Creates inverse operation to undo changes.

Parameters:

  • audit_id (required): The audit ID of the operation to rollback

Rollback Logic:

  • add operation → Delete the record
  • update operation → Restore old values
  • delete operation → Restore the record

Example:

{
  "audit_id": "abc123def456"
}

search_parquet

Semantic search using embeddings. Searches text fields for semantically similar records.

Parameters:

  • data_type (required): The data type name
  • query (required): Search query text
  • text_fields (optional): List of text fields to search (default: auto-detect)
  • limit (optional): Maximum number of results (default: 10)
  • min_similarity (optional): Minimum cosine similarity threshold 0-1 (default: 0.7)
  • additional_filters (optional): Additional filters to apply (same format as read_parquet)

Prerequisites:

  • Must run generate_embeddings first to create embeddings for the data type
  • Requires OPENAI_API_KEY environment variable

Example:

{
  "data_type": "tasks",
  "query": "pay for therapy session",
  "limit": 5,
  "min_similarity": 0.7
}

generate_embeddings

Generate and store embeddings for text fields in a data type. Creates embeddings parquet file for semantic search.

Parameters:

  • data_type (required): The data type name
  • text_fields (optional): List of text fields to generate embeddings for (default: auto-detect)
  • force_regenerate (optional): Force regeneration of all embeddings (default: false)

Prerequisites:

  • Requires OPENAI_API_KEY environment variable

Example:

{
  "data_type": "tasks",
  "text_fields": ["title", "description", "notes"]
}

Note: Embeddings are cached. Only missing embeddings are generated unless force_regenerate is true.

Backup & Recovery

Audit Log (Default)

All write operations create lightweight audit log entries in data/logs/audit_log.parquet:

  • Storage: ~1 KB per operation (99%+ reduction vs full snapshots)
  • Content: Operation type, record ID, affected fields, old/new values, timestamp
  • Recovery: Rollback specific operations using rollback_operation tool

Optional Full Snapshots

Configure periodic full snapshots for additional safety:

Environment Variables:

  • MCP_FULL_SNAPSHOTS: Set to "true" to enable periodic snapshots (default: false)
  • MCP_SNAPSHOT_FREQUENCY: "daily", "weekly", "monthly", "never" (default: weekly)

Snapshot Location:

data/snapshots/[data_type]-[YYYY-MM-DD-HHMMSS].parquet

Storage Comparison

Approach Storage per Operation 100 Operations
Full snapshots (old) 10 MB 1 GB
Audit log (new) ~1 KB ~100 KB
Savings 99.99% 99.99%

Recovery Options

  1. Recent Changes: Use rollback_operation with audit ID
  2. Multiple Changes: Rollback operations in reverse chronological order
  3. Full Restore: Restore from periodic snapshot (if enabled)
  4. Point-in-Time: Restore snapshot + replay audit log to specific timestamp

See AUDIT_LOG_GUIDE.md for detailed documentation.

Data Types

The server automatically discovers data types by scanning data/ for directories containing [type].parquet files. Common data types include:

  • flows - Cash flow and expense data
  • transactions - Transaction data
  • tasks - Task management data
  • contacts - Contact/merchant information
  • income - Income data
  • fixed_costs - Fixed cost data
  • And many more...

Error Handling

The server returns structured error messages in JSON format when operations fail. Common errors include:

  • File not found errors
  • Schema validation errors
  • Column not found errors
  • Filter matching errors

Security Notes

  • All write operations create audit log entries for traceability
  • Audit logs are stored in data/logs/audit_log.parquet
  • Optional full snapshots can be configured for additional safety
  • Never commit sensitive data files to version control

Troubleshooting

  1. File Not Found Errors

    • Verify the data type exists in data/[type]/[type].parquet
    • Check file permissions
  2. Schema Validation Errors

    • Ensure records match the schema defined in data/schemas/[type]_schema.json
    • Check required fields are present
  3. Filter Matching Errors

    • Verify filter syntax matches supported operators
    • Check column names exist in the schema

Testing

After installation/updates, run the test script:

python3 mcp-servers/parquet/test_audit_log.py

This validates:

  • Audit log creation
  • Schema compliance
  • Operation tracking

See IMPLEMENTATION_SUMMARY.md for manual testing procedures.

Documentation

Notes

  • The server uses audit log for efficient change tracking (99%+ storage reduction)
  • All date fields are automatically converted to ISO format strings in responses
  • Null/NaN values are converted to null in JSON responses
  • The server runs in stdio mode for MCP communication
  • Audit log entries are never automatically deleted (manual archival if needed)

License

MIT

Support

Recommended Servers

playwright-mcp

playwright-mcp

A Model Context Protocol server that enables LLMs to interact with web pages through structured accessibility snapshots without requiring vision models or screenshots.

Official
Featured
TypeScript
Magic Component Platform (MCP)

Magic Component Platform (MCP)

An AI-powered tool that generates modern UI components from natural language descriptions, integrating with popular IDEs to streamline UI development workflow.

Official
Featured
Local
TypeScript
Audiense Insights MCP Server

Audiense Insights MCP Server

Enables interaction with Audiense Insights accounts via the Model Context Protocol, facilitating the extraction and analysis of marketing insights and audience data including demographics, behavior, and influencer engagement.

Official
Featured
Local
TypeScript
VeyraX MCP

VeyraX MCP

Single MCP tool to connect all your favorite tools: Gmail, Calendar and 40 more.

Official
Featured
Local
graphlit-mcp-server

graphlit-mcp-server

The Model Context Protocol (MCP) Server enables integration between MCP clients and the Graphlit service. Ingest anything from Slack to Gmail to podcast feeds, in addition to web crawling, into a Graphlit project - and then retrieve relevant contents from the MCP client.

Official
Featured
TypeScript
Kagi MCP Server

Kagi MCP Server

An MCP server that integrates Kagi search capabilities with Claude AI, enabling Claude to perform real-time web searches when answering questions that require up-to-date information.

Official
Featured
Python
E2B

E2B

Using MCP to run code via e2b.

Official
Featured
Neon Database

Neon Database

MCP server for interacting with Neon Management API and databases

Official
Featured
Exa Search

Exa Search

A Model Context Protocol (MCP) server lets AI assistants like Claude use the Exa AI Search API for web searches. This setup allows AI models to get real-time web information in a safe and controlled way.

Official
Featured
Qdrant Server

Qdrant Server

This repository is an example of how to create a MCP server for Qdrant, a vector search engine.

Official
Featured