2023-02-23 13:26:07 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
2023-02-23 14:27:39 +00:00
|
|
|
# build-in modules
|
2023-02-23 13:26:07 +00:00
|
|
|
import argparse
|
2023-02-23 14:27:39 +00:00
|
|
|
import getpass
|
2023-02-23 13:26:07 +00:00
|
|
|
import os
|
2023-02-23 14:27:39 +00:00
|
|
|
from io import StringIO
|
|
|
|
|
|
|
|
# opt-in modules
|
|
|
|
import paramiko
|
2023-02-23 13:26:07 +00:00
|
|
|
import yaml
|
|
|
|
from yaml.loader import SafeLoader
|
|
|
|
from cryptography.hazmat.primitives import serialization as crypto_serialization
|
|
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519, dsa, rsa, ec
|
|
|
|
from scp import SCPClient
|
|
|
|
from paramiko import RSAKey, Ed25519Key, ECDSAKey, DSSKey, PKey, SSHClient
|
2023-02-23 14:27:39 +00:00
|
|
|
|
2023-02-23 13:26:07 +00:00
|
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("--host", dest="hostname", required=True, type=str, help="SSH-Server")
|
|
|
|
parser.add_argument("--port", dest="port", default=22, type=int, help="SSH-Port")
|
2023-02-23 14:26:18 +00:00
|
|
|
parser.add_argument("--username", dest="username", default=getpass.getuser(), type=str, help="Remote Unix User")
|
2023-02-23 13:26:07 +00:00
|
|
|
parser.add_argument("--password", dest="password", default=None, type=str, help="Remote password")
|
|
|
|
parser.add_argument("--identity-file", dest="identity_file", default=None, type=str, help="Path to private SSH-Key")
|
|
|
|
parser.add_argument("--identity-passphrase", dest="identity_passphrase", default=None, type=str, help="Passphrase of the SSH-Key")
|
|
|
|
parser.add_argument("--remote-config", dest="remote_config", default="~/.kube/config", type=str, help="Remote kubectl config")
|
2023-02-23 13:33:04 +00:00
|
|
|
parser.add_argument("--remote-cluster-name", dest="remote_cluster_name", default="kubernetes", type=str, help="Name of the cluster")
|
|
|
|
parser.add_argument("--remote-user-name", dest="remote_user_name", default="kubernetes-admin", type=str, help="Name of the user")
|
2023-02-23 13:26:07 +00:00
|
|
|
parser.add_argument("--local-config", dest="local_config", default="~/.kube/config", type=str, help="Local kubectl config")
|
|
|
|
parser.add_argument("--local-cluster-name", dest="local_cluster_name", default="kubernetes", type=str, help="Name of the cluster")
|
|
|
|
parser.add_argument("--local-user-name", dest="local_user_name", default="kubernetes-admin", type=str, help="Name of the user")
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
ssh_client = create_ssh_client(args.hostname, args.port, args.username, args.identity_file, args.identity_passphrase)
|
|
|
|
scp_client = SCPClient(ssh_client.get_transport())
|
|
|
|
|
|
|
|
scp_client.get(args.remote_config, local_path="/tmp/admin.config")
|
|
|
|
|
|
|
|
with open("/tmp/admin.config", 'r', encoding="UTF-8") as f:
|
|
|
|
remote_config = yaml.load(f, Loader=SafeLoader)
|
|
|
|
|
|
|
|
with open(args.local_config, 'r', encoding="UTF-8") as f:
|
|
|
|
local_config = yaml.load(f, Loader=SafeLoader)
|
|
|
|
|
|
|
|
|
|
|
|
# Remove old objects
|
|
|
|
local_config["clusters"] = list(filter(lambda cluster: cluster["name"] != args.local_cluster_name, local_config["clusters"]))
|
|
|
|
local_config["users"] = list(filter(lambda user: user["name"] != args.local_user_name, local_config["users"]))
|
|
|
|
|
|
|
|
# Add new objects
|
|
|
|
local_config["clusters"] = local_config["clusters"] + list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"]))
|
|
|
|
local_config["users"] = local_config["users"] + list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"]))
|
|
|
|
|
|
|
|
# Rename object attributes
|
|
|
|
for cluster in list(filter(lambda cluster: cluster["name"] == args.remote_cluster_name, remote_config["clusters"])):
|
|
|
|
cluster["name"] = args.local_cluster_name
|
|
|
|
|
|
|
|
for user in list(filter(lambda user: user["name"] == args.remote_user_name, remote_config["users"])):
|
|
|
|
user["name"] = args.local_user_name
|
|
|
|
|
|
|
|
# Store new local_config
|
|
|
|
with open(args.local_config, 'w', encoding="UTF-8") as f:
|
|
|
|
yaml.dump(local_config, f)
|
|
|
|
|
|
|
|
def create_ssh_client(hostname: str, port: str, username: str, identity_file: str, identity_passphrase: str) -> SSHClient:
|
2023-03-25 13:23:04 +00:00
|
|
|
'''
|
|
|
|
create_ssh_client returns based on passed arguments an SSHClient for example
|
|
|
|
to establish a connection.
|
|
|
|
'''
|
2023-02-23 13:26:07 +00:00
|
|
|
ssh_client = paramiko.SSHClient()
|
|
|
|
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
|
|
|
|
ssh_private_key = PKey
|
|
|
|
|
|
|
|
if identity_file != None and os.path.isfile(identity_file):
|
|
|
|
ssh_private_key_file_obj = open(file=identity_file, mode="r", encoding="UTF-8")
|
|
|
|
ssh_private_key = from_private_key(ssh_private_key_file_obj, identity_passphrase)
|
|
|
|
|
|
|
|
ssh_client.connect(username=username, hostname = hostname, port = port, pkey=ssh_private_key, compress=True)
|
|
|
|
|
|
|
|
return ssh_client
|
|
|
|
|
|
|
|
def from_private_key(file_obj, password=None) -> PKey:
|
2023-03-25 13:23:04 +00:00
|
|
|
'''
|
|
|
|
from_private_key returns a private key object based on a passed file and an
|
|
|
|
optional password
|
|
|
|
'''
|
|
|
|
|
2023-02-23 13:26:07 +00:00
|
|
|
private_key = None
|
|
|
|
file_bytes = bytes(file_obj.read(), "utf-8")
|
|
|
|
try:
|
|
|
|
key = crypto_serialization.load_ssh_private_key(
|
|
|
|
file_bytes,
|
|
|
|
password=password,
|
|
|
|
)
|
|
|
|
file_obj.seek(0)
|
|
|
|
except ValueError:
|
|
|
|
key = crypto_serialization.load_pem_private_key(
|
|
|
|
file_bytes,
|
|
|
|
password=password,
|
|
|
|
)
|
|
|
|
if password:
|
|
|
|
encryption_algorithm = crypto_serialization.BestAvailableEncryption(
|
|
|
|
password
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
encryption_algorithm = crypto_serialization.NoEncryption()
|
|
|
|
|
|
|
|
file_obj = StringIO(
|
|
|
|
key.private_bytes(
|
|
|
|
crypto_serialization.Encoding.PEM,
|
|
|
|
crypto_serialization.PrivateFormat.OpenSSH,
|
|
|
|
encryption_algorithm,
|
|
|
|
).decode("utf-8")
|
|
|
|
)
|
|
|
|
if isinstance(key, rsa.RSAPrivateKey):
|
|
|
|
private_key = RSAKey.from_private_key(file_obj, password)
|
|
|
|
elif isinstance(key, ed25519.Ed25519PrivateKey):
|
|
|
|
private_key = Ed25519Key.from_private_key(file_obj, password)
|
|
|
|
elif isinstance(key, ec.EllipticCurvePrivateKey):
|
|
|
|
private_key = ECDSAKey.from_private_key(file_obj, password)
|
|
|
|
elif isinstance(key, dsa.DSAPrivateKey):
|
|
|
|
private_key = DSSKey.from_private_key(file_obj, password)
|
|
|
|
else:
|
|
|
|
raise TypeError
|
|
|
|
return private_key
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|