Initial Commit

This commit is contained in:
Markus Pesch 2023-02-23 14:26:07 +01:00
commit 0cb818404a
Signed by: volker.raschek
GPG Key ID: 852BCC170D81A982
4 changed files with 167 additions and 0 deletions

12
.editorconfig Normal file
View File

@ -0,0 +1,12 @@
# EditorConfig is awesome: https://EditorConfig.org
# top-most EditorConfig file
root = true
[*]
indent_style = space
indent_size = 4
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = false

4
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,4 @@
{
"python.linting.pylintEnabled": true,
"python.linting.enabled": true
}

34
README.md Normal file
View File

@ -0,0 +1,34 @@
# kcf - kubectl fetcher
A small python script to merge a remote kubectl config with a local stored
configuration file. It supports cli flags. The following
```bash
$ ./main.py -h
usage: main.py [-h] --host HOSTNAME [--port PORT] [--username USERNAME] [--password PASSWORD] [--identity-file IDENTITY_FILE] [--identity-passphrase IDENTITY_PASSPHRASE]
[--remote-config REMOTE_CONFIG] [--remove-cluster-name REMOTE_CLUSTER_NAME] [--remove-user-name REMOTE_USER_NAME] [--local-config LOCAL_CONFIG]
[--local-cluster-name LOCAL_CLUSTER_NAME] [--local-user-name LOCAL_USER_NAME]
options:
-h, --help show this help message and exit
--host HOSTNAME SSH-Server
--port PORT SSH-Port
--username USERNAME Remote Unix User
--password PASSWORD Remote password
--identity-file IDENTITY_FILE
Path to private SSH-Key
--identity-passphrase IDENTITY_PASSPHRASE
Passphrase of the SSH-Key
--remote-config REMOTE_CONFIG
Remote kubectl config
--remove-cluster-name REMOTE_CLUSTER_NAME
Name of the cluster
--remove-user-name REMOTE_USER_NAME
Name of the user
--local-config LOCAL_CONFIG
Local kubectl config
--local-cluster-name LOCAL_CLUSTER_NAME
Name of the cluster
--local-user-name LOCAL_USER_NAME
Name of the user
```

117
main.py Executable file
View File

@ -0,0 +1,117 @@
#!/usr/bin/env python3
import argparse
import paramiko
import os
import yaml
from yaml.loader import SafeLoader
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import ed25519, dsa, rsa, ec
from io import StringIO
from scp import SCPClient
from paramiko import RSAKey, Ed25519Key, ECDSAKey, DSSKey, PKey, SSHClient
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", dest="hostname", required=True, type=str, help="SSH-Server")
parser.add_argument("--port", dest="port", default=22, type=int, help="SSH-Port")
parser.add_argument("--username", dest="username", default=os.getlogin(), type=str, help="Remote Unix User")
parser.add_argument("--password", dest="password", default=None, type=str, help="Remote password")
parser.add_argument("--identity-file", dest="identity_file", default=None, type=str, help="Path to private SSH-Key")
parser.add_argument("--identity-passphrase", dest="identity_passphrase", default=None, type=str, help="Passphrase of the SSH-Key")
parser.add_argument("--remote-config", dest="remote_config", default="~/.kube/config", type=str, help="Remote kubectl config")
parser.add_argument("--remove-cluster-name", dest="remote_cluster_name", default="kubernetes", type=str, help="Name of the cluster")
parser.add_argument("--remove-user-name", dest="remote_user_name", default="kubernetes-admin", type=str, help="Name of the user")
parser.add_argument("--local-config", dest="local_config", default="~/.kube/config", type=str, help="Local kubectl config")
parser.add_argument("--local-cluster-name", dest="local_cluster_name", default="kubernetes", type=str, help="Name of the cluster")
parser.add_argument("--local-user-name", dest="local_user_name", default="kubernetes-admin", type=str, help="Name of the user")
args = parser.parse_args()
ssh_client = create_ssh_client(args.hostname, args.port, args.username, args.identity_file, args.identity_passphrase)
scp_client = SCPClient(ssh_client.get_transport())
scp_client.get(args.remote_config, local_path="/tmp/admin.config")
with open("/tmp/admin.config", 'r', encoding="UTF-8") as f:
remote_config = yaml.load(f, Loader=SafeLoader)
with open(args.local_config, 'r', encoding="UTF-8") as f:
local_config = yaml.load(f, Loader=SafeLoader)
# Remove old objects
local_config["clusters"] = list(filter(lambda cluster: cluster["name"] != args.local_cluster_name, local_config["clusters"]))
local_config["users"] = list(filter(lambda user: user["name"] != args.local_user_name, local_config["users"]))
# Add new objects
local_config["clusters"] = local_config["clusters"] + list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"]))
local_config["users"] = local_config["users"] + list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"]))
# Rename object attributes
for cluster in list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"])):
cluster["name"] = args.local_cluster_name
for user in list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"])):
user["name"] = args.local_user_name
# Store new local_config
with open(args.local_config, 'w', encoding="UTF-8") as f:
yaml.dump(local_config, f)
def create_ssh_client(hostname: str, port: str, username: str, identity_file: str, identity_passphrase: str) -> SSHClient:
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_private_key = PKey
if identity_file != None and os.path.isfile(identity_file):
ssh_private_key_file_obj = open(file=identity_file, mode="r", encoding="UTF-8")
ssh_private_key = from_private_key(ssh_private_key_file_obj, identity_passphrase)
ssh_client.connect(username=username, hostname = hostname, port = port, pkey=ssh_private_key, compress=True)
return ssh_client
def from_private_key(file_obj, password=None) -> PKey:
private_key = None
file_bytes = bytes(file_obj.read(), "utf-8")
try:
key = crypto_serialization.load_ssh_private_key(
file_bytes,
password=password,
)
file_obj.seek(0)
except ValueError:
key = crypto_serialization.load_pem_private_key(
file_bytes,
password=password,
)
if password:
encryption_algorithm = crypto_serialization.BestAvailableEncryption(
password
)
else:
encryption_algorithm = crypto_serialization.NoEncryption()
file_obj = StringIO(
key.private_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PrivateFormat.OpenSSH,
encryption_algorithm,
).decode("utf-8")
)
if isinstance(key, rsa.RSAPrivateKey):
private_key = RSAKey.from_private_key(file_obj, password)
elif isinstance(key, ed25519.Ed25519PrivateKey):
private_key = Ed25519Key.from_private_key(file_obj, password)
elif isinstance(key, ec.EllipticCurvePrivateKey):
private_key = ECDSAKey.from_private_key(file_obj, password)
elif isinstance(key, dsa.DSAPrivateKey):
private_key = DSSKey.from_private_key(file_obj, password)
else:
raise TypeError
return private_key
if __name__ == "__main__":
main()