kafka-mcp-gateway
A standardized Model Context Protocol (MCP) bridge designed for seamless, programmatic interaction with Apache Kafka message streaming infrastructure. Facilitates message ingress/egress, lifecycle management for topics and consumer groups, and operational diagnostics of the cluster state, abstracting Kafka complexity for AI agents.
Author

tuannvm
Quick Info
Actions
Tags
Kafka MCP Gateway Service
This component serves as a dedicated Model Context Protocol (MCP) endpoint, architected in Go, utilizing the franz-go library for Kafka interaction and mcp-go for protocol handling.
It implements the necessary interfaces to permit Large Language Models (LLMs) to orchestrate complex Kafka tasks via a unified, structured protocol.
Conceptual Overview
The Kafka MCP Gateway acts as a specialized translator, interfacing large language models with the underlying Apache Kafka fabric. It abstracts operational complexity, enabling AI agents to:
- Publish and subscribe to data streams on topics.
- Manage topic metadata (creation, inspection, deletion).
- Audit and administer consumer group states.
- Query real-time cluster diagnostics.
- Execute predefined operational procedures.
This functionality is exposed exclusively through the Model Context Protocol (MCP) layer.
System Topology
mermaid graph TB subgraph "AI Agents (MCP Consumers)" A[LLM Interface: Cursor] B[LLM Interface: Claude] C[LLM Interface: Custom Apps] end
subgraph "Kafka MCP Gateway"
E[Protocol Abstraction Layer]
F[Functionality Registry]
G[State/Metadata Store]
H[Predefined Workflows]
I[Kafka Communication Core]
end
subgraph "Apache Kafka Infrastructure"
J[Broker Node 1]
K[Broker Node 2]
L[Broker Node 3]
M[Topic & Partition Layout]
N[Consumer Group Registry]
end
A --> E
B --> E
C --> E
E --> F
E --> G
E --> H
F --> I
G --> I
H --> I
I --> J
I --> K
I --> L
J --> M
K --> M
L --> M
J --> N
K --> N
L --> N
classDef client fill:#e1f5fe
classDef mcp fill:#f3e5f5
classDef kafka fill:#fff3e0
class A,B,C client
class E,F,G,H,I mcp
class J,K,L,M,N kafka
Operational Flow:
1. Consumers: AI entities transmit requests to the Gateway using the MCP specification, typically over stdio.
2. Gateway Processing: The service dispatches the request through its registries, mapping it to one of three capability sets:
- Tools: Direct, granular operations (e.g., sending a single message).
- Resources: Access to aggregated diagnostic views (e.g., cluster topology).
- Prompts: Executing complex, multi-step operational sequences.
3. Kafka Abstraction: The Kafka Communication Core utilizes franz-go to formulate and execute the necessary low-level broker calls.
4. Execution: The Kafka cluster handles the native data transmission and persistence.
Core Capabilities
- Kafka Native Connectivity: Provides idiomatic access to standard Kafka features via the MCP layer.
- Robust Security: Supports SASL authentication schemes (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512) and Transport Layer Security (TLS).
- Reliable Feedback: Delivers structured, informative error responses.
- Adaptable Configuration: Environment variable driven settings for deployment flexibility.
- Predefined Operational Sequences: A library of prompts designed for rapid execution of common diagnostic tasks.
- LLM Compatibility: Fully compliant with MCP specifications for integration with evolving LLM platforms.
Deployment Guide
Prerequisites
- Go language runtime (version 1.24+).
- Docker environment (necessary for verifying integration tests).
- Network access to the target Kafka cluster.
Installation Methods
Homebrew (Recommended for Unix-like Systems)
Register the necessary repository tap:
bash
Register repository source
brew tap tuannvm/mcp
Deploy the executable
brew install kafka-mcp-gateway
To apply updates:
bash brew update && brew upgrade kafka-mcp-gateway
Building From Source
bash
Clone the source repository
git clone https://github.com/tuannvm/kafka-mcp-server.git cd kafka-mcp-server
Compile the binary executable
go build -o kafka-mcp-gateway ./cmd
Integrating with AI Clients
This Gateway must be registered within your LLM client's configuration file, typically specifying the command, transport method, and necessary connection parameters.
Cursor Configuration Example
Modify ~/.cursor/mcp.json to include the service definition:
{ "mcpServers": { "kafka": { "command": "kafka-mcp-gateway", "args": [], "env": { "KAFKA_BROKERS": "local-kafka-01:9092,local-kafka-02:9092", "KAFKA_CLIENT_ID": "ai-gateway-client", "MCP_TRANSPORT": "stdio" } } } }
Claude Desktop Integration
Update the respective configuration file:
- macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
- Windows: %APPDATA%\Claude\claude_desktop_config.json
Use the same JSON structure provided for Cursor above, substituting the command name if necessary.
Crucially, restart the Claude Desktop application after configuration edits.
Claude Code CLI Setup
Register the tool via the integrated command-line interface:
bash
Register kafka-mcp-gateway, setting connection environment variables
claude mcp add kafka \ --env KAFKA_BROKERS=broker-a:9092 \ --env KAFKA_CLIENT_ID=code-agent \ --env MCP_TRANSPORT=stdio \ --env KAFKA_TLS_ENABLE=false \ --kafka-mcp-gateway
Utility Commands: bash
View current registered services
claude mcp list
De-register the service
claude mcp remove kafka
Verify connectivity (sends a simple protocol handshake)
claude mcp get kafka
ChatWise Setup
Navigate to ChatWise → Settings → Tooling → Add New Tool (“Command Line MCP”):
1. Identifier: kafka
2. Executable Path: kafka-mcp-gateway
3. Arguments: (Empty)
4. Environment Variables: Configure the necessary Kafka connection details (e.g., KAFKA_BROKERS=your-cluster:9092, MCP_TRANSPORT=stdio).
Leveraging mcpenetes for Unified Configuration
Manually synchronizing configuration settings across multiple LLM clients (Cursor, Claude, etc.) is error-prone. The auxiliary utility, mcpenetes, streamlines this synchronization:
bash
Install the synchronization utility
go install github.com/tuannvm/mcpenetes@latest
Simplified Workflow with mcpenetes
bash
Discover registered MCP services, including kafka-mcp-gateway
mcpenetes search
Automatically inject the defined kafka configuration into all detected clients
mcpenetes apply
Load a specific configuration state from the clipboard
mcpenetes load
mcpenetes allows users to maintain distinct profiles (e.g., Dev vs. Prod Kafka endpoints) and instantly toggle them across all integrated AI tools without manual file manipulation.
Available MCP Endpoints
The gateway exposes the following functional primitives via the MCP structure. Comprehensive operational documentation and JSON examples reside in docs/tools.md.
- produce_message: Handles the dispatch of data payloads to specified Kafka destinations.
- consume_messages: Facilitates batched retrieval of records from targeted topic partitions.
- list_brokers: Reports the current configuration and status of connected broker nodes.
- describe_topic: Returns detailed metadata pertaining to a named topic.
- list_consumer_groups: Yields a manifest of all active consumer groups within the topology.
- describe_consumer_group: Retrieves granular metrics for a consumer group, including offset status and lag.
- describe_configs: Fetches runtime and static configuration settings for Kafka system entities.
- cluster_overview: Generates a high-level summary of overall cluster operational health.
- list_topics: Provides a catalog of existing topics, including partition counts and replication factors.
Exposed System Resources
These resources offer read-only diagnostic views accessible via the MCP protocol. Refer to docs/resources.md for response schemas.
- kafka-mcp://overview: Top-line dashboard summarizing cluster vital signs.
- kafka-mcp://health-check: Deep diagnostic scan providing actionable findings.
- kafka-mcp://under-replicated-partitions: Focused report detailing partitions that lack quorum.
- kafka-mcp://consumer-lag-report: Performance analysis report for consumer offsets against configurable thresholds.
Executable Prompts (Workflows)
Pre-packaged, intelligent sequences that automate common diagnostic workflows, detailed in docs/prompts.md:
- kafka_cluster_overview: Automates the generation of a comprehensive cluster status report.
- kafka_health_check: Executes a full diagnostic sweep and suggests corrective actions.
- kafka_under_replicated_partitions: Isolates and provides troubleshooting guidance for replication anomalies.
- kafka_consumer_lag_report: Analyzes consumer backlog and offers tuning suggestions for throughput optimization.
Diagnostic Example: Lag Investigation
User Input: "Our order processing service is lagging significantly. Can you analyze the consumer backlog?"
AI Workflow Execution Sequence:
- Tool Call:
list_consumer_groups - Result: Identifies the target group:
order-processor. - Tool Call:
describe_consumer_group(Parameters:group_id="order-processor", include_offsets=true) - Result: Detects high message lag on specific topic partitions.
- Tool Call:
cluster_overview - Result: Verifies that the underlying Kafka infrastructure is stable (no dead brokers or replication failures).
- Prompt Execution:
kafka_consumer_lag_report(Parameter:threshold=1000) - Result: Generates a formal analysis.
- Final Output to User:
Analysis indicates critical backlog for the 'order-processor' group on topic 'orders', specifically partitions 2 (Lag: 15,420) and 3 (Lag: 12,305).
The Kafka cluster resources are functioning optimally; the bottleneck resides within the consumer application logic or resource allocation.
Recommended remediation steps: 1. Investigate I/O or processing slowdowns within the consumer instances managing partitions 2 and 3. 2. Propose horizontal scaling of the consumer group. 3. Review consumer configuration tuning parameters (e.g., commit frequency, batch processing size).
This capability showcases how the Gateway empowers LLMs to perform sophisticated, multi-step operational analysis on distributed systems.
Configuration Parameters
The operational behavior of the Gateway is governed by the following environment variables:
| Variable | Purpose | Default Value |
|---|---|---|
KAFKA_BROKERS |
Required list of Kafka bootstrap server endpoints (comma-separated) | localhost:9092 |
KAFKA_CLIENT_ID |
Identifier used by the internal Kafka client sessions | kafka-mcp-gateway |
MCP_TRANSPORT |
Communication channel protocol (stdio or http) |
stdio |
KAFKA_SASL_MECHANISM |
Selection: plain, scram-sha-256, scram-sha-512, or "" (off) |
"" |
KAFKA_SASL_USER |
SASL Principal identifier | "" |
KAFKA_SASL_PASSWORD |
Secret credential for authentication | "" |
KAFKA_TLS_ENABLE |
Boolean flag to activate SSL/TLS encryption | false |
KAFKA_TLS_INSECURE_SKIP_VERIFY |
Bypasses certificate validation during TLS handshake | false |
Security Warning: Setting
KAFKA_TLS_INSECURE_SKIP_VERIFYtotruedisables critical security checks. Use this setting exclusively for non-production, controlled testing scenarios.
Security Posture
The Gateway is engineered with enterprise security requirements as a priority:
- Access Control: Full native support for SASL authentication methods.
- Data Integrity: TLS encryption ensures secure communication channels to brokers.
- Input Hardening: Rigorous sanitization and validation applied to all incoming MCP requests to mitigate injection vectors.
- Information Leakage Prevention: Error responses are carefully constructed to avoid disclosing sensitive infrastructure details.
Engineering & Maintenance
Testing Suite
Maintainability relies on comprehensive testing, particularly for integration points:
bash
Execute all unit and integration tests (integration tests require Docker running)
go test ./...
Run only short, unit-level tests
go test -short ./...
Execute targeted integration tests against a specified broker endpoint
export KAFKA_BROKERS="test-broker:9093" export SKIP_KAFKA_TESTS="false" go test ./kafka -v -run Test
Contribution Guidelines
We welcome external enhancements and bug fixes. Please review the contribution guidelines and submit Pull Requests for review.
License
This software is distributed under the terms of the MIT License. Consult the LICENSE file for full copyright and permission details.
WIKIPEDIA: XMLHttpRequest (XHR) is an API in the form of a JavaScript object whose methods transmit HTTP requests from a web browser to a web server. The methods allow a browser-based application to send requests to the server after page loading is complete, and receive information back. XMLHttpRequest is a component of Ajax programming. Prior to Ajax, hyperlinks and form submissions were the primary mechanisms for interacting with the server, often replacing the current page with another one.
== History == The concept behind XMLHttpRequest was conceived in 2000 by the developers of Microsoft Outlook. The concept was then implemented within the Internet Explorer 5 browser (1999). However, the original syntax did not use the XMLHttpRequest identifier. Instead, the developers used the identifiers ActiveXObject("Msxml2.XMLHTTP") and ActiveXObject("Microsoft.XMLHTTP"). As of Internet Explorer 7 (2006), all browsers support the XMLHttpRequest identifier. The XMLHttpRequest identifier is now the de facto standard in all the major browsers, including Mozilla's Gecko layout engine (2002), Safari 1.2 (2004) and Opera 8.0 (2005).
=== Standards === The World Wide Web Consortium (W3C) published a Working Draft specification for the XMLHttpRequest object on April 5, 2006. On February 25, 2008, the W3C published the Working Draft Level 2 specification. Level 2 added methods to monitor event progress, allow cross-site requests, and handle byte streams. At the end of 2011, the Level 2 specification was absorbed into the original specification. At the end of 2012, the WHATWG took over development and maintains a living document using Web IDL.
== Usage == Generally, sending a request with XMLHttpRequest has several programming steps.
Create an XMLHttpRequest object by calling a constructor: Call the "open" method to specify the request type, identify the relevant resource, and select synchronous or asynchronous operation: For an asynchronous request, set a listener that will be notified when the request's state changes: Initiate the request by calling the "send" method: Respond to state changes in the event listener. If the server sends response data, by default it is captured in the "responseText" property. When the object stops processing the response, it changes to state 4, the "done" state. Aside from these general steps, XMLHttpRequest has many options to control how the request is sent and how the response is processed. Custom header fields can be added to the request to indicate how the server should fulfill it, and data can be uploaded to the server by providing it in the "send" call. The response can be parsed from the JSON format into a readily usable JavaScript object, or processed gradually as it arrives rather than waiting for the entire text. The request can be aborted prematurely or set to fail if not completed in a specified amount of time.
== Cross-domain requests ==
In the early development of the World Wide Web, it was found possible to brea
