KubeControl Nexus
This utility functions as a robust orchestration layer for Kubernetes clusters, analogous to an SDK-level abstraction over kubectl or client-go mechanisms. It facilitates efficient life-cycle management of cluster resources, encompassing creation, modification, deletion, and retrieval across numerous environments. The tool inherently manages Custom Resource Definitions (CRDs) and introduces streamlined processes for operational oversight and log retrieval, mirroring the need for coordination in complex undertakings like project management.
Author

weibaohui
Quick Info
Actions
Tags
Introduction
This tool streamlines resource administration within Kubernetes environments. Managing resources effectively is akin to organizing activities within a larger project plan; each operation is a discrete activity requiring precise execution. This utility helps organize these technical activities, ensuring necessary coordination for successful project outcomes.
Core Capabilities
- Simplified Operations: Provides comprehensive functions for resource manipulation, including initialization, updates, removal, and querying for both native and custom types.
- Multi-Cluster Coordination: Facilitates management across several Kubernetes environments through cluster registration, supporting platforms like AWS EKS.
- MCP Integration: Supports multi-cluster management protocols (MCP) using both standard input/output (stdio) and Server-Sent Events (SSE) modes. It incorporates numerous internal tools for extensive operational flexibility.
- Namespace Flexibility: Permits resource operations spanning multiple namespaces using specific selection methods.
- Fluent Interface: Offers method chaining, making resource interaction flows intuitive and concise.
- Custom Resource Support: Seamlessly interacts with Custom Resource Definitions (CRDs) for dynamic resource handling.
- Extensibility via Callbacks: Supports a callback architecture, allowing business logic integration decoupled from core Kubernetes interactions.
- In-Pod File Utility: Enables remote file system operations directly within Pod containers, such as uploading or retrieving files.
- Accelerated Actions: Includes wrappers for frequent operational tasks like Deployment restarts, scaling, and state toggling.
- SQL Query Interface: Allows resource querying using familiar SQL syntax against cluster state.
- Query Caching: Incorporates optional caching for high-frequency or batch retrieval scenarios to boost performance. Filtering criteria are unaffected by caching mechanisms.
Examples Blueprint
KubeControl CLI is a lightweight administration utility built upon this framework, integrating with other frontend technologies.
1. Acquisition: Retrieve the current version from https://github.com/weibaohui/kom.
2. Execution: Launch using the ./k8m command to access the local interface at http://127.0.0.1:3618.
Setup
import (
"github.com/weibaohui/kom"
"github.com/weibaohui/kom/callbacks"
)
func main() {
// Essential: Register necessary callbacks first.
callbacks.RegisterInit()
// Register cluster connections.
defaultKubeConfig := os.Getenv("KUBECONFIG")
if defaultKubeConfig == "" {
defaultKubeConfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
}
_, _ = kom.Clusters().RegisterInCluster()
_, _ = kom.Clusters().RegisterByPathWithID(defaultKubeConfig, "default")
kom.Clusters().Show()
// Further operational logic follows here.
}
Usage Examples
0. Multi-Cluster MCP Coordination
Supports both stdio and sse communication protocols. Provides methods for arbitrary resource querying, listing, deletion, description, and Pod log access across multiple managed environments.
1. Code Integration
// Initiate the MCP Server with one line of code
mcp.RunMCPServer("kom mcp server", "0.0.1", 9096)
2. Compilation
# Start from source code
go build main.go
// Compile into the KubeControl Nexus executable
3. Launch
Two operational modes are available post-compilation: stdio and sse. Kubernetes interaction defaults to the KUBECONFIG environment variable setting.
# Set the KUBECONFIG environment variable
export KUBECONFIG = /Users/xxx/.kube/config
# Execute the tool
./kom
# Access point for MCP Server
http://IP:9096/sse
When executed, the binary functions as a stdio tool. The HTTP endpoint enables sse mode communication.
4. Integration into MCP Tools
This tool accommodates integration via stdio or sse methods. It is suitable for embedding within broader MCP tooling, such as Cursor, Claude Desktop (stdio only), and Windsurf interfaces. UI configuration panels may also facilitate its setup.
{
"mcpServers": {
"kom": {
"type": "sse",
"url": "http://IP:9096/sse"
}
}
}
{
"mcpServers": {
"k8m": {
"command": "path/to/kom",
"args": []
}
}
}
MCP Tool Registry (59 Functions)
| Category | Method | Description |
|---|---|---|
| Cluster Management (1) | list_k8s_clusters |
Enumerates all registered Kubernetes environments. |
| DaemonSet Management (1) | restart_k8s_daemonset |
Restarts a DaemonSet using cluster, namespace, and name identifiers. |
| Deployment Management (12) | scale_k8s_deployment |
Adjusts Deployment replica count via cluster, namespace, and name. |
restart_k8s_deployment |
Reinitializes a Deployment identified by cluster, namespace, and name. | |
stop_k8s_deployment |
Sets Deployment replicas count to zero. | |
restore_k8s_deployment |
Reverts Deployment replica count to a previously stored value. | |
update_k8s_deployment_image_tag |
Modifies the container image tag within a Deployment specification. | |
get_k8s_deployment_rollout_history |
Retrieves the history log for Deployment revisions. | |
undo_k8s_deployment_rollout |
Executes a rollback action on the Deployment revision history. | |
pause_k8s_deployment_rollout |
Suspends the active Deployment update process. | |
resume_k8s_deployment_rollout |
Resumes a paused Deployment update sequence. | |
get_k8s_deployment_rollout_status |
Fetches the current status of a Deployment revision update. | |
get_k8s_deployment_hpa_list |
Lists associated Horizontal Pod Autoscalers for a Deployment. | |
list_k8s_deploy_event |
Retrieves events related to a specific Deployment object. | |
| Dynamic Resource Management (incl. CRD, 8) | get_k8s_resource |
Fetches detailed information for a Kubernetes resource by identifier. |
describe_k8s_resource |
Provides descriptive output for a specified Kubernetes resource. | |
delete_k8s_resource |
Removes a Kubernetes resource based on its location and name. | |
list_k8s_resource |
Retrieves a collection of Kubernetes resources filtered by cluster and type. | |
annotate_k8s_resource |
Adds or removes annotations on a specified Kubernetes resource. | |
label_k8s_resource |
Applies or deletes labels for a selected Kubernetes resource. | |
patch_k8s_resource |
Applies a partial modification to a Kubernetes resource object. | |
GetDynamicResource |
Retrieves an arbitrary dynamic resource instance. | |
| Node Management (11) | taint_k8s_node |
Applies a taint restriction to a specified cluster node. |
untaint_k8s_node |
Removes an existing taint from a cluster node. | |
cordon_k8s_node |
Marks a node as unschedulable for new Pods. | |
uncordon_k8s_node |
Restores scheduling availability on a designated node. | |
drain_k8s_node |
Evicts existing Pods and prevents future scheduling on a node. | |
get_k8s_node_ip_usage |
Queries the IP resource consumption metrics for a node. | |
list_k8s_node |
Fetches the list of available cluster Nodes. | |
get_k8s_top_node |
Retrieves ranked statistics for Node CPU and memory utilization. | |
get_k8s_pod_count_running_on_node |
Tallies the quantity of active Pods scheduled on a specific node. | |
get_k8s_node_resource_usage |
Summarizes current resource consumption statistics for a node. | |
TaintNodeTool |
Utility for applying node taint modifications. | |
| Event Management (1) | list_k8s_event |
Lists Kubernetes events scoped by cluster and namespace. |
| Ingress Management (1) | set_default_k8s_ingressclass |
Designates a specific IngressClass as the cluster default. |
| Pod Management (18) | run_command_in_k8s_pod |
Executes an arbitrary command within the context of a running Pod. |
list_k8s_pod_event |
Retrieves events associated with a particular Pod object. | |
list_files_in_k8s_pod |
Generates a file listing for a specified directory path inside a Pod. | |
list_all_pod_files |
Recursively lists every file within a designated Pod directory path. | |
delete_k8s_pod |
Terminates a running Pod instance. | |
delete_pod_file |
Removes a specified file from within the Pod's filesystem. | |
get_k8s_pod_linked_env |
Extracts runtime environment variables configured for a Pod. | |
get_pod_linked_env_from_yaml |
Derives environment variables directly from the Pod specification YAML. | |
get_k8s_pod_linked_services |
Identifies associated Service objects linked to a Pod. | |
get_pod_linked_ingresses |
Retrieves Ingress resources that route traffic to a Pod. | |
get_pod_linked_endpoints |
Finds the backing Endpoints defining network connectivity for a Pod. | |
list_k8s_pod |
Fetches the current collection of Pod resources. | |
get_k8s_top_pod |
Returns ranked metrics for Pod CPU and memory utilization. | |
ListPodFilesTool |
Tool for listing files within a Pod directory structure. | |
ListAllPodFilesTool |
Tool for recursively listing all files within a Pod path. | |
DeletePodFileTool |
Tool to erase a file from a Pod's filesystem. | |
UploadPodFileTool |
Tool for transferring local files into a Pod. | |
GetPodLogsTool |
Tool for accessing standard output and error streams from a Pod. | |
describe_k8s_pod |
Generates detailed descriptive output for a container group (Pod). | |
| Storage Management (3) | set_k8s_default_storageclass |
Designates a specific StorageClass as the cluster default provisioner. |
get_k8s_storageclass_pvc_count |
Counts the PersistentVolumeClaims bound to a given StorageClass. | |
get_k8s_storageclass_pv_count |
Tallies the PersistentVolumes associated with a specific StorageClass. | |
| YAML Management (2) | apply_k8s_yaml |
Creates or modifies cluster resources defined in a YAML manifest. |
delete_k8s_yaml |
Removes cluster resources specified within a YAML document. |
Launch Command
mcp.RunMCPServer("kom mcp server", "0.0.1", 3619)
AI Tool Integration
Claude Desktop
- Access the Claude Desktop configuration panel.
- Locate the MCP Server configuration area.
- Activate the SSE event monitoring functionality.
- Verify the established connection status.
{
"mcpServers": {
"k8m": {
"command": "path/to/kom",
"args": []
}
}
}
Cursor
- Navigate to the Cursor settings interface.
- Find the extension service configuration option.
- Supports sse by providing
http://localhost:9096/sse, or stdio by providing the path to the Nexus executable.
Windsurf
- Open the central configuration access point.
- Specify the API server network address.
- Supports sse via
http://localhost:9096/sseor stdio via the Nexus executable path.
cherry studio
- Click the settings icon located in the lower left corner.
- Navigate to the MCP Server section.
- Select the option to add a new server.
- Configuration accepts sse (
http://localhost:9096/sse) or stdio (Nexus executable path).
1. Multi-Cluster Coordination
Registering Multiple Environments
// Register the in-cluster environment, assigned the identifier InCluster
kom.Clusters().RegisterInCluster()
// Register two external clusters, named 'orb' and 'docker-desktop' respectively
kom.Clusters().RegisterByPathWithID("/Users/kom/.kube/orb", "orb")
kom.Clusters().RegisterByPathWithID("/Users/kom/.kube/config", "docker-desktop")
// Registering a cluster named 'default' enables its use via kom.DefaultCluster().
kom.Clusters().RegisterByPathWithID("/Users/kom/.kube/config", "default")
Registering AWS EKS Environments
// Define configuration parameters for EKS cluster access
config := aws.EKSAuthConfig{
AccessKey: "XXX", // AWS Access Key ID credential
SecretAccessKey: "yyy", // AWS Secret Access Key credential
Region: "us-east-1", // AWS geographical region identifier
ClusterName: "k8m", // The specific EKS cluster name
}
// Attempt registration of the AWS EKS cluster
_, err := kom.Clusters().RegisterAWSCluster(config)
if err != nil {
fmt.Printf("EKS Cluster Registration Failure: %v", err)
return
}
// Subsequently utilize the registered EKS cluster
var pods []corev1.Pod
clusterID := fmt.Sprintf("%s-%s", config.Region, config.ClusterName) // Cluster ID format: {Region}-{ClusterName}
err = kom.Cluster(clusterID).Resource(&corev1.Pod{}).Namespace("kube-system").List(&pods).Error
AWS EKS Cluster Registration Notes:
- AccessKey: AWS credential identifier.
- SecretAccessKey: AWS secret credential.
- Region: AWS geographical zone, e.g., us-east-1.
- ClusterName: The designated EKS cluster name.
- RoleARN: (Optional) IAM Role ARN for assuming roles in cross-account scenarios.
- The system automatically generates an ID post-registration in the {Region}-{ClusterName} format.
- Cross-account access is enabled via IAM role assumption mechanisms.
- AWS credentials are held only in memory and purged upon application termination.
Display Registered Environments
kom.Clusters().Show()
Selecting the Default Environment
// Accessing the default cluster; fetches the instance identified as "InCluster".
// If absent, it attempts to retrieve the instance named "default".
// If neither exists, any available cluster instance from the list is returned.
var pods []corev1.Pod
err = kom.DefaultCluster().Resource(&corev1.Pod{}).Namespace("kube-system").List(&pods).Error
Selecting a Specific Environment
// Target the 'orb' cluster to query Pods within the kube-system namespace
var pods []corev1.Pod
err = kom.Cluster("orb").Resource(&corev1.Pod{}).Namespace("kube-system").List(&pods).Error
2. CRUD Operations and Watch for Native Resources
Define a Deployment object for subsequent resource management activities.
var item v1.Deployment
var items []v1.Deployment
Create a Resource Instance
item = v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
Namespace: "default",
},
Spec: v1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "test", Image: "nginx:1.14.2"},
},
},
},
},
}
err := kom.DefaultCluster().Resource(&item).Create(&item).Error
GET Query for a Specific Resource
// Retrieve the Deployment named 'nginx' in the 'default' namespace
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Get(&item).Error
// Retrieve the Deployment, utilizing a 5-second cache for the query
// Caching is recommended for batch or high-frequency operations.
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").WithCache(5 * time.Second).Get(&item).Error
LIST Query for Resource Collections
// Retrieve a list of Deployments within the 'default' namespace
err := kom.DefaultCluster().Resource(&item).Namespace("default").List(&items).Error
// Retrieve Deployments across 'default' and 'kube-system' namespaces
err := kom.DefaultCluster().Resource(&item).Namespace("default","kube-system").List(&items).Error
// Retrieve Deployments across all cluster namespaces
err := kom.DefaultCluster().Resource(&item).Namespace("*").List(&items).Error
err := kom.DefaultCluster().Resource(&item).AllNamespace().List(&items).Error
// Apply a 5-second cache; this applies to list operations.
err := kom.DefaultCluster().Resource(&item).WithCache(5 * time.Second).List(&nodeList).Error
LIST Query Utilizing Label Selectors
// Retrieve Deployments in 'default' namespace matching the label app=nginx
err := kom.DefaultCluster().Resource(&item).Namespace("default").WithLabelSelector("app=nginx").List(&items).Error
LIST Query Utilizing Multiple Labels
// Retrieve Deployments matching both labels app=nginx and m=n in 'default' namespace
err := kom.DefaultCluster().Resource(&item).Namespace("default").WithLabelSelector("app=nginx").WithLabelSelector("m=n").List(&items).Error
LIST Query Utilizing Field Selectors
// Retrieve Deployments in 'default' namespace matching field metadata.name=test-deploy
// Field selectors generally support native definition fields like metadata.name, metadata.labels, spec.nodeName, status.phase, etc.
err := kom.DefaultCluster().Resource(&item).Namespace("default").WithFieldSelector("metadata.name=test-deploy").List(&items).Error
Paged Resource Retrieval
var list []corev1.Pod
var total int64
sql := "select * from pod where metadata.namespace=? or metadata.namespace=? order by metadata.creationTimestamp desc "
err := kom.DefaultCluster().Sql(sql, "kube-system", "default").
FillTotalCount(&total).
Limit(5).
Offset(10).
List(&list).Error
fmt.Printf("total %d\n", total) // Returns total count, e.g., 480
fmt.Printf("Count %d\n", len(list)) // Returns item count, equals limit (5)
Modifying Resource Content
// Retrieve the Deployment named 'nginx' for modification
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Get(&item).Error
if item.Spec.Template.Annotations == nil {
item.Spec.Template.Annotations = map[string]string{}
}
item.Spec.Template.Annotations["kom.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
err = kom.DefaultCluster().Resource(&item).Update(&item).Error
PATCH Resource Update
// Apply a Patch update; adds a label and sets replica count to 5 for Deployment 'nginx'
patchData := `{
"spec": {
"replicas": 5
},
"metadata": {
"labels": {
"new-label": "new-value"
}
}
}`
err := kom.DefaultCluster().Resource(&item).Patch(&item, types.StrategicMergePatchType, patchData).Error
Deleting a Resource Instance
// Remove the Deployment named 'nginx'
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Delete().Error
Force Deleting a Resource Instance
// Forcefully delete the Deployment named 'nginx'
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").ForceDelete().Error
Retrieving Resources by GVK (General Purpose)
// Fetch resources by Group-Version-Kind specification
var list []corev1.Event
err := kom.DefaultCluster().GVK("events.k8s.io", "v1", "Event").Namespace("default").List(&list).Error
Watching Resource Changes
// Monitor changes for Pod resources in the 'default' namespace
var watcher watch.Interface
var pod corev1.Pod
err := kom.DefaultCluster().Resource(&pod).Namespace("default").Watch(&watcher).Error
if err != nil {
fmt.Printf("Create Watcher Error %v", err)
return err
}
go func() {
defer watcher.Stop()
for event := range watcher.ResultChan() {
err := kom.DefaultCluster().Tools().ConvertRuntimeObjectToTypedObject(event.Object, &pod)
if err != nil {
fmt.Printf("Could not cast object to *v1.Pod type: %v", err)
return
}
// Process the event type
switch event.Type {
case watch.Added:
fmt.Printf("Added Pod [ %s/%s ]\n", pod.Namespace, pod.Name)
case watch.Modified:
fmt.Printf("Modified Pod [ %s/%s ]\n", pod.Namespace, pod.Name)
case watch.Deleted:
fmt.Printf("Deleted Pod [ %s/%s ]\n", pod.Namespace, pod.Name)
}
}
}()
Describe Query for a Specific Resource
// Describe the Deployment named 'nginx' in the 'default' namespace
var describeResult []byte
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Describe(&item).Error
fmt.Printf("describeResult: %s", describeResult)
3. YAML Manifest Creation, Update, Deletion
yaml := `apiVersion: v1
kind: ConfigMap
metadata:
name: example-config
namespace: default
data:
key: value
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: example-deployment
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: example
template:
metadata:
labels:
app: example
spec:
containers:
- name: example-container
image: nginx
`
// Initial Apply creates resources; returns execution status for each item.
results := kom.DefaultCluster().Applier().Apply(yaml)
// Subsequent Apply invokes updates based on resource status.
results = kom.DefaultCluster().Applier().Apply(yaml)
// Deletion operation based on the provided YAML definitions.
results = kom.DefaultCluster().Applier().Delete(yaml)
4. Pod Operations
Retrieving Logs
// Obtain logs from a specific Pod
var stream io.ReadCloser
err := kom.DefaultCluster().Namespace("default").Name("random-char-pod").Ctl().Pod().ContainerName("container").GetLogs(&stream, &corev1.PodLogOptions{}).Error
reader := bufio.NewReader(stream)
line, _ := reader.ReadString('\n')
fmt.Println(line)
Executing Commands
Executes a command inside a Pod, triggering specific callback types.
// Run 'ps -ef' command within the Pod context
var execResult string
err := kom.DefaultCluster().Namespace("default").Name("random-char-pod").Ctl().Pod().ContainerName("container").Command("ps", "-ef").ExecuteCommand(&execResult).Error
fmt.Printf("execResult: %s", execResult)
Port Forwarding
err := kom.DefaultCluster().Resource(&v1.Pod{}).
Namespace("default").
Name("nginx-deployment-f576985cc-7czqr").
Ctl().Pod().
ContainerName("nginx").
PortForward("20088", "80", stopCh).Error
// This forwards traffic from the local machine's port 20088 to the Pod's port 80.
Streaming Command Execution
Executes a command with streaming I/O, triggering StreamExec callbacks, suitable for continuous output commands like ping.
cb := func(data []byte) error {
fmt.Printf("Data %s\n", string(data))
return nil
}
err := kom.DefaultCluster().Namespace("kube-system").Name("traefik-d7c9c5778-p9nf4").Ctl().Pod().ContainerName("traefik").Command("ping", "127.0.0.1").StreamExecute(cb, cb).Error
//Example Output:
//Data PING 127.0.0.1 (127.0.0.1): 56 data bytes
//Data 64 bytes from 127.0.0.1: seq=0 ttl=42 time=0.023 ms
//Data 64 bytes from 127.0.0.1: seq=1 ttl=42 time=0.011 ms
//Data 64 bytes from 127.0.0.1: seq=2 ttl=42 time=0.012 ms
//Data 64 bytes from 127.0.0.1: seq=3 ttl=42 time=0.016 ms
File Listing
// List files within the /etc directory of the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").ListFiles("/etc")
Recursive File Listing (Including Hidden)
// List all files, including hidden ones, within the /etc directory of the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").ListAllFiles("/etc")
File Download
// Download the /etc/hosts file from the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").DownloadFile("/etc/hosts")
Tar Compressed File Download
// Download /etc/hosts, packaging it as a tar archive before retrieval
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").DownloadTarFile("/etc/hosts")
File Upload
// Save text content to /etc/demo.txt inside the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").SaveFile("/etc/demo.txt", "txt-context")
// Upload a file from the host OS (os.File type) to the /etc/ directory in the Pod
file, _ := os.Open(tempFilePath)
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").UploadFile("/etc/", file)
File Deletion
// Erase the specified file located at /etc/xyz within the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").DeleteFile("/etc/xyz")
Retrieving Related Resources - Services
// Fetch Service objects associated with the Pod
svcs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedService()
for _, svc := range svcs {
fmt.Printf("service name %v\n", svc.Name)
}
Retrieving Related Resources - Ingress
// Fetch Ingress objects associated with the Pod
ingresses, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedIngress()
for _, ingress := range ingresses {
fmt.Printf("ingress name %v\n", ingress.Name)
}
Retrieving Related Resources - PVC
// Fetch PersistentVolumeClaim objects linked to the Pod
pvcs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedPVC()
for _, pvc := range pvcs {
fmt.Printf("pvc name %v\n", pvc.Name)
}
Retrieving Related Resources - PV
// Fetch PersistentVolume objects linked to the Pod
pvs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedPV()
for _, pv := range pvs {
fmt.Printf("pv name %v\n", pv.Name)
}
Retrieving Related Resources - Endpoints
// Fetch Endpoints objects associated with the Pod
endpoints, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedEndpoints()
for _, endpoint := range endpoints {
fmt.Printf("endpoint name %v\n", endpoint.Name)
}
Retrieving Related Resources - Runtime Environment Variables
Obtains configuration data by executing the 'env' command inside the Pod.
envs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedEnv()
for _, env := range envs {
fmt.Printf("env %s %s=%s\n", env.ContainerName, env.EnvName, env.EnvValue)
}
Retrieving Related Resources - Defined Environment Variables
Derives environment variable configurations directly from the Pod definition.
envs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedEnvFromPod()
for _, env := range envs {
fmt.Printf("env %s %s=%s\n", env.ContainerName, env.EnvName, env.EnvValue)
}
Retrieving Related Resources - Potential Nodes
Returns a list of viable nodes based on Pod configuration (NodeSelector, Affinity, Tolerations, NodeName). It currently excludes runtime scheduling factors like CPU/memory pressure.
nodes, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedNode()
for _, node := range nodes {
fmt.Printf("reason:%s\t node name %s\n", node.Reason, node.Name)
}
5. CRUD Operations and Watch for Custom Resources (CRD)
Operations are identical to native resources when using unstructured data, requiring specification of Group, Version, and Kind.
kom.DefaultCluster().GVK(group, version, kind) replaces kom.DefaultCluster().Resource(interface{}).
For simplification, kom.DefaultCluster().CRD(group, version, kind) is provided.
First, define a generic object structure to capture CRD responses.
var item unstructured.Unstructured
Creating a CustomResourceDefinition (CRD)
yaml := `apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: crontabs.stable.example.com
spec:
group: stable.example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
cronSpec:
type: string
image:
type: string
replicas:
type: integer
scope: Namespaced
names:
plural: crontabs
singular: crontab
kind: CronTab
shortNames:
- ct`
result := kom.DefaultCluster().Applier().Apply(yaml)
Creating an Instance of the CRD
item = unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"metadata": map[string]interface{}{
"name": "test-crontab",
"namespace": "default",
},
"spec": map[string]interface{}{
"cronSpec": "* * * * */8",
"image": "test-crontab-image",
},
},
}
er := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace(item.GetNamespace()).Name(item.GetName()).Create(&item).Error
GET Query for a Single CR Instance
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(item.GetName()).Namespace(item.GetNamespace()).Get(&item).Error
LIST Query for CR Instances
var crontabList []unstructured.Unstructured
// Query CronTab resources in the 'default' namespace
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace(crontab.GetNamespace()).List(&crontabList).Error
// Query CronTab resources across all namespaces
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").AllNamespace().List(&crontabList).Error
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace("*").List(&crontabList).Error
Updating a CR Instance
patchData := `{
"spec": {
"image": "patch-image"
},
"metadata": {
"labels": {
"new-label": "new-value"
}
}
}`
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(crontab.GetName()).Namespace(crontab.GetNamespace()).Patch(&crontab, types.StrategicMergePatchType, patchData).Error
Deleting a CR Instance
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(crontab.GetName()).Namespace(crontab.GetNamespace()).Delete().Error
Force Deleting a CR Instance
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(crontab.GetName()).Namespace(crontab.GetNamespace()).ForceDelete().Error
Watching CR Instances
var watcher watch.Interface
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace("default").Watch(&watcher).Error
if err != nil {
fmt.Printf("Create Watcher Error %v", err)
}
go func() {
defer watcher.Stop()
for event := range watcher.ResultChan() {
var item *unstructured.Unstructured
item, err := kom.DefaultCluster().Tools().ConvertRuntimeObjectToUnstructuredObject(event.Object)
if err != nil {
fmt.Printf("Could not cast object to Unstructured type: %v", err)
return
}
// Process the event type
switch event.Type {
case watch.Added:
fmt.Printf("Added Unstructured [ %s/%s ]\n", item.GetNamespace(), item.GetName())
case watch.Modified:
fmt.Printf("Modified Unstructured [ %s/%s ]\n", item.GetNamespace(), item.GetName())
case watch.Deleted:
fmt.Printf("Deleted Unstructured [ %s/%s ]\n", item.GetNamespace(), item.GetName())
}
}
}()
Describe Query for a CRD Resource
// Describe the CronTab instance in the 'default' namespace
var describeResult []byte
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace("default").Name(item.GetName()).Describe(&item).Error
fmt.Printf("describeResult: %s", describeResult)
Getting Pods Managed by a CRD
pods, err := kom.DefaultCluster().CRD("apps.kruise.io", "v1beta1", "StatefulSet").
Namespace("default").Name("sample").Ctl().CRD().ManagedPods()
for _, pod := range pods {
fmt.Printf("Get pods: %v", pod.GetName())
}
6. Cluster Parameter Information
// Retrieve cluster documentation summary
kom.DefaultCluster().Status().Docs()
// Inspect cluster API resource inventory
kom.DefaultCluster().Status().APIResources()
// List all Custom Resource Definitions registered on the cluster
kom.DefaultCluster().Status().CRDList()
// Fetch cluster version details
kom.DefaultCluster().Status().ServerVersion()
// Summarize the count of various resource types within the cluster (limit 10)
kom.DefaultCluster().Status().GetResourceCountSummary(10)
7. Callback Mechanism
- A callback system is integrated, enabling custom function invocation upon operation completion.
- If a registered callback returns 'true', subsequent operations continue; returning 'false' halts the chain.
- Supported operations for callbacks include: get, list, create, update, patch, delete, exec, stream-exec, logs, watch, and doc interaction.
- Default callback identifiers include: "kom:get", "kom:list", "kom:create", "kom:update", "kom:patch", "kom:watch", "kom:delete", "kom:pod:exec", "kom:pod:stream:exec", "kom:pod:logs", "kom:pod:port:forward", "kom:doc".
- Execution sequence can be managed using
.After("kom:get")or.Before("kom:get")to set relative ordering against built-in callbacks. - Callbacks can be removed using
kom.DefaultCluster().Callback().Delete("kom:get"). - A registered callback can be entirely replaced via
kom.DefaultCluster().Callback().Replace("kom:get",cb).
// Register a callback function for Get resource retrieval operations
kom.DefaultCluster().Callback().Get().Register("get", cb)
// Register a callback function for List resource retrieval operations
kom.DefaultCluster().Callback().List().Register("list", cb)
// Register a callback function for Create resource initialization operations
kom.DefaultCluster().Callback().Create().Register("create", cb)
// Register a callback function for Update resource modification operations
kom.DefaultCluster().Callback().Update().Register("update", cb)
// Register a callback function for Patch resource modification operations
kom.DefaultCluster().Callback().Patch().Register("patch", cb)
// Register a callback function for Delete resource removal operations
kom.DefaultCluster().Callback().Delete().Register("delete", cb)
// Register a callback function for Watch resource observation operations
kom.DefaultCluster().Callback().Watch().Register("watch",cb)
// Register a callback function for Exec command execution within Pods
kom.DefaultCluster().Callback().Exec().Register("exec", cb)
// Register a callback function for Logs retrieval operations
kom.DefaultCluster().Callback().Logs().Register("logs", cb)
// Remove a previously registered callback function
kom.DefaultCluster().Callback().Get().Delete("get")
// Replace an existing callback function for Get operations
kom.DefaultCluster().Callback().Get().Replace("get", cb)
// Specify execution order: run after the built-in 'kom:get' operation completes
kom.DefaultCluster().Callback().After("kom:get").Register("get", cb)
// Specify execution order: run before the built-in 'kom:create' operation starts
// Example 1. A pre-creation check verifies permissions; returning an error prevents subsequent creation steps.
// Example 2. Post-list filtering removes unwanted resources from the returned set (Statement.Dest) before presentation.
kom.DefaultCluster().Callback().Before("kom:create").Register("create", cb)
// Custom callback function definition
func cb(k *kom.Kubectl) error {
stmt := k.Statement
gvr := stmt.GVR
ns := stmt.Namespace
name := stmt.Name
// Log operational details
fmt.Printf("Get %s/%s(%s)\n", ns, name, gvr)
fmt.Printf("Command %s/%s(%s %s)\n", ns, name, stmt.Command, stmt.Args)
return nil
// Returning an error stops execution of subsequent callbacks in the chain.
}
8. SQL Querying of Cluster Resources
- The
Sql()method enables querying cluster resources using standard SQL syntax, offering efficiency. - Table names support all registered resource types, including CRDs, provided they are registered in the cluster.
- Common table names include: pod, deployment, service, ingress, pvc, pv, node, namespace, secret, configmap, etc., covering nearly all types.
- Currently, the query field selection is limited to the wildcard (
*), meaning onlyselect *is supported. - Supported conditional operators include: =, !=, >=, <=, <>, like, in, not in, and, or, between.
- Sorting is restricted to a single field, defaulting to descending order by creation timestamp.
Querying Native Kubernetes Resources
sql := "select * from deploy where metadata.namespace='kube-system' or metadata.namespace='default' order by metadata.creationTimestamp asc "
var list []v1.Deployment
err := kom.DefaultCluster().Sql(sql).List(&list).Error
for _, d := range list {
fmt.Printf("List Items foreach %s,%s at %s \n", d.GetNamespace(), d.GetName(), d.GetCreationTimestamp())
}
Querying CRD Resources
// 'vm' represents a CRD resource type from Kubevirt, for example.
sql := "select * from vm where (metadata.namespace='kube-system' or metadata.namespace='default' ) "
var list []unstructured.Unstructured
err := kom.DefaultCluster().Sql(sql).List(&list).Error
for _, d := range list {
fmt.Printf("List Items foreach %s,%s\n", d.GetNamespace(), d.GetName())
}
Chained SQL Query Invocation
// Query a list of Pods using chained methods
er := kom.DefaultCluster().From("pod").
Where("metadata.namespace = ? or metadata.namespace= ? ", "kube-system", "default").
Order("metadata.creationTimestamp desc").
List(&list).Error
Support for Nested List Attributes in SQL
// Query based on properties within nested lists, such as container ports names.
sql := "select * from pod where spec.containers.ports.name like '%k8m%' " // Example targets port names containing 'k8m'
var list []v1.Pod
err := kom.DefaultCluster().Sql(sql).List(&list).Error
for _, d := range list {
t.Logf("List Items foreach %s,%s\n", d.GetNamespace(), d.GetName())
}
9. Additional Utility Operations
Deployment Restart
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Restart()
Deployment Scaling
// Set the replica count for the Deployment named 'nginx' to 3
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Scaler().Scale(3)
Deployment Suspension
// Set the replica count for the Deployment named 'nginx' to zero.
// The current running replica count is preserved in an annotation.
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Scaler().Stop()
Deployment Restoration
// Restore the Deployment replica count from the recorded annotation; defaults to 1 if annotation is missing.
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Scaler().Restore()
Deployment Image Tag Update
// Update the container image tag in the Deployment 'nginx' for the container named 'main' to version '20241124'
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Deployment().ReplaceImageTag("main","20241124")
Deployment Rollout History
// Query the revision history for the Deployment named 'nginx'
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().History()
Deployment Rollout Undo
// Execute a general rollback for the Deployment named 'nginx'
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Undo()
// Rollback the Deployment 'nginx' to a specific revision version (obtained from History query)
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Undo("6")
Deployment Rollout Pause
// Halt the ongoing Deployment update process
err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Pause()
Deployment Rollout Resume
// Resume a previously suspended Deployment update process
err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Resume()
Deployment Rollout Status
// Check the current status of the Deployment 'nginx' update process
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Status()
Deployment HPA Information
// Display associated Horizontal Pod Autoscalers for the 'nginx-web' Deployment
list, err := kom.DefaultCluster().Resource(&v1.Deployment{}).Namespace("default").Name("nginx-web").Ctl().Deployment().HPAList()
for _, item := range list {
t.Logf("HPA %s\n", item.Name)
}
Applying Node Taints
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().Taint("dedicated=special-user:NoSchedule")
Removing Node Taints
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().UnTaint("dedicated=special-user:NoSchedule")
Cordoning a Node
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().Cordon()
Uncordoning a Node
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().UnCordon()
Draining a Node
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().Drain()
Querying Node IP Resource Utilization
Cache time can be set to minimize repeated API calls.
nodeName := "lima-rancher-desktop"
total, used, available := kom.DefaultCluster().Resource(&corev1.Node{}).WithCache(5 * time.Second).Name(nodeName).Ctl().Node().IPUsage()
fmt.Printf("Total %d, Used %d, Available %d\n", total, used, available)
//Example Output: Total 256, Used 6, Available 250
Querying Node Pod Capacity Metrics
Cache time can be set to minimize repeated API calls.
nodeName := "lima-rancher-desktop"
total, used, available := kom.DefaultCluster().Resource(&corev1.Node{}).WithCache(5 * time.Second).Name(nodeName).Ctl().Node().PodCount()
fmt.Printf("Total %d, Used %d, Available %d\n", total, used, available)
//Example Output: Total 110, Used 9, Available 101
Summarizing Node Resource Usage
Cache time can be set to minimize repeated API calls.
nodeName := "lima-rancher-desktop"
usage := kom.DefaultCluster().Resource(&corev1.Node{}).WithCache(5 * time.Second).Name(nodeName).Ctl().Node().ResourceUsage()
fmt.Printf("Node Usage %s\n", utils.ToJSON(usage))
Includes current request values, limits, allocatable amounts, and usage fractions.
{
"requests": {
"cpu": "200m",
"memory": "140Mi"
},
"limits": {
"memory": "170Mi"
},
"allocatable": {
"cpu": "4",
"ephemeral-storage": "99833802265",
"hugepages-1Gi": "0",
"hugepages-2Mi": "0",
"hugepages-32Mi": "0",
"hugepages-64Ki": "0",
"memory": "8127096Ki",
"pods": "110"
},
"usageFractions": {
"cpu": {
"requestFraction": 5,
"limitFraction": 0
},
"ephemeral-storage": {
"requestFraction": 0,
"limitFraction": 0
},
"memory": {
"requestFraction": 1.76397571777176,
"limitFraction": 2.1419705144371375
}
}
}
Adding Labels to Resources
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Label("name=zhangsan")
Removing Labels from Resources
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Label("name-")
Adding Annotations to Resources
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Annotate("name=zhangsan")
Removing Annotations from Resources
err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Annotate("name-")
Creating a Node Shell
ns, pod, container, err := kom.DefaultCluster().Resource(&v1.Node{}).Name("kind-control-plane").Ctl().Node().CreateNodeShell()
fmt.Printf("Node Shell ns=%s podName=%s containerName=%s", ns, pod, container)
Creating a Kubectl Shell
ns, pod, container, err := kom.DefaultCluster().Resource(&v1.Node{}).Name(name).Ctl().Node().CreateKubectlShell(kubeconfig)
fmt.Printf("Kubectl Shell ns=%s podName=%s containerName=%s", ns, pod, container)
Counting PVCs per StorageClass
count, err := kom.DefaultCluster().Resource(&v1.StorageClass{}).Name("hostpath").Ctl().StorageClass().PVCCount()
fmt.Printf("pvc count %d\n", count)
Counting PVs per StorageClass
count, err := kom.DefaultCluster().Resource(&v1.StorageClass{}).Name("hostpath").Ctl().StorageClass().PVCount()
fmt.Printf("pv count %d\n", count)
Setting StorageClass as Default
err := kom.DefaultCluster().Resource(&v1.StorageClass{}).Name("hostpath").Ctl().StorageClass().SetDefault()
Setting IngressClass as Default
err := kom.DefaultCluster().Resource(&v1.IngressClass{}).Name("nginx").Ctl().IngressClass().SetDefault()
Listing Pods Managed by Workload Controllers
list, err := kom.DefaultCluster().Namespace("default").Name("managed-pods").Ctl().Deployment().ManagedPods()
for _, pod := range list {
fmt.Printf("ManagedPod: %v", pod.Name)
}
Obtaining the Aggregate Label Set from All Nodes
// labels variable type is map[string]string
labels, err := kom.DefaultCluster().Resource(&v1.Node{}).Ctl().Node().AllNodeLabels()
fmt.Printf("%s", utils.ToJSON(labels))
{
"beta.kubernetes.io/arch": "arm64",
"beta.kubernetes.io/os": "linux",
"kubernetes.io/arch": "arm64",
"kubernetes.io/hostname": "kind-control-plane",
"kubernetes.io/os": "linux",
"kubernetes.io/role": "agent",
"node-role.kubernetes.io/agent": "",
"node-role.kubernetes.io/control-plane": "",
"type": "kwok",
"uat": "test",
"x": "x"
}
Viewing Pod Resource Consumption Rates
podName := "coredns-ccb96694c-jprpf"
ns := "kube-system"
usage := kom.DefaultCluster().Resource(&corev1.Pod{}).Name(podName).Namespace(ns).Ctl().Pod().ResourceUsage()
fmt.Printf("Pod Usage %s\n", utils.ToJSON(usage))
Includes current request values, limits, allocatable amounts, and usage fractions.
{
"requests": {
"cpu": "100m",
"memory": "70Mi"
},
"limits": {
"memory": "170Mi"
},
"allocatable": {
"cpu": "4",
"ephemeral-storage": "99833802265",
"hugepages-1Gi": "0",
"hugepages-2Mi": "0",
"hugepages-32Mi": "0",
"hugepages-64Ki": "0",
"memory": "8127096Ki",
"pods": "110"
},
"usageFractions": {
"cpu": {
"requestFraction": 2.5,
"limitFraction": 0
},
"memory": {
"requestFraction": 0.88198785888588,
"limitFraction": 2.1419705144371375
}
}
}
Obtaining Field Documentation Explanations
var docResult []byte
item := v1.Deployment{}
field := "spec.replicas"
field = "spec.template.spec.containers.name"
field = "spec.template.spec.containers.imagePullPolicy"
field = "spec.template.spec.containers.livenessProbe.successThreshold"
err := kom.DefaultCluster().
Resource(&item).DocField(field).Doc(&docResult).Error
fmt.Printf("Get Deployment Doc [%s] :%s", field, string(docResult))
Related Topics
- Task dependency management in complex systems.
- Resource allocation and scheduling in distributed computing.
- Iterative refinement and project completion criteria.
- Configuration management using declarative specifications.
- Inter-process communication paradigms (stdio, SSE).
Extra Details
This utility abstracts away repetitive Kubernetes client interactions, allowing system architects to focus on resource relationships and orchestration flow, much like focusing on critical task dependencies rather than micro-assignments. The ability to query resources via SQL provides a significant abstraction layer, simplifying data extraction that might otherwise require complex Go client interactions or manual aggregation.
Conclusion
By consolidating numerous operational tasks into a cohesive, scriptable framework, this tool enhances the coordination required to maintain healthy Kubernetes environments. Effective resource control, facilitated by these integrated capabilities, ensures project goals—in this context, cluster stability and performance—are met without unnecessary friction arising from task ambiguity or poorly linked execution steps.
