图样:
最小化时
上代码:
import json
import logging
import threading
from typing import Dict, List
import efinance as ef
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
import sys
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import queue
import requests
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('stock_monitor.log', encoding='utf-8'),
logging.StreamHandler()
]
)
class StockMonitorGUI:
def __init__(self, root):
self.root = root
self.root.title("股票监控系统")
self.root.geometry("1024x600")
# 添加最小化事件绑定
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.root.bind("<Unmap>", self.on_minimize)
self.float_window = None # 悬浮窗口
self.is_minimized = False
self.monitor = StockMonitor()
self.monitor.set_log_callback(self.add_log)
self.running = False
self.monitor_thread = None
self.log_queue = queue.Queue()
self.setup_gui()
self.update_log()
self.update_stock_table(self.get_initial_stock_data())
def setup_gui(self):
# 创建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 股票信息表格
table_frame = ttk.LabelFrame(main_frame, text="股票监控列表", padding="5")
table_frame.grid(row=0, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S))
self.tree = ttk.Treeview(table_frame, columns=('代码', '名称', '当前价格', '买入价', '卖出价', '状态'), show='headings', height=8) # 设置固定高度
self.tree.heading('代码', text='代码')
self.tree.heading('名称', text='名称')
self.tree.heading('当前价格', text='当前价格')
self.tree.heading('买入价', text='买入价')
self.tree.heading('卖出价', text='卖出价')
self.tree.heading('状态', text='状态')
# 设置列宽
self.tree.column('代码', width=80)
self.tree.column('名称', width=100)
self.tree.column('当前价格', width=80)
self.tree.column('买入价', width=80)
self.tree.column('卖出价', width=80)
self.tree.column('状态', width=100)
self.tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 添加滚动条
scrollbar = ttk.Scrollbar(table_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# 日志显示区域
log_frame = ttk.LabelFrame(main_frame, text="运行日志", padding="5")
log_frame.grid(row=1, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S))
self.log_text = scrolledtext.ScrolledText(log_frame, height=8) # 减小日志区域高度
self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 控制按钮
button_frame = ttk.Frame(main_frame, padding="5")
button_frame.grid(row=2, column=0, columnspan=3, sticky=(tk.W, tk.E))
# 使用更紧凑的按钮布局
self.start_button = ttk.Button(button_frame, text="开始监控", command=self.start_monitoring, width=10)
self.start_button.grid(row=0, column=0, padx=3)
self.stop_button = ttk.Button(button_frame, text="停止监控", command=self.stop_monitoring, state=tk.DISABLED, width=10)
self.stop_button.grid(row=0, column=1, padx=3)
self.add_button = ttk.Button(button_frame, text="添加股票", command=self.show_add_dialog, width=10)
self.add_button.grid(row=0, column=2, padx=3)
self.edit_button = ttk.Button(button_frame, text="修改股票", command=self.show_edit_dialog, width=10)
self.edit_button.grid(row=0, column=3, padx=3)
self.delete_button = ttk.Button(button_frame, text="删除股票", command=self.delete_stock, width=10)
self.delete_button.grid(row=0, column=4, padx=3)
self.email_settings_button = ttk.Button(button_frame, text="邮件设置", command=self.show_email_settings, width=10)
self.email_settings_button.grid(row=0, column=5, padx=3)
self.qq_settings_button = ttk.Button(button_frame, text="QQ设置", command=self.show_qq_settings, width=10)
self.qq_settings_button.grid(row=0, column=6, padx=3)
self.weixin_settings_button = ttk.Button(button_frame, text="微信设置", command=self.show_weixin_settings, width=10)
self.weixin_settings_button.grid(row=0, column=7, padx=3)
# 调整窗口大小
self.root.geometry("800x500") # 减小窗口高度
# 配置grid权重
self.root.grid_rowconfigure(0, weight=1)
self.root.grid_columnconfigure(0, weight=1)
main_frame.grid_rowconfigure(1, weight=1) # 让日志区域可以扩展
main_frame.grid_columnconfigure(0, weight=1)
def update_log(self):
while True:
try:
log_message = self.log_queue.get_nowait()
self.log_text.insert(tk.END, log_message + '\n')
self.log_text.see(tk.END)
except queue.Empty:
break
self.root.after(100, self.update_log)
def update_stock_table(self, stock_data):
# 清空现有数据
for item in self.tree.get_children():
self.tree.delete(item)
# 插入新数据
for stock in stock_data:
self.tree.insert('', tk.END, values=stock)
# 更新悬浮窗口
self.update_float_window(stock_data)
def start_monitoring(self):
self.add_log("启动股票监控系统...")
self.running = True
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.monitor_thread = threading.Thread(target=self.monitoring_task)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
self.running = False
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.add_log("正在停止股票监控系统...")
def monitoring_task(self):
self.add_log("开始监控股票...")
while self.running:
try:
stock_data = self.monitor.get_stock_data()
self.root.after(0, self.update_stock_table, stock_data)
time.sleep(self.monitor.check_interval)
except Exception as e:
self.add_log(f"错误: {str(e)}")
time.sleep(self.monitor.check_interval)
self.add_log("停止监控股票...")
def show_add_dialog(self):
dialog = tk.Toplevel(self.root)
dialog.title("添加股票")
dialog.geometry("300x200")
dialog.transient(self.root)
ttk.Label(dialog, text="股票代码:").grid(row=0, column=0, padx=5, pady=5)
code_entry = ttk.Entry(dialog)
code_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(dialog, text="股票名称:").grid(row=1, column=0, padx=5, pady=5)
name_entry = ttk.Entry(dialog)
name_entry.grid(row=1, column=1, padx=5, pady=5)
ttk.Label(dialog, text="买入价:").grid(row=2, column=0, padx=5, pady=5)
buy_entry = ttk.Entry(dialog)
buy_entry.grid(row=2, column=1, padx=5, pady=5)
ttk.Label(dialog, text="卖出价:").grid(row=3, column=0, padx=5, pady=5)
sell_entry = ttk.Entry(dialog)
sell_entry.grid(row=3, column=1, padx=5, pady=5)
def save_stock():
try:
new_stock = {
"code": code_entry.get(),
"name": name_entry.get(),
"buy_price": float(buy_entry.get()),
"sell_price": float(sell_entry.get())
}
if not new_stock["code"] or not new_stock["name"]:
messagebox.showerror("错误", "股票代码和名称不能为空!")
return
self.monitor.add_stock(new_stock)
dialog.destroy()
self.add_log(f"添加股票成功:{new_stock['name']}")
self.update_stock_table(self.get_initial_stock_data())
except ValueError:
messagebox.showerror("错误", "请输入有效的价格!")
ttk.Button(dialog, text="保存", command=save_stock).grid(row=4, column=0, columnspan=2, pady=20)
def show_edit_dialog(self):
selected = self.tree.selection()
if not selected:
messagebox.showwarning("提示", "请先选择要修改的股票!")
return
item = self.tree.item(selected[0])
values = item['values']
dialog = tk.Toplevel(self.root)
dialog.title("修改股票")
dialog.geometry("300x200")
dialog.transient(self.root)
ttk.Label(dialog, text="股票代码:").grid(row=0, column=0, padx=5, pady=5)
code_entry = ttk.Entry(dialog)
code_entry.insert(0, values[0])
code_entry.config(state='readonly')
code_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(dialog, text="股票名称:").grid(row=1, column=0, padx=5, pady=5)
name_entry = ttk.Entry(dialog)
name_entry.insert(0, values[1])
name_entry.grid(row=1, column=1, padx=5, pady=5)
ttk.Label(dialog, text="买入价:").grid(row=2, column=0, padx=5, pady=5)
buy_entry = ttk.Entry(dialog)
buy_entry.insert(0, values[3])
buy_entry.grid(row=2, column=1, padx=5, pady=5)
ttk.Label(dialog, text="卖出价:").grid(row=3, column=0, padx=5, pady=5)
sell_entry = ttk.Entry(dialog)
sell_entry.insert(0, values[4])
sell_entry.grid(row=3, column=1, padx=5, pady=5)
def save_changes():
try:
updated_stock = {
"code": values[0],
"name": name_entry.get(),
"buy_price": float(buy_entry.get()),
"sell_price": float(sell_entry.get())
}
if not updated_stock["name"]:
messagebox.showerror("错误", "股票名称不能为空!")
return
self.monitor.update_stock(updated_stock)
dialog.destroy()
self.add_log(f"修改股票成功:{updated_stock['name']}")
self.update_stock_table(self.get_initial_stock_data())
except ValueError:
messagebox.showerror("错误", "请输入有效的价格!")
ttk.Button(dialog, text="保存", command=save_changes).grid(row=4, column=0, columnspan=2, pady=20)
def delete_stock(self):
selected = self.tree.selection()
if not selected:
messagebox.showwarning("提示", "请先选择要删除的股票!")
return
item = self.tree.item(selected[0])
stock_code = item['values'][0]
stock_name = item['values'][1]
if messagebox.askyesno("确认", f"确定要删除股票 {stock_name} 吗?"):
self.monitor.delete_stock(stock_code)
self.add_log(f"删除股票成功:{stock_name}")
self.update_stock_table(self.get_initial_stock_data())
def show_email_settings(self):
dialog = tk.Toplevel(self.root)
dialog.title("邮件服务设置")
dialog.geometry("400x350") # 增加一点高度
dialog.transient(self.root)
frame = ttk.Frame(dialog, padding="10")
frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 添加启用邮件通知选项
enabled_var = tk.BooleanVar(value=self.monitor.email_config.get('enabled', False))
ttk.Checkbutton(frame, text="启用邮件通知", variable=enabled_var).grid(row=0, column=0, columnspan=2, pady=5)
# SMTP服务器���置
ttk.Label(frame, text="SMTP服务器:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
host_entry = ttk.Entry(frame, width=30)
host_entry.insert(0, self.monitor.email_config['host'])
host_entry.grid(row=1, column=1, padx=5, pady=5)
# 端口设置
ttk.Label(frame, text="端口:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
port_entry = ttk.Entry(frame, width=30)
port_entry.insert(0, "465") # 默认端口
port_entry.grid(row=2, column=1, padx=5, pady=5)
# 用户名设置
ttk.Label(frame, text="邮箱账号:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
user_entry = ttk.Entry(frame, width=30)
user_entry.insert(0, self.monitor.email_config['user'])
user_entry.grid(row=3, column=1, padx=5, pady=5)
# 密码设置
ttk.Label(frame, text="授权密码:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)
pass_entry = ttk.Entry(frame, width=30, show="*")
pass_entry.insert(0, self.monitor.email_config['password'])
pass_entry.grid(row=4, column=1, padx=5, pady=5)
# 发件人设置
ttk.Label(frame, text="发件人:").grid(row=5, column=0, padx=5, pady=5, sticky=tk.W)
sender_entry = ttk.Entry(frame, width=30)
sender_entry.insert(0, self.monitor.email_config['sender'])
sender_entry.grid(row=5, column=1, padx=5, pady=5)
# 收件人设置
ttk.Label(frame, text="收件人:").grid(row=6, column=0, padx=5, pady=5, sticky=tk.W)
receivers_entry = ttk.Entry(frame, width=30)
receivers_entry.insert(0, ",".join(self.monitor.email_config['receivers']))
receivers_entry.grid(row=6, column=1, padx=5, pady=5)
# 邮件主题设置
ttk.Label(frame, text="邮件主题:").grid(row=7, column=0, padx=5, pady=5, sticky=tk.W)
title_entry = ttk.Entry(frame, width=30)
title_entry.insert(0, self.monitor.email_config['title'])
title_entry.grid(row=7, column=1, padx=5, pady=5)
def test_email():
if not enabled_var.get():
messagebox.showwarning("提示", "请先启用邮件通知!")
return
# 临时保存当前设置
temp_config = {
'enabled': enabled_var.get(),
'host': host_entry.get(),
'user': user_entry.get(),
'password': pass_entry.get(),
'sender': sender_entry.get(),
'receivers': [r.strip() for r in receivers_entry.get().split(',')],
'title': title_entry.get()
}
try:
with smtplib.SMTP_SSL(temp_config['host'], 465) as smtp:
smtp.login(temp_config['user'], temp_config['password'])
message = MIMEText("这是一封测试邮件,如果您收到这封邮件,说明邮件服务设置正确。", 'plain', 'utf-8')
message['From'] = temp_config['sender']
message['To'] = ",".join(temp_config['receivers'])
message['Subject'] = "测试邮件"
smtp.sendmail(
temp_config['sender'],
temp_config['receivers'],
message.as_string()
)
messagebox.showinfo("成功", "测试邮件发送成功!")
except Exception as e:
messagebox.showerror("错误", f"测试邮件发送失败:{str(e)}")
def save_settings():
try:
new_config = {
'enabled': enabled_var.get(),
'host': host_entry.get(),
'user': user_entry.get(),
'password': pass_entry.get(),
'sender': sender_entry.get(),
'receivers': [r.strip() for r in receivers_entry.get().split(',')],
'title': title_entry.get()
}
# 验证必填字段
if new_config['enabled'] and not all([new_config['host'], new_config['user'],
new_config['password'], new_config['sender'],
new_config['receivers'], new_config['title']]):
messagebox.showerror("错误", "启用邮件通知时所有字段都必须填写!")
return
# 更新配置
self.monitor.update_email_config(new_config)
messagebox.showinfo("成功", "邮件设置已保存!")
dialog.destroy()
except Exception as e:
messagebox.showerror("错误", f"保存设置失败:{str(e)}")
# 按钮框架
button_frame = ttk.Frame(frame)
button_frame.grid(row=8, column=0, columnspan=2, pady=20)
test_button = ttk.Button(button_frame, text="测试", command=test_email)
test_button.grid(row=0, column=0, padx=5)
save_button = ttk.Button(button_frame, text="保存", command=save_settings)
save_button.grid(row=0, column=1, padx=5)
def show_qq_settings(self):
dialog = tk.Toplevel(self.root)
dialog.title("QQ机器人设置")
dialog.geometry("400x250")
dialog.transient(self.root)
frame = ttk.Frame(dialog, padding="10")
frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 启用QQ通知
enabled_var = tk.BooleanVar(value=self.monitor.qq_config.get('enabled', False))
ttk.Checkbutton(frame, text="启用QQ通知", variable=enabled_var).grid(row=0, column=0, columnspan=2, pady=5)
# API地址
ttk.Label(frame, text="API地址:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
api_entry = ttk.Entry(frame, width=30)
api_entry.insert(0, self.monitor.qq_config.get('api_url', 'http://127.0.0.1:5700'))
api_entry.grid(row=1, column=1, padx=5, pady=5)
# QQ号
ttk.Label(frame, text="接收QQ:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
qq_entry = ttk.Entry(frame, width=30)
qq_entry.insert(0, self.monitor.qq_config.get('qq_id', ''))
qq_entry.grid(row=2, column=1, padx=5, pady=5)
# 访问令牌
ttk.Label(frame, text="访问令牌:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
token_entry = ttk.Entry(frame, width=30, show="*")
token_entry.insert(0, self.monitor.qq_config.get('access_token', ''))
token_entry.grid(row=3, column=1, padx=5, pady=5)
def test_qq():
config = {
'enabled': enabled_var.get(),
'api_url': api_entry.get(),
'qq_id': qq_entry.get(),
'access_token': token_entry.get()
}
try:
url = f"{config['api_url']}/send_private_msg"
params = {
'user_id': config['qq_id'],
'message': "这是一条测试消息,如果您收到这消息,说明QQ机器人设置正确。"
}
headers = {
'Authorization': f"Bearer {config['access_token']}"
}
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
messagebox.showinfo("成功", "测试消息发送成功!")
else:
messagebox.showerror("错误", f"测试消息发送失败:{response.text}")
except Exception as e:
messagebox.showerror("错误", f"测试消息发送失败:{str(e)}")
def save_settings():
try:
new_config = {
'enabled': enabled_var.get(),
'api_url': api_entry.get(),
'qq_id': qq_entry.get(),
'access_token': token_entry.get()
}
if new_config['enabled'] and not all([new_config['api_url'], new_config['qq_id']]):
messagebox.showerror("错误", "启用QQ通知时必须填写API地址和接收QQ!")
return
self.monitor.qq_config = new_config
self.monitor.config['qq'] = new_config
self.monitor._save_config()
messagebox.showinfo("成功", "QQ设置已保存!")
dialog.destroy()
except Exception as e:
messagebox.showerror("错误", f"保存设置失败:{str(e)}")
# 按钮框架
button_frame = ttk.Frame(frame)
button_frame.grid(row=4, column=0, columnspan=2, pady=20)
test_button = ttk.Button(button_frame, text="测试", command=test_qq)
test_button.grid(row=0, column=0, padx=5)
save_button = ttk.Button(button_frame, text="保存", command=save_settings)
save_button.grid(row=0, column=1, padx=5)
def show_weixin_settings(self):
dialog = tk.Toplevel(self.root)
dialog.title("企业微信设置")
dialog.geometry("400x300")
dialog.transient(self.root)
frame = ttk.Frame(dialog, padding="10")
frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 启用微信通知
enabled_var = tk.BooleanVar(value=self.monitor.weixin_config.get('enabled', False))
ttk.Checkbutton(frame, text="启用企业微信通知", variable=enabled_var).grid(row=0, column=0, columnspan=2, pady=5)
# 企业ID
ttk.Label(frame, text="企业ID:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
corp_id_entry = ttk.Entry(frame, width=30)
corp_id_entry.insert(0, self.monitor.weixin_config.get('corp_id', ''))
corp_id_entry.grid(row=1, column=1, padx=5, pady=5)
# 应用ID
ttk.Label(frame, text="应用ID:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
agent_id_entry = ttk.Entry(frame, width=30)
agent_id_entry.insert(0, self.monitor.weixin_config.get('agent_id', ''))
agent_id_entry.grid(row=2, column=1, padx=5, pady=5)
# 应用密钥
ttk.Label(frame, text="应用密钥:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
secret_entry = ttk.Entry(frame, width=30, show="*")
secret_entry.insert(0, self.monitor.weixin_config.get('corp_secret', ''))
secret_entry.grid(row=3, column=1, padx=5, pady=5)
# 接收用户
ttk.Label(frame, text="接收用户:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)
to_user_entry = ttk.Entry(frame, width=30)
to_user_entry.insert(0, self.monitor.weixin_config.get('to_user', '@all'))
to_user_entry.grid(row=4, column=1, padx=5, pady=5)
def test_weixin():
if not enabled_var.get():
messagebox.showwarning("提示", "请先启用企业微信通知!")
return
config = {
'enabled': enabled_var.get(),
'corp_id': corp_id_entry.get(),
'agent_id': agent_id_entry.get(),
'corp_secret': secret_entry.get(),
'to_user': to_user_entry.get()
}
# 创建临时的 StockMonitor 实例来测试
temp_monitor = StockMonitor()
temp_monitor.weixin_config = config
temp_monitor.set_log_callback(self.add_log)
temp_monitor.send_weixin_message("这是一条测试消息,如果您收到这条消息,说明企业微信设置正确。")
def save_settings():
try:
new_config = {
'enabled': enabled_var.get(),
'corp_id': corp_id_entry.get(),
'agent_id': agent_id_entry.get(),
'corp_secret': secret_entry.get(),
'to_user': to_user_entry.get()
}
if new_config['enabled'] and not all([new_config['corp_id'],
new_config['agent_id'],
new_config['corp_secret']]):
messagebox.showerror("错误", "启用企业微信通知时必须填写企业ID、应用ID和应用密钥!")
return
self.monitor.weixin_config = new_config
self.monitor.config['weixin'] = new_config
self.monitor._save_config()
messagebox.showinfo("成功", "企业微信设置已保存!")
dialog.destroy()
except Exception as e:
messagebox.showerror("错误", f"保存设置失败:{str(e)}")
# 按钮框架
button_frame = ttk.Frame(frame)
button_frame.grid(row=5, column=0, columnspan=2, pady=20)
test_button = ttk.Button(button_frame, text="测试", command=test_weixin)
test_button.grid(row=0, column=0, padx=5)
save_button = ttk.Button(button_frame, text="保存", command=save_settings)
save_button.grid(row=0, column=1, padx=5)
def add_log(self, message: str):
"""添加日志到界面"""
self.log_text.insert(tk.END, message + '\n')
self.log_text.see(tk.END) # 滚动到最新的日志
def get_initial_stock_data(self):
"""获取初始股票数据用于显示"""
result = []
for stock in self.monitor.stocks:
result.append((
stock['code'],
stock['name'],
'等待更新', # 初始显示时还没有实时价格
stock['buy_price'],
stock['sell_price'],
'等待监控' # 初始状态
))
return result
def create_float_window(self):
"""创建悬浮窗口"""
self.float_window = tk.Toplevel()
self.float_window.title("股票监控")
# 设置窗口属性
self.float_window.attributes('-topmost', True) # 始终置顶
self.float_window.overrideredirect(True) # 无边框
self.float_window.attributes('-alpha', 0.9) # 设置透明度
# 获取屏幕尺寸
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
# 设置窗口位置(右下角)
window_width = 200
window_height = 300
x = screen_width - window_width - 10
y = screen_height - window_height - 50
self.float_window.geometry(f"{window_width}x{window_height}+{x}+{y}")
# 创建标题栏
title_frame = ttk.Frame(self.float_window)
title_frame.pack(fill=tk.X, padx=2, pady=2)
ttk.Label(title_frame, text="股票监控").pack(side=tk.LEFT, padx=5)
# 添加关闭和还原按钮
ttk.Button(title_frame, text="□", width=3, command=self.restore_window).pack(side=tk.RIGHT, padx=1)
ttk.Button(title_frame, text="×", width=3, command=self.on_closing).pack(side=tk.RIGHT, padx=1)
# 添加拖动功能
title_frame.bind("<Button-1>", self.start_move)
title_frame.bind("<B1-Motion>", self.on_move)
# 创建股票信息显示区域
self.float_tree = ttk.Treeview(self.float_window, columns=('名称', '价格', '状态'), show='headings', height=10)
self.float_tree.heading('名称', text='名称')
self.float_tree.heading('价格', text='价格')
self.float_tree.heading('状态', text='状态')
# 设置列宽
self.float_tree.column('名称', width=60)
self.float_tree.column('价格', width=60)
self.float_tree.column('状态', width=60)
self.float_tree.pack(fill=tk.BOTH, expand=True, padx=2, pady=2)
def update_float_window(self, stock_data):
"""更新悬浮窗口的股票信息"""
if self.float_window and self.is_minimized:
for item in self.float_tree.get_children():
self.float_tree.delete(item)
for stock in stock_data:
self.float_tree.insert('', tk.END, values=(stock[1], stock[2], stock[5]))
def start_move(self, event):
"""开始拖动窗口"""
self.x = event.x
self.y = event.y
def on_move(self, event):
"""拖动窗口"""
deltax = event.x - self.x
deltay = event.y - self.y
x = self.float_window.winfo_x() + deltax
y = self.float_window.winfo_y() + deltay
self.float_window.geometry(f"+{x}+{y}")
def on_minimize(self, event):
"""最小化主窗口时显示悬浮窗"""
if not self.float_window:
self.create_float_window()
self.is_minimized = True
self.float_window.deiconify()
def restore_window(self):
"""还原主窗口"""
self.root.deiconify()
self.is_minimized = False
if self.float_window:
self.float_window.withdraw()
def on_closing(self):
"""关闭程序"""
if messagebox.askokcancel("退出", "确定要退出程序吗?"):
if self.float_window:
self.float_window.destroy()
self.root.destroy()
class StockMonitor:
def __init__(self, config_file: str = 'stock_config.json'):
self.config = self._load_config(config_file)
self.email_config = self.config['email']
self.qq_config = self.config.get('qq', {'enabled': False}) # 添加QQ配置
self.weixin_config = self.config.get('weixin', {'enabled': False}) # 添加微信配置
self.stocks = self.config['stocks']
self.check_interval = self.config['check_interval']
self.log_callback = None # 添加日志回调函数
def set_log_callback(self, callback):
"""设置日志回调函数"""
self.log_callback = callback
def log(self, message: str, level: str = 'info'):
"""统一的日志处理方法"""
if level == 'error':
logging.error(message)
else:
logging.info(message)
if self.log_callback:
self.log_callback(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}")
def _load_config(self, config_file: str) -> Dict:
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
logging.error(f"配置文件 {config_file} 不存在")
sys.exit(1)
except json.JSONDecodeError:
logging.error(f"配置文件 {config_file} 格式错误")
sys.exit(1)
def send_email(self, content: str) -> None:
"""发送邮件"""
if not self.email_config.get('enabled', False): # 检查是否启用
return
message = MIMEText(content, 'plain', 'utf-8')
message['From'] = self.email_config['sender']
message['To'] = ",".join(self.email_config['receivers'])
message['Subject'] = self.email_config['title']
try:
with smtplib.SMTP_SSL(self.email_config['host'], 465) as smtp:
smtp.login(self.email_config['user'], self.email_config['password'])
smtp.sendmail(
self.email_config['sender'],
self.email_config['receivers'],
message.as_string()
)
self.log("邮件发送成功!")
except Exception as e:
self.log(f"邮件发送失败: {str(e)}", 'error')
def get_stock_status(self, current_price: float, buy_price: float, sell_price: float) -> str:
if current_price <= buy_price:
return "建议买入"
elif current_price >= sell_price:
return "建议卖出"
return "观察中"
def send_qq_message(self, message: str) -> None:
"""发送QQ消息"""
if not self.qq_config.get('enabled', False):
return
try:
url = f"{self.qq_config['api_url']}/send_private_msg"
params = {
'user_id': self.qq_config['qq_id'],
'message': message
}
headers = {
'Authorization': f"Bearer {self.qq_config.get('access_token', '')}"
}
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
self.log("QQ消息发送成功!")
else:
self.log(f"QQ消息发送失败: {response.text}", 'error')
except Exception as e:
self.log(f"QQ消息发送失败: {str(e)}", 'error')
def send_weixin_message(self, message: str) -> None:
"""发送企业微信消息"""
if not self.weixin_config.get('enabled', False):
return
try:
# 获取访问令牌
token_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken"
token_params = {
'corpid': self.weixin_config['corp_id'],
'corpsecret': self.weixin_config['corp_secret']
}
token_response = requests.get(token_url, params=token_params)
token_data = token_response.json()
if token_data['errcode'] != 0:
self.log(f"获取微信访问令牌失败: {token_data['errmsg']}", 'error')
return
access_token = token_data['access_token']
# 发送消息
send_url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
send_data = {
'touser': self.weixin_config['to_user'],
'msgtype': 'text',
'agentid': self.weixin_config['agent_id'],
'text': {
'content': message
}
}
response = requests.post(send_url, json=send_data)
result = response.json()
if result['errcode'] == 0:
self.log("微信消息发送成功!")
else:
self.log(f"微信消息发送失败: {result['errmsg']}", 'error')
except Exception as e:
self.log(f"微信消息发送失败: {str(e)}", 'error')
def get_stock_data(self) -> List:
try:
df = ef.stock.get_realtime_quotes()
stock_list = df.values.tolist()
result = []
for stock_info in self.stocks:
stock_data = next(
(item for item in stock_list if item[0] == stock_info['code']),
None
)
if stock_data:
current_price = float(stock_data[3])
status = self.get_stock_status(
current_price,
stock_info['buy_price'],
stock_info['sell_price']
)
result.append((
stock_info['code'],
stock_info['name'],
current_price,
stock_info['buy_price'],
stock_info['sell_price'],
status
))
# 检查是否需要发送提醒
if status in ["建议买入", "建议卖出"]:
content = f'当前{stock_info["name"]}的价格是: {current_price}, {status}'
self.send_email(content)
self.send_qq_message(content)
self.send_weixin_message(content) # 添加微信消息提醒
self.log(content)
else:
self.log(f'当前{stock_info["name"]}的价格是: {current_price} 耐心观察中...')
else:
self.log(f"未找到股票 {stock_info['code']} 的数据", 'error')
return result
except Exception as e:
self.log(f"获取股票数据失败: {str(e)}", 'error')
return []
def add_stock(self, stock: Dict) -> None:
self.stocks.append(stock)
self._save_config()
def update_stock(self, updated_stock: Dict) -> None:
for i, stock in enumerate(self.stocks):
if stock['code'] == updated_stock['code']:
self.stocks[i] = updated_stock
break
self._save_config()
def delete_stock(self, stock_code: str) -> None:
self.stocks = [s for s in self.stocks if s['code'] != stock_code]
self._save_config()
def _save_config(self) -> None:
try:
self.config['stocks'] = self.stocks
with open('stock_config.json', 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"保存配置文件失败: {str(e)}")
def update_email_config(self, new_config: Dict) -> None:
self.email_config = new_config
self.config['email'] = new_config
self._save_config()
def main():
root = tk.Tk()
app = StockMonitorGUI(root)
root.mainloop()
if __name__ == "__main__":
main()
上配置文件:stock_config.json
{
"email": {
"enabled": false,
"host": "smtp.163.com",
"user": "i238@163.com",
"password": "JIMRV",
"sender": "i25@163.com",
"receivers": [
"123@qq.com"
],
"title": "股票监控买卖提示"
},
"qq": {
"enabled": false,
"api_url": "http://127.0.0.1:5700",
"qq_id": "123456789",
"access_token": "your_access_token"
},
"weixin": {
"enabled": false,
"corp_id": "your_corp_id",
"agent_id": "your_agent_id",
"corp_secret": "your_corp_secret",
"to_user": "@all"
},
"stocks": [
{
"code": "000776",
"name": "广发证券",
"buy_price": 11.3,
"sell_price": 12.6
},
{
"code": "002945",
"name": "华林证券",
"buy_price": 12.0,
"sell_price": 16.0
}
],
"check_interval": 60
}
上文档说明--- 股票监控系统使用说明
1. 系统简介
这是一个基于Python开发的股票监控系统,可以实时监控多支股票的价格变动,并通过多种方式(邮件、QQ、企业微信)发送买卖提醒。
主要功能:
-
实时监控多支股票价格
-
自定义买入卖出价格
-
多种提醒方式(邮件/QQ/企业微信)
-
悬浮窗口显示实时行情
-
完整的日志记录
2. 系统要求
-
Python 3.6 或更高版本
-
需要安装的依赖包:
pip install efinance requests
3. 配置文件说明
配置文件为 stock_config.json
,包含以下主要配置项:
son { "email": { "enabled": false, // 是否启用邮件通知 "host": "smtp.163.com", // SMTP服务器 "user": "xxx@163.com", // 邮箱账号 "password": "xxx", // 邮箱授权码 "sender": "xxx@163.com", // 发件人 "receivers": ["xxx@qq.com"],// 收件人列表 "title": "股票监控提示" // 邮件主题 }, "qq": { "enabled": false, // 是否启用QQ通知 "api_url": "http://127.0.0.1:5700", // go-cqhttp服务地址 "qq_id": "123456789", // 接收消息的QQ号 "access_token": "xxx" // 访问令牌 }, "weixin": { "enabled": false, // 是否启用企业微信通知 "corp_id": "xxx", // 企业ID "agent_id": "xxx", // 应用ID "corp_secret": "xxx", // 应用密钥 "to_user": "@all" // 接收用户 }, "stocks": [ // 监控的股票列表 { "code": "000776", // 股票代码 "name": "广发证券", // 股票名称 "buy_price": 11.3, // 买入价 "sell_price": 12.6 // 卖出价 } ], "check_interval": 60 // 检查间隔(秒) }
4. 使用说明
4.1 启动程序
运行 股票监听器.py
文件启动程序。
4.2 股票管理
-
添加股票:点击"添加股票"按钮,输入股票代码、名称、买入价和卖出价
-
修改股票:选中要修改的股票,点击"修改股票"按钮
-
删除股票:选中要删除的股票,点击"删除股票"按钮
4.3 通知设置
-
邮件设置:
-
点击"邮件设置"按钮
-
勾选"启用邮件通知"
-
填写SMTP服务器、邮箱账号等信息
-
可点击"测试"按钮测试设置
-
-
QQ设置:
-
需要先配置并运行 go-cqhttp
-
点击"QQ设置"按钮
-
勾选"启用QQ通知"
-
填写API地址和接收QQ号
-
可点击"测试"按钮测试设置
-
-
企业微信设置:
-
需要有企业微信管理员权限
-
点击"微信设置"按钮
-
勾选"启用企业微信通知"
-
填写企业ID、应用ID等信息
-
可点击"测试"按钮测试设置
-
4.4 监控操作
-
开始监控:点击"开始监控"按钮
-
停止监控:点击"停止监控"按钮
-
最小化:
-
程序最小化后会在屏幕右下角显示悬浮窗
-
悬浮窗可以拖动位置
-
点击"□"按钮可以还原主窗口
-
点击"×"按钮可以关闭程序
-
5. 提醒规则
-
当股票价格低于或等于设定的买入价时,发送"建议买入"提醒
-
当股票价格高于或等于设定的卖出价时,发送"建议卖出"提醒
-
提醒会同时通过所有已启用的通知方式发送
6. 注意事项
-
邮件通知需要使用邮箱的授权码,而不是登录密码
-
QQ通知需要正确配置并运行 go-cqhttp
-
企业微信通知需要管理员在企业微信后台创建应用
-
所有密码和密钥信息都保存在本地配置文件中,请注意信息安全
-
程序会自动保存所有设置到配置文件
7. 常见问题
-
邮件发送失败:
-
检查邮箱账号和授权码是否正确
-
确认SMTP服务器地址是否正确
-
-
QQ消息发送失败:
-
确认go-cqhttp是否正常运行
-
检查API地址是否可以访问
-
-
企业微信消息发送失败:
-
确认企业ID和应用密钥是否正确
-
检查应用是否有发送消息权限
-
-
股票数据获取失败:
-
检查网络连接
-
确认股票代码是否正确
-