#1.先将所有数据都迁入到数据库存储+读取 #2.再实现“关键信息输入键”点击浮窗,确认关闭 #3.再实现加密+解密操作 import tkinter as tk from tkinter import simpledialog, messagebox import sqlite3 from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_OAEP, AES from Crypto.Util.Padding import pad, unpad import os import hashlib import base64 # 生成RSA密钥对并保存到文件 def generate_and_save_rsa_keys(keys_file): key = RSA.generate(2048) private_key = key.export_key() public_key = key.publickey().export_key() # 使用明确的分隔符来区分私钥和公钥 with open(keys_file, 'wb') as f: f.write(private_key) f.write(public_key) return private_key, public_key # 使用AES加密数据 def aes_encrypt(data, password): salt = os.urandom(16) key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) cipher = AES.new(key, AES.MODE_CBC) ct_bytes = cipher.encrypt(pad(data, AES.block_size)) iv = cipher.iv return base64.b64encode(salt + iv + ct_bytes) # 使用AES解密数据 def aes_decrypt(encrypted_data, password): encrypted_data = base64.b64decode(encrypted_data) salt = encrypted_data[:16] iv = encrypted_data[16:32] ct = encrypted_data[32:] key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) cipher = AES.new(key, AES.MODE_CBC, iv) pt = unpad(cipher.decrypt(ct), AES.block_size) return pt # 使用固定密码保护加载RSA密钥 def load_protected_rsa_keys(keys_file, password_func): if not os.path.exists(keys_file): print(f"密钥文件不存在:{keys_file},正在生成新密钥...") private_key, public_key = generate_and_save_rsa_keys("temp_keys.pem") with open("temp_keys.pem", 'rb') as f: key_data = f.read() encrypted_data = aes_encrypt(key_data, password_func()) with open(keys_file, 'wb') as f: f.write(encrypted_data) os.remove("temp_keys.pem") print(f"新密钥文件已生成并加密保存:{keys_file}") # 这里需要重新加载刚刚生成的密钥 with open(keys_file, 'rb') as f: encrypted_data = f.read() try: decrypted_data = aes_decrypt(encrypted_data, password_func()) except Exception as e: print(f"解密错误:{e}") messagebox.showerror("错误", "密钥文件损坏或密码错误") return None, None private_key_end = decrypted_data.find(b'-----END RSA PRIVATE KEY-----') public_key_start = decrypted_data.find(b'-----BEGIN PUBLIC KEY-----') if private_key_end != -1 and public_key_start != -1: private_key_end += len(b'-----END RSA PRIVATE KEY-----') private_key = decrypted_data[:private_key_end] public_key = decrypted_data[public_key_start:] return private_key, public_key else: print("密钥格式错误") return None, None else: print(f"密钥文件已存在:{keys_file},正在加载...") with open(keys_file, 'rb') as f: encrypted_data = f.read() try: decrypted_data = aes_decrypt(encrypted_data, password_func()) except Exception as e: print(f"解密错误:{e}") messagebox.showerror("错误", "密钥文件损坏或密码错误") return None, None private_key_end = decrypted_data.find(b'-----END RSA PRIVATE KEY-----') public_key_start = decrypted_data.find(b'-----BEGIN PUBLIC KEY-----') if private_key_end != -1 and public_key_start != -1: private_key_end += len(b'-----END RSA PRIVATE KEY-----') private_key = decrypted_data[:private_key_end] public_key = decrypted_data[public_key_start:] return private_key, public_key else: print("密钥格式错误") return None, None # RSA加密 def rsa_encrypt(data, public_key): rsa_key = RSA.import_key(public_key) cipher = PKCS1_OAEP.new(rsa_key) encrypted_data = cipher.encrypt(data.encode('utf-8')) return encrypted_data # RSA解密 def rsa_decrypt(encrypted_data, private_key): rsa_key = RSA.import_key(private_key) cipher = PKCS1_OAEP.new(rsa_key) decrypted_data = cipher.decrypt(encrypted_data) return decrypted_data.decode('utf-8') # 创建或连接SQLite数据库 def create_or_connect_db(db_name): conn = sqlite3.connect(db_name) cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS account_a ( id INTEGER PRIMARY KEY, encrypted_content BLOB NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS password_a ( id INTEGER PRIMARY KEY, encrypted_content BLOB NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS account_b ( id INTEGER PRIMARY KEY, encrypted_content BLOB NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS password_b ( id INTEGER PRIMARY KEY, encrypted_content BLOB NOT NULL)''') conn.commit() return conn, cursor # 写入或更新加密数据到SQLite数据库 def write_or_update_db(conn, cursor, table_name, encrypted_data): cursor.execute(f"SELECT id FROM {table_name} LIMIT 1") if cursor.fetchone(): cursor.execute(f"UPDATE {table_name} SET encrypted_content = ? WHERE id = 1", (encrypted_data,)) conn.commit() print(f"{table_name}中的数据已更新!") else: cursor.execute(f"INSERT INTO {table_name} (encrypted_content) VALUES (?)", (encrypted_data,)) conn.commit() print(f"加密数据写入{table_name}成功!") # 从SQLite数据库读取加密数据并解密打印 def read_from_db(conn, cursor, table_name, private_key): cursor.execute(f"SELECT * FROM {table_name}") rows = cursor.fetchall() decrypted_data_list = [] for row in rows: decrypted_data = rsa_decrypt(row[1], private_key) decrypted_data_list.append(decrypted_data) return decrypted_data_list # 主界面 class MainApp: def __init__(self, root, private_key, public_key, conn, cursor): self.root = root self.private_key = private_key self.public_key = public_key self.conn = conn self.cursor = cursor self.root.title("加密输入与查询系统") # 输入账号按钮 self.account_a_input_button = tk.Button(root, text="A账户输入", command=lambda: self.show_input_dialog("account_a")) self.account_a_input_button.pack(pady=20) # 输入密码按钮 self.password_a_input_button = tk.Button(root, text="A密码输入", command=lambda: self.show_input_dialog("password_a")) self.password_a_input_button.pack(pady=20) # 输入账号按钮 self.account_b_input_button = tk.Button(root, text="B账户输入", command=lambda: self.show_input_dialog("account_b")) self.account_b_input_button.pack(pady=20) # 输入密码按钮 self.password_b_input_button = tk.Button(root, text="B密码输入", command=lambda: self.show_input_dialog("password_b")) self.password_b_input_button.pack(pady=20) # 查询按钮 self.query_button = tk.Button(root, text="查询全部信息", command=self.show_query_dialog) self.query_button.pack(pady=20) def show_input_dialog(self, table_name): user_input = simpledialog.askstring("输入", f"请输入{table_name}:", parent=self.root) if user_input: # 加密用户输入 encrypted_data = rsa_encrypt(user_input, self.public_key) # 写入或更新加密数据到数据库 write_or_update_db(self.conn, self.cursor, table_name, encrypted_data) messagebox.showinfo("成功", f"{table_name}输入完成,数据已加密并存储!") def show_query_dialog(self): decrypted_data_list = [] decrypted_data_list.extend(read_from_db(self.conn, self.cursor, "account_a", self.private_key)) decrypted_data_list.extend(read_from_db(self.conn, self.cursor, "password_a", self.private_key)) decrypted_data_list.extend(read_from_db(self.conn, self.cursor, "account_b", self.private_key)) decrypted_data_list.extend(read_from_db(self.conn, self.cursor, "password_b", self.private_key)) if decrypted_data_list: query_result = "\n".join(decrypted_data_list) messagebox.showinfo("查询结果", f"解密后的数据:\n{query_result}") else: messagebox.showinfo("查询结果", "数据库中没有数据。") if __name__ == "__main__": # 定义密钥文件路径 keys_file = "demo.bin" # 定义密码获取函数 def get_password(): return "qwer1234" # 固定密码,自行修改 # 加载或生成RSA密钥对 private_key, public_key = load_protected_rsa_keys(keys_file, get_password) if not private_key or not public_key: messagebox.showerror("错误", "无法加载密钥") exit() # 创建或连接数据库 db_name = "encrypted_data.db" conn, cursor = create_or_connect_db(db_name) # 创建主窗口 root = tk.Tk() app = MainApp(root, private_key, public_key, conn, cursor) # 运行主循环 root.mainloop() # 关闭数据库连接 conn.close()