logo
Free, unlimited AI code reviews that run on commit
git-lrc git-lrc GitHub Install Now We'd appreciate a star git-lrc - Free, unlimited AI code reviews that run on commit | Product Hunt git-lrc - Free, unlimited AI code reviews that run on commit | Product Hunt

kafka-mcp-gateway

A standardized Model Context Protocol (MCP) bridge designed for seamless, programmatic interaction with Apache Kafka message streaming infrastructure. Facilitates message ingress/egress, lifecycle management for topics and consumer groups, and operational diagnostics of the cluster state, abstracting Kafka complexity for AI agents.

Author

kafka-mcp-gateway logo

tuannvm

No License

Quick Info

GitHub GitHub Stars 31
NPM Weekly Downloads 0
Tools 1
Last Updated 2026-02-19

Tags

kafkaclustersclusterkafka clustersapache kafkakafka operations

Kafka MCP Gateway Service

This component serves as a dedicated Model Context Protocol (MCP) endpoint, architected in Go, utilizing the franz-go library for Kafka interaction and mcp-go for protocol handling.

It implements the necessary interfaces to permit Large Language Models (LLMs) to orchestrate complex Kafka tasks via a unified, structured protocol.

Go Report Card GitHub Workflow Status Go Version Trivy Scan SLSA 3 Go Reference Docker Image GitHub Release License: MIT

Conceptual Overview

The Kafka MCP Gateway acts as a specialized translator, interfacing large language models with the underlying Apache Kafka fabric. It abstracts operational complexity, enabling AI agents to:

  • Publish and subscribe to data streams on topics.
  • Manage topic metadata (creation, inspection, deletion).
  • Audit and administer consumer group states.
  • Query real-time cluster diagnostics.
  • Execute predefined operational procedures.

This functionality is exposed exclusively through the Model Context Protocol (MCP) layer.

System Topology

mermaid graph TB subgraph "AI Agents (MCP Consumers)" A[LLM Interface: Cursor] B[LLM Interface: Claude] C[LLM Interface: Custom Apps] end

subgraph "Kafka MCP Gateway"
    E[Protocol Abstraction Layer]
    F[Functionality Registry]
    G[State/Metadata Store]
    H[Predefined Workflows]
    I[Kafka Communication Core]
end

subgraph "Apache Kafka Infrastructure"
    J[Broker Node 1]
    K[Broker Node 2]
    L[Broker Node 3]
    M[Topic & Partition Layout]
    N[Consumer Group Registry]
end

A --> E
B --> E
C --> E

E --> F
E --> G
E --> H

F --> I
G --> I
H --> I

I --> J
I --> K
I --> L

J --> M
K --> M
L --> M

J --> N
K --> N
L --> N

classDef client fill:#e1f5fe
classDef mcp fill:#f3e5f5
classDef kafka fill:#fff3e0

class A,B,C client
class E,F,G,H,I mcp
class J,K,L,M,N kafka

Operational Flow: 1. Consumers: AI entities transmit requests to the Gateway using the MCP specification, typically over stdio. 2. Gateway Processing: The service dispatches the request through its registries, mapping it to one of three capability sets: - Tools: Direct, granular operations (e.g., sending a single message). - Resources: Access to aggregated diagnostic views (e.g., cluster topology). - Prompts: Executing complex, multi-step operational sequences. 3. Kafka Abstraction: The Kafka Communication Core utilizes franz-go to formulate and execute the necessary low-level broker calls. 4. Execution: The Kafka cluster handles the native data transmission and persistence.

Core Capabilities

  • Kafka Native Connectivity: Provides idiomatic access to standard Kafka features via the MCP layer.
  • Robust Security: Supports SASL authentication schemes (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512) and Transport Layer Security (TLS).
  • Reliable Feedback: Delivers structured, informative error responses.
  • Adaptable Configuration: Environment variable driven settings for deployment flexibility.
  • Predefined Operational Sequences: A library of prompts designed for rapid execution of common diagnostic tasks.
  • LLM Compatibility: Fully compliant with MCP specifications for integration with evolving LLM platforms.

Deployment Guide

Prerequisites

  • Go language runtime (version 1.24+).
  • Docker environment (necessary for verifying integration tests).
  • Network access to the target Kafka cluster.

Installation Methods

Register the necessary repository tap:

bash

Register repository source

brew tap tuannvm/mcp

Deploy the executable

brew install kafka-mcp-gateway

To apply updates:

bash brew update && brew upgrade kafka-mcp-gateway

Building From Source

bash

Clone the source repository

git clone https://github.com/tuannvm/kafka-mcp-server.git cd kafka-mcp-server

Compile the binary executable

go build -o kafka-mcp-gateway ./cmd

Integrating with AI Clients

This Gateway must be registered within your LLM client's configuration file, typically specifying the command, transport method, and necessary connection parameters.

Cursor Configuration Example

Modify ~/.cursor/mcp.json to include the service definition:

{ "mcpServers": { "kafka": { "command": "kafka-mcp-gateway", "args": [], "env": { "KAFKA_BROKERS": "local-kafka-01:9092,local-kafka-02:9092", "KAFKA_CLIENT_ID": "ai-gateway-client", "MCP_TRANSPORT": "stdio" } } } }

Claude Desktop Integration

Update the respective configuration file: - macOS: ~/Library/Application Support/Claude/claude_desktop_config.json - Windows: %APPDATA%\Claude\claude_desktop_config.json

Use the same JSON structure provided for Cursor above, substituting the command name if necessary.

Crucially, restart the Claude Desktop application after configuration edits.

Claude Code CLI Setup

Register the tool via the integrated command-line interface:

bash

Register kafka-mcp-gateway, setting connection environment variables

claude mcp add kafka \ --env KAFKA_BROKERS=broker-a:9092 \ --env KAFKA_CLIENT_ID=code-agent \ --env MCP_TRANSPORT=stdio \ --env KAFKA_TLS_ENABLE=false \ --kafka-mcp-gateway

Utility Commands: bash

View current registered services

claude mcp list

De-register the service

claude mcp remove kafka

Verify connectivity (sends a simple protocol handshake)

claude mcp get kafka

ChatWise Setup

Navigate to ChatWise → Settings → Tooling → Add New Tool (“Command Line MCP”): 1. Identifier: kafka 2. Executable Path: kafka-mcp-gateway 3. Arguments: (Empty) 4. Environment Variables: Configure the necessary Kafka connection details (e.g., KAFKA_BROKERS=your-cluster:9092, MCP_TRANSPORT=stdio).

Leveraging mcpenetes for Unified Configuration

Manually synchronizing configuration settings across multiple LLM clients (Cursor, Claude, etc.) is error-prone. The auxiliary utility, mcpenetes, streamlines this synchronization:

bash

Install the synchronization utility

go install github.com/tuannvm/mcpenetes@latest

Simplified Workflow with mcpenetes

bash

Discover registered MCP services, including kafka-mcp-gateway

mcpenetes search

Automatically inject the defined kafka configuration into all detected clients

mcpenetes apply

Load a specific configuration state from the clipboard

mcpenetes load

mcpenetes allows users to maintain distinct profiles (e.g., Dev vs. Prod Kafka endpoints) and instantly toggle them across all integrated AI tools without manual file manipulation.

Available MCP Endpoints

The gateway exposes the following functional primitives via the MCP structure. Comprehensive operational documentation and JSON examples reside in docs/tools.md.

  • produce_message: Handles the dispatch of data payloads to specified Kafka destinations.
  • consume_messages: Facilitates batched retrieval of records from targeted topic partitions.
  • list_brokers: Reports the current configuration and status of connected broker nodes.
  • describe_topic: Returns detailed metadata pertaining to a named topic.
  • list_consumer_groups: Yields a manifest of all active consumer groups within the topology.
  • describe_consumer_group: Retrieves granular metrics for a consumer group, including offset status and lag.
  • describe_configs: Fetches runtime and static configuration settings for Kafka system entities.
  • cluster_overview: Generates a high-level summary of overall cluster operational health.
  • list_topics: Provides a catalog of existing topics, including partition counts and replication factors.

Exposed System Resources

These resources offer read-only diagnostic views accessible via the MCP protocol. Refer to docs/resources.md for response schemas.

  • kafka-mcp://overview: Top-line dashboard summarizing cluster vital signs.
  • kafka-mcp://health-check: Deep diagnostic scan providing actionable findings.
  • kafka-mcp://under-replicated-partitions: Focused report detailing partitions that lack quorum.
  • kafka-mcp://consumer-lag-report: Performance analysis report for consumer offsets against configurable thresholds.

Executable Prompts (Workflows)

Pre-packaged, intelligent sequences that automate common diagnostic workflows, detailed in docs/prompts.md:

  • kafka_cluster_overview: Automates the generation of a comprehensive cluster status report.
  • kafka_health_check: Executes a full diagnostic sweep and suggests corrective actions.
  • kafka_under_replicated_partitions: Isolates and provides troubleshooting guidance for replication anomalies.
  • kafka_consumer_lag_report: Analyzes consumer backlog and offers tuning suggestions for throughput optimization.

Diagnostic Example: Lag Investigation

User Input: "Our order processing service is lagging significantly. Can you analyze the consumer backlog?"

AI Workflow Execution Sequence:

  1. Tool Call: list_consumer_groups
  2. Result: Identifies the target group: order-processor.
  3. Tool Call: describe_consumer_group (Parameters: group_id="order-processor", include_offsets=true)
  4. Result: Detects high message lag on specific topic partitions.
  5. Tool Call: cluster_overview
  6. Result: Verifies that the underlying Kafka infrastructure is stable (no dead brokers or replication failures).
  7. Prompt Execution: kafka_consumer_lag_report (Parameter: threshold=1000)
  8. Result: Generates a formal analysis.
  9. Final Output to User:

Analysis indicates critical backlog for the 'order-processor' group on topic 'orders', specifically partitions 2 (Lag: 15,420) and 3 (Lag: 12,305).

The Kafka cluster resources are functioning optimally; the bottleneck resides within the consumer application logic or resource allocation.

Recommended remediation steps: 1. Investigate I/O or processing slowdowns within the consumer instances managing partitions 2 and 3. 2. Propose horizontal scaling of the consumer group. 3. Review consumer configuration tuning parameters (e.g., commit frequency, batch processing size).

This capability showcases how the Gateway empowers LLMs to perform sophisticated, multi-step operational analysis on distributed systems.

Configuration Parameters

The operational behavior of the Gateway is governed by the following environment variables:

Variable Purpose Default Value
KAFKA_BROKERS Required list of Kafka bootstrap server endpoints (comma-separated) localhost:9092
KAFKA_CLIENT_ID Identifier used by the internal Kafka client sessions kafka-mcp-gateway
MCP_TRANSPORT Communication channel protocol (stdio or http) stdio
KAFKA_SASL_MECHANISM Selection: plain, scram-sha-256, scram-sha-512, or "" (off) ""
KAFKA_SASL_USER SASL Principal identifier ""
KAFKA_SASL_PASSWORD Secret credential for authentication ""
KAFKA_TLS_ENABLE Boolean flag to activate SSL/TLS encryption false
KAFKA_TLS_INSECURE_SKIP_VERIFY Bypasses certificate validation during TLS handshake false

Security Warning: Setting KAFKA_TLS_INSECURE_SKIP_VERIFY to true disables critical security checks. Use this setting exclusively for non-production, controlled testing scenarios.

Security Posture

The Gateway is engineered with enterprise security requirements as a priority:

  • Access Control: Full native support for SASL authentication methods.
  • Data Integrity: TLS encryption ensures secure communication channels to brokers.
  • Input Hardening: Rigorous sanitization and validation applied to all incoming MCP requests to mitigate injection vectors.
  • Information Leakage Prevention: Error responses are carefully constructed to avoid disclosing sensitive infrastructure details.

Engineering & Maintenance

Testing Suite

Maintainability relies on comprehensive testing, particularly for integration points:

bash

Execute all unit and integration tests (integration tests require Docker running)

go test ./...

Run only short, unit-level tests

go test -short ./...

Execute targeted integration tests against a specified broker endpoint

export KAFKA_BROKERS="test-broker:9093" export SKIP_KAFKA_TESTS="false" go test ./kafka -v -run Test

Contribution Guidelines

We welcome external enhancements and bug fixes. Please review the contribution guidelines and submit Pull Requests for review.

License

This software is distributed under the terms of the MIT License. Consult the LICENSE file for full copyright and permission details.

WIKIPEDIA: XMLHttpRequest (XHR) is an API in the form of a JavaScript object whose methods transmit HTTP requests from a web browser to a web server. The methods allow a browser-based application to send requests to the server after page loading is complete, and receive information back. XMLHttpRequest is a component of Ajax programming. Prior to Ajax, hyperlinks and form submissions were the primary mechanisms for interacting with the server, often replacing the current page with another one.

== History == The concept behind XMLHttpRequest was conceived in 2000 by the developers of Microsoft Outlook. The concept was then implemented within the Internet Explorer 5 browser (1999). However, the original syntax did not use the XMLHttpRequest identifier. Instead, the developers used the identifiers ActiveXObject("Msxml2.XMLHTTP") and ActiveXObject("Microsoft.XMLHTTP"). As of Internet Explorer 7 (2006), all browsers support the XMLHttpRequest identifier. The XMLHttpRequest identifier is now the de facto standard in all the major browsers, including Mozilla's Gecko layout engine (2002), Safari 1.2 (2004) and Opera 8.0 (2005).

=== Standards === The World Wide Web Consortium (W3C) published a Working Draft specification for the XMLHttpRequest object on April 5, 2006. On February 25, 2008, the W3C published the Working Draft Level 2 specification. Level 2 added methods to monitor event progress, allow cross-site requests, and handle byte streams. At the end of 2011, the Level 2 specification was absorbed into the original specification. At the end of 2012, the WHATWG took over development and maintains a living document using Web IDL.

== Usage == Generally, sending a request with XMLHttpRequest has several programming steps.

Create an XMLHttpRequest object by calling a constructor: Call the "open" method to specify the request type, identify the relevant resource, and select synchronous or asynchronous operation: For an asynchronous request, set a listener that will be notified when the request's state changes: Initiate the request by calling the "send" method: Respond to state changes in the event listener. If the server sends response data, by default it is captured in the "responseText" property. When the object stops processing the response, it changes to state 4, the "done" state. Aside from these general steps, XMLHttpRequest has many options to control how the request is sent and how the response is processed. Custom header fields can be added to the request to indicate how the server should fulfill it, and data can be uploaded to the server by providing it in the "send" call. The response can be parsed from the JSON format into a readily usable JavaScript object, or processed gradually as it arrives rather than waiting for the entire text. The request can be aborted prematurely or set to fail if not completed in a specified amount of time.

== Cross-domain requests ==

In the early development of the World Wide Web, it was found possible to brea

See Also

`