#!/usr/bin/env python3 # build-in modules import argparse import getpass import os from io import StringIO # opt-in modules import paramiko import yaml from yaml.loader import SafeLoader from cryptography.hazmat.primitives import serialization as crypto_serialization from cryptography.hazmat.primitives.asymmetric import ed25519, dsa, rsa, ec from scp import SCPClient from paramiko import RSAKey, Ed25519Key, ECDSAKey, DSSKey, PKey, SSHClient def main(): parser = argparse.ArgumentParser() parser.add_argument("--host", dest="hostname", required=True, type=str, help="SSH-Server") parser.add_argument("--port", dest="port", default=22, type=int, help="SSH-Port") parser.add_argument("--username", dest="username", default=getpass.getuser(), type=str, help="Remote Unix User") parser.add_argument("--password", dest="password", default=None, type=str, help="Remote password") parser.add_argument("--identity-file", dest="identity_file", default=None, type=str, help="Path to private SSH-Key") parser.add_argument("--identity-passphrase", dest="identity_passphrase", default=None, type=str, help="Passphrase of the SSH-Key") parser.add_argument("--remote-config", dest="remote_config", default="~/.kube/config", type=str, help="Remote kubectl config") parser.add_argument("--remote-cluster-name", dest="remote_cluster_name", default="kubernetes", type=str, help="Name of the cluster") parser.add_argument("--remote-user-name", dest="remote_user_name", default="kubernetes-admin", type=str, help="Name of the user") parser.add_argument("--local-config", dest="local_config", default="~/.kube/config", type=str, help="Local kubectl config") parser.add_argument("--local-cluster-name", dest="local_cluster_name", default="kubernetes", type=str, help="Name of the cluster") parser.add_argument("--local-user-name", dest="local_user_name", default="kubernetes-admin", type=str, help="Name of the user") args = parser.parse_args() ssh_client = create_ssh_client(args.hostname, args.port, args.username, args.identity_file, args.identity_passphrase) scp_client = SCPClient(ssh_client.get_transport()) scp_client.get(args.remote_config, local_path="/tmp/admin.config") with open("/tmp/admin.config", 'r', encoding="UTF-8") as f: remote_config = yaml.load(f, Loader=SafeLoader) with open(args.local_config, 'r', encoding="UTF-8") as f: local_config = yaml.load(f, Loader=SafeLoader) # Remove old objects local_config["clusters"] = list(filter(lambda cluster: cluster["name"] != args.local_cluster_name, local_config["clusters"])) local_config["users"] = list(filter(lambda user: user["name"] != args.local_user_name, local_config["users"])) # Add new objects local_config["clusters"] = local_config["clusters"] + list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"])) local_config["users"] = local_config["users"] + list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"])) # Rename object attributes for cluster in list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"])): cluster["name"] = args.local_cluster_name for user in list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"])): user["name"] = args.local_user_name # Store new local_config with open(args.local_config, 'w', encoding="UTF-8") as f: yaml.dump(local_config, f) def create_ssh_client(hostname: str, port: str, username: str, identity_file: str, identity_passphrase: str) -> SSHClient: ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_private_key = PKey if identity_file != None and os.path.isfile(identity_file): ssh_private_key_file_obj = open(file=identity_file, mode="r", encoding="UTF-8") ssh_private_key = from_private_key(ssh_private_key_file_obj, identity_passphrase) ssh_client.connect(username=username, hostname = hostname, port = port, pkey=ssh_private_key, compress=True) return ssh_client def from_private_key(file_obj, password=None) -> PKey: private_key = None file_bytes = bytes(file_obj.read(), "utf-8") try: key = crypto_serialization.load_ssh_private_key( file_bytes, password=password, ) file_obj.seek(0) except ValueError: key = crypto_serialization.load_pem_private_key( file_bytes, password=password, ) if password: encryption_algorithm = crypto_serialization.BestAvailableEncryption( password ) else: encryption_algorithm = crypto_serialization.NoEncryption() file_obj = StringIO( key.private_bytes( crypto_serialization.Encoding.PEM, crypto_serialization.PrivateFormat.OpenSSH, encryption_algorithm, ).decode("utf-8") ) if isinstance(key, rsa.RSAPrivateKey): private_key = RSAKey.from_private_key(file_obj, password) elif isinstance(key, ed25519.Ed25519PrivateKey): private_key = Ed25519Key.from_private_key(file_obj, password) elif isinstance(key, ec.EllipticCurvePrivateKey): private_key = ECDSAKey.from_private_key(file_obj, password) elif isinstance(key, dsa.DSAPrivateKey): private_key = DSSKey.from_private_key(file_obj, password) else: raise TypeError return private_key if __name__ == "__main__": main()