Discover Awesome MCP Servers

Extend your agent with 12,217 capabilities via MCP servers.

All12,217
Vercel MCP Integration

Vercel MCP Integration

MCP 服务器连接 Claude 到 Vercel (MCP 服务器连接 Claude 至 Vercel) - This is a more formal and slightly more common way to say it.

MCPE Server Proxy

MCPE Server Proxy

镜子 (jìng zi)

A Pokedex web app!

A Pokedex web app!

宝可梦图鉴 Web 应用 (Bǎokěmèng tújiàn Web yìngyòng) Alternatively, a more concise translation: 宝可梦图鉴网页应用 (Bǎokěmèng tújiàn wǎngyè yìngyòng) Both translations convey the meaning of a Pokedex web application. The first is slightly more general, while the second specifically mentions it's a web *page* application.

Multi Model Advisor

Multi Model Advisor

决策模型委员会 / 决策模型理事会

Tiny MCP Server (Rust)

Tiny MCP Server (Rust)

一个用 Rust 实现的机器通信协议 (MCP)。

Lighthouse MCP

Lighthouse MCP

一个模型上下文协议服务器,使 Claude 能够通过安全认证与您的 Lighthouse.one 加密货币投资组合数据进行交互和分析。

YouTube Transcript MCP Server

YouTube Transcript MCP Server

There are a few ways to approach building an MCP (Microservices Communication Protocol) server for fetching YouTube transcripts. Here's a breakdown of the concepts and potential implementations: **Understanding the Requirements** * **YouTube Data API:** You'll need to use the YouTube Data API to retrieve transcript information. This API requires authentication (API key or OAuth 2.0). * **Transcript Retrieval:** The API provides different ways to get transcripts: * **Automatic Transcripts (ASR):** Generated by YouTube's automatic speech recognition. These are often less accurate. * **Community Contributions:** Transcripts provided by the YouTube community. * **Official Transcripts:** Transcripts uploaded by the video creator. * **MCP (Microservices Communication Protocol):** This defines how your server will communicate with other microservices in your architecture. Common choices include: * **REST (HTTP):** Simple, widely understood. Good for basic operations. * **gRPC:** High-performance, uses Protocol Buffers for data serialization. Excellent for complex data structures and demanding performance. * **Message Queues (e.g., RabbitMQ, Kafka):** Asynchronous communication. Useful for decoupling services and handling large volumes of requests. * **Scalability and Reliability:** Consider how your server will handle a large number of requests and potential failures. * **Error Handling:** Implement robust error handling to gracefully deal with API errors, network issues, and invalid requests. **Implementation Options** Here are a few implementation options, focusing on different MCP approaches: **1. REST (HTTP) based MCP Server (Python with Flask/FastAPI)** * **Language:** Python (popular for API development) * **Framework:** Flask (simple) or FastAPI (modern, asynchronous) ```python # FastAPI example from fastapi import FastAPI, HTTPException from youtube_transcript_api import YouTubeTranscriptApi app = FastAPI() @app.get("/transcript/{video_id}") async def get_transcript(video_id: str, lang: str = 'en'): """ Fetches the transcript for a YouTube video. Args: video_id: The YouTube video ID. lang: The desired language of the transcript (default: 'en'). Returns: A list of transcript entries (text, start, duration). """ try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[lang]) return transcript except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) ``` **Explanation:** * **`YouTubeTranscriptApi`:** This is a Python library that simplifies interacting with the YouTube transcript API. Install it with `pip install youtube-transcript-api`. * **`FastAPI`:** A modern, high-performance web framework for building APIs. * **`/transcript/{video_id}`:** An endpoint that accepts the YouTube video ID as a path parameter. * **`lang`:** An optional query parameter to specify the desired language. * **Error Handling:** The `try...except` block catches potential errors and returns an HTTP 500 error with a descriptive message. * **`uvicorn`:** An ASGI server to run the FastAPI application. **To use this:** 1. Install dependencies: `pip install fastapi uvicorn youtube-transcript-api` 2. Run the server: `python your_script_name.py` 3. Access the API: `http://localhost:8000/transcript/VIDEO_ID` (replace `VIDEO_ID` with the actual YouTube video ID). You can also specify the language: `http://localhost:8000/transcript/VIDEO_ID?lang=fr` **2. gRPC based MCP Server (Python with gRPC)** * **Language:** Python * **Framework:** gRPC **Steps:** 1. **Define the Protocol Buffer (.proto) file:** This defines the service and message structure. ```protobuf syntax = "proto3"; package youtube_transcript; service TranscriptService { rpc GetTranscript (TranscriptRequest) returns (TranscriptResponse) {} } message TranscriptRequest { string video_id = 1; string language = 2; } message TranscriptResponse { repeated TranscriptEntry entries = 1; } message TranscriptEntry { string text = 1; double start = 2; double duration = 3; } ``` 2. **Generate gRPC code:** Use the `grpc_tools.protoc` compiler to generate Python code from the `.proto` file. ```bash python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. youtube_transcript.proto ``` 3. **Implement the gRPC server:** ```python # youtube_transcript_server.py import grpc from concurrent import futures from youtube_transcript_api import YouTubeTranscriptApi import youtube_transcript_pb2 as youtube_transcript_pb2 import youtube_transcript_pb2_grpc as youtube_transcript_pb2_grpc class TranscriptServicer(youtube_transcript_pb2_grpc.TranscriptServiceServicer): def GetTranscript(self, request, context): try: transcript = YouTubeTranscriptApi.get_transcript(request.video_id, languages=[request.language]) entries = [] for entry in transcript: entries.append(youtube_transcript_pb2.TranscriptEntry( text=entry['text'], start=entry['start'], duration=entry['duration'] )) return youtube_transcript_pb2.TranscriptResponse(entries=entries) except Exception as e: context.abort(grpc.StatusCode.INTERNAL, str(e)) def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) youtube_transcript_pb2_grpc.add_TranscriptServiceServicer_to_server(TranscriptServicer(), server) server.add_insecure_port('[::]:50051') server.start() server.wait_for_termination() if __name__ == '__main__': serve() ``` 4. **Implement the gRPC client (example):** ```python # youtube_transcript_client.py import grpc import youtube_transcript_pb2 as youtube_transcript_pb2 import youtube_transcript_pb2_grpc as youtube_transcript_pb2_grpc def get_transcript(video_id, language): with grpc.insecure_channel('localhost:50051') as channel: stub = youtube_transcript_pb2_grpc.TranscriptServiceStub(channel) request = youtube_transcript_pb2.TranscriptRequest(video_id=video_id, language=language) try: response = stub.GetTranscript(request) for entry in response.entries: print(f"[{entry.start:.2f} - {entry.start + entry.duration:.2f}] {entry.text}") except grpc.RpcError as e: print(f"Error: {e.details()}") if __name__ == '__main__': get_transcript("VIDEO_ID", "en") # Replace with a real video ID ``` **Explanation:** * **`.proto` file:** Defines the service (`TranscriptService`) and the messages (`TranscriptRequest`, `TranscriptResponse`, `TranscriptEntry`). * **`grpc_tools.protoc`:** Compiles the `.proto` file into Python code. * **`TranscriptServicer`:** Implements the `GetTranscript` method, which retrieves the transcript using `YouTubeTranscriptApi` and converts it into the gRPC response format. * **gRPC Client:** Connects to the server, sends a `TranscriptRequest`, and prints the received transcript entries. **To use this:** 1. Install dependencies: `pip install grpcio grpcio-tools protobuf youtube-transcript-api` 2. Compile the `.proto` file: `python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. youtube_transcript.proto` 3. Run the server: `python youtube_transcript_server.py` 4. Run the client: `python youtube_transcript_client.py` **3. Message Queue based MCP Server (Python with RabbitMQ/Kafka)** * **Language:** Python * **Message Queue:** RabbitMQ or Kafka **Conceptual Outline (RabbitMQ Example):** 1. **Producer (Client):** Sends a message to the queue with the video ID and language. 2. **Consumer (Server):** Listens to the queue, receives the message, fetches the transcript, and potentially publishes the transcript to another queue or stores it in a database. **RabbitMQ Example (Simplified):** * **Producer (Client):** ```python # producer.py import pika import json connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='transcript_requests') message = {'video_id': 'VIDEO_ID', 'language': 'en'} # Replace with a real video ID channel.basic_publish(exchange='', routing_key='transcript_requests', body=json.dumps(message)) print(" [x] Sent %r" % message) connection.close() ``` * **Consumer (Server):** ```python # consumer.py import pika import json from youtube_transcript_api import YouTubeTranscriptApi connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='transcript_requests') def callback(ch, method, properties, body): message = json.loads(body.decode('utf-8')) video_id = message['video_id'] language = message['language'] print(f" [x] Received request for video ID: {video_id}, language: {language}") try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=[language]) # Process the transcript (e.g., store in a database, publish to another queue) print(f" [x] Transcript fetched successfully for {video_id}") # Example: Print the first few lines for i in range(min(5, len(transcript))): print(transcript[i]) except Exception as e: print(f" [x] Error fetching transcript: {e}") channel.basic_consume(queue='transcript_requests', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ``` **Explanation:** * **RabbitMQ:** A message broker that allows asynchronous communication between services. * **`transcript_requests` queue:** The queue where the client sends requests for transcripts. * **Producer:** Sends a JSON message containing the video ID and language to the queue. * **Consumer:** Listens to the queue, retrieves the message, fetches the transcript using `YouTubeTranscriptApi`, and processes the transcript. * **`auto_ack=True`:** Automatically acknowledges the message after it's processed. Consider using manual acknowledgements for more robust error handling. **To use this:** 1. Install RabbitMQ: Follow the instructions on the RabbitMQ website. 2. Install dependencies: `pip install pika youtube-transcript-api` 3. Run the consumer: `python consumer.py` 4. Run the producer: `python producer.py` **Key Considerations and Best Practices** * **API Key Management:** Store your YouTube Data API key securely (e.g., environment variables, secrets management). Never hardcode it in your code. * **Rate Limiting:** The YouTube Data API has rate limits. Implement retry logic with exponential backoff to handle rate limit errors gracefully. Consider caching transcripts to reduce API calls. * **Error Handling:** Implement comprehensive error handling to catch API errors, network issues, and invalid requests. Log errors for debugging. * **Asynchronous Operations:** For gRPC and message queue implementations, use asynchronous operations (e.g., `asyncio` in Python) to improve performance and scalability. * **Data Validation:** Validate the input (video ID, language) to prevent errors and security vulnerabilities. * **Logging:** Use a logging library (e.g., `logging` in Python) to log important events and errors. * **Monitoring:** Monitor the performance of your server (e.g., request latency, error rates) to identify and address issues. * **Security:** If your server handles sensitive data, implement appropriate security measures (e.g., authentication, authorization, encryption). * **Scalability:** Design your server to be scalable to handle a large number of requests. Consider using a load balancer and multiple instances of your server. * **Deployment:** Choose a suitable deployment environment (e.g., cloud platform, containerization with Docker). * **Caching:** Implement caching mechanisms (e.g., Redis, Memcached) to store frequently accessed transcripts and reduce the load on the YouTube Data API. Consider using a cache invalidation strategy. * **Transcript Availability:** Not all YouTube videos have transcripts available. Handle cases where a transcript is not found. * **Language Support:** The `YouTubeTranscriptApi` library supports multiple languages. Allow users to specify the desired language. * **Transcript Types:** Consider supporting different types of transcripts (automatic, community, official). The `YouTubeTranscriptApi` library provides methods to access different transcript types. **Choosing the Right Approach** * **REST (HTTP):** Good for simple use cases and when you need a widely accessible API. Easy to implement and debug. * **gRPC:** Best for high-performance communication between microservices. Requires more setup but offers significant performance benefits. * **Message Queue:** Ideal for asynchronous processing and decoupling services. Useful for handling large volumes of requests and ensuring that requests are processed even if one service is temporarily unavailable. The best approach depends on your specific requirements and the overall architecture of your microservices. Start with REST if you're unsure, and then consider gRPC or message queues if you need better performance or scalability. Remember to prioritize security, error handling, and rate limiting in all implementations.

Payman AI Documentation MCP Server

Payman AI Documentation MCP Server

为 Claude 或 Cursor 等 AI 助手提供访问 Payman AI 文档的权限,帮助开发者更高效地构建集成。

PortOne MCP Server

PortOne MCP Server

面向开发者的 PortOne MCP 服务器

Claud Coin ($CLAUD)

Claud Coin ($CLAUD)

$CLAUDE 去中心化 AI 开发生态系统

BrasilAPI MCP Server

BrasilAPI MCP Server

通过统一的界面,无缝查询来自巴西资源的各种数据。访问邮政编码、区号、银行、假日、税收等信息。通过 BrasilAPI 轻松增强您的人工智能代理和应用程序,使其拥有丰富且更新的数据。

mcp-voice

mcp-voice

用于语音人工智能的 MCP 服务器,使用 OpenAI (Yòng yú yǔyīn réngōng zhìnéng de MCP fúwùqì, shǐyòng OpenAI)

MCP Servers - OpenAI and Flux Integration

MCP Servers - OpenAI and Flux Integration

FridayAI

FridayAI

帮助完成任务的AI游戏伙伴。

LI.FI MCP Server

LI.FI MCP Server

集成了 [LI.FI API]( 的 MCP 服务器

OpenMCPSever

OpenMCPSever

MCP服务器的开源版本

mcp-server

mcp-server

MianshiyaServer

MianshiyaServer

Unity MCP Server - Enhancing Unity Editor Actions with MCP Clients 🎮

Unity MCP Server - Enhancing Unity Editor Actions with MCP Clients 🎮

一个 Unity MCP 服务器,允许像 Claude Desktop 或 Cursor 这样的 MCP 客户端执行 Unity 编辑器操作。

MCP Server for Milvus

MCP Server for Milvus

用于 Milvus 的模型上下文协议服务器

YouTube MCP Server

YouTube MCP Server

一个 MCP 服务器,允许 Claude 和其他 AI 助手与 YouTube API 交互,提供搜索视频/频道以及检索关于它们的详细信息的工具。

Wikimedia MCP Server

Wikimedia MCP Server

一个用于与维基媒体API交互的MCP服务器。可以通过编程方式访问维基百科和其他维基媒体项目的内容。

MCP Server for MySQL based on NodeJS

MCP Server for MySQL based on NodeJS

一个模型上下文协议服务器,提供对 MySQL 数据库的只读访问,使大型语言模型 (LLM) 能够检查数据库模式并执行只读查询。

MCP Client:

MCP Client:

一个 MCP 客户端,用于连接到兼容 MCP 服务器的服务,地址为:

MCP Server-Client Example

MCP Server-Client Example

Getting Started

Getting Started

Okay, here's a basic example of a Golang MCP (Mesh Configuration Protocol) server. This example focuses on the core structure and handling of requests. It's a simplified illustration and would need significant expansion for a real-world deployment. ```go package main import ( "context" "fmt" "log" "net" "os" "os/signal" "syscall" "google.golang.org/grpc" "google.golang.org/grpc/reflection" mcp "istio.io/api/mcp/v1alpha1" // Use the correct MCP API version "istio.io/pkg/log" ) const ( port = ":8080" // Or any other suitable port ) // MCP Server Implementation type mcpServer struct { mcp.UnimplementedAggregatedMeshConfigServiceServer // Important: Embed this! // Add any server-side state here, e.g., a cache of resources. // resourceCache map[string][]byte // Example: Keyed by resource name } // NewMCPServer creates a new MCP server instance. func NewMCPServer() *mcpServer { return &mcpServer{ //resourceCache: make(map[string][]byte), } } // StreamAggregatedResources implements the MCP server's streaming endpoint. func (s *mcpServer) StreamAggregatedResources(stream mcp.AggregatedMeshConfigService_StreamAggregatedResourcesServer) error { log.Infof("New StreamAggregatedResources connection") defer log.Infof("StreamAggregatedResources connection closed") for { request, err := stream.Recv() if err != nil { log.Infof("StreamAggregatedResources recv error: %v", err) return err } log.Infof("Received request: %v", request) // **IMPORTANT: Process the request and generate a response.** // This is where the core logic of your MCP server goes. // You need to: // 1. Examine the `request.TypeUrl` to determine the resource type being requested (e.g., Envoy Cluster, Route, Listener). // 2. Examine the `request.ResponseNonce` to track request/response pairs. // 3. Examine the `request.ResourceNames` to see which specific resources are being requested. // 4. Fetch the requested resources from your data source (e.g., a database, a file, an in-memory cache). // 5. Construct an `mcp.AggregatedMeshConfigResponse` containing the resources. // 6. Send the response using `stream.Send()`. // **Example (Very Basic): Respond with an empty response.** response := &mcp.AggregatedMeshConfigResponse{ TypeUrl: request.TypeUrl, // Echo back the requested type. CRITICAL! Nonce: "some-nonce", // Generate a unique nonce for each response. CRITICAL! VersionInfo: "v1", // Indicate the version of the resources. Resources: []*mcp.Resource{}, // Empty resource list for now. } if err := stream.Send(response); err != nil { log.Infof("StreamAggregatedResources send error: %v", err) return err } log.Infof("Sent response: %v", response) } } func main() { // Set up gRPC server. lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() mcpServer := NewMCPServer() mcp.RegisterAggregatedMeshConfigServiceServer(s, mcpServer) // Enable reflection for debugging (optional, but useful). reflection.Register(s) // Graceful shutdown handling. signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) go func() { log.Infof("Server listening on port %s", port) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }() // Block until a signal is received. <-signalChan log.Info("Shutting down server...") // Gracefully stop the gRPC server. s.GracefulStop() log.Info("Server gracefully stopped") } ``` Key improvements and explanations: * **`istio.io/api/mcp/v1alpha1` Import:** This is *crucial*. You *must* use the correct MCP API version that matches the client (e.g., Istio control plane) you're interacting with. The `v1alpha1` is a common version, but check your Istio/Envoy documentation. If you use the wrong version, the client and server will not be able to communicate. * **`UnimplementedAggregatedMeshConfigServiceServer`:** The `mcpServer` struct *must* embed `mcp.UnimplementedAggregatedMeshConfigServiceServer`. This satisfies the gRPC interface requirements. Without it, your server won't compile. * **Error Handling:** Includes basic error handling for `Listen` and `Serve`. More robust error handling is needed in a production environment. * **Logging:** Uses `istio.io/pkg/log` for logging. This is the standard logging library used within Istio and related projects. Configure the logging level appropriately. * **`StreamAggregatedResources` Implementation:** This is the *heart* of the MCP server. It handles the bi-directional streaming of requests and responses. * **Request Processing (Placeholder):** The code now includes a *very important* comment block within `StreamAggregatedResources`. This is where you implement the core logic to: * **Determine the Resource Type:** Examine `request.TypeUrl` (e.g., `type.googleapis.com/envoy.config.cluster.v3.Cluster`). This tells you what kind of resource the client is requesting. * **Handle Nonces:** Use `request.ResponseNonce` to track request/response pairs. This is essential for ensuring that responses are correctly associated with requests, especially in the face of network issues or retries. * **Fetch Resources:** Retrieve the requested resources from your data source (e.g., a database, a file, an in-memory cache). * **Construct the Response:** Create an `mcp.AggregatedMeshConfigResponse` containing the resources. The `response.Resources` field is a slice of `*mcp.Resource`. You'll need to marshal your resources into `mcp.Resource` objects. * **Send the Response:** Use `stream.Send()` to send the response back to the client. * **Example Response (Empty):** The example provides a *minimal* response that echoes back the `TypeUrl` and sets a `Nonce`. **This is not a complete implementation.** You *must* populate the `response.Resources` field with the actual resources. * **Nonce Generation:** The `Nonce` field in the response is *critical*. It should be a unique identifier for each response. Use a UUID or a similar mechanism to generate nonces. * **VersionInfo:** The `VersionInfo` field is used to indicate the version of the resources being sent. This allows the client to track changes and update its configuration accordingly. * **Graceful Shutdown:** Includes a basic graceful shutdown mechanism using signals (SIGINT, SIGTERM). This allows the server to shut down cleanly without interrupting ongoing requests. * **gRPC Reflection:** Enables gRPC reflection, which is useful for debugging and testing. You can use tools like `grpcurl` to inspect the server's API. **To make this example functional, you need to:** 1. **Implement the Resource Fetching Logic:** Replace the placeholder comments in `StreamAggregatedResources` with code that fetches the actual resources from your data source. 2. **Marshal Resources into `mcp.Resource`:** You'll need to marshal your resources (e.g., Envoy Cluster, Route, Listener configurations) into the `mcp.Resource` type. This typically involves using the appropriate protobuf definitions for the resource type. 3. **Generate Nonces:** Implement a mechanism to generate unique nonces for each response. 4. **Handle Errors:** Add more robust error handling throughout the code. 5. **Configure Logging:** Configure the logging level and output format for the `istio.io/pkg/log` library. **Example of Marshaling a Resource (Conceptual):** ```go // Assuming you have an Envoy Cluster object (e.g., from envoy/config/cluster/v3/cluster.proto) // and it's stored in a variable called 'cluster'. // Marshal the cluster object to protobuf. clusterBytes, err := proto.Marshal(cluster) // Requires "google.golang.org/protobuf/proto" if err != nil { log.Errorf("Error marshaling cluster: %v", err) return err } // Create an mcp.Resource. resource := &mcp.Resource{ Body: &any.Any{ // Requires "google.golang.org/protobuf/types/known/anypb" TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", // IMPORTANT: Match the resource type! Value: clusterBytes, }, //Metadata: ... (Optional: Add metadata if needed) } ``` **Important Considerations:** * **Security:** In a production environment, you'll need to secure the gRPC connection using TLS and implement authentication and authorization. * **Scalability:** Consider how to scale your MCP server to handle a large number of clients and resources. Caching, load balancing, and database optimization may be necessary. * **Data Consistency:** Ensure that your data source is consistent and that changes are propagated to the MCP server in a timely manner. * **Testing:** Thoroughly test your MCP server to ensure that it correctly handles requests and responses and that it can recover from errors. * **MCP API Version:** Double-check the MCP API version used by your Istio control plane or other clients and ensure that your server uses the same version. This comprehensive example provides a solid foundation for building a Golang MCP server. Remember to adapt it to your specific needs and environment. ```chinese 好的,这是一个 Golang MCP (Mesh Configuration Protocol) 服务器的基本示例。 这个示例侧重于核心结构和请求处理。 这是一个简化的说明,需要进行大量扩展才能进行实际部署。 ```go package main import ( "context" "fmt" "log" "net" "os" "os/signal" "syscall" "google.golang.org/grpc" "google.golang.org/grpc/reflection" mcp "istio.io/api/mcp/v1alpha1" // 使用正确的 MCP API 版本 "istio.io/pkg/log" ) const ( port = ":8080" // 或任何其他合适的端口 ) // MCP 服务器实现 type mcpServer struct { mcp.UnimplementedAggregatedMeshConfigServiceServer // 重要:嵌入这个! // 在此处添加任何服务器端状态,例如,资源缓存。 // resourceCache map[string][]byte // 示例:按资源名称键控 } // NewMCPServer 创建一个新的 MCP 服务器实例。 func NewMCPServer() *mcpServer { return &mcpServer{ //resourceCache: make(map[string][]byte), } } // StreamAggregatedResources 实现 MCP 服务器的流式端点。 func (s *mcpServer) StreamAggregatedResources(stream mcp.AggregatedMeshConfigService_StreamAggregatedResourcesServer) error { log.Infof("New StreamAggregatedResources connection") defer log.Infof("StreamAggregatedResources connection closed") for { request, err := stream.Recv() if err != nil { log.Infof("StreamAggregatedResources recv error: %v", err) return err } log.Infof("Received request: %v", request) // **重要:处理请求并生成响应。** // 这是 MCP 服务器的核心逻辑所在。 // 你需要: // 1. 检查 `request.TypeUrl` 以确定请求的资源类型(例如,Envoy 集群、路由、监听器)。 // 2. 检查 `request.ResponseNonce` 以跟踪请求/响应对。 // 3. 检查 `request.ResourceNames` 以查看请求哪些特定资源。 // 4. 从你的数据源(例如,数据库、文件、内存缓存)获取请求的资源。 // 5. 构造一个包含资源的 `mcp.AggregatedMeshConfigResponse`。 // 6. 使用 `stream.Send()` 发送响应。 // **示例(非常基本):使用空响应进行响应。** response := &mcp.AggregatedMeshConfigResponse{ TypeUrl: request.TypeUrl, // 回显请求的类型。 关键! Nonce: "some-nonce", // 为每个响应生成一个唯一的 nonce。 关键! VersionInfo: "v1", // 指示资源的版本。 Resources: []*mcp.Resource{}, // 暂时为空资源列表。 } if err := stream.Send(response); err != nil { log.Infof("StreamAggregatedResources send error: %v", err) return err } log.Infof("Sent response: %v", response) } } func main() { // 设置 gRPC 服务器。 lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() mcpServer := NewMCPServer() mcp.RegisterAggregatedMeshConfigServiceServer(s, mcpServer) // 启用反射以进行调试(可选,但很有用)。 reflection.Register(s) // 优雅关闭处理。 signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) go func() { log.Infof("Server listening on port %s", port) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }() // 阻塞直到收到信号。 <-signalChan log.Info("Shutting down server...") // 优雅地停止 gRPC 服务器。 s.GracefulStop() log.Info("Server gracefully stopped") } ``` 关键改进和说明: * **`istio.io/api/mcp/v1alpha1` 导入:** 这 *至关重要*。 你 *必须* 使用与你交互的客户端(例如,Istio 控制平面)匹配的正确 MCP API 版本。 `v1alpha1` 是一个常见的版本,但请检查你的 Istio/Envoy 文档。 如果你使用错误的版本,客户端和服务器将无法通信。 * **`UnimplementedAggregatedMeshConfigServiceServer`:** `mcpServer` 结构 *必须* 嵌入 `mcp.UnimplementedAggregatedMeshConfigServiceServer`。 这满足 gRPC 接口要求。 如果没有它,你的服务器将无法编译。 * **错误处理:** 包括 `Listen` 和 `Serve` 的基本错误处理。 在生产环境中需要更强大的错误处理。 * **日志记录:** 使用 `istio.io/pkg/log` 进行日志记录。 这是 Istio 和相关项目中使用的标准日志记录库。 适当地配置日志记录级别。 * **`StreamAggregatedResources` 实现:** 这是 MCP 服务器的 *核心*。 它处理请求和响应的双向流。 * **请求处理(占位符):** 代码现在包含 `StreamAggregatedResources` 中的 *非常重要的* 注释块。 这是你实现核心逻辑的地方: * **确定资源类型:** 检查 `request.TypeUrl`(例如,`type.googleapis.com/envoy.config.cluster.v3.Cluster`)。 这告诉你客户端正在请求哪种类型的资源。 * **处理 Nonce:** 使用 `request.ResponseNonce` 跟踪请求/响应对。 这对于确保响应与请求正确关联至关重要,尤其是在出现网络问题或重试的情况下。 * **获取资源:** 从你的数据源(例如,数据库、文件、内存缓存)检索请求的资源。 * **构造响应:** 创建一个包含资源的 `mcp.AggregatedMeshConfigResponse`。 `response.Resources` 字段是 `*mcp.Resource` 的切片。 你需要将你的资源编组到 `mcp.Resource` 对象中。 * **发送响应:** 使用 `stream.Send()` 将响应发送回客户端。 * **示例响应(空):** 该示例提供了一个 *最小的* 响应,该响应回显 `TypeUrl` 并设置一个 `Nonce`。 **这不是一个完整的实现。** 你 *必须* 使用实际资源填充 `response.Resources` 字段。 * **Nonce 生成:** 响应中的 `Nonce` 字段 *至关重要*。 它应该是每个响应的唯一标识符。 使用 UUID 或类似的机制来生成 nonce。 * **VersionInfo:** `VersionInfo` 字段用于指示发送的资源的版本。 这允许客户端跟踪更改并相应地更新其配置。 * **优雅关闭:** 包括使用信号(SIGINT,SIGTERM)的基本优雅关闭机制。 这允许服务器干净地关闭,而不会中断正在进行的请求。 * **gRPC 反射:** 启用 gRPC 反射,这对于调试和测试很有用。 你可以使用诸如 `grpcurl` 之类的工具来检查服务器的 API。 **要使此示例起作用,你需要:** 1. **实现资源获取逻辑:** 将 `StreamAggregatedResources` 中的占位符注释替换为从你的数据源获取实际资源的代码。 2. **将资源编组到 `mcp.Resource` 中:** 你需要将你的资源(例如,Envoy 集群、路由、监听器配置)编组到 `mcp.Resource` 类型中。 这通常涉及使用资源类型的适当 protobuf 定义。 3. **生成 Nonce:** 实现一种为每个响应生成唯一 nonce 的机制。 4. **处理错误:** 在整个代码中添加更强大的错误处理。 5. **配置日志记录:** 配置 `istio.io/pkg/log` 库的日志记录级别和输出格式。 **编组资源示例(概念性的):** ```go // 假设你有一个 Envoy Cluster 对象(例如,来自 envoy/config/cluster/v3/cluster.proto) // 并且它存储在一个名为 'cluster' 的变量中。 // 将集群对象编组为 protobuf。 clusterBytes, err := proto.Marshal(cluster) // 需要 "google.golang.org/protobuf/proto" if err != nil { log.Errorf("Error marshaling cluster: %v", err) return err } // 创建一个 mcp.Resource。 resource := &mcp.Resource{ Body: &any.Any{ // 需要 "google.golang.org/protobuf/types/known/anypb" TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", // 重要:匹配资源类型! Value: clusterBytes, }, //Metadata: ... (可选:如果需要,添加元数据) } ``` **重要注意事项:** * **安全性:** 在生产环境中,你需要使用 TLS 保护 gRPC 连接,并实现身份验证和授权。 * **可伸缩性:** 考虑如何扩展你的 MCP 服务器以处理大量客户端和资源。 可能需要缓存、负载平衡和数据库优化。 * **数据一致性:** 确保你的数据源是一致的,并且更改及时传播到 MCP 服务器。 * **测试:** 彻底测试你的 MCP 服务器,以确保它正确处理请求和响应,并且可以从错误中恢复。 * **MCP API 版本:** 仔细检查你的 Istio 控制平面或其他客户端使用的 MCP API 版本,并确保你的服务器使用相同的版本。 这个全面的示例为构建 Golang MCP 服务器奠定了坚实的基础。 记住根据你的特定需求和环境进行调整。 ```

Global MCP Servers

Global MCP Servers

用于所有项目的集中式模型上下文协议 (MCP) 服务器

Cerebra Legal MCP Server

Cerebra Legal MCP Server

一个企业级 MCP 服务器,提供专门用于法律推理和分析的工具,自动检测法律领域,并提供特定领域的指导、模板和引文格式。

Pica Mcp Server

Pica Mcp Server

A Model Context Protocol Server for Pica, built in TypeScript

MCP Server Manager

MCP Server Manager