Coreflux-MQTT-MCP-Gateway
A robust server component facilitating secure, bidirectional connectivity between an MCP-aware AI assistant (like Claude) and a Coreflux MQTT message broker. It exposes Coreflux system capabilities—including commands, rules, and data routes—via standardized Model Context Protocol endpoints, supporting dynamic discovery and secure execution.
Author

CorefluxCommunity
Quick Info
Actions
Tags
Coreflux MQTT MCP Gateway Server
An enterprise-grade Model Context Protocol (MCP) bridge engineered to provide secure, highly scalable mediation for AI assistants interfacing with Coreflux's event-driven IoT infrastructure via MQTT.
🚀 Core Capabilities
Foundational Integration
- 🔌 Secure MQTT Nexus: Establishes resilient connections to Coreflux MQTT brokers, enforcing full Transport Layer Security (TLS).
- 🛠️ Full API Surface: Grants comprehensive access to registered Coreflux models, declarative rules, and data routing configurations.
- 🤖 Cognitive Code Synthesis: Leverages the Coreflux Copilot API for Language-of-Things (LOT) code generation based on natural language intent.
- 🔍 Capability Enumeration: Implements automatic introspection to list all available actions and endpoints.
- 🏥 Operational Oversight: Provides continuous system diagnostics and health status reporting.
Production Readiness
- 🔒 Security Hardening: Features extensive input scrubbing, validation mechanisms, and sensitive data sanitization within logs.
- ⚡ Asynchronous Throughput: Utilizes a non-blocking architecture with integrated queue management and traffic throttling.
- � Advanced Telemetry: Structured, rotatable logging with security filtering applied at the output stage.
- ✅ Configuration Integrity: A rigorous validation pipeline checks all runtime parameters and environment settings.
- 🧪 Testing Rigor: Includes a comprehensive unit and integration testing framework with mocking capabilities.
DevOps & Deployment
- 🐳 Containerization: Fully configured for deployment via Docker and Kubernetes orchestration.
- 🔄 Automated Pipelines: Continuous Integration/Continuous Deployment (CI/CD) workflows managed by GitHub Actions for testing and quality assurance.
- 📦 Developer Tooling: Integrated pre-commit hooks, code formatters (e.g., Black, isort), and documentation generation utilities.
- ⚙️ Streamlined Onboarding: An interactive setup utility simplifies initial configuration and validation.
- 📚 Comprehensive Guides: Extensive documentation covering API details, security protocols, and deployment topology.
Quick Start Guide
Docker Orchestration (Preferred Method)
-
Acquire Source and Initialize: bash git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server cp .env.example .env # Modify configuration parameters in the new .env file
-
Execute Deployment: bash docker-compose up -d
Prerequisites
- Python Interpreter (Version 3.11 or newer)
- Docker Engine (Recommended for operational deployment)
- Network access to a Coreflux MQTT broker instance
- Coreflux Copilot credential (optional, required for AI code features)
Option 1: Containerized Launch Sequence
(Refer to the Docker Deployment section above for steps 1 & 2)
- Verify Service Operation: bash docker-compose logs -f coreflux-mcp-server
Option 2: Local Development Environment Setup
-
Clone Repository: bash git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server
-
Dependency Installation: bash pip install -r requirements.txt # For development builds pip install -r requirements-dev.txt
-
Environment Setup: bash python setup_assistant.py # Invoke interactive setup tool # Alternative: cp .env.example .env && nano .env
-
Validation and Execution: bash make validate # Configuration integrity check make test # Execute test suite
-
Server Start: bash python server.py # OR using utility script make run
Comprehensive deployment directives are available in DEPLOYMENT.md.
⚙️ Configuration Management
Automated Configuration Assistant
The integrated setup utility automates and validates system configuration:
bash python setup_assistant.py
Assistant Focus Areas: - 🔧 MQTT endpoint parameters and credentials - 🔐 TLS certificate path specification - 🤖 Integration parameters for the Coreflux Copilot interface - 📝 Definition of logging destinations and verbosity - ✅ Final configuration validation and preliminary connectivity testing
When to use the assistant: - Initial system provisioning - Modifying established connection parameters - Diagnosing connectivity failures - Updating security credentials or certificate anchors
Environment Variable Definition
Replicate .env.example into .env to define runtime parameters:
bash
MQTT Broker Credentials
MQTT_BROKER=broker.example.io MQTT_PORT=8883 MQTT_USER=mcp_client MQTT_PASSWORD=secure_secret MQTT_USE_TLS=true
TLS Certificate Paths (Required if MQTT_USE_TLS is true)
MQTT_CA_CERT=/etc/certs/ca.pem
MQTT_CERT_FILE=/etc/certs/client.crt
MQTT_KEY_FILE=/etc/certs/client.key
Cognitive Agent API Key
DO_AGENT_API_KEY=ai_access_token_xyz
Logging Configuration
LOG_LEVEL=INFO LOG_FILE=/var/log/coreflux-mcp-gateway.log
Refer to SECRET_MANAGEMENT.md for advanced configuration structure and secure secrets handling.
🔌 Linking Claude via MCP
Integration in Claude Desktop
- Locate Claude Configuration:
- *NIX Systems:
~/Library/Application Support/Claude/claude_desktop_config.json -
Windows:
%USERPROFILE%\AppData\Roaming\Claude\claude_desktop_config.json -
Inject Server Definition:
{ "mcpServers": { "coreflux_bridge": { "command": "python", "args": ["/path/to/your/server.py"], "env": { "MQTT_BROKER": "broker.example.io", "MQTT_PORT": "8883", "MQTT_USER": "mcp_client", "MQTT_PASSWORD": "secure_secret", "MQTT_USE_TLS": "true", "DO_AGENT_API_KEY": "ai_access_token_xyz" } } } }
- Restart Claude Desktop to load the new server mapping.
Security Advisory: In production environments, avoid direct hardcoding of sensitive credentials within the Claude configuration file; instead, utilize secure environment variable injection methods.
Secure Configuration via Environment Variables
For superior credential hygiene, leverage environment placeholders:
{ "mcpServers": { "coreflux_bridge": { "command": "python", "args": ["/path/to/your/server.py"], "env": { "MQTT_BROKER": "${COREFLUX_MQTT_BROKER}", "MQTT_PORT": "${COREFLUX_MQTT_PORT}", "MQTT_USER": "${COREFLUX_MQTT_USER}", "MQTT_PASSWORD": "${COREFLUX_MQTT_PASSWORD}", "DO_AGENT_API_KEY": "${COREFLUX_AI_KEY}" } } } }
Connection Verification
Once configured, prompt Claude to confirm operational status:
Query the operational status and configuration details of the Coreflux MCP bridge server.
Success is indicated by Claude returning system health metrics and broker metadata.
🛠️ Exposed MCP Tools
The gateway service exposes the following capabilities to the connected AI agent:
Core MQTT Utilities
publish_data_payload- Transmit arbitrary data payloads to specified MQTT destinations, controlling Quality of Service (QoS) and message persistence.retrieve_broker_metadata- Obtain comprehensive diagnostics regarding the active MQTT connection state and configuration parameters.
Cognitive Services Interface
synthesize_lot_code- Engage the Coreflux Copilot engine to produce Language-of-Things (LOT) compliant automation logic from descriptive prompts.
System Health & Diagnostics
execute_deep_health_scan- Initiate an exhaustive diagnostic procedure across all integrated subsystems.
For the full contract definition, consult API_DOCUMENTATION.md.
🧪 Development & Quality Assurance
Initializing the Development Environment
-
Install Development Toolchain: bash pip install -r requirements-dev.txt
-
Activate Pre-commit Hooks: bash pre-commit install
-
Full Setup Routine: bash make dev-setup # Initializes environment, installs hooks, and prepares workspace
Test Execution Strategy
Execute the complete quality assurance matrix:
bash
Execute entire test suite
make test
Run tests with detailed coverage reporting
make test-coverage
Targeted testing
make test-unit # Unit tests only make test-integration # Integration tests only
Code Quality Enforcement
Automated tooling maintains code hygiene:
bash
Apply code formatting standards
make format
Execute static analysis tools (linters)
make lint
Security vulnerability scanning
make security-check
Static type verification
make type-check
Run all automated quality gates
make quality-check
Available Utility Commands for Daily Workflow:
bash
Setup and run cycle
make dev-setup # Prepare complete dev workspace make validate # Validate configuration integrity make run # Launch server after validation make run-debug # Launch server with verbose debugging output
Testing/Validation
make test # Execute all tests make test-coverage # Generate test coverage report make test-unit # Execute only unit tests make validate-config # Specific check for configuration files
Code Quality
make format # Enforce formatting (black/isort) make lint # Run linters (flake8, bandit, mypy) make security-check # Run security analysis (bandit) make type-check # Run static type checking (mypy)
Container Operations
make docker-build # Build the official Docker artifact make docker-run # Start the application within a container make docker-test # Execute test suite inside the container environment
Documentation
make docs # Generate Sphinx documentation artifacts make docs-serve # Serve generated documentation locally via HTTP
🔧 System Architecture Overview
Primary Modules
server.py- The main application entry point and tool registry handler.config_validator.py- Module responsible for parsing and verifying runtime configuration.message_processor.py- Manages asynchronous MQTT traffic flow, implementing rate limiting.enhanced_logging.py- Provides secure, structured logging services with file rotation.config_schema.py- Defines Pydantic models for guaranteed type-safe configuration structures.parser.py- Utility functions for input sanitization and data transformation.
Security Framework
- Injection Prevention: Rigorous sanitization applied across all externally sourced inputs.
- Data Masking: Automated scrubbing of sensitive fields within generated log records.
- Encryption: Mandatory TLS/SSL encryption utilized for all MQTT transport layers.
- Configuration Auditing: Comprehensive parameter verification prior to initialization.
- Principle of Least Privilege: Designed for execution with minimal necessary operational permissions.
Performance Optimizations
- Asynchronous I/O: Maximizes concurrency through non-blocking operations.
- Connection Lifespan Management: Efficient connection pooling for MQTT sessions.
- Traffic Shaping: Configurable rate limiting safeguards against ingestion spikes.
- Diagnostic Instrumentation: Real-time metrics collection for performance tracking.
📚 Documentation Index
- API Specification - Reference for all exposed tool contracts.
- Deployment Manual - Production hardening and orchestration instructions.
- Secret Handling Guide - Protocols for securing credentials.
- Configuration Reference - Exhaustive listing of all environment variables.
🐳 Docker Deployment Instructions
Rapid Containerized Initialization
bash
Fetch source code and navigate
git clone https://github.com/CorefluxCommunity/Coreflux-MQTT-MCP-Server.git cd Coreflux-MQTT-MCP-Server
Configure environment file
cp .env.example .env nano .env # Input broker details and keys
Start detached services
docker-compose up -d
Monitor output
docker-compose logs -f coreflux-mcp-server
Execute health check tool via running container
docker-compose exec coreflux-mcp-server python -c " import os os.system('python server.py --health-check') "
Advanced Production Deployment
Refer to DEPLOYMENT.md for critical production considerations, including:
- Optimized multi-stage Docker build artifacts
- Kubernetes manifest generation
- Integration with service meshes and load balancers
- Security baseline hardening procedures
🔑 Coreflux Copilot AI Synergy
The gateway fully integrates the Coreflux Copilot API for advanced AI-driven automation support:
Setup Procedure
- Acquire Key: Obtain the necessary API access token from the Coreflux Copilot portal.
- Credential Injection: Set the key via: bash # Method A: Direct file modification echo "DO_AGENT_API_KEY=your_ai_token_here" >> .env
# Method B: Shell export export DO_AGENT_API_KEY=your_ai_token_here
Value Proposition
- LOT Code Blueprinting: Rapid creation of Language-of-Things logic from plain English descriptions.
- Automation Consultation: Expert advice on designing efficient Coreflux workflows.
- Implementation Guidance: Recommendations on best practices for Coreflux resource utilization.
- Diagnostic Support: AI-assisted debugging and performance tuning suggestions.
Example Interaction
Prompt Claude to generate automation logic:
Develop LOT script to monitor ambient light levels and activate dimmers if lux readings exceed 800 for five consecutive minutes.
Prompt Claude for workflow assistance:
Outline the optimal structure for routing high-frequency sensor telemetry into a time-series database instance.
🚀 Advanced Operational Features
High-Throughput Asynchronous Pipeline
The core message handling mechanism is designed for resilience and speed:
- Non-Blocking Dispatch: Ensures responsiveness by processing incoming requests concurrently.
- Backpressure Control: Implements adaptive rate limiting to prevent broker saturation.
- Queue Resilience: Sophisticated management of pending message queues.
- Performance Visibility: Exposure of real-time throughput and latency metrics.
Enterprise-Grade Logging Architecture
Logging capabilities include features critical for audit and debugging:
- JSON Serialization: Ensures logs are immediately machine-readable.
- Automated Rollover: Manages disk consumption via timestamped file rotation.
- PII Scrubbing: Automated removal/masking of sensitive payload elements.
- Multi-Target Output: Support for console output, persistent files, and centralized syslog ingestion.
Configuration Verification Engine
A mandatory validation sequence checks all system prerequisites:
- Environment Checklist: Confirms existence and correctness of all necessary environment variables.
- File Access Checks: Verifies read permissions for TLS certificates and configuration paths.
- Network Probing: Performs preliminary connectivity tests against the configured MQTT endpoint.
- Dependency Checks: Assesses the availability and responsiveness of external services (e.g., Copilot API).
🛡️ Security Posture & Governance
Core Security Mechanisms
- Input Validation: Strict schema enforcement and sanitization on all received MCP requests.
- End-to-End Encryption: Mandated TLS protocol usage for communication channels.
- Credential Isolation: Secure handling and isolation of secrets.
- Activity Auditing: Comprehensive logging of security-relevant events.
- Privilege Separation: Operational execution within a least-privilege context.
Compliance Alignment
This infrastructure is built to support adherence to common regulatory frameworks:
- SOC 2 - Security monitoring and control implementation.
- GDPR - Mechanisms for data handling and access control.
- HIPAA - Applicable controls for safeguarding protected health information (contingent on deployment configuration).
Detailed security methodology is documented in SECRET_MANAGEMENT.md.
📊 Operational Monitoring & Health Probes
Health Check Tooling
Utilize the specialized execute_deep_health_scan tool for diagnostic purposes:
bash
Command-line health check invocation
python server.py --health-check
AI-driven check request via Claude:
"Run a comprehensive diagnostic assessment on the Coreflux MCP Gateway and report status."
Key Performance Indicators (KPIs)
The service exposes critical operational metrics:
- MQTT Health: Real-time connection status to the broker.
- Throughput: Messages processed per second, queue depth.
- Resource Utilization: System metrics (CPU/Memory consumption).
- Error Rates: Frequency of failed operations and exception types.
- AI Latency: Response times from the Copilot API endpoint.
Alerting Framework
Integration hooks allow configuration for alerts based on:
- MQTT connection interruptions or degradation.
- Sustained high error rates.
- Resource saturation thresholds.
- Security policy violations.
🤝 Community Contribution Guidelines
We encourage external collaboration to enhance this project. Review our contribution pathway:
Development Lifecycle
- Fork the primary repository.
- Branch Out: Create a dedicated feature branch (e.g.,
git checkout -b feat/new-tooling). - Setup: Install development packages (
pip install -r requirements-dev.txt) and hooks (pre-commit install). - Develop: Implement changes, ensuring corresponding tests are added or updated.
- Validate Quality: Execute quality gates (
make quality-check). - Commit: Use conventional commit messages (
git commit -am 'feat: Add new capability X'). - Submit: Push to your fork and open a Pull Request targeting the main branch.
Adherence to Standards
- Mandatory adherence to Python 3.11+ specifications.
- Use of type annotations across all interfaces.
- Target test coverage exceeding 90%.
- Security analysis via Bandit.
- Code formatting enforced by Black and isort.
- Thorough documentation for all public interfaces.
📄 Licensing
This software is distributed under the terms of the Apache License 2.0.
🆘 Support & Incident Resolution
Frequent Failure Analysis
Connection Refused Errors: Often manifests as Error: MQTT broker connection refused.
* Action: Verify host/port accuracy and firewall rules.
* Action: Confirm the broker is actively listening on the specified interface.
Authentication Failures: Indicated by Error: Authorization denied on connection.
* Action: Re-validate provided credentials against the broker's access control list (ACL).
* Action: Verify Coreflux Copilot API key status.
TLS Negotiation Errors: Seen as Error: SSL/TLS handshake failure.
* Action: Check certificate file paths and validity timelines.
* Action: Ensure client/server TLS protocol versions are compatible.
Enhanced Debugging Mode
For deep inspection, elevate logging verbosity:
bash export LOG_LEVEL=DEBUG python server.py
Channels for Assistance
- Bug Reporting/Feature Requests: Utilize the GitHub Issues Tracker.
- General Queries: Engage in the Community Discussions Forum.
- Security Concerns: Report vulnerabilities directly to security@coreflux.org.
🗺️ Future Development Trajectory
Current Milestone: Version 1.0.0 (Complete) ✅
- ✅ Established core MQTT interoperability.
- ✅ Functional Copilot AI integration layer.
- ✅ Implemented production-grade security controls.
- ✅ Comprehensive test suite coverage.
- ✅ Deployment artifacts finalized.
Upcoming Iterations
- v1.1.0: Advanced real-time metrics exposition and integration with external monitoring systems.
- v1.2.0: Expansion of supported Coreflux native API calls exposed to MCP.
- v1.3.0: Introduction of WebSocket backchannel support for continuous streaming data.
- v2.0.0: Architecture refined for multi-tenancy and interconnected broker federation.
📋 Operational Quick Reference
Essential Command Matrix
bash
Configuration & Setup
python setup_assistant.py # Guided initial configuration make validate # Validate current environment state
Development Workflow
make dev-setup # Prepare local coding environment make test # Run all integrated tests make quality-check # Execute static analysis and formatting checks
Deployment
docker-compose up -d # Start stack via Docker Compose make docker-build # Create deployment Docker image
Monitoring
make health-check # Trigger the internal system health probe docker-compose logs -f # Stream all container logs
Critical File Locations
server.py- Core application executable..env- Primary runtime configuration file.requirements.txt- Production dependencies list.docker-compose.yml- Orchestration definition for containerized launch.Makefile- Centralized collection of development and utility targets.
Developed by the Coreflux Community.
invoke_coreflux_action: Execute a defined action or function within the Coreflux environment.decommission_coreflux_model: Remove a registered AI model definition.decommission_all_models: Erase all registered AI model definitions.decommission_all_actions: Erase all defined executable actions.decommission_all_routes: Erase all configured data routing definitions.enumerate_discovered_agents: List all Coreflux resources dynamically registered.prompt_copilot_for_lot: Request LOT code synthesis from the Coreflux Copilot engine using descriptive natural language.
Troubleshooting Connection Instability
The gateway is configured for fault tolerance; it will initialize even if the MQTT broker is temporarily unavailable, allowing configuration changes via MCP tools before connection establishment.
Connection Health Tools
get_connection_status: Retrieves current connection state, including diagnostic suggestions.configure_mqtt_endpoint: Dynamically redefine the target broker connection parameters without requiring a service restart.initiate_mqtt_session: Explicitly connect to a specified broker using custom authentication parameters.verify_broker_reachability: Test connectivity and trigger a reconnection attempt.force_reestablish_mqtt: Immediately attempt to restore the primary MQTT linkage.
Remediation Sequence
If operational anomalies persist:
- Scrutinize your Claude configuration for accurate MQTT host/port settings.
- Validate network pathways between the gateway and the broker.
-
Execute the setup assistant to confirm or update internal configurations: bash python setup_assistant.py
-
Examine recent logs generated by the Claude host client for MCP interaction errors: bash # Linux/macOS log tailing tail -n 20 -f ~/Library/Logs/Claude/mcp.log # Windows PowerShell log retrieval Get-Content -Path "$env:USERPROFILE\AppData\Roaming\Claude\Logs\mcp.log" -Tail 20 -Wait
-
Rerun the server directly, setting verbose logging: bash python server.py --mqtt-host localhost --mqtt-port 1883 --log-level DEBUG
References & Documentation Portal
- DEPLOYMENT.md - Comprehensive production rollout guide.
- SECURITY.md - Best practices for securing the deployment.
- MCP Specification - Official documentation for the protocol standard.
- Coreflux Platform Site - Information on the underlying automation framework.
Community Engagement
We encourage all contributions. Develop on a feature branch and target the development branch for submission via Pull Request, following our contribution protocols.
Legal Notice
This software is governed by the Apache License, Version 2.0. Review the LICENSE file for full terms.
Assistance Channels
- 📖 Reference Material: Consult the README, DEPLOYMENT.md, and SECURITY.md.
- 🐛 Issue Tracking: Report defects and suggest enhancements via GitHub Issues.
- 💬 Discussions: Engage with peers on the community forums.
WIKIPEDIA SUMMARY: Cloud computing represents a delivery model for computing resources—such as processing power, storage, and networking—that are provisioned on demand over a network (the 'cloud'), as defined by ISO standards. NIST outlined five core characteristics in 2011: On-demand self-service, Broad network access, Resource pooling, Rapid elasticity, and Measured service. The historical roots trace back to 1960s time-sharing systems, while the term 'cloud' for virtualized services gained traction around 1994, later popularized in 1996 business planning.
