如何用Paramiko和多线程让SSH远程命令飞一般执行?

点击蓝字


RECRUIT

关注我们

大家好!我是一个热衷于分享IT技术的up主。在这个公众号里,我将为大家带来最新、最实用的技术干货,从编程语言到前沿科技,从软件开发到网络安全。希望通过我的分享,能够帮助更多的小伙伴提升技术水平,共同成长!欢迎关注,一起探索科技的魅力吧!

在现代网络环境中,远程管理服务器和设备是系统管理员和开发人员的常见任务。SSH(Secure Shell)是一种广泛使用的协议,允许用户安全地连接到远程设备并执行命令。在本文中,我们将介绍如何使用 Python 的 Paramiko 库来封装 SSH 远程登录设备,并通过多线程优化批量命令执行的效率。

Paramiko库简介

Paramiko 是一个用于处理 SSH 连接的 Python 库。它提供了丰富的功能,包括 SSH 登录、文件传输、远程命令执行等。使用 Paramiko,可以方便地编写 Python 脚本来管理远程服务器。关于Paramiko这个库的更多用法可以查阅官网文档[1]

基本的 SSH 客户端类

首先,我们创建一个基本的 SSH 客户端类,封装 SSH 连接和命令执行的功能。以下是一个示例代码:

import paramiko
from paramiko import SSHException, AuthenticationException, BadHostKeyException
import logging

# 设置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SSHClient:
    def __init__(self, hostname, username, password, port=22, timeout=10):
        self.hostname = hostname
        self.username = username
        self.password = password
        self.port = port
        self.timeout = timeout
        self.client = None

    def connect(self):
        try:
            self.client = paramiko.SSHClient()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.client.connect(
                hostname=self.hostname,
                username=self.username,
                password=self.password,
                port=self.port,
                timeout=self.timeout
            )
            logger.info(f"Successfully connected to {self.hostname}")
        except AuthenticationException:
            logger.error("Authentication failed, please verify your credentials")
        except SSHException as sshException:
            logger.error(f"Unable to establish SSH connection: {sshException}")
        except BadHostKeyException as badHostKeyException:
            logger.error(f"Unable to verify server's host key: {badHostKeyException}")
        except Exception as e:
            logger.error(f"Exception in connecting to the server: {e}")

    def execute_command(self, command):
        if self.client is None:
            logger.error("SSH client is not connected")
            return None

        try:
            stdin, stdout, stderr = self.client.exec_command(command)
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')
            if error:
                logger.error(f"Error executing command: {error}")
                return error
            logger.info(f"Command output: {output}")
            return output
        except Exception as e:
            logger.error(f"Exception in executing command: {e}")
            return None

    def close(self):
        if self.client:
            self.client.close()
            logger.info(f"Connection to {self.hostname} closed")

在这个类中,我们定义了连接、执行命令和关闭连接的方法,并处理了可能的异常情况。这个基本的 SSH 客户端类已经能够满足单个命令执行的需求。

批量命令执行

为了支持批量命令执行,我们可以在类中新增一个方法 execute_commands,该方法接受一个命令列表,并依次执行每个命令。以下是实现代码:

def execute_commands(self, commands):
    results = {}
    if self.client is None:
        logger.error("SSH client is not connected")
        return results

    for command in commands:
        try:
            logger.info(f"Executing command: {command}")
            stdin, stdout, stderr = self.client.exec_command(command)
            output = stdout.read().decode('utf-8')
            error = stderr.read().decode('utf-8')
            if error:
                logger.error(f"Error executing command '{command}': {error}")
                results[command] = error
            else:
                logger.info(f"Command output for '{command}': {output}")
                results[command] = output
        except Exception as e:
            logger.error(f"Exception in executing command '{command}': {e}")
            results[command] = str(e)

    return results

虽然这个方法能够按顺序执行多个命令,但效率可能不高,特别是在处理大量命令时。

引入多线程优化批量命令执行

为了提高批量命令执行的效率,我们可以使用 Python 的 concurrent.futures 模块来引入多线程。以下是优化后的代码:

from concurrent.futures import ThreadPoolExecutor, as_completed

def _execute_command_thread(self, command):
    try:
        logger.info(f"Executing command: {command}")
        stdin, stdout, stderr = self.client.exec_command(command)
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')
        if error:
            logger.error(f"Error executing command '{command}': {error}")
            return (command, error)
        logger.info(f"Command output for '{command}': {output}")
        return (command, output)
    except Exception as e:
        logger.error(f"Exception in executing command '{command}': {e}")
        return (command, str(e))

def execute_commands(self, commands, max_workers=5):
    results = {}
    if self.client is None:
        logger.error("SSH client is not connected")
        return results

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_command = {executor.submit(self._execute_command_thread, command): command for command in commands}
        for future in as_completed(future_to_command):
            command = future_to_command[future]
            try:
                cmd, result = future.result()
                results[cmd] = result
            except Exception as e:
                logger.error(f"Exception in future result for command '{command}': {e}")
                results[command] = str(e)

    return results

在这个优化后的代码中,我们使用 ThreadPoolExecutor 来管理线程池,并使用 as_completed 方法来处理并发执行的命令。这样可以显著提高批量命令执行的效率。

把执行的结果保存的文件中

为了将执行命令的结果保存到文件中,可以在 SSHClient 类中添加一个新的方法 save_results_to_file。这个方法将接受一个包含命令执行结果的字典和一个文件路径,然后将结果写入指定的文件。

def save_results_to_file(self, results, file_path):
    try:
        with open(file_path, 'w', encoding='utf-8'as file:
            for command, result in results.items():
                file.write(f"Command: {command}\nResult: {result}\n\n")
            logger.info(f"Results saved to {file_path}")
    except Exception as e:
        logger.error(f"Exception in saving results to file: {e}")

使用示例

最后,我们通过一个使用示例来展示如何使用这个优化后的 SSH 客户端类:

# 使用示例
if __name__ == "__main__":
    ssh_client = SSHClient(hostname="your_hostname", username="your_username", password="your_password")
    ssh_client.connect()
    
    # 单个命令执行示例
    ssh_client.execute_command("ls")
    
    # 批量命令执行示例
    commands = ["ls""pwd""whoami"]
    results = ssh_client.execute_commands(commands)
    for cmd, result in results.items():
        print(f"Command: {cmd}\nResult: {result}\n")
    
    # 将结果保存到文件中
    ssh_client.save_results_to_file(results, "ssh_command_results.txt")
    
    ssh_client.close()

通过这个示例,我们可以看到如何连接到远程设备、执行单个命令以及批量执行多个命令。

结论

在本文中,我们介绍了如何使用 Python 的 Paramiko 库来封装 SSH 远程登录设备,并通过多线程优化批量命令执行的效率。通过这种方式,系统管理员和开发人员可以更加高效地管理远程服务器和设备。希望本文对你在实际项目中使用 SSH 提供了一些有用的参考。

福利

本文中的脚本已经存放到gitee仓库中,该仓库存放了自己工作日常用的脚本,后续也会一直更新,有需要的小伙伴可以上去拿,创作不易,记得给star。

仓库地址:

https://gitee.com/didiplus/script.git

推荐阅读

Reference[1]

官网文档: https://docs.paramiko.org/en/latest/


相关推荐

  • 【Python】十大Python可视化工具,太强了
  • 【机器学习】机器学习分类模型决策边界,MLxtend轻松绘制!
  • 为什么现在很多人想读博了?读博有什么用?
  • 给你的 H5 页面加上惯性滚动吧!
  • 不用 JS,轻松锁定页面滚动!
  • RAG中的Query改写思路之查询-文档对齐评分优化:兼看昨日大模型进展总结回顾
  • 统计学入门:时间序列分析基础知识详解
  • 李飞飞创业:3 个月估值破 10 亿美元
  • CVPR 2024 录用数据出炉!这几个方向爆火 。。。
  • 假开源真噱头?Meta再陷「开源」争议,LeCun被炮轰Meta只是开放模型
  • 清华提出时间序列大模型:面向通用时序分析的生成式Transformer | ICML 2024
  • xAI创立未足年,创始工程师Kosic离职重返老东家OpenAI,巨头人才之战热度升级
  • 「数据墙」迫近?苹果OpenAI等巨头走投无路,被迫「偷师」YouTube视频!
  • 奥特曼深夜发动价格战,GPT-4o mini暴跌99%!清华同济校友立功,GPT-3.5退役
  • 13个漂亮的登录页面,附源代码地址
  • 30s到0.8s,记录一次接口优化成功案例!
  • 45K*16薪,这波跳槽不亏。。。
  • 大模型知识机理与编辑专场 | 7月23日 19:00直播
  • 公理训练让LLM学会因果推理:6700万参数模型比肩万亿参数级GPT-4
  • 15 年功臣、英伟达首席科学家在股价巅峰期黯然辞职:手握大笔财富,但我为我的工作感到遗憾