1、准备巡检IP文件
建一个名为 ip.txt 的文件,用于存放待巡检设备的IP地址。该文件需与Python巡检脚本位于同一目录下。
2、实现脚本
import paramiko
import os
import time
from datetime import datetime
from openai import OpenAI # 使用 OpenAI 的客户端库
# ******************** 配置区域 ********************
SSH_PORT = 22 # SSH端口号
SSH_USER = "root" # 固定SSH用户名
SSH_PASS = "" # 固定SSH密码
DEEPSEEK_API_KEY = "sk-1" # 替换为你的 DeepSeek API 密钥
DEEPSEEK_API_URL = "https://api.deepseek.com" # DeepSeek API 地址、
COMMAND_DELAY = 2 # 命令间隔时间(秒)
TIMEOUT = 20 # SSH连接超时时间(秒)
# *************************************************
# 巡检命令列表(可根据需要修改)
INSPECTION_COMMANDS = ['sh /root/servicecheck-2.0/servicecheck2.sh'] # 基础巡检
def get_timestamp():
"""获取标准时间格式"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def collect_device_info(ip):
"""
通过SSH收集设备信息
:param ip: 设备IP地址cd
:return: 采集结果字符串
"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 建立SSH连接
print(f"[{get_timestamp()}] 正在连接 {ip}...")
client.connect(ip, SSH_PORT, SSH_USER, SSH_PASS, timeout=TIMEOUT)
output = f"\n=== {ip} 巡检结果 ===\n"
# 执行巡检命令
for idx, cmd in enumerate(INSPECTION_COMMANDS):
try:
print(f"[{get_timestamp()}] 执行命令: {cmd}")
stdin, stdout, stderr = client.exec_command(cmd)
result = stdout.read().decode('utf-8', errors='ignore').strip()
output += f"[{cmd}]\n{result}\n\n"
# 非最后一个命令添加延时
if idx < len(INSPECTION_COMMANDS) - 1:
print(f"[{get_timestamp()}] 等待 {COMMAND_DELAY} 秒...")
time.sleep(COMMAND_DELAY)
except Exception as cmd_error:
output += f"命令执行失败: {str(cmd_error)}\n"
return output
except Exception as conn_error:
return f"\n=== {ip} 连接失败 ===\n错误信息: {str(conn_error)}\n"
finally:
client.close()
def generate_ai_report(content):
""" 使用 DeepSeek API 生成分析报告
:param content: 待分析内容
:return: 分析结果字符串
"""
client = OpenAI(api_key='sk-', base_url='https://api.deepseek.com')
try:
print(f"[{get_timestamp()}] 正在生成分析报告...")
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个专业的系统运维分析专家,能够根据系统巡检日志生成分析报告"},
{"role": "user", "content": f"请根据以下设备巡检日志生成分析报告:\n{content}"}
],
stream=False
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"DeepSeek API 调用异常:{str(e)}"
def main():
# 生成带时间戳的报告文件名
report_file = f"Analysis_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
try:
# # 读取IP列表
# with open('ip.txt', 'r') as f:
# ips = [line.strip() for line in f if line.strip()]
# if not ips:
# print(f"[{get_timestamp()}] 错误:ip.txt中没有找到有效IP地址")
# return
# 采集设备信息
ips = [""]
full_report = ""
for ip in ips:
print(f"\n[{get_timestamp()}] 开始巡检 {ip}")
full_report += collect_device_info(ip)
# 写入临时文件
with open('temp.txt', 'w') as f:
f.write(full_report)
# 生成分析报告
with open('temp.txt', 'r') as f:
ai_report = generate_ai_report(f.read())
# 保存最终报告
with open(report_file, 'w') as f:
f.write(f"=== 设备巡检分析报告 ===\n")
f.write(f"生成时间: {get_timestamp()}\n")
f.write(f"巡检设备: {len(ips)}台\n\n")
f.write(ai_report)
print(f"\n[{get_timestamp()}] 报告生成成功: {report_file}")
except FileNotFoundError:
print(f"[{get_timestamp()}] 错误:未找到ip.txt文件")
except Exception as e:
print(f"[{get_timestamp()}] 发生未预期错误: {str(e)}")
finally:
# 清理临时文件
time.sleep(10)
# if os.path.exists('temp.txt'):
# os.remove('temp.txt')
# print(f"[{get_timestamp()}] 已清理临时文件")
if __name__ == "__main__":
main()
三、效果