logo
Free, unlimited AI code reviews that run on commit
git-lrc git-lrc GitHub Install Now We'd appreciate a star git-lrc - Free, unlimited AI code reviews that run on commit | Product Hunt git-lrc - Free, unlimited AI code reviews that run on commit | Product Hunt

Coreflux-MQTT-MCP-Gateway

A robust server component facilitating secure, bidirectional connectivity between an MCP-aware AI assistant (like Claude) and a Coreflux MQTT message broker. It exposes Coreflux system capabilities—including commands, rules, and data routes—via standardized Model Context Protocol endpoints, supporting dynamic discovery and secure execution.

Author

Coreflux-MQTT-MCP-Gateway logo

CorefluxCommunity

Apache License 2.0

Quick Info

GitHub GitHub Stars 3
NPM Weekly Downloads 0
Tools 1
Last Updated 2026-02-19

Tags

corefluxcommunitymqttcorefluxcoreflux mqttservices corefluxcommunitycorefluxcommunity coreflux

Coreflux MQTT MCP Gateway Server

License Python Docker Tests Code Quality

An enterprise-grade Model Context Protocol (MCP) bridge engineered to provide secure, highly scalable mediation for AI assistants interfacing with Coreflux's event-driven IoT infrastructure via MQTT.

🚀 Core Capabilities

Foundational Integration

  • 🔌 Secure MQTT Nexus: Establishes resilient connections to Coreflux MQTT brokers, enforcing full Transport Layer Security (TLS).
  • 🛠️ Full API Surface: Grants comprehensive access to registered Coreflux models, declarative rules, and data routing configurations.
  • 🤖 Cognitive Code Synthesis: Leverages the Coreflux Copilot API for Language-of-Things (LOT) code generation based on natural language intent.
  • 🔍 Capability Enumeration: Implements automatic introspection to list all available actions and endpoints.
  • 🏥 Operational Oversight: Provides continuous system diagnostics and health status reporting.

Production Readiness

  • 🔒 Security Hardening: Features extensive input scrubbing, validation mechanisms, and sensitive data sanitization within logs.
  • Asynchronous Throughput: Utilizes a non-blocking architecture with integrated queue management and traffic throttling.
  • Advanced Telemetry: Structured, rotatable logging with security filtering applied at the output stage.
  • Configuration Integrity: A rigorous validation pipeline checks all runtime parameters and environment settings.
  • 🧪 Testing Rigor: Includes a comprehensive unit and integration testing framework with mocking capabilities.

DevOps & Deployment

  • 🐳 Containerization: Fully configured for deployment via Docker and Kubernetes orchestration.
  • 🔄 Automated Pipelines: Continuous Integration/Continuous Deployment (CI/CD) workflows managed by GitHub Actions for testing and quality assurance.
  • 📦 Developer Tooling: Integrated pre-commit hooks, code formatters (e.g., Black, isort), and documentation generation utilities.
  • ⚙️ Streamlined Onboarding: An interactive setup utility simplifies initial configuration and validation.
  • 📚 Comprehensive Guides: Extensive documentation covering API details, security protocols, and deployment topology.

Quick Start Guide

Docker Orchestration (Preferred Method)

  1. Acquire Source and Initialize: bash git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server cp .env.example .env # Modify configuration parameters in the new .env file

  2. Execute Deployment: bash docker-compose up -d

Prerequisites

  • Python Interpreter (Version 3.11 or newer)
  • Docker Engine (Recommended for operational deployment)
  • Network access to a Coreflux MQTT broker instance
  • Coreflux Copilot credential (optional, required for AI code features)

Option 1: Containerized Launch Sequence

(Refer to the Docker Deployment section above for steps 1 & 2)

  1. Verify Service Operation: bash docker-compose logs -f coreflux-mcp-server

Option 2: Local Development Environment Setup

  1. Clone Repository: bash git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server

  2. Dependency Installation: bash pip install -r requirements.txt # For development builds pip install -r requirements-dev.txt

  3. Environment Setup: bash python setup_assistant.py # Invoke interactive setup tool # Alternative: cp .env.example .env && nano .env

  4. Validation and Execution: bash make validate # Configuration integrity check make test # Execute test suite

  5. Server Start: bash python server.py # OR using utility script make run

Comprehensive deployment directives are available in DEPLOYMENT.md.

⚙️ Configuration Management

Automated Configuration Assistant

The integrated setup utility automates and validates system configuration:

bash python setup_assistant.py

Assistant Focus Areas: - 🔧 MQTT endpoint parameters and credentials - 🔐 TLS certificate path specification - 🤖 Integration parameters for the Coreflux Copilot interface - 📝 Definition of logging destinations and verbosity - ✅ Final configuration validation and preliminary connectivity testing

When to use the assistant: - Initial system provisioning - Modifying established connection parameters - Diagnosing connectivity failures - Updating security credentials or certificate anchors

Environment Variable Definition

Replicate .env.example into .env to define runtime parameters:

bash

MQTT Broker Credentials

MQTT_BROKER=broker.example.io MQTT_PORT=8883 MQTT_USER=mcp_client MQTT_PASSWORD=secure_secret MQTT_USE_TLS=true

TLS Certificate Paths (Required if MQTT_USE_TLS is true)

MQTT_CA_CERT=/etc/certs/ca.pem MQTT_CERT_FILE=/etc/certs/client.crt
MQTT_KEY_FILE=/etc/certs/client.key

Cognitive Agent API Key

DO_AGENT_API_KEY=ai_access_token_xyz

Logging Configuration

LOG_LEVEL=INFO LOG_FILE=/var/log/coreflux-mcp-gateway.log

Refer to SECRET_MANAGEMENT.md for advanced configuration structure and secure secrets handling.

🔌 Linking Claude via MCP

Integration in Claude Desktop

  1. Locate Claude Configuration:
  2. *NIX Systems: ~/Library/Application Support/Claude/claude_desktop_config.json
  3. Windows: %USERPROFILE%\AppData\Roaming\Claude\claude_desktop_config.json

  4. Inject Server Definition:

{ "mcpServers": { "coreflux_bridge": { "command": "python", "args": ["/path/to/your/server.py"], "env": { "MQTT_BROKER": "broker.example.io", "MQTT_PORT": "8883", "MQTT_USER": "mcp_client", "MQTT_PASSWORD": "secure_secret", "MQTT_USE_TLS": "true", "DO_AGENT_API_KEY": "ai_access_token_xyz" } } } }

  1. Restart Claude Desktop to load the new server mapping.

Security Advisory: In production environments, avoid direct hardcoding of sensitive credentials within the Claude configuration file; instead, utilize secure environment variable injection methods.

Secure Configuration via Environment Variables

For superior credential hygiene, leverage environment placeholders:

{ "mcpServers": { "coreflux_bridge": { "command": "python", "args": ["/path/to/your/server.py"], "env": { "MQTT_BROKER": "${COREFLUX_MQTT_BROKER}", "MQTT_PORT": "${COREFLUX_MQTT_PORT}", "MQTT_USER": "${COREFLUX_MQTT_USER}", "MQTT_PASSWORD": "${COREFLUX_MQTT_PASSWORD}", "DO_AGENT_API_KEY": "${COREFLUX_AI_KEY}" } } } }

Connection Verification

Once configured, prompt Claude to confirm operational status:

Query the operational status and configuration details of the Coreflux MCP bridge server.

Success is indicated by Claude returning system health metrics and broker metadata.

🛠️ Exposed MCP Tools

The gateway service exposes the following capabilities to the connected AI agent:

Core MQTT Utilities

  • publish_data_payload - Transmit arbitrary data payloads to specified MQTT destinations, controlling Quality of Service (QoS) and message persistence.
  • retrieve_broker_metadata - Obtain comprehensive diagnostics regarding the active MQTT connection state and configuration parameters.

Cognitive Services Interface

  • synthesize_lot_code - Engage the Coreflux Copilot engine to produce Language-of-Things (LOT) compliant automation logic from descriptive prompts.

System Health & Diagnostics

  • execute_deep_health_scan - Initiate an exhaustive diagnostic procedure across all integrated subsystems.

For the full contract definition, consult API_DOCUMENTATION.md.

🧪 Development & Quality Assurance

Initializing the Development Environment

  1. Install Development Toolchain: bash pip install -r requirements-dev.txt

  2. Activate Pre-commit Hooks: bash pre-commit install

  3. Full Setup Routine: bash make dev-setup # Initializes environment, installs hooks, and prepares workspace

Test Execution Strategy

Execute the complete quality assurance matrix:

bash

Execute entire test suite

make test

Run tests with detailed coverage reporting

make test-coverage

Targeted testing

make test-unit # Unit tests only make test-integration # Integration tests only

Code Quality Enforcement

Automated tooling maintains code hygiene:

bash

Apply code formatting standards

make format

Execute static analysis tools (linters)

make lint

Security vulnerability scanning

make security-check

Static type verification

make type-check

Run all automated quality gates

make quality-check

Available Utility Commands for Daily Workflow:

bash

Setup and run cycle

make dev-setup # Prepare complete dev workspace make validate # Validate configuration integrity make run # Launch server after validation make run-debug # Launch server with verbose debugging output

Testing/Validation

make test # Execute all tests make test-coverage # Generate test coverage report make test-unit # Execute only unit tests make validate-config # Specific check for configuration files

Code Quality

make format # Enforce formatting (black/isort) make lint # Run linters (flake8, bandit, mypy) make security-check # Run security analysis (bandit) make type-check # Run static type checking (mypy)

Container Operations

make docker-build # Build the official Docker artifact make docker-run # Start the application within a container make docker-test # Execute test suite inside the container environment

Documentation

make docs # Generate Sphinx documentation artifacts make docs-serve # Serve generated documentation locally via HTTP

🔧 System Architecture Overview

Primary Modules

  • server.py - The main application entry point and tool registry handler.
  • config_validator.py - Module responsible for parsing and verifying runtime configuration.
  • message_processor.py - Manages asynchronous MQTT traffic flow, implementing rate limiting.
  • enhanced_logging.py - Provides secure, structured logging services with file rotation.
  • config_schema.py - Defines Pydantic models for guaranteed type-safe configuration structures.
  • parser.py - Utility functions for input sanitization and data transformation.

Security Framework

  • Injection Prevention: Rigorous sanitization applied across all externally sourced inputs.
  • Data Masking: Automated scrubbing of sensitive fields within generated log records.
  • Encryption: Mandatory TLS/SSL encryption utilized for all MQTT transport layers.
  • Configuration Auditing: Comprehensive parameter verification prior to initialization.
  • Principle of Least Privilege: Designed for execution with minimal necessary operational permissions.

Performance Optimizations

  • Asynchronous I/O: Maximizes concurrency through non-blocking operations.
  • Connection Lifespan Management: Efficient connection pooling for MQTT sessions.
  • Traffic Shaping: Configurable rate limiting safeguards against ingestion spikes.
  • Diagnostic Instrumentation: Real-time metrics collection for performance tracking.

📚 Documentation Index

  • API Specification - Reference for all exposed tool contracts.
  • Deployment Manual - Production hardening and orchestration instructions.
  • Secret Handling Guide - Protocols for securing credentials.
  • Configuration Reference - Exhaustive listing of all environment variables.

🐳 Docker Deployment Instructions

Rapid Containerized Initialization

bash

Fetch source code and navigate

git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server

Configure environment file

cp .env.example .env nano .env # Input broker details and keys

Start detached services

docker-compose up -d

Monitor output

docker-compose logs -f coreflux-mcp-server

Execute health check tool via running container

docker-compose exec coreflux-mcp-server python -c " import os os.system('python server.py --health-check') "

Advanced Production Deployment

Refer to DEPLOYMENT.md for critical production considerations, including:

  • Optimized multi-stage Docker build artifacts
  • Kubernetes manifest generation
  • Integration with service meshes and load balancers
  • Security baseline hardening procedures

🔑 Coreflux Copilot AI Synergy

The gateway fully integrates the Coreflux Copilot API for advanced AI-driven automation support:

Setup Procedure

  1. Acquire Key: Obtain the necessary API access token from the Coreflux Copilot portal.
  2. Credential Injection: Set the key via: bash # Method A: Direct file modification echo "DO_AGENT_API_KEY=your_ai_token_here" >> .env

# Method B: Shell export export DO_AGENT_API_KEY=your_ai_token_here

Value Proposition

  • LOT Code Blueprinting: Rapid creation of Language-of-Things logic from plain English descriptions.
  • Automation Consultation: Expert advice on designing efficient Coreflux workflows.
  • Implementation Guidance: Recommendations on best practices for Coreflux resource utilization.
  • Diagnostic Support: AI-assisted debugging and performance tuning suggestions.

Example Interaction

Prompt Claude to generate automation logic:

Develop LOT script to monitor ambient light levels and activate dimmers if lux readings exceed 800 for five consecutive minutes.

Prompt Claude for workflow assistance:

Outline the optimal structure for routing high-frequency sensor telemetry into a time-series database instance.

🚀 Advanced Operational Features

High-Throughput Asynchronous Pipeline

The core message handling mechanism is designed for resilience and speed:

  • Non-Blocking Dispatch: Ensures responsiveness by processing incoming requests concurrently.
  • Backpressure Control: Implements adaptive rate limiting to prevent broker saturation.
  • Queue Resilience: Sophisticated management of pending message queues.
  • Performance Visibility: Exposure of real-time throughput and latency metrics.

Enterprise-Grade Logging Architecture

Logging capabilities include features critical for audit and debugging:

  • JSON Serialization: Ensures logs are immediately machine-readable.
  • Automated Rollover: Manages disk consumption via timestamped file rotation.
  • PII Scrubbing: Automated removal/masking of sensitive payload elements.
  • Multi-Target Output: Support for console output, persistent files, and centralized syslog ingestion.

Configuration Verification Engine

A mandatory validation sequence checks all system prerequisites:

  • Environment Checklist: Confirms existence and correctness of all necessary environment variables.
  • File Access Checks: Verifies read permissions for TLS certificates and configuration paths.
  • Network Probing: Performs preliminary connectivity tests against the configured MQTT endpoint.
  • Dependency Checks: Assesses the availability and responsiveness of external services (e.g., Copilot API).

🛡️ Security Posture & Governance

Core Security Mechanisms

  • Input Validation: Strict schema enforcement and sanitization on all received MCP requests.
  • End-to-End Encryption: Mandated TLS protocol usage for communication channels.
  • Credential Isolation: Secure handling and isolation of secrets.
  • Activity Auditing: Comprehensive logging of security-relevant events.
  • Privilege Separation: Operational execution within a least-privilege context.

Compliance Alignment

This infrastructure is built to support adherence to common regulatory frameworks:

  • SOC 2 - Security monitoring and control implementation.
  • GDPR - Mechanisms for data handling and access control.
  • HIPAA - Applicable controls for safeguarding protected health information (contingent on deployment configuration).

Detailed security methodology is documented in SECRET_MANAGEMENT.md.

📊 Operational Monitoring & Health Probes

Health Check Tooling

Utilize the specialized execute_deep_health_scan tool for diagnostic purposes:

bash

Command-line health check invocation

python server.py --health-check

AI-driven check request via Claude:

"Run a comprehensive diagnostic assessment on the Coreflux MCP Gateway and report status."

Key Performance Indicators (KPIs)

The service exposes critical operational metrics:

  • MQTT Health: Real-time connection status to the broker.
  • Throughput: Messages processed per second, queue depth.
  • Resource Utilization: System metrics (CPU/Memory consumption).
  • Error Rates: Frequency of failed operations and exception types.
  • AI Latency: Response times from the Copilot API endpoint.

Alerting Framework

Integration hooks allow configuration for alerts based on:

  • MQTT connection interruptions or degradation.
  • Sustained high error rates.
  • Resource saturation thresholds.
  • Security policy violations.

🤝 Community Contribution Guidelines

We encourage external collaboration to enhance this project. Review our contribution pathway:

Development Lifecycle

  1. Fork the primary repository.
  2. Branch Out: Create a dedicated feature branch (e.g., git checkout -b feat/new-tooling).
  3. Setup: Install development packages (pip install -r requirements-dev.txt) and hooks (pre-commit install).
  4. Develop: Implement changes, ensuring corresponding tests are added or updated.
  5. Validate Quality: Execute quality gates (make quality-check).
  6. Commit: Use conventional commit messages (git commit -am 'feat: Add new capability X').
  7. Submit: Push to your fork and open a Pull Request targeting the main branch.

Adherence to Standards

  • Mandatory adherence to Python 3.11+ specifications.
  • Use of type annotations across all interfaces.
  • Target test coverage exceeding 90%.
  • Security analysis via Bandit.
  • Code formatting enforced by Black and isort.
  • Thorough documentation for all public interfaces.

📄 Licensing

This software is distributed under the terms of the Apache License 2.0.

🆘 Support & Incident Resolution

Frequent Failure Analysis

Connection Refused Errors: Often manifests as Error: MQTT broker connection refused. * Action: Verify host/port accuracy and firewall rules. * Action: Confirm the broker is actively listening on the specified interface.

Authentication Failures: Indicated by Error: Authorization denied on connection. * Action: Re-validate provided credentials against the broker's access control list (ACL). * Action: Verify Coreflux Copilot API key status.

TLS Negotiation Errors: Seen as Error: SSL/TLS handshake failure. * Action: Check certificate file paths and validity timelines. * Action: Ensure client/server TLS protocol versions are compatible.

Enhanced Debugging Mode

For deep inspection, elevate logging verbosity:

bash export LOG_LEVEL=DEBUG python server.py

Channels for Assistance

🗺️ Future Development Trajectory

Current Milestone: Version 1.0.0 (Complete) ✅

  • ✅ Established core MQTT interoperability.
  • ✅ Functional Copilot AI integration layer.
  • ✅ Implemented production-grade security controls.
  • ✅ Comprehensive test suite coverage.
  • ✅ Deployment artifacts finalized.

Upcoming Iterations

  • v1.1.0: Advanced real-time metrics exposition and integration with external monitoring systems.
  • v1.2.0: Expansion of supported Coreflux native API calls exposed to MCP.
  • v1.3.0: Introduction of WebSocket backchannel support for continuous streaming data.
  • v2.0.0: Architecture refined for multi-tenancy and interconnected broker federation.

📋 Operational Quick Reference

Essential Command Matrix

bash

Configuration & Setup

python setup_assistant.py # Guided initial configuration make validate # Validate current environment state

Development Workflow

make dev-setup # Prepare local coding environment make test # Run all integrated tests make quality-check # Execute static analysis and formatting checks

Deployment

docker-compose up -d # Start stack via Docker Compose make docker-build # Create deployment Docker image

Monitoring

make health-check # Trigger the internal system health probe docker-compose logs -f # Stream all container logs

Critical File Locations

  • server.py - Core application executable.
  • .env - Primary runtime configuration file.
  • requirements.txt - Production dependencies list.
  • docker-compose.yml - Orchestration definition for containerized launch.
  • Makefile - Centralized collection of development and utility targets.

Developed by the Coreflux Community.

  • invoke_coreflux_action: Execute a defined action or function within the Coreflux environment.
  • decommission_coreflux_model: Remove a registered AI model definition.
  • decommission_all_models: Erase all registered AI model definitions.
  • decommission_all_actions: Erase all defined executable actions.
  • decommission_all_routes: Erase all configured data routing definitions.
  • enumerate_discovered_agents: List all Coreflux resources dynamically registered.
  • prompt_copilot_for_lot: Request LOT code synthesis from the Coreflux Copilot engine using descriptive natural language.

Troubleshooting Connection Instability

The gateway is configured for fault tolerance; it will initialize even if the MQTT broker is temporarily unavailable, allowing configuration changes via MCP tools before connection establishment.

Connection Health Tools

  • get_connection_status: Retrieves current connection state, including diagnostic suggestions.
  • configure_mqtt_endpoint: Dynamically redefine the target broker connection parameters without requiring a service restart.
  • initiate_mqtt_session: Explicitly connect to a specified broker using custom authentication parameters.
  • verify_broker_reachability: Test connectivity and trigger a reconnection attempt.
  • force_reestablish_mqtt: Immediately attempt to restore the primary MQTT linkage.

Remediation Sequence

If operational anomalies persist:

  1. Scrutinize your Claude configuration for accurate MQTT host/port settings.
  2. Validate network pathways between the gateway and the broker.
  3. Execute the setup assistant to confirm or update internal configurations: bash python setup_assistant.py

  4. Examine recent logs generated by the Claude host client for MCP interaction errors: bash # Linux/macOS log tailing tail -n 20 -f ~/Library/Logs/Claude/mcp.log # Windows PowerShell log retrieval Get-Content -Path "$env:USERPROFILE\AppData\Roaming\Claude\Logs\mcp.log" -Tail 20 -Wait

  5. Rerun the server directly, setting verbose logging: bash python server.py --mqtt-host localhost --mqtt-port 1883 --log-level DEBUG

References & Documentation Portal

  • DEPLOYMENT.md - Comprehensive production rollout guide.
  • SECURITY.md - Best practices for securing the deployment.
  • MCP Specification - Official documentation for the protocol standard.
  • Coreflux Platform Site - Information on the underlying automation framework.

Community Engagement

We encourage all contributions. Develop on a feature branch and target the development branch for submission via Pull Request, following our contribution protocols.

This software is governed by the Apache License, Version 2.0. Review the LICENSE file for full terms.

Assistance Channels

  • 📖 Reference Material: Consult the README, DEPLOYMENT.md, and SECURITY.md.
  • 🐛 Issue Tracking: Report defects and suggest enhancements via GitHub Issues.
  • 💬 Discussions: Engage with peers on the community forums.

WIKIPEDIA SUMMARY: Cloud computing represents a delivery model for computing resources—such as processing power, storage, and networking—that are provisioned on demand over a network (the 'cloud'), as defined by ISO standards. NIST outlined five core characteristics in 2011: On-demand self-service, Broad network access, Resource pooling, Rapid elasticity, and Measured service. The historical roots trace back to 1960s time-sharing systems, while the term 'cloud' for virtualized services gained traction around 1994, later popularized in 1996 business planning.

See Also

`