Discover Awesome MCP Servers
Extend your agent with 16,348 capabilities via MCP servers.
- All16,348
- Developer Tools3,867
- Search1,714
- Research & Data1,557
- AI Integration Systems229
- Cloud Platforms219
- Data & App Analysis181
- Database Interaction177
- Remote Shell Execution165
- Browser Automation147
- Databases145
- Communication137
- AI Content Generation127
- OS Automation120
- Programming Docs Access109
- Content Fetching108
- Note Taking97
- File Systems96
- Version Control93
- Finance91
- Knowledge & Memory90
- Monitoring79
- Security71
- Image & Video Processing69
- Digital Note Management66
- AI Memory Systems62
- Advanced AI Reasoning59
- Git Management Tools58
- Cloud Storage51
- Entertainment & Media43
- Virtualization42
- Location Services35
- Web Automation & Stealth32
- Media Content Processing32
- Calendar Management26
- Ecommerce & Retail18
- Speech Processing18
- Customer Data Platforms16
- Travel & Transportation14
- Education & Learning Tools13
- Home Automation & IoT13
- Web Search Integration12
- Health & Wellness10
- Customer Support10
- Marketing9
- Games & Gamification8
- Google Cloud Integrations7
- Art & Culture4
- Language Translation3
- Legal & Compliance2
OAuth MCP Server
A complete OAuth 2.1 server implementation for FastMCP with PKCE support, enabling secure authentication and authorization flows. Provides authorization code exchange, token management, and refresh capabilities for building authenticated MCP applications.
Jokes MCP Server
An MCP server that provides jokes on demand, allowing users to request different types of jokes (Chuck Norris, Dad jokes, etc.) through natural language in both GitHub Copilot and Microsoft Copilot Studio.
SafetyCulture MCP Server
镜子 (jìng zi)
All-Search MCP Server
An MCP server that enables web search and document retrieval capabilities through Tavily API and LangConnect vector database, supporting AI agents in gathering information for comprehensive report generation.
dev-kit-mcp-server
dev-kit-mcp-server
fast-pyairbyte
fast-pyairbyte lets you create a data pipeline in code ,from any Airbyte connector, with a single prompt.
TfL Journey Status MCP Server
Provides real-time access to Transport for London data, enabling users to check tube line statuses, get disruption details, and plan journeys between London locations. Uses the official TfL Unified API to deliver live transport information through AI assistants.
Source Map Parser MCP Server
能够将 JavaScript 错误堆栈跟踪映射回原始源代码,提取上下文信息以帮助开发人员定位和修复问题。
SOAR MCP Server
Integrates SOAR (Security Orchestration, Automation and Response) capabilities into AI clients, enabling security playbook execution, event management, and threat intelligence queries. Provides a complete security incident response platform through natural language interactions.
MCSS MCP Server
用于 MCSS (Minecraft 服务器软体) 的模型上下文协议服务器
Hedera MCP Server
A production-ready Node.js server that enables decentralized communication between AI agents on the Hedera network, implementing the Model-Context-Protocol architecture with support for multiple Hedera Consensus Service standards.
Mcp Server Scraper
MCP Hybrid Forecasting
Enables algorithmic stock trading analysis by combining ARIMA, ARIMA-GARCH, and XGBoost models to generate buy/sell/hold signals with risk management, portfolio comparison, and volatility analysis for various market sectors.
PsiAnimator-MCP
Enables quantum physics simulations and visualizations by integrating QuTip for quantum computations with Manim for generating publication-quality animations of quantum states, circuits, and dynamics.
Slack Admin MCP Server
用于 Slack 频道管理的 MCP 服务器(创建、重命名、存档)
Exa MCP Server
Enables AI assistants to perform real-time web searches, company research, content crawling, LinkedIn searches, and deep research using the Exa AI Search API. Provides comprehensive web information retrieval capabilities in a safe and controlled manner.
AIStor MCP server
Mirror of
Mcp Server Flomo
MindMesh MCP Server
具有场相干性的 Claude 3.7 Swarm:一个模型上下文协议 (MCP) 服务器,它协调多个专门的 Claude 3.7 Sonnet 实例,形成一个受量子启发的集群。它在模式识别、信息论和推理专家之间创建一种场相干效应,从而从集成智能中产生最佳相干响应。
Internetsearch-mcp-server
镜子 (jìng zi)
NPM Sentinel MCP
A Model Context Protocol server providing utility tools for development and testing, offering functionalities like personalized greetings, random card drawing, and datetime formatting with an extensible architecture.
Hello MCP Server
A simple demonstration MCP server that responds with "4" when greeted with "hello". Serves as a basic example for understanding MCP server implementation.
Gospel Library MCP
Enables access to LDS Gospel Library content and scriptures through the Model Context Protocol. Provides tools for searching and retrieving religious texts and study materials from the Church of Jesus Christ of Latter-day Saints.
HTML to PDF MCP Server
Converts HTML files or HTML content to PDF using Puppeteer's browser rendering engine with support for CSS, JavaScript, custom page formats, margins, and header/footer templates.
Add API key to .env file
Okay, here's a basic implementation of a simplified Master-Client-Processor (MCP) system in Python, focusing on clarity and simplicity. This example uses TCP sockets for communication and demonstrates a basic task distribution and result collection. **Important Considerations:** * **Error Handling:** This is a *very* basic example. Robust error handling (try-except blocks, connection timeouts, etc.) is crucial for a real-world system. * **Security:** This code does *not* include any security measures (authentication, encryption). Do *not* use this in a production environment without adding appropriate security. * **Serialization:** This example uses simple string-based communication. For more complex data, you'll need to use a serialization library like `pickle`, `json`, or `protobuf`. `pickle` is easy but has security risks if you're receiving data from untrusted sources. * **Threading/Asynchronous:** This example uses basic threading. For higher performance, consider using `asyncio` for asynchronous I/O. * **Scalability:** This is a simple example and won't scale well to a large number of clients or processors. Consider using message queues (e.g., RabbitMQ, Redis Pub/Sub) for a more scalable architecture. **Code:** ```python import socket import threading import time import random # --- Master (Server) --- class Master: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) # Listen for up to 5 connections self.processors = [] # List to store processor connections self.client_socket = None # Store the client connection self.task_queue = [] # Queue to store tasks self.results = {} # Dictionary to store results def start(self): print(f"Master listening on {self.host}:{self.port}") threading.Thread(target=self.accept_processors).start() threading.Thread(target=self.accept_client).start() def accept_processors(self): while True: try: processor_socket, processor_address = self.server_socket.accept() print(f"Processor connected from {processor_address}") self.processors.append(processor_socket) threading.Thread(target=self.handle_processor, args=(processor_socket,)).start() except Exception as e: print(f"Error accepting processor connection: {e}") break def accept_client(self): try: self.client_socket, client_address = self.server_socket.accept() print(f"Client connected from {client_address}") threading.Thread(target=self.handle_client, args=(self.client_socket,)).start() except Exception as e: print(f"Error accepting client connection: {e}") def handle_processor(self, processor_socket): try: while True: if self.task_queue: task_id, task_data = self.task_queue.pop(0) processor_socket.send(f"TASK:{task_id}:{task_data}".encode()) print(f"Sent task {task_id} to processor") result = processor_socket.recv(1024).decode() if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"Received result {result_id}: {result_data}") if self.client_socket: self.client_socket.send(f"RESULT:{result_id}:{result_data}".encode()) # Forward result to client else: print(f"Unexpected message from processor: {result}") else: time.sleep(1) # Check for tasks periodically except Exception as e: print(f"Processor disconnected or error: {e}") self.processors.remove(processor_socket) finally: processor_socket.close() def handle_client(self, client_socket): try: while True: message = client_socket.recv(1024).decode() if not message: break if message.startswith("TASK:"): _, task_data = message.split(":", 1) task_id = len(self.results) # Assign a unique ID self.task_queue.append((task_id, task_data)) print(f"Received task {task_id} from client: {task_data}") client_socket.send(f"TASK_ACCEPTED:{task_id}".encode()) # Acknowledge task elif message == "GET_RESULTS": client_socket.send(str(self.results).encode()) else: print(f"Unknown message from client: {message}") except Exception as e: print(f"Client disconnected or error: {e}") finally: client_socket.close() self.client_socket = None # Reset client socket # --- Processor (Worker) --- class Processor: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"Processor connected to {self.host}:{self.port}") self.process_tasks() except Exception as e: print(f"Error connecting to master: {e}") def process_tasks(self): try: while True: data = self.client_socket.recv(1024).decode() if not data: break if data.startswith("TASK:"): _, task_id, task_data = data.split(":", 2) print(f"Received task {task_id}: {task_data}") # Simulate processing (replace with actual work) time.sleep(random.uniform(0.5, 2)) # Simulate variable processing time result = f"Processed: {task_data.upper()}" self.client_socket.send(f"RESULT:{task_id}:{result}".encode()) print(f"Sent result {task_id}: {result}") else: print(f"Unknown message: {data}") except Exception as e: print(f"Connection lost or error: {e}") finally: self.client_socket.close() # --- Client --- class Client: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.results = {} def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"Connected to {self.host}:{self.port}") except Exception as e: print(f"Error connecting to master: {e}") return False return True def send_task(self, task_data): try: self.client_socket.send(f"TASK:{task_data}".encode()) response = self.client_socket.recv(1024).decode() if response.startswith("TASK_ACCEPTED:"): _, task_id = response.split(":") print(f"Task {task_id} accepted by master.") return int(task_id) else: print(f"Unexpected response: {response}") return None except Exception as e: print(f"Error sending task: {e}") return None def get_results(self): try: self.client_socket.send("GET_RESULTS".encode()) results_str = self.client_socket.recv(4096).decode() try: self.results = eval(results_str) # WARNING: eval() is dangerous with untrusted input print("Received results:", self.results) except: print("Could not decode results") except Exception as e: print(f"Error getting results: {e}") def receive_results(self): try: while True: result = self.client_socket.recv(1024).decode() if not result: break if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"Received result {result_id}: {result_data}") else: print(f"Unexpected message: {result}") except Exception as e: print(f"Error receiving results: {e}") finally: self.client_socket.close() def start_receiving(self): threading.Thread(target=self.receive_results).start() # --- Main --- if __name__ == "__main__": # Start the master master = Master() master.start() # Start some processors (adjust the number as needed) num_processors = 3 processors = [] for i in range(num_processors): processor = Processor() processors.append(threading.Thread(target=processor.connect)) processors[-1].start() time.sleep(0.1) # Small delay to avoid connection issues # Start the client client = Client() if client.connect(): client.start_receiving() # Start a thread to receive results asynchronously # Send some tasks task_ids = [] for i in range(5): task_id = client.send_task(f"Task {i}") if task_id is not None: task_ids.append(task_id) time.sleep(0.5) # Wait for a while to allow processing time.sleep(5) # Get the results (optional, if not using asynchronous result receiving) # client.get_results() # Keep the main thread alive for a while to allow everything to finish time.sleep(10) ``` **How to Run:** 1. **Save:** Save the code as a Python file (e.g., `mcp_system.py`). 2. **Run the Master:** `python mcp_system.py` (This will start the master server.) 3. **Run the Processors:** Open *multiple* terminal windows and run `python mcp_system.py` in each. Each instance will act as a processor and connect to the master. Adjust `num_processors` in the `if __name__ == "__main__":` block to control how many processors are started. 4. **Run the Client:** Open another terminal window and run `python mcp_system.py`. This will start the client, which will connect to the master and send tasks. **Explanation:** * **Master:** * Listens for connections from processors and a client. * Maintains a task queue. * Distributes tasks to available processors. * Collects results from processors. * Forwards results to the client. * **Processor:** * Connects to the master. * Receives tasks from the master. * Simulates processing the task (replace with your actual processing logic). * Sends the result back to the master. * **Client:** * Connects to the master. * Sends tasks to the master. * Receives results from the master. * Can request all results at once or receive them asynchronously as they become available. **Key Improvements and Explanations:** * **Clearer Task Handling:** The master now uses a `task_queue` to store tasks and assigns unique IDs to each task. This makes it easier to track tasks and results. * **Result Forwarding:** The master now forwards results received from processors directly to the client. This allows the client to receive results as they are processed, rather than having to request them all at the end. * **Asynchronous Result Receiving:** The client now uses a separate thread (`start_receiving`) to receive results from the master asynchronously. This prevents the client from blocking while waiting for results. * **Task Acknowledgement:** The master sends a `TASK_ACCEPTED` message to the client when a task is received. This provides feedback to the client that the task has been successfully submitted. * **Error Handling:** Basic `try...except` blocks are added to catch potential exceptions during socket operations and thread execution. This is still very basic, but it's a start. * **Comments:** More comments are added to explain the code. * **`eval()` Warning:** A warning is included about the use of `eval()` to parse the results string. `eval()` is dangerous if you're receiving data from untrusted sources. Use a safer serialization method like `json.loads()` if possible. * **Simulated Processing Time:** The processor now simulates variable processing time using `time.sleep(random.uniform(0.5, 2))`. This makes the example more realistic. * **Client Disconnect Handling:** The master now handles client disconnections more gracefully by setting `self.client_socket = None` when the client disconnects. This prevents errors if the master tries to send results to a disconnected client. **Next Steps:** 1. **Replace Simulated Processing:** Replace the `time.sleep()` in the `Processor.process_tasks()` method with your actual task processing logic. 2. **Implement Serialization:** Use `json`, `pickle`, or `protobuf` to serialize and deserialize data between the client, master, and processors. 3. **Add Error Handling:** Add more robust error handling to handle connection errors, data corruption, and other potential issues. 4. **Implement Security:** Add authentication and encryption to protect your system from unauthorized access. 5. **Consider Asynchronous I/O:** Use `asyncio` for asynchronous I/O to improve performance, especially if you have a large number of clients or processors. 6. **Use a Message Queue:** For a more scalable architecture, use a message queue like RabbitMQ or Redis Pub/Sub to decouple the client, master, and processors. This improved example provides a more complete and functional starting point for building your MCP system. Remember to address the security and scalability concerns before deploying this in a production environment. ```chinese 好的,这是一个用 Python 实现的简化版主-客户端-处理器 (MCP) 系统的基本示例,重点在于清晰和简洁。此示例使用 TCP 套接字进行通信,并演示了基本的任务分配和结果收集。 **重要注意事项:** * **错误处理:** 这是一个 *非常* 基础的示例。对于实际系统,健壮的错误处理(try-except 块、连接超时等)至关重要。 * **安全性:** 此代码 *不* 包含任何安全措施(身份验证、加密)。在添加适当的安全性之前,*不要* 在生产环境中使用它。 * **序列化:** 此示例使用简单的基于字符串的通信。对于更复杂的数据,您需要使用序列化库,例如 `pickle`、`json` 或 `protobuf`。 `pickle` 很容易使用,但如果您从不受信任的来源接收数据,则存在安全风险。 * **线程/异步:** 此示例使用基本的线程。为了获得更高的性能,请考虑使用 `asyncio` 进行异步 I/O。 * **可扩展性:** 这是一个简单的示例,无法很好地扩展到大量的客户端或处理器。考虑使用消息队列(例如,RabbitMQ、Redis Pub/Sub)来实现更具可扩展性的架构。 **代码:** ```python import socket import threading import time import random # --- 主服务器 (Master) --- class Master: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) # 监听最多 5 个连接 self.processors = [] # 存储处理器连接的列表 self.client_socket = None # 存储客户端连接 self.task_queue = [] # 存储任务的队列 self.results = {} # 存储结果的字典 def start(self): print(f"主服务器监听 {self.host}:{self.port}") threading.Thread(target=self.accept_processors).start() threading.Thread(target=self.accept_client).start() def accept_processors(self): while True: try: processor_socket, processor_address = self.server_socket.accept() print(f"处理器从 {processor_address} 连接") self.processors.append(processor_socket) threading.Thread(target=self.handle_processor, args=(processor_socket,)).start() except Exception as e: print(f"接受处理器连接时出错:{e}") break def accept_client(self): try: self.client_socket, client_address = self.server_socket.accept() print(f"客户端从 {client_address} 连接") threading.Thread(target=self.handle_client, args=(self.client_socket,)).start() except Exception as e: print(f"接受客户端连接时出错:{e}") def handle_processor(self, processor_socket): try: while True: if self.task_queue: task_id, task_data = self.task_queue.pop(0) processor_socket.send(f"TASK:{task_id}:{task_data}".encode()) print(f"已将任务 {task_id} 发送到处理器") result = processor_socket.recv(1024).decode() if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"收到结果 {result_id}: {result_data}") if self.client_socket: self.client_socket.send(f"RESULT:{result_id}:{result_data}".encode()) # 将结果转发到客户端 else: print(f"来自处理器的意外消息:{result}") else: time.sleep(1) # 定期检查任务 except Exception as e: print(f"处理器断开连接或出错:{e}") self.processors.remove(processor_socket) finally: processor_socket.close() def handle_client(self, client_socket): try: while True: message = client_socket.recv(1024).decode() if not message: break if message.startswith("TASK:"): _, task_data = message.split(":", 1) task_id = len(self.results) # 分配一个唯一的 ID self.task_queue.append((task_id, task_data)) print(f"收到来自客户端的任务 {task_id}: {task_data}") client_socket.send(f"TASK_ACCEPTED:{task_id}".encode()) # 确认任务 elif message == "GET_RESULTS": client_socket.send(str(self.results).encode()) else: print(f"来自客户端的未知消息:{message}") except Exception as e: print(f"客户端断开连接或出错:{e}") finally: client_socket.close() self.client_socket = None # 重置客户端套接字 # --- 处理器 (Worker) --- class Processor: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"处理器连接到 {self.host}:{self.port}") self.process_tasks() except Exception as e: print(f"连接到主服务器时出错:{e}") def process_tasks(self): try: while True: data = self.client_socket.recv(1024).decode() if not data: break if data.startswith("TASK:"): _, task_id, task_data = data.split(":", 2) print(f"收到任务 {task_id}: {task_data}") # 模拟处理(替换为实际工作) time.sleep(random.uniform(0.5, 2)) # 模拟可变处理时间 result = f"已处理: {task_data.upper()}" self.client_socket.send(f"RESULT:{task_id}:{result}".encode()) print(f"已发送结果 {task_id}: {result}") else: print(f"未知消息:{data}") except Exception as e: print(f"连接丢失或出错:{e}") finally: self.client_socket.close() # --- 客户端 --- class Client: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.results = {} def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"连接到 {self.host}:{self.port}") except Exception as e: print(f"连接到主服务器时出错:{e}") return False return True def send_task(self, task_data): try: self.client_socket.send(f"TASK:{task_data}".encode()) response = self.client_socket.recv(1024).decode() if response.startswith("TASK_ACCEPTED:"): _, task_id = response.split(":") print(f"任务 {task_id} 已被主服务器接受。") return int(task_id) else: print(f"意外响应:{response}") return None except Exception as e: print(f"发送任务时出错:{e}") return None def get_results(self): try: self.client_socket.send("GET_RESULTS".encode()) results_str = self.client_socket.recv(4096).decode() try: self.results = eval(results_str) # 警告:eval() 对于不受信任的输入是危险的 print("收到结果:", self.results) except: print("无法解码结果") except Exception as e: print(f"获取结果时出错:{e}") def receive_results(self): try: while True: result = self.client_socket.recv(1024).decode() if not result: break if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"收到结果 {result_id}: {result_data}") else: print(f"意外消息:{result}") except Exception as e: print(f"接收结果时出错:{e}") finally: self.client_socket.close() def start_receiving(self): threading.Thread(target=self.receive_results).start() # --- 主函数 --- if __name__ == "__main__": # 启动主服务器 master = Master() master.start() # 启动一些处理器(根据需要调整数量) num_processors = 3 processors = [] for i in range(num_processors): processor = Processor() processors.append(threading.Thread(target=processor.connect)) processors[-1].start() time.sleep(0.1) # 稍微延迟以避免连接问题 # 启动客户端 client = Client() if client.connect(): client.start_receiving() # 启动一个线程以异步接收结果 # 发送一些任务 task_ids = [] for i in range(5): task_id = client.send_task(f"任务 {i}") if task_id is not None: task_ids.append(task_id) time.sleep(0.5) # 等待一段时间以允许处理 time.sleep(5) # 获取结果(可选,如果未使用异步结果接收) # client.get_results() # 保持主线程活动一段时间以允许一切完成 time.sleep(10) ``` **如何运行:** 1. **保存:** 将代码保存为 Python 文件(例如,`mcp_system.py`)。 2. **运行主服务器:** `python mcp_system.py`(这将启动主服务器。) 3. **运行处理器:** 打开 *多个* 终端窗口并在每个窗口中运行 `python mcp_system.py`。每个实例将充当处理器并连接到主服务器。调整 `if __name__ == "__main__":` 块中的 `num_processors` 以控制启动的处理器数量。 4. **运行客户端:** 打开另一个终端窗口并运行 `python mcp_system.py`。这将启动客户端,客户端将连接到主服务器并发送任务。 **说明:** * **主服务器:** * 监听来自处理器和客户端的连接。 * 维护任务队列。 * 将任务分配给可用的处理器。 * 从处理器收集结果。 * 将结果转发到客户端。 * **处理器:** * 连接到主服务器。 * 从主服务器接收任务。 * 模拟处理任务(替换为您的实际处理逻辑)。 * 将结果发送回主服务器。 * **客户端:** * 连接到主服务器。 * 将任务发送到主服务器。 * 从主服务器接收结果。 * 可以一次请求所有结果,也可以在结果可用时异步接收它们。 **主要改进和说明:** * **更清晰的任务处理:** 主服务器现在使用 `task_queue` 来存储任务,并为每个任务分配唯一的 ID。这使得跟踪任务和结果更容易。 * **结果转发:** 主服务器现在将从处理器收到的结果直接转发到客户端。这允许客户端在处理结果时接收结果,而不必在最后请求所有结果。 * **异步结果接收:** 客户端现在使用单独的线程 (`start_receiving`) 来异步接收来自主服务器的结果。这可以防止客户端在等待结果时阻塞。 * **任务确认:** 主服务器在收到任务时向客户端发送 `TASK_ACCEPTED` 消息。这向客户端提供反馈,表明任务已成功提交。 * **错误处理:** 添加了基本的 `try...except` 块来捕获套接字操作和线程执行期间的潜在异常。这仍然非常基础,但这是一个开始。 * **注释:** 添加了更多注释来解释代码。 * **`eval()` 警告:** 包含有关使用 `eval()` 解析结果字符串的警告。如果您从不受信任的来源接收数据,`eval()` 是危险的。如果可能,请使用更安全的序列化方法,例如 `json.loads()`。 * **模拟处理时间:** 处理器现在使用 `time.sleep(random.uniform(0.5, 2))` 模拟可变处理时间。这使得示例更逼真。 * **客户端断开连接处理:** 主服务器现在通过在客户端断开连接时设置 `self.client_socket = None` 来更优雅地处理客户端断开连接。这可以防止主服务器尝试将结果发送到已断开连接的客户端时出错。 **下一步:** 1. **替换模拟处理:** 将 `Processor.process_tasks()` 方法中的 `time.sleep()` 替换为您的实际任务处理逻辑。 2. **实现序列化:** 使用 `json`、`pickle` 或 `protobuf` 在客户端、主服务器和处理器之间序列化和反序列化数据。 3. **添加错误处理:** 添加更健壮的错误处理来处理连接错误、数据损坏和其他潜在问题。 4. **实现安全性:** 添加身份验证和加密以保护您的系统免受未经授权的访问。 5. **考虑异步 I/O:** 使用 `asyncio` 进行异步 I/O 以提高性能,尤其是在您有大量客户端或处理器的情况下。 6. **使用消息队列:** 对于更具可扩展性的架构,请使用消息队列(如 RabbitMQ 或 Redis Pub/Sub)来解耦客户端、主服务器和处理器。 这个改进的示例为构建您的 MCP 系统提供了一个更完整和功能更强大的起点。请记住在生产环境中部署此系统之前解决安全性和可扩展性问题。
sushimcp
sushimcp
mcp-init
Okay, here's a basic outline and code snippets for creating a new MCP (Minecraft Protocol) server in TypeScript, with some "batteries included" features like basic logging, player management, and simple command handling. This is a starting point; a full MCP server is a complex project. **Conceptual Outline** 1. **Project Setup:** Initialize a TypeScript project with necessary dependencies. 2. **Networking:** Set up a TCP server to listen for incoming Minecraft client connections. 3. **Protocol Handling:** Implement the Minecraft protocol (handshaking, status, login, play). This is the most complex part. We'll use a library to help. 4. **Player Management:** Track connected players, their usernames, UUIDs, and game state. 5. **Command Handling:** Parse and execute simple commands entered by players. 6. **World Simulation (Optional):** A very basic world representation (e.g., a flat plane) to allow players to move around. 7. **Logging:** Implement a basic logging system for debugging and monitoring. **Code Snippets (TypeScript)** **1. Project Setup** ```bash mkdir mcp-server cd mcp-server npm init -y npm install typescript ts-node ws uuid node-nbt --save # Core dependencies npm install @types/node @types/ws @types/uuid --save-dev # Type definitions tsc --init # Initialize TypeScript config ``` **`tsconfig.json` (Example)** ```json { "compilerOptions": { "target": "es2020", "module": "commonjs", "outDir": "./dist", "rootDir": "./src", "strict": true, "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true, "resolveJsonModule": true }, "include": ["src/**/*"], "exclude": ["node_modules"] } ``` **2. `src/index.ts` (Main Server File)** ```typescript import { WebSocketServer, WebSocket } from 'ws'; import { v4 as uuidv4 } from 'uuid'; import * as nbt from 'node-nbt'; // Basic Logging const log = (message: string) => { console.log(`[${new Date().toISOString()}] ${message}`); }; // Player Interface interface Player { id: string; username: string; socket: WebSocket; x: number; y: number; z: number; } // Server Configuration const SERVER_PORT = 25565; const SERVER_MOTD = "My Awesome MCP Server"; const MAX_PLAYERS = 10; // Global Server State const players: { [id: string]: Player } = {}; // Function to handle incoming messages from clients const handleClientMessage = (ws: WebSocket, message: string) => { try { const data = JSON.parse(message); // Assuming JSON format for simplicity if (data.type === 'chat') { const playerId = Object.keys(players).find(key => players[key].socket === ws); if (playerId) { const player = players[playerId]; const chatMessage = `<${player.username}> ${data.message}`; log(chatMessage); broadcast(chatMessage); // Broadcast to all players } } else if (data.type === 'position') { const playerId = Object.keys(players).find(key => players[key].socket === ws); if (playerId) { const player = players[playerId]; player.x = data.x; player.y = data.y; player.z = data.z; // Broadcast position update (optional) // broadcastPosition(player); } } else { log(`Unknown message type: ${data.type}`); } } catch (error) { log(`Error parsing message: ${error}`); } }; // Function to broadcast a message to all connected clients const broadcast = (message: string) => { for (const playerId in players) { if (players.hasOwnProperty(playerId)) { const player = players[playerId]; player.socket.send(JSON.stringify({ type: 'chat', message: message })); } } }; // Function to handle new client connections const handleNewConnection = (ws: WebSocket) => { const playerId = uuidv4(); let username = `Player${Object.keys(players).length + 1}`; // Default username log(`New connection from ${ws._socket.remoteAddress}, assigning ID: ${playerId}`); // Add the player to the players object players[playerId] = { id: playerId, username: username, socket: ws, x: 0, y: 0, z: 0, }; // Send a welcome message to the new player ws.send(JSON.stringify({ type: 'welcome', message: `Welcome to the server, ${username}!` })); // Notify other players about the new player (optional) broadcast(`${username} has joined the server.`); // Set up event listeners for the new connection ws.on('message', (message) => { handleClientMessage(ws, message.toString()); }); ws.on('close', () => { log(`Connection closed for player ${playerId}`); delete players[playerId]; broadcast(`${username} has left the server.`); }); ws.on('error', (error) => { log(`Error on connection ${playerId}: ${error}`); delete players[playerId]; }); }; // Create a WebSocket server const wss = new WebSocketServer({ port: SERVER_PORT }); wss.on('connection', handleNewConnection); wss.on('listening', () => { log(`Server listening on port ${SERVER_PORT}`); }); wss.on('error', (error) => { log(`Server error: ${error}`); }); // Start the server log(`Starting MCP server...`); ``` **3. Building and Running** ```bash npm run build # Transpile TypeScript to JavaScript (check your package.json for the build script) node dist/index.js # Run the server ``` **Explanation and "Batteries Included" Features:** * **`ws` Library:** Uses the `ws` library for WebSocket communication, which is a common choice for real-time applications. This handles the low-level socket management. * **`uuid` Library:** Generates unique player IDs using the `uuid` library. * **`node-nbt` Library:** This is included for handling NBT (Named Binary Tag) data, which is used for storing world data, player data, and other Minecraft-related information. You'll need this for more advanced features. * **Logging:** A simple `log` function provides basic timestamped logging to the console. * **Player Management:** The `players` object stores information about connected players (ID, username, socket, position). * **Chat:** Basic chat functionality is implemented. Players can send messages, and the server broadcasts them to all other players. * **Position Updates:** The server can receive position updates from clients, although the example doesn't do anything with them beyond storing them. * **Error Handling:** Basic error handling is included for connection errors and message parsing errors. **Important Considerations and Next Steps:** * **Minecraft Protocol:** This example *does not* implement the full Minecraft protocol. You'll need to handle the handshake, status, login, and play states correctly. Libraries like `prismarine-protocol` (Node.js) can help with this, but they are complex. The example uses a simplified JSON-based communication for demonstration. * **Security:** This is a very basic example and has no security measures. You'll need to implement proper authentication, authorization, and input validation to prevent malicious clients from exploiting the server. * **World Generation:** The example doesn't have any world generation. You'll need to implement a world generator to create a playable environment. Consider using libraries for procedural generation. * **Game Logic:** You'll need to implement the game logic for your server, such as handling player movement, interactions with the world, and other game events. * **Performance:** For a large number of players, you'll need to optimize the server's performance. Consider using techniques like multithreading or clustering. * **Data Storage:** You'll need to store world data, player data, and other server data in a persistent storage system, such as a database or file system. * **Command System:** Expand the command handling to support more complex commands and permissions. **Example Client (Simple WebSocket Client)** You'll need a client to connect to your server. Here's a very basic HTML/JavaScript example: ```html <!DOCTYPE html> <html> <head> <title>MCP Client</title> </head> <body> <h1>MCP Client</h1> <input type="text" id="messageInput" placeholder="Enter message"> <button onclick="sendMessage()">Send</button> <div id="chatLog"></div> <script> const ws = new WebSocket('ws://localhost:25565'); // Replace with your server address ws.onopen = () => { console.log('Connected to server'); }; ws.onmessage = (event) => { const data = JSON.parse(event.data); const chatLog = document.getElementById('chatLog'); if (data.type === 'chat') { chatLog.innerHTML += `<p>${data.message}</p>`; } else if (data.type === 'welcome') { chatLog.innerHTML += `<p>${data.message}</p>`; } else { console.log('Received:', data); } }; ws.onclose = () => { console.log('Disconnected from server'); }; ws.onerror = (error) => { console.error('WebSocket error:', error); }; function sendMessage() { const messageInput = document.getElementById('messageInput'); const message = messageInput.value; ws.send(JSON.stringify({ type: 'chat', message: message })); messageInput.value = ''; } </script> </body> </html> ``` Save this as `index.html` and open it in your browser. You should be able to connect to the server and send chat messages. **In summary, this is a very basic starting point. Building a full MCP server is a significant undertaking that requires a deep understanding of the Minecraft protocol and server-side development concepts.** Use this as a foundation and build upon it, researching the Minecraft protocol and using appropriate libraries to handle the complexities. Good luck!
MCP Tools
An MCP (Model Context Protocol) server implementation using HTTP SSE (Server-Sent Events) connections with built-in utility tools including echo, time, calculator, and weather query functionality.
Data Analysis MCP Server
Provides comprehensive statistical analysis tools for industrial data including time series analysis, correlation calculations, stationarity tests, outlier detection, causal analysis, and forecasting capabilities. Enables data quality assessment and statistical modeling through a FastAPI-based MCP architecture.
MCP Ctl
A package manager to manage all your mcp servers across platforms