logo
Free, unlimited AI code reviews that run on commit
git-lrc git-lrc GitHub Install Now We'd appreciate a star git-lrc - Free, unlimited AI code reviews that run on commit | Product Hunt git-lrc - Free, unlimited AI code reviews that run on commit | Product Hunt

KubeControl Nexus

This utility functions as a robust orchestration layer for Kubernetes clusters, analogous to an SDK-level abstraction over kubectl or client-go mechanisms. It facilitates efficient life-cycle management of cluster resources, encompassing creation, modification, deletion, and retrieval across numerous environments. The tool inherently manages Custom Resource Definitions (CRDs) and introduces streamlined processes for operational oversight and log retrieval, mirroring the need for coordination in complex undertakings like project management.

Author

KubeControl Nexus logo

weibaohui

MIT License

Quick Info

GitHub GitHub Stars 126
NPM Weekly Downloads 0
Tools 1
Last Updated 2026-02-19

Tags

kuberneteskomweibaohuimanage kuberneteskubernetes resourceskom manage

Introduction

This tool streamlines resource administration within Kubernetes environments. Managing resources effectively is akin to organizing activities within a larger project plan; each operation is a discrete activity requiring precise execution. This utility helps organize these technical activities, ensuring necessary coordination for successful project outcomes.

Core Capabilities

  1. Simplified Operations: Provides comprehensive functions for resource manipulation, including initialization, updates, removal, and querying for both native and custom types.
  2. Multi-Cluster Coordination: Facilitates management across several Kubernetes environments through cluster registration, supporting platforms like AWS EKS.
  3. MCP Integration: Supports multi-cluster management protocols (MCP) using both standard input/output (stdio) and Server-Sent Events (SSE) modes. It incorporates numerous internal tools for extensive operational flexibility.
  4. Namespace Flexibility: Permits resource operations spanning multiple namespaces using specific selection methods.
  5. Fluent Interface: Offers method chaining, making resource interaction flows intuitive and concise.
  6. Custom Resource Support: Seamlessly interacts with Custom Resource Definitions (CRDs) for dynamic resource handling.
  7. Extensibility via Callbacks: Supports a callback architecture, allowing business logic integration decoupled from core Kubernetes interactions.
  8. In-Pod File Utility: Enables remote file system operations directly within Pod containers, such as uploading or retrieving files.
  9. Accelerated Actions: Includes wrappers for frequent operational tasks like Deployment restarts, scaling, and state toggling.
  10. SQL Query Interface: Allows resource querying using familiar SQL syntax against cluster state.
  11. Query Caching: Incorporates optional caching for high-frequency or batch retrieval scenarios to boost performance. Filtering criteria are unaffected by caching mechanisms.

Examples Blueprint

KubeControl CLI is a lightweight administration utility built upon this framework, integrating with other frontend technologies. 1. Acquisition: Retrieve the current version from https://github.com/weibaohui/kom. 2. Execution: Launch using the ./k8m command to access the local interface at http://127.0.0.1:3618.

Setup

import (
    "github.com/weibaohui/kom"
    "github.com/weibaohui/kom/callbacks"
)
func main() {
    // Essential: Register necessary callbacks first.
    callbacks.RegisterInit()
    // Register cluster connections.
    defaultKubeConfig := os.Getenv("KUBECONFIG")
    if defaultKubeConfig == "" {
        defaultKubeConfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
    }
    _, _ = kom.Clusters().RegisterInCluster()
    _, _ = kom.Clusters().RegisterByPathWithID(defaultKubeConfig, "default")
    kom.Clusters().Show()
    // Further operational logic follows here.
}

Usage Examples

0. Multi-Cluster MCP Coordination

Supports both stdio and sse communication protocols. Provides methods for arbitrary resource querying, listing, deletion, description, and Pod log access across multiple managed environments.

1. Code Integration

// Initiate the MCP Server with one line of code
mcp.RunMCPServer("kom mcp server", "0.0.1", 9096)



2. Compilation

# Start from source code
go build main.go 
// Compile into the KubeControl Nexus executable

3. Launch

Two operational modes are available post-compilation: stdio and sse. Kubernetes interaction defaults to the KUBECONFIG environment variable setting.

# Set the KUBECONFIG environment variable
export KUBECONFIG = /Users/xxx/.kube/config
# Execute the tool
./kom 
# Access point for MCP Server
http://IP:9096/sse

When executed, the binary functions as a stdio tool. The HTTP endpoint enables sse mode communication.

4. Integration into MCP Tools

This tool accommodates integration via stdio or sse methods. It is suitable for embedding within broader MCP tooling, such as Cursor, Claude Desktop (stdio only), and Windsurf interfaces. UI configuration panels may also facilitate its setup.

{
  "mcpServers": {
    "kom": {
      "type": "sse",
      "url": "http://IP:9096/sse"
    }
  }
}
{
    "mcpServers": {
        "k8m": {
            "command": "path/to/kom",
            "args": []
        }
    }
}

MCP Tool Registry (59 Functions)

Category Method Description
Cluster Management (1) list_k8s_clusters Enumerates all registered Kubernetes environments.
DaemonSet Management (1) restart_k8s_daemonset Restarts a DaemonSet using cluster, namespace, and name identifiers.
Deployment Management (12) scale_k8s_deployment Adjusts Deployment replica count via cluster, namespace, and name.
restart_k8s_deployment Reinitializes a Deployment identified by cluster, namespace, and name.
stop_k8s_deployment Sets Deployment replicas count to zero.
restore_k8s_deployment Reverts Deployment replica count to a previously stored value.
update_k8s_deployment_image_tag Modifies the container image tag within a Deployment specification.
get_k8s_deployment_rollout_history Retrieves the history log for Deployment revisions.
undo_k8s_deployment_rollout Executes a rollback action on the Deployment revision history.
pause_k8s_deployment_rollout Suspends the active Deployment update process.
resume_k8s_deployment_rollout Resumes a paused Deployment update sequence.
get_k8s_deployment_rollout_status Fetches the current status of a Deployment revision update.
get_k8s_deployment_hpa_list Lists associated Horizontal Pod Autoscalers for a Deployment.
list_k8s_deploy_event Retrieves events related to a specific Deployment object.
Dynamic Resource Management (incl. CRD, 8) get_k8s_resource Fetches detailed information for a Kubernetes resource by identifier.
describe_k8s_resource Provides descriptive output for a specified Kubernetes resource.
delete_k8s_resource Removes a Kubernetes resource based on its location and name.
list_k8s_resource Retrieves a collection of Kubernetes resources filtered by cluster and type.
annotate_k8s_resource Adds or removes annotations on a specified Kubernetes resource.
label_k8s_resource Applies or deletes labels for a selected Kubernetes resource.
patch_k8s_resource Applies a partial modification to a Kubernetes resource object.
GetDynamicResource Retrieves an arbitrary dynamic resource instance.
Node Management (11) taint_k8s_node Applies a taint restriction to a specified cluster node.
untaint_k8s_node Removes an existing taint from a cluster node.
cordon_k8s_node Marks a node as unschedulable for new Pods.
uncordon_k8s_node Restores scheduling availability on a designated node.
drain_k8s_node Evicts existing Pods and prevents future scheduling on a node.
get_k8s_node_ip_usage Queries the IP resource consumption metrics for a node.
list_k8s_node Fetches the list of available cluster Nodes.
get_k8s_top_node Retrieves ranked statistics for Node CPU and memory utilization.
get_k8s_pod_count_running_on_node Tallies the quantity of active Pods scheduled on a specific node.
get_k8s_node_resource_usage Summarizes current resource consumption statistics for a node.
TaintNodeTool Utility for applying node taint modifications.
Event Management (1) list_k8s_event Lists Kubernetes events scoped by cluster and namespace.
Ingress Management (1) set_default_k8s_ingressclass Designates a specific IngressClass as the cluster default.
Pod Management (18) run_command_in_k8s_pod Executes an arbitrary command within the context of a running Pod.
list_k8s_pod_event Retrieves events associated with a particular Pod object.
list_files_in_k8s_pod Generates a file listing for a specified directory path inside a Pod.
list_all_pod_files Recursively lists every file within a designated Pod directory path.
delete_k8s_pod Terminates a running Pod instance.
delete_pod_file Removes a specified file from within the Pod's filesystem.
get_k8s_pod_linked_env Extracts runtime environment variables configured for a Pod.
get_pod_linked_env_from_yaml Derives environment variables directly from the Pod specification YAML.
get_k8s_pod_linked_services Identifies associated Service objects linked to a Pod.
get_pod_linked_ingresses Retrieves Ingress resources that route traffic to a Pod.
get_pod_linked_endpoints Finds the backing Endpoints defining network connectivity for a Pod.
list_k8s_pod Fetches the current collection of Pod resources.
get_k8s_top_pod Returns ranked metrics for Pod CPU and memory utilization.
ListPodFilesTool Tool for listing files within a Pod directory structure.
ListAllPodFilesTool Tool for recursively listing all files within a Pod path.
DeletePodFileTool Tool to erase a file from a Pod's filesystem.
UploadPodFileTool Tool for transferring local files into a Pod.
GetPodLogsTool Tool for accessing standard output and error streams from a Pod.
describe_k8s_pod Generates detailed descriptive output for a container group (Pod).
Storage Management (3) set_k8s_default_storageclass Designates a specific StorageClass as the cluster default provisioner.
get_k8s_storageclass_pvc_count Counts the PersistentVolumeClaims bound to a given StorageClass.
get_k8s_storageclass_pv_count Tallies the PersistentVolumes associated with a specific StorageClass.
YAML Management (2) apply_k8s_yaml Creates or modifies cluster resources defined in a YAML manifest.
delete_k8s_yaml Removes cluster resources specified within a YAML document.

Launch Command

mcp.RunMCPServer("kom mcp server", "0.0.1", 3619)

AI Tool Integration

Claude Desktop
  1. Access the Claude Desktop configuration panel.
  2. Locate the MCP Server configuration area.
  3. Activate the SSE event monitoring functionality.
  4. Verify the established connection status.
{
  "mcpServers": {
    "k8m": {
      "command": "path/to/kom",
      "args": []
    }
  }
}
Cursor
  1. Navigate to the Cursor settings interface.
  2. Find the extension service configuration option.
  3. Supports sse by providing http://localhost:9096/sse, or stdio by providing the path to the Nexus executable.
Windsurf
  1. Open the central configuration access point.
  2. Specify the API server network address.
  3. Supports sse via http://localhost:9096/sse or stdio via the Nexus executable path.

cherry studio

  1. Click the settings icon located in the lower left corner.
  2. Navigate to the MCP Server section.
  3. Select the option to add a new server.
  4. Configuration accepts sse (http://localhost:9096/sse) or stdio (Nexus executable path).

1. Multi-Cluster Coordination

Registering Multiple Environments

// Register the in-cluster environment, assigned the identifier InCluster
kom.Clusters().RegisterInCluster()
// Register two external clusters, named 'orb' and 'docker-desktop' respectively
kom.Clusters().RegisterByPathWithID("/Users/kom/.kube/orb", "orb")
kom.Clusters().RegisterByPathWithID("/Users/kom/.kube/config", "docker-desktop")
// Registering a cluster named 'default' enables its use via kom.DefaultCluster().
kom.Clusters().RegisterByPathWithID("/Users/kom/.kube/config", "default")

Registering AWS EKS Environments

// Define configuration parameters for EKS cluster access
config := aws.EKSAuthConfig{
    AccessKey:       "XXX",        // AWS Access Key ID credential
    SecretAccessKey: "yyy",        // AWS Secret Access Key credential
    Region:          "us-east-1",  // AWS geographical region identifier
    ClusterName:     "k8m",        // The specific EKS cluster name
}

// Attempt registration of the AWS EKS cluster
_, err := kom.Clusters().RegisterAWSCluster(config)
if err != nil {
    fmt.Printf("EKS Cluster Registration Failure: %v", err)
    return
}

// Subsequently utilize the registered EKS cluster
var pods []corev1.Pod
clusterID := fmt.Sprintf("%s-%s", config.Region, config.ClusterName) // Cluster ID format: {Region}-{ClusterName}
err = kom.Cluster(clusterID).Resource(&corev1.Pod{}).Namespace("kube-system").List(&pods).Error

AWS EKS Cluster Registration Notes: - AccessKey: AWS credential identifier. - SecretAccessKey: AWS secret credential. - Region: AWS geographical zone, e.g., us-east-1. - ClusterName: The designated EKS cluster name. - RoleARN: (Optional) IAM Role ARN for assuming roles in cross-account scenarios. - The system automatically generates an ID post-registration in the {Region}-{ClusterName} format. - Cross-account access is enabled via IAM role assumption mechanisms. - AWS credentials are held only in memory and purged upon application termination.

Display Registered Environments

kom.Clusters().Show()

Selecting the Default Environment

// Accessing the default cluster; fetches the instance identified as "InCluster".
// If absent, it attempts to retrieve the instance named "default".
// If neither exists, any available cluster instance from the list is returned.
var pods []corev1.Pod
err = kom.DefaultCluster().Resource(&corev1.Pod{}).Namespace("kube-system").List(&pods).Error

Selecting a Specific Environment

// Target the 'orb' cluster to query Pods within the kube-system namespace
var pods []corev1.Pod
err = kom.Cluster("orb").Resource(&corev1.Pod{}).Namespace("kube-system").List(&pods).Error

2. CRUD Operations and Watch for Native Resources

Define a Deployment object for subsequent resource management activities.

var item v1.Deployment
var items []v1.Deployment

Create a Resource Instance

item = v1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "nginx",
            Namespace: "default",
        },
        Spec: v1.DeploymentSpec{
            Template: corev1.PodTemplateSpec{
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {Name: "test", Image: "nginx:1.14.2"},
                    },
                },
            },
        },
    }
err := kom.DefaultCluster().Resource(&item).Create(&item).Error

GET Query for a Specific Resource

// Retrieve the Deployment named 'nginx' in the 'default' namespace
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Get(&item).Error
// Retrieve the Deployment, utilizing a 5-second cache for the query
// Caching is recommended for batch or high-frequency operations.
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").WithCache(5 * time.Second).Get(&item).Error

LIST Query for Resource Collections

// Retrieve a list of Deployments within the 'default' namespace
err := kom.DefaultCluster().Resource(&item).Namespace("default").List(&items).Error
// Retrieve Deployments across 'default' and 'kube-system' namespaces
err := kom.DefaultCluster().Resource(&item).Namespace("default","kube-system").List(&items).Error
// Retrieve Deployments across all cluster namespaces
err := kom.DefaultCluster().Resource(&item).Namespace("*").List(&items).Error
err := kom.DefaultCluster().Resource(&item).AllNamespace().List(&items).Error
// Apply a 5-second cache; this applies to list operations.
err := kom.DefaultCluster().Resource(&item).WithCache(5 * time.Second).List(&nodeList).Error

LIST Query Utilizing Label Selectors

// Retrieve Deployments in 'default' namespace matching the label app=nginx
err := kom.DefaultCluster().Resource(&item).Namespace("default").WithLabelSelector("app=nginx").List(&items).Error

LIST Query Utilizing Multiple Labels

// Retrieve Deployments matching both labels app=nginx and m=n in 'default' namespace
err := kom.DefaultCluster().Resource(&item).Namespace("default").WithLabelSelector("app=nginx").WithLabelSelector("m=n").List(&items).Error

LIST Query Utilizing Field Selectors

// Retrieve Deployments in 'default' namespace matching field metadata.name=test-deploy
// Field selectors generally support native definition fields like metadata.name, metadata.labels, spec.nodeName, status.phase, etc.
err := kom.DefaultCluster().Resource(&item).Namespace("default").WithFieldSelector("metadata.name=test-deploy").List(&items).Error

Paged Resource Retrieval

var list []corev1.Pod
var total int64
sql := "select * from pod where metadata.namespace=? or metadata.namespace=?     order by  metadata.creationTimestamp desc "
err := kom.DefaultCluster().Sql(sql, "kube-system", "default").
        FillTotalCount(&total).
        Limit(5).
        Offset(10).
        List(&list).Error
fmt.Printf("total %d\n", total)  // Returns total count, e.g., 480
fmt.Printf("Count %d\n", len(list)) // Returns item count, equals limit (5)

Modifying Resource Content

// Retrieve the Deployment named 'nginx' for modification
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Get(&item).Error
if item.Spec.Template.Annotations == nil {
    item.Spec.Template.Annotations = map[string]string{}
}
item.Spec.Template.Annotations["kom.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
err = kom.DefaultCluster().Resource(&item).Update(&item).Error

PATCH Resource Update

// Apply a Patch update; adds a label and sets replica count to 5 for Deployment 'nginx'
patchData := `{
    "spec": {
        "replicas": 5
    },
    "metadata": {
        "labels": {
            "new-label": "new-value"
        }
    }
}`
err := kom.DefaultCluster().Resource(&item).Patch(&item, types.StrategicMergePatchType, patchData).Error

Deleting a Resource Instance

// Remove the Deployment named 'nginx'
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Delete().Error

Force Deleting a Resource Instance

// Forcefully delete the Deployment named 'nginx'
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").ForceDelete().Error

Retrieving Resources by GVK (General Purpose)

// Fetch resources by Group-Version-Kind specification
var list []corev1.Event
err := kom.DefaultCluster().GVK("events.k8s.io", "v1", "Event").Namespace("default").List(&list).Error

Watching Resource Changes

// Monitor changes for Pod resources in the 'default' namespace
var watcher watch.Interface
var pod corev1.Pod
err := kom.DefaultCluster().Resource(&pod).Namespace("default").Watch(&watcher).Error
if err != nil {
    fmt.Printf("Create Watcher Error %v", err)
    return err
}
go func() {
    defer watcher.Stop()

    for event := range watcher.ResultChan() {
        err := kom.DefaultCluster().Tools().ConvertRuntimeObjectToTypedObject(event.Object, &pod)
        if err != nil {
            fmt.Printf("Could not cast object to *v1.Pod type: %v", err)
            return
        }
        // Process the event type
        switch event.Type {
        case watch.Added:
            fmt.Printf("Added Pod [ %s/%s ]\n", pod.Namespace, pod.Name)
        case watch.Modified:
            fmt.Printf("Modified Pod [ %s/%s ]\n", pod.Namespace, pod.Name)
        case watch.Deleted:
            fmt.Printf("Deleted Pod [ %s/%s ]\n", pod.Namespace, pod.Name)
        }
    }
}()

Describe Query for a Specific Resource

// Describe the Deployment named 'nginx' in the 'default' namespace
var describeResult []byte
err := kom.DefaultCluster().Resource(&item).Namespace("default").Name("nginx").Describe(&item).Error
fmt.Printf("describeResult: %s", describeResult)

3. YAML Manifest Creation, Update, Deletion

yaml := `apiVersion: v1
kind: ConfigMap
metadata:
  name: example-config
  namespace: default
data:
  key: value
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: example-deployment
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: example
  template:
    metadata:
      labels:
        app: example
    spec:
      containers:
        - name: example-container
          image: nginx
`
// Initial Apply creates resources; returns execution status for each item.
results := kom.DefaultCluster().Applier().Apply(yaml)
// Subsequent Apply invokes updates based on resource status.
results = kom.DefaultCluster().Applier().Apply(yaml)
// Deletion operation based on the provided YAML definitions.
results = kom.DefaultCluster().Applier().Delete(yaml)

4. Pod Operations

Retrieving Logs

// Obtain logs from a specific Pod
var stream io.ReadCloser
err := kom.DefaultCluster().Namespace("default").Name("random-char-pod").Ctl().Pod().ContainerName("container").GetLogs(&stream, &corev1.PodLogOptions{}).Error
reader := bufio.NewReader(stream)
line, _ := reader.ReadString('\n')
fmt.Println(line)

Executing Commands

Executes a command inside a Pod, triggering specific callback types.

// Run 'ps -ef' command within the Pod context
var execResult string
err := kom.DefaultCluster().Namespace("default").Name("random-char-pod").Ctl().Pod().ContainerName("container").Command("ps", "-ef").ExecuteCommand(&execResult).Error
fmt.Printf("execResult: %s", execResult)

Port Forwarding

err := kom.DefaultCluster().Resource(&v1.Pod{}).
        Namespace("default").
        Name("nginx-deployment-f576985cc-7czqr").
    Ctl().Pod().
        ContainerName("nginx").
        PortForward("20088", "80", stopCh).Error
// This forwards traffic from the local machine's port 20088 to the Pod's port 80.

Streaming Command Execution

Executes a command with streaming I/O, triggering StreamExec callbacks, suitable for continuous output commands like ping.

cb := func(data []byte) error {
        fmt.Printf("Data %s\n", string(data))
        return nil
    }
err := kom.DefaultCluster().Namespace("kube-system").Name("traefik-d7c9c5778-p9nf4").Ctl().Pod().ContainerName("traefik").Command("ping", "127.0.0.1").StreamExecute(cb, cb).Error
//Example Output:
//Data PING 127.0.0.1 (127.0.0.1): 56 data bytes
//Data 64 bytes from 127.0.0.1: seq=0 ttl=42 time=0.023 ms
//Data 64 bytes from 127.0.0.1: seq=1 ttl=42 time=0.011 ms
//Data 64 bytes from 127.0.0.1: seq=2 ttl=42 time=0.012 ms
//Data 64 bytes from 127.0.0.1: seq=3 ttl=42 time=0.016 ms

File Listing

// List files within the /etc directory of the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").ListFiles("/etc")

Recursive File Listing (Including Hidden)

// List all files, including hidden ones, within the /etc directory of the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").ListAllFiles("/etc")

File Download

// Download the /etc/hosts file from the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").DownloadFile("/etc/hosts")

Tar Compressed File Download

// Download /etc/hosts, packaging it as a tar archive before retrieval
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").DownloadTarFile("/etc/hosts")

File Upload

// Save text content to /etc/demo.txt inside the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").SaveFile("/etc/demo.txt", "txt-context")
// Upload a file from the host OS (os.File type) to the /etc/ directory in the Pod
file, _ := os.Open(tempFilePath)
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").UploadFile("/etc/", file)

File Deletion

// Erase the specified file located at /etc/xyz within the Pod
kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().ContainerName("nginx").DeleteFile("/etc/xyz")
// Fetch Service objects associated with the Pod
svcs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedService()
for _, svc := range svcs {
    fmt.Printf("service name %v\n", svc.Name)
}
// Fetch Ingress objects associated with the Pod
ingresses, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedIngress()
for _, ingress := range ingresses {
    fmt.Printf("ingress name %v\n", ingress.Name)
}
// Fetch PersistentVolumeClaim objects linked to the Pod
pvcs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedPVC()
for _, pvc := range pvcs {
    fmt.Printf("pvc name %v\n", pvc.Name)
}
// Fetch PersistentVolume objects linked to the Pod
pvs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedPV()
for _, pv := range pvs {
    fmt.Printf("pv name %v\n", pv.Name)
}
// Fetch Endpoints objects associated with the Pod
endpoints, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedEndpoints()
for _, endpoint := range endpoints {
    fmt.Printf("endpoint name %v\n", endpoint.Name)
}

Obtains configuration data by executing the 'env' command inside the Pod.

envs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedEnv()
for _, env := range envs {
        fmt.Printf("env %s %s=%s\n", env.ContainerName, env.EnvName, env.EnvValue)
    }

Derives environment variable configurations directly from the Pod definition.

envs, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedEnvFromPod()
for _, env := range envs {
        fmt.Printf("env %s %s=%s\n", env.ContainerName, env.EnvName, env.EnvValue)
    }

Returns a list of viable nodes based on Pod configuration (NodeSelector, Affinity, Tolerations, NodeName). It currently excludes runtime scheduling factors like CPU/memory pressure.

nodes, err := kom.DefaultCluster().Namespace("default").Name("nginx").Ctl().Pod().LinkedNode()
for _, node := range nodes {
    fmt.Printf("reason:%s\t node name %s\n", node.Reason, node.Name)
}

5. CRUD Operations and Watch for Custom Resources (CRD)

Operations are identical to native resources when using unstructured data, requiring specification of Group, Version, and Kind. kom.DefaultCluster().GVK(group, version, kind) replaces kom.DefaultCluster().Resource(interface{}). For simplification, kom.DefaultCluster().CRD(group, version, kind) is provided. First, define a generic object structure to capture CRD responses.

var item unstructured.Unstructured

Creating a CustomResourceDefinition (CRD)

yaml := `apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: crontabs.stable.example.com
spec:
  group: stable.example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                cronSpec:
                  type: string
                image:
                  type: string
                replicas:
                  type: integer
  scope: Namespaced
  names:
    plural: crontabs
    singular: crontab
    kind: CronTab
    shortNames:
    - ct`
result := kom.DefaultCluster().Applier().Apply(yaml)

Creating an Instance of the CRD

item = unstructured.Unstructured{
        Object: map[string]interface{}{
            "apiVersion": "stable.example.com/v1",
            "kind":       "CronTab",
            "metadata": map[string]interface{}{
                "name":      "test-crontab",
                "namespace": "default",
            },
            "spec": map[string]interface{}{
                "cronSpec": "* * * * */8",
                "image":    "test-crontab-image",
            },
        },
    }
er := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace(item.GetNamespace()).Name(item.GetName()).Create(&item).Error

GET Query for a Single CR Instance

err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(item.GetName()).Namespace(item.GetNamespace()).Get(&item).Error

LIST Query for CR Instances

var crontabList []unstructured.Unstructured
// Query CronTab resources in the 'default' namespace
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace(crontab.GetNamespace()).List(&crontabList).Error
// Query CronTab resources across all namespaces
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").AllNamespace().List(&crontabList).Error
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace("*").List(&crontabList).Error

Updating a CR Instance

patchData := `{
    "spec": {
        "image": "patch-image"
    },
    "metadata": {
        "labels": {
            "new-label": "new-value"
        }
    }
}`
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(crontab.GetName()).Namespace(crontab.GetNamespace()).Patch(&crontab, types.StrategicMergePatchType, patchData).Error

Deleting a CR Instance

err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(crontab.GetName()).Namespace(crontab.GetNamespace()).Delete().Error

Force Deleting a CR Instance

err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Name(crontab.GetName()).Namespace(crontab.GetNamespace()).ForceDelete().Error

Watching CR Instances

var watcher watch.Interface

err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace("default").Watch(&watcher).Error
if err != nil {
    fmt.Printf("Create Watcher Error %v", err)
}
go func() {
    defer watcher.Stop()

    for event := range watcher.ResultChan() {
    var item *unstructured.Unstructured

    item, err := kom.DefaultCluster().Tools().ConvertRuntimeObjectToUnstructuredObject(event.Object)
    if err != nil {
        fmt.Printf("Could not cast object to Unstructured type: %v", err)
        return
    }
    // Process the event type
    switch event.Type {
        case watch.Added:
            fmt.Printf("Added Unstructured [ %s/%s ]\n", item.GetNamespace(), item.GetName())
        case watch.Modified:
            fmt.Printf("Modified Unstructured [ %s/%s ]\n", item.GetNamespace(), item.GetName())
        case watch.Deleted:
            fmt.Printf("Deleted Unstructured [ %s/%s ]\n", item.GetNamespace(), item.GetName())
        }
    }
}()

Describe Query for a CRD Resource

// Describe the CronTab instance in the 'default' namespace
var describeResult []byte
err := kom.DefaultCluster().CRD("stable.example.com", "v1", "CronTab").Namespace("default").Name(item.GetName()).Describe(&item).Error
fmt.Printf("describeResult: %s", describeResult)

Getting Pods Managed by a CRD

pods, err := kom.DefaultCluster().CRD("apps.kruise.io", "v1beta1", "StatefulSet").
Namespace("default").Name("sample").Ctl().CRD().ManagedPods()
    for _, pod := range pods {
        fmt.Printf("Get pods: %v", pod.GetName())
    }

6. Cluster Parameter Information

// Retrieve cluster documentation summary
kom.DefaultCluster().Status().Docs()
// Inspect cluster API resource inventory
kom.DefaultCluster().Status().APIResources()
// List all Custom Resource Definitions registered on the cluster
kom.DefaultCluster().Status().CRDList()
// Fetch cluster version details
kom.DefaultCluster().Status().ServerVersion()
// Summarize the count of various resource types within the cluster (limit 10)
kom.DefaultCluster().Status().GetResourceCountSummary(10)

7. Callback Mechanism

  • A callback system is integrated, enabling custom function invocation upon operation completion.
  • If a registered callback returns 'true', subsequent operations continue; returning 'false' halts the chain.
  • Supported operations for callbacks include: get, list, create, update, patch, delete, exec, stream-exec, logs, watch, and doc interaction.
  • Default callback identifiers include: "kom:get", "kom:list", "kom:create", "kom:update", "kom:patch", "kom:watch", "kom:delete", "kom:pod:exec", "kom:pod:stream:exec", "kom:pod:logs", "kom:pod:port:forward", "kom:doc".
  • Execution sequence can be managed using .After("kom:get") or .Before("kom:get") to set relative ordering against built-in callbacks.
  • Callbacks can be removed using kom.DefaultCluster().Callback().Delete("kom:get").
  • A registered callback can be entirely replaced via kom.DefaultCluster().Callback().Replace("kom:get",cb).
// Register a callback function for Get resource retrieval operations
kom.DefaultCluster().Callback().Get().Register("get", cb)
// Register a callback function for List resource retrieval operations
kom.DefaultCluster().Callback().List().Register("list", cb)
// Register a callback function for Create resource initialization operations
kom.DefaultCluster().Callback().Create().Register("create", cb)
// Register a callback function for Update resource modification operations
kom.DefaultCluster().Callback().Update().Register("update", cb)
// Register a callback function for Patch resource modification operations
kom.DefaultCluster().Callback().Patch().Register("patch", cb)
// Register a callback function for Delete resource removal operations
kom.DefaultCluster().Callback().Delete().Register("delete", cb)
// Register a callback function for Watch resource observation operations
kom.DefaultCluster().Callback().Watch().Register("watch",cb)
// Register a callback function for Exec command execution within Pods
kom.DefaultCluster().Callback().Exec().Register("exec", cb)
// Register a callback function for Logs retrieval operations
kom.DefaultCluster().Callback().Logs().Register("logs", cb)
// Remove a previously registered callback function
kom.DefaultCluster().Callback().Get().Delete("get")
// Replace an existing callback function for Get operations
kom.DefaultCluster().Callback().Get().Replace("get", cb)
// Specify execution order: run after the built-in 'kom:get' operation completes
kom.DefaultCluster().Callback().After("kom:get").Register("get", cb)
// Specify execution order: run before the built-in 'kom:create' operation starts
// Example 1. A pre-creation check verifies permissions; returning an error prevents subsequent creation steps.
// Example 2. Post-list filtering removes unwanted resources from the returned set (Statement.Dest) before presentation.
kom.DefaultCluster().Callback().Before("kom:create").Register("create", cb)

// Custom callback function definition
func cb(k *kom.Kubectl) error {
    stmt := k.Statement
    gvr := stmt.GVR
    ns := stmt.Namespace
    name := stmt.Name
    // Log operational details
    fmt.Printf("Get %s/%s(%s)\n", ns, name, gvr)
    fmt.Printf("Command %s/%s(%s %s)\n", ns, name, stmt.Command, stmt.Args)
    return nil
    // Returning an error stops execution of subsequent callbacks in the chain.
}

8. SQL Querying of Cluster Resources

  • The Sql() method enables querying cluster resources using standard SQL syntax, offering efficiency.
  • Table names support all registered resource types, including CRDs, provided they are registered in the cluster.
  • Common table names include: pod, deployment, service, ingress, pvc, pv, node, namespace, secret, configmap, etc., covering nearly all types.
  • Currently, the query field selection is limited to the wildcard (*), meaning only select * is supported.
  • Supported conditional operators include: =, !=, >=, <=, <>, like, in, not in, and, or, between.
  • Sorting is restricted to a single field, defaulting to descending order by creation timestamp.

Querying Native Kubernetes Resources

    sql := "select * from deploy where metadata.namespace='kube-system' or metadata.namespace='default' order by  metadata.creationTimestamp asc   "

    var list []v1.Deployment
    err := kom.DefaultCluster().Sql(sql).List(&list).Error
    for _, d := range list {
        fmt.Printf("List Items foreach %s,%s at %s \n", d.GetNamespace(), d.GetName(), d.GetCreationTimestamp())
    }

Querying CRD Resources

    // 'vm' represents a CRD resource type from Kubevirt, for example.
    sql := "select * from vm where (metadata.namespace='kube-system' or metadata.namespace='default' )  "
    var list []unstructured.Unstructured
    err := kom.DefaultCluster().Sql(sql).List(&list).Error
    for _, d := range list {
        fmt.Printf("List Items foreach %s,%s\n", d.GetNamespace(), d.GetName())
    }

Chained SQL Query Invocation

// Query a list of Pods using chained methods
er := kom.DefaultCluster().From("pod").
        Where("metadata.namespace = ?  or metadata.namespace= ? ", "kube-system", "default").
        Order("metadata.creationTimestamp desc").
        List(&list).Error

Support for Nested List Attributes in SQL

// Query based on properties within nested lists, such as container ports names.
sql := "select * from pod where spec.containers.ports.name like '%k8m%'  " // Example targets port names containing 'k8m'
var list []v1.Pod
err := kom.DefaultCluster().Sql(sql).List(&list).Error
for _, d := range list {
    t.Logf("List Items foreach %s,%s\n", d.GetNamespace(), d.GetName())
}

9. Additional Utility Operations

Deployment Restart

err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Restart()

Deployment Scaling

// Set the replica count for the Deployment named 'nginx' to 3
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Scaler().Scale(3)

Deployment Suspension

// Set the replica count for the Deployment named 'nginx' to zero.
// The current running replica count is preserved in an annotation.
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Scaler().Stop()

Deployment Restoration

// Restore the Deployment replica count from the recorded annotation; defaults to 1 if annotation is missing.
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Scaler().Restore()

Deployment Image Tag Update

// Update the container image tag in the Deployment 'nginx' for the container named 'main' to version '20241124'
err = kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Deployment().ReplaceImageTag("main","20241124")

Deployment Rollout History

// Query the revision history for the Deployment named 'nginx'
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().History()

Deployment Rollout Undo

// Execute a general rollback for the Deployment named 'nginx'
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Undo()
// Rollback the Deployment 'nginx' to a specific revision version (obtained from History query)
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Undo("6")

Deployment Rollout Pause

// Halt the ongoing Deployment update process
err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Pause()

Deployment Rollout Resume

// Resume a previously suspended Deployment update process
err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Resume()

Deployment Rollout Status

// Check the current status of the Deployment 'nginx' update process
result, err := kom.DefaultCluster().Resource(&Deployment{}).Namespace("default").Name("nginx").Ctl().Rollout().Status()

Deployment HPA Information

// Display associated Horizontal Pod Autoscalers for the 'nginx-web' Deployment 
list, err := kom.DefaultCluster().Resource(&v1.Deployment{}).Namespace("default").Name("nginx-web").Ctl().Deployment().HPAList()
for _, item := range list {
    t.Logf("HPA %s\n", item.Name)
}

Applying Node Taints

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().Taint("dedicated=special-user:NoSchedule")

Removing Node Taints

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().UnTaint("dedicated=special-user:NoSchedule")

Cordoning a Node

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().Cordon()

Uncordoning a Node

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().UnCordon()

Draining a Node

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Node().Drain()

Querying Node IP Resource Utilization

Cache time can be set to minimize repeated API calls.

nodeName := "lima-rancher-desktop"
total, used, available := kom.DefaultCluster().Resource(&corev1.Node{}).WithCache(5 * time.Second).Name(nodeName).Ctl().Node().IPUsage()
fmt.Printf("Total %d, Used %d, Available %d\n", total, used, available)
//Example Output: Total 256, Used 6, Available 250

Querying Node Pod Capacity Metrics

Cache time can be set to minimize repeated API calls.

nodeName := "lima-rancher-desktop"
total, used, available := kom.DefaultCluster().Resource(&corev1.Node{}).WithCache(5 * time.Second).Name(nodeName).Ctl().Node().PodCount()
fmt.Printf("Total %d, Used %d, Available %d\n", total, used, available)
//Example Output: Total 110, Used 9, Available 101

Summarizing Node Resource Usage

Cache time can be set to minimize repeated API calls.

nodeName := "lima-rancher-desktop"
usage := kom.DefaultCluster().Resource(&corev1.Node{}).WithCache(5 * time.Second).Name(nodeName).Ctl().Node().ResourceUsage()
fmt.Printf("Node Usage %s\n", utils.ToJSON(usage))

Includes current request values, limits, allocatable amounts, and usage fractions.

{
  "requests": {
    "cpu": "200m",
    "memory": "140Mi"
  },
  "limits": {
    "memory": "170Mi"
  },
  "allocatable": {
    "cpu": "4",
    "ephemeral-storage": "99833802265",
    "hugepages-1Gi": "0",
    "hugepages-2Mi": "0",
    "hugepages-32Mi": "0",
    "hugepages-64Ki": "0",
    "memory": "8127096Ki",
    "pods": "110"
  },
  "usageFractions": {
    "cpu": {
      "requestFraction": 5,
      "limitFraction": 0
    },
    "ephemeral-storage": {
      "requestFraction": 0,
      "limitFraction": 0
    },
    "memory": {
      "requestFraction": 1.76397571777176,
      "limitFraction": 2.1419705144371375
    }
  }
}

Adding Labels to Resources

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Label("name=zhangsan")

Removing Labels from Resources

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Label("name-")

Adding Annotations to Resources

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Annotate("name=zhangsan")

Removing Annotations from Resources

err = kom.DefaultCluster().Resource(&Node{}).Name("kind-control-plane").Ctl().Annotate("name-")

Creating a Node Shell

ns, pod, container, err  := kom.DefaultCluster().Resource(&v1.Node{}).Name("kind-control-plane").Ctl().Node().CreateNodeShell()
fmt.Printf("Node Shell ns=%s podName=%s containerName=%s", ns, pod, container)

Creating a Kubectl Shell

ns, pod, container, err := kom.DefaultCluster().Resource(&v1.Node{}).Name(name).Ctl().Node().CreateKubectlShell(kubeconfig)
fmt.Printf("Kubectl Shell ns=%s podName=%s containerName=%s", ns, pod, container)

Counting PVCs per StorageClass

count, err := kom.DefaultCluster().Resource(&v1.StorageClass{}).Name("hostpath").Ctl().StorageClass().PVCCount()
fmt.Printf("pvc count %d\n", count)

Counting PVs per StorageClass

count, err := kom.DefaultCluster().Resource(&v1.StorageClass{}).Name("hostpath").Ctl().StorageClass().PVCount()
fmt.Printf("pv count %d\n", count)

Setting StorageClass as Default

err := kom.DefaultCluster().Resource(&v1.StorageClass{}).Name("hostpath").Ctl().StorageClass().SetDefault()

Setting IngressClass as Default

err := kom.DefaultCluster().Resource(&v1.IngressClass{}).Name("nginx").Ctl().IngressClass().SetDefault()

Listing Pods Managed by Workload Controllers

list, err := kom.DefaultCluster().Namespace("default").Name("managed-pods").Ctl().Deployment().ManagedPods()
for _, pod := range list {
    fmt.Printf("ManagedPod: %v", pod.Name)
}

Obtaining the Aggregate Label Set from All Nodes

// labels variable type is map[string]string
labels, err := kom.DefaultCluster().Resource(&v1.Node{}).Ctl().Node().AllNodeLabels()
fmt.Printf("%s", utils.ToJSON(labels))
{
          "beta.kubernetes.io/arch": "arm64",
          "beta.kubernetes.io/os": "linux",
          "kubernetes.io/arch": "arm64",
          "kubernetes.io/hostname": "kind-control-plane",
          "kubernetes.io/os": "linux",
          "kubernetes.io/role": "agent",
          "node-role.kubernetes.io/agent": "",
          "node-role.kubernetes.io/control-plane": "",
          "type": "kwok",
          "uat": "test",
          "x": "x"
}

Viewing Pod Resource Consumption Rates

podName := "coredns-ccb96694c-jprpf"
ns := "kube-system"
usage := kom.DefaultCluster().Resource(&corev1.Pod{}).Name(podName).Namespace(ns).Ctl().Pod().ResourceUsage()
fmt.Printf("Pod Usage %s\n", utils.ToJSON(usage))

Includes current request values, limits, allocatable amounts, and usage fractions.

{
  "requests": {
    "cpu": "100m",
    "memory": "70Mi"
  },
  "limits": {
    "memory": "170Mi"
  },
  "allocatable": {
    "cpu": "4",
    "ephemeral-storage": "99833802265",
    "hugepages-1Gi": "0",
    "hugepages-2Mi": "0",
    "hugepages-32Mi": "0",
    "hugepages-64Ki": "0",
    "memory": "8127096Ki",
    "pods": "110"
  },
  "usageFractions": {
    "cpu": {
      "requestFraction": 2.5,
      "limitFraction": 0
    },
    "memory": {
      "requestFraction": 0.88198785888588,
      "limitFraction": 2.1419705144371375
    }
  }
}

Obtaining Field Documentation Explanations

var docResult []byte
    item := v1.Deployment{}
    field := "spec.replicas"
    field = "spec.template.spec.containers.name"
    field = "spec.template.spec.containers.imagePullPolicy"
    field = "spec.template.spec.containers.livenessProbe.successThreshold"
    err := kom.DefaultCluster().
        Resource(&item).DocField(field).Doc(&docResult).Error
    fmt.Printf("Get Deployment Doc [%s] :%s", field, string(docResult))
  • Task dependency management in complex systems.
  • Resource allocation and scheduling in distributed computing.
  • Iterative refinement and project completion criteria.
  • Configuration management using declarative specifications.
  • Inter-process communication paradigms (stdio, SSE).

Extra Details

This utility abstracts away repetitive Kubernetes client interactions, allowing system architects to focus on resource relationships and orchestration flow, much like focusing on critical task dependencies rather than micro-assignments. The ability to query resources via SQL provides a significant abstraction layer, simplifying data extraction that might otherwise require complex Go client interactions or manual aggregation.

Conclusion

By consolidating numerous operational tasks into a cohesive, scriptable framework, this tool enhances the coordination required to maintain healthy Kubernetes environments. Effective resource control, facilitated by these integrated capabilities, ensures project goals—in this context, cluster stability and performance—are met without unnecessary friction arising from task ambiguity or poorly linked execution steps.

See Also

`