kcf/main.py
2023-03-25 14:23:04 +01:00

131 lines
5.8 KiB
Python
Executable File

#!/usr/bin/env python3
# build-in modules
import argparse
import getpass
import os
from io import StringIO
# opt-in modules
import paramiko
import yaml
from yaml.loader import SafeLoader
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import ed25519, dsa, rsa, ec
from scp import SCPClient
from paramiko import RSAKey, Ed25519Key, ECDSAKey, DSSKey, PKey, SSHClient
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", dest="hostname", required=True, type=str, help="SSH-Server")
parser.add_argument("--port", dest="port", default=22, type=int, help="SSH-Port")
parser.add_argument("--username", dest="username", default=getpass.getuser(), type=str, help="Remote Unix User")
parser.add_argument("--password", dest="password", default=None, type=str, help="Remote password")
parser.add_argument("--identity-file", dest="identity_file", default=None, type=str, help="Path to private SSH-Key")
parser.add_argument("--identity-passphrase", dest="identity_passphrase", default=None, type=str, help="Passphrase of the SSH-Key")
parser.add_argument("--remote-config", dest="remote_config", default="~/.kube/config", type=str, help="Remote kubectl config")
parser.add_argument("--remote-cluster-name", dest="remote_cluster_name", default="kubernetes", type=str, help="Name of the cluster")
parser.add_argument("--remote-user-name", dest="remote_user_name", default="kubernetes-admin", type=str, help="Name of the user")
parser.add_argument("--local-config", dest="local_config", default="~/.kube/config", type=str, help="Local kubectl config")
parser.add_argument("--local-cluster-name", dest="local_cluster_name", default="kubernetes", type=str, help="Name of the cluster")
parser.add_argument("--local-user-name", dest="local_user_name", default="kubernetes-admin", type=str, help="Name of the user")
args = parser.parse_args()
ssh_client = create_ssh_client(args.hostname, args.port, args.username, args.identity_file, args.identity_passphrase)
scp_client = SCPClient(ssh_client.get_transport())
scp_client.get(args.remote_config, local_path="/tmp/admin.config")
with open("/tmp/admin.config", 'r', encoding="UTF-8") as f:
remote_config = yaml.load(f, Loader=SafeLoader)
with open(args.local_config, 'r', encoding="UTF-8") as f:
local_config = yaml.load(f, Loader=SafeLoader)
# Remove old objects
local_config["clusters"] = list(filter(lambda cluster: cluster["name"] != args.local_cluster_name, local_config["clusters"]))
local_config["users"] = list(filter(lambda user: user["name"] != args.local_user_name, local_config["users"]))
# Add new objects
local_config["clusters"] = local_config["clusters"] + list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"]))
local_config["users"] = local_config["users"] + list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"]))
# Rename object attributes
for cluster in list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"])):
cluster["name"] = args.local_cluster_name
for user in list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"])):
user["name"] = args.local_user_name
# Store new local_config
with open(args.local_config, 'w', encoding="UTF-8") as f:
yaml.dump(local_config, f)
def create_ssh_client(hostname: str, port: str, username: str, identity_file: str, identity_passphrase: str) -> SSHClient:
'''
create_ssh_client returns based on passed arguments an SSHClient for example
to establish a connection.
'''
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_private_key = PKey
if identity_file != None and os.path.isfile(identity_file):
ssh_private_key_file_obj = open(file=identity_file, mode="r", encoding="UTF-8")
ssh_private_key = from_private_key(ssh_private_key_file_obj, identity_passphrase)
ssh_client.connect(username=username, hostname = hostname, port = port, pkey=ssh_private_key, compress=True)
return ssh_client
def from_private_key(file_obj, password=None) -> PKey:
'''
from_private_key returns a private key object based on a passed file and an
optional password
'''
private_key = None
file_bytes = bytes(file_obj.read(), "utf-8")
try:
key = crypto_serialization.load_ssh_private_key(
file_bytes,
password=password,
)
file_obj.seek(0)
except ValueError:
key = crypto_serialization.load_pem_private_key(
file_bytes,
password=password,
)
if password:
encryption_algorithm = crypto_serialization.BestAvailableEncryption(
password
)
else:
encryption_algorithm = crypto_serialization.NoEncryption()
file_obj = StringIO(
key.private_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PrivateFormat.OpenSSH,
encryption_algorithm,
).decode("utf-8")
)
if isinstance(key, rsa.RSAPrivateKey):
private_key = RSAKey.from_private_key(file_obj, password)
elif isinstance(key, ed25519.Ed25519PrivateKey):
private_key = Ed25519Key.from_private_key(file_obj, password)
elif isinstance(key, ec.EllipticCurvePrivateKey):
private_key = ECDSAKey.from_private_key(file_obj, password)
elif isinstance(key, dsa.DSAPrivateKey):
private_key = DSSKey.from_private_key(file_obj, password)
else:
raise TypeError
return private_key
if __name__ == "__main__":
main()