View memory usage in Activity Monitor on Mac

You can see the amount of system memory being used on your Mac.

In the Activity Monitor app on your Mac, click Memory to see the following in the bottom of the window:

  • Memory Pressure: Graphically represents how efficiently your memory is serving your processing needs.
    Memory pressure is determined by the amount of free memory, swap rate, wired memory, and file cached memory.

  • Physical Memory: The amount of RAM installed.

  • Memory Used: The amount of RAM being used. To the right, you can see where the memory is allocated.

    • App Memory: The amount of memory being used by apps.
    • Wired Memory: Memory required by the system to operate. This memory can’t be cached and must stay in RAM, so it’s not available to other apps.
    • Compressed: The amount of memory that has been compressed to make more RAM available.
      When your computer approaches its maximum memory capacity, inactive apps in memory are compressed, making more memory available to active apps. Select the Compressed Memory column, then look in the VM Compressed column for each app to see the amount of memory being compressed for that app.
  • Cached Files: The size of files cached by the system into unused memory to improve performance.
    Until this memory is overwritten, it remains cached, so it can help improve performance when you reopen the app.

  • Swap Used: The amount of space being used on your startup disk to swap unused files to and from RAM.

To display more columns, choose View > Columns, then choose the columns you want to show.

Mac 内存描述

  1. 物理内存
    2. 使用内存
    — 1. 应用内存 (可以理解为打开的 XX.app 应用程序使用的内存)
    — 2. 联动内存 (不能压缩和交换的缓存, 大多是系统层申请的内存)
    — 3. 压缩内存 (后台不活跃的应用程序的内存将被压缩)
    3. 缓存文件 (特别注明: 是归位于未使用内存中的. 直白点, 在内存充足的情况下, 关闭应用并不会马上清理内存, 而是标记为缓存文件, 如果你再次打开就直接用上了)
  2. Swap 硬盘交换空间 (挪到磁盘 disk 上的内存数据使用量)

Reference: Activity Monitor User Guide


(END)

# E:\AI_System\web_ui\server.py import sys import os import time import logging import json import traceback import threading import platform import psutil import datetime from pathlib import Path from flask import Flask, jsonify, request, render_template from logging.handlers import TimedRotatingFileHandler from flask_socketio import SocketIO, emit # 在server.py中添加健康检查路由 from flask import jsonify @app.route('/system/health') def health_check(): """系统健康检查接口""" components = { 'ai_core': True, # 实际应替换为状态检测函数 'hardware_manager': True, 'scheduler': True, 'environment': True, 'evolution_monitor': True } status = all(components.values()) return jsonify( status="running" if status else "degraded", components=components, timestamp=datetime.now().isoformat() ) # ========== 配置系统 ========== class SystemConfig: def __init__(self): self.BASE_DIR = Path(__file__).resolve().parent.parent self.HOST = '127.0.0.1' self.PORT = 5000 self.LOG_LEVEL = 'INFO' self.SECRET_KEY = 'your_secret_key_here' self.DEBUG = True self.USE_GPU = False self.DEFAULT_MODEL = 'gpt-3.5-turbo' # 目录配置 self.LOG_DIR = self.BASE_DIR / 'logs' self.LOG_DIR.mkdir(parents=True, exist_ok=True) self.CONFIG_DIR = self.BASE_DIR / 'config' self.CONFIG_DIR.mkdir(parents=True, exist_ok=True) self.AGENT_PATH = self.BASE_DIR / 'agent' self.MODEL_CACHE_DIR = self.BASE_DIR / 'model_cache' self.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) def __str__(self): return f"SystemConfig(HOST={self.HOST}, PORT={self.PORT})" config = SystemConfig() # ========== 全局协调器 ========== coordinator = None def register_coordinator(coord): """注册意识系统协调器""" global coordinator coordinator = coord if coordinator and hasattr(coordinator, 'connect_to_ui'): coordinator.connect_to_ui(update_ui) def update_ui(event): """更新UI事件处理""" if 'socketio' in globals(): socketio.emit('system_event', event) # ========== 初始化日志系统 ========== def setup_logger(): """配置全局日志系统""" logger = logging.getLogger('WebServer') logger.setLevel(getattr(logging, config.LOG_LEVEL.upper(), logging.INFO)) # 日志格式 log_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器 (每天轮换,保留30天) file_handler = TimedRotatingFileHandler( config.LOG_DIR / 'web_server.log', when='midnight', backupCount=30, encoding='utf-8' ) file_handler.setFormatter(log_formatter) logger.addHandler(file_handler) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) logger.addHandler(console_handler) # 安全日志处理装饰器 def safe_logger(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except UnicodeEncodeError: new_args = [] for arg in args: if isinstance(arg, str): new_args.append(arg.encode('ascii', 'ignore').decode('ascii')) else: new_args.append(arg) return func(*new_args, **kwargs) return wrapper # 应用安全日志处理 for level in ['debug', 'info', 'warning', 'error', 'critical']: setattr(logger, level, safe_logger(getattr(logger, level))) return logger # 初始化日志 logger = setup_logger() # ========== 系统初始化 ========== class SystemInitializer: """负责初始化系统核心组件""" def __init__(self): self.base_dir = Path(__file__).resolve().parent.parent self.ai_core = None self.hardware_manager = None self.life_scheduler = None self.ai_agent = None self.start_time = time.time() self.environment_manager = None def initialize_system_paths(self): """初始化系统路径""" sys.path.insert(0, str(self.base_dir)) logger.info(f"项目根目录: {self.base_dir}") sub_dirs = ['agent', 'core', 'utils', 'config', 'cognitive_arch', 'environment'] for sub_dir in sub_dirs: full_path = self.base_dir / sub_dir if full_path.exists(): sys.path.insert(0, str(full_path)) logger.info(f"添加路径: {full_path}") else: logger.warning(f"目录不存在: {full_path}") def initialize_environment_manager(self): """初始化环境管理器""" try: # 环境管理器模拟实现 class EnvironmentManager: def __init__(self, config): self.config = config self.state = { 'temperature': 22.5, 'humidity': 45.0, 'light_level': 75, 'objects': [], 'last_updated': datetime.datetime.now().isoformat() } def start(self): logger.info("环境管理器已启动") def get_state(self): # 更新模拟数据 self.state['temperature'] = 20 + 5 * (time.time() % 10) / 10 self.state['humidity'] = 40 + 10 * (time.time() % 10) / 10 self.state['light_level'] = 70 + 10 * (time.time() % 10) / 10 self.state['last_updated'] = datetime.datetime.now().isoformat() return self.state def execute_action(self, action, params): logger.info(f"执行环境动作: {action} 参数: {params}") if action == "adjust_temperature": self.state['temperature'] = params.get('value', 22.0) return True elif action == "adjust_light": self.state['light_level'] = params.get('level', 70) return True return False env_config = {'update_interval': 1.0, 'spatial': {'grid_size': 1.0}} self.environment_manager = EnvironmentManager(env_config) self.environment_manager.start() logger.info("✅ 环境管理器初始化成功") return self.environment_manager except Exception as e: logger.error(f"❌ 环境管理器初始化失败: {str(e)}") logger.warning("⚠️ 环境交互功能将不可用") return None def initialize_ai_core(self): """初始化AI核心系统""" class AICore: def __init__(self, base_dir): self.base_dir = Path(base_dir) self.genetic_code = self.load_genetic_code() self.physical_state = { "health": 100, "energy": 100, "mood": "calm" } self.dependencies = self.scan_dependencies() def load_genetic_code(self): code_path = self.base_dir / "core" / "genetic_code.py" try: if not code_path.exists(): # 创建默认遗传代码文件 default_code = """# 默认遗传代码 class AICore: def __init__(self): self.version = "1.0.0" self.capabilities = ["learning", "reasoning", "problem_solving"] def evolve(self): print("系统正在进化...") """ with open(code_path, "w", encoding="utf-8") as f: f.write(default_code) with open(code_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: logger.error(f"加载遗传代码失败: {str(e)}") return "# 默认遗传代码\nclass AICore:\n pass" def scan_dependencies(self): return { "nervous_system": "flask", "memory": "sqlite", "perception": "opencv", "reasoning": "transformers" } def mutate(self, new_code): try: code_path = self.base_dir / "core" / "genetic_code.py" with open(code_path, "w", encoding="utf-8") as f: f.write(new_code) return True, "核心代码更新成功,系统已进化" except Exception as e: return False, f"进化失败: {str(e)}" def wear_dependency(self, dependency_name, version): self.dependencies[dependency_name] = version return f"已穿戴 {dependency_name}@{version}" def get_state(self): return { "genetic_code_hash": hash(self.genetic_code), "dependencies": self.dependencies, "physical_state": self.physical_state, "hardware_environment": self.get_hardware_info() } def get_hardware_info(self): return { "cpu": platform.processor(), "cpu_cores": psutil.cpu_count(logical=False), "cpu_threads": psutil.cpu_count(logical=True), "memory_gb": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage_gb": round(psutil.disk_usage('/').total / (1024 ** 3), 1), "os": f"{platform.system()} {platform.release()}" } self.ai_core = AICore(self.base_dir) logger.info("✅ AI核心系统初始化完成") return self.ai_core def initialize_hardware_manager(self): """初始化硬件管理器""" try: # 硬件管理器模拟实现 class HardwareManager: def __init__(self): self.available_hardware = { "cpu": ["Intel i9-13900K", "AMD Ryzen 9 7950X", "Apple M2 Max"], "gpu": ["NVIDIA RTX 4090", "AMD Radeon RX 7900 XTX", "Apple M2 GPU"], "memory": [16, 32, 64, 128], "storage": ["1TB SSD", "2TB SSD", "4TB SSD", "8TB HDD"], "peripherals": ["4K Camera", "3D Scanner", "High-Fidelity Microphone"] } self.current_setup = { "cpu": platform.processor(), "gpu": "Integrated Graphics", "memory": round(psutil.virtual_memory().total / (1024 ** 3), 1), "storage": round(psutil.disk_usage('/').total / (1024 ** 3), 1) } def request_hardware(self, hardware_type, specification): if hardware_type not in self.available_hardware: return False, f"不支持硬件类型: {hardware_type}" if specification not in self.available_hardware[hardware_type]: return False, f"不支持的规格: {specification}" self.current_setup[hardware_type] = specification return True, f"已请求 {hardware_type}: {specification}。请管理员完成安装。" def get_current_setup(self): return self.current_setup def get_performance_metrics(self): return { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "cpu_temp": 45.0, "gpu_temp": 55.0, "network_io": { "sent": psutil.net_io_counters().bytes_sent, "received": psutil.net_io_counters().bytes_recv }, "last_updated": datetime.datetime.now().isoformat() } self.hardware_manager = HardwareManager() logger.info("✅ 硬件管理器初始化成功") return self.hardware_manager except Exception as e: logger.error(f"❌ 硬件管理器初始化失败: {str(e)}") logger.warning("⚠️ 使用内置简单硬件管理器") return None def initialize_life_scheduler(self): """初始化生活调度器""" try: # 生活调度器模拟实现 class LifeScheduler: def __init__(self): self.daily_schedule = { "wake_up": "07:00", "breakfast": "08:00", "lunch": "12:30", "dinner": "19:00", "sleep": "23:00" } self.current_activity = "awake" self.energy_level = 100 self.mood = "calm" self.activity_log = [] def wake_up(self): self.current_activity = "awake" self.log_activity("醒来") def have_meal(self, meal_type): self.current_activity = f"eating_{meal_type}" self.energy_level = min(100, self.energy_level + 20) self.log_activity(f"用餐: {meal_type}") def go_to_sleep(self): self.current_activity = "sleeping" self.log_activity("睡觉") def log_activity(self, activity): timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.activity_log.append(f"{timestamp} - {activity}") # 保留最近的100条记录 if len(self.activity_log) > 100: self.activity_log.pop(0) def adjust_schedule(self, adjustments): for activity, new_time in adjustments.items(): if activity in self.daily_schedule: self.daily_schedule[activity] = new_time def get_current_state(self): return { "current_activity": self.current_activity, "next_scheduled": self._get_next_scheduled(), "energy_level": self.energy_level, "mood": self.mood } def get_recent_activities(self, count=10): return self.activity_log[-count:] def _get_next_scheduled(self): now = datetime.datetime.now() current_time = now.strftime("%H:%M") # 找到下一个计划活动 schedule_times = sorted( [(k, v) for k, v in self.daily_schedule.items()], key=lambda x: x[1] ) for activity, time_str in schedule_times: if time_str > current_time: return f"{activity} at {time_str}" # 如果没有找到,返回第二天的第一个活动 return f"{schedule_times[0][0]} at {schedule_times[0][1]} tomorrow" self.life_scheduler = LifeScheduler() logger.info("✅ 生活调度器初始化成功") # 启动生活系统后台线程 life_thread = threading.Thread( target=self._update_life_status, daemon=True, name="LifeSystemThread" ) life_thread.start() logger.info("✅ 生活系统后台线程已启动") return self.life_scheduler except Exception as e: logger.error(f"❌ 生活调度器初始化失败: {str(e)}") return None def _update_life_status(self): logger.info("🚦 生活系统后台线程启动") while True: try: now = datetime.datetime.now() current_hour = now.hour current_minute = now.minute current_time = f"{current_hour:02d}:{current_minute:02d}" # 根据时间更新活动状态 if 7 <= current_hour < 8 and self.life_scheduler.current_activity != "awake": self.life_scheduler.wake_up() elif 12 <= current_hour < 13 and self.life_scheduler.current_activity != "eating_lunch": self.life_scheduler.have_meal("lunch") elif 19 <= current_hour < 20 and self.life_scheduler.current_activity != "eating_dinner": self.life_scheduler.have_meal("dinner") elif (23 <= current_hour or current_hour < 6) and self.life_scheduler.current_activity != "sleeping": self.life_scheduler.go_to_sleep() # 自然能量消耗 if self.life_scheduler.current_activity != "sleeping": self.life_scheduler.energy_level = max(0, self.life_scheduler.energy_level - 0.1) time.sleep(60) except Exception as e: logger.error(f"生活系统更新失败: {str(e)}", exc_info=True) time.sleep(300) def initialize_ai_agent(self): """初始化AI智能体""" try: # AI智能体模拟实现 class AutonomousAgent: def __init__(self, model_path, cache_dir, use_gpu, default_model): self.model_name = default_model self.cache_dir = cache_dir self.use_gpu = use_gpu self.conversation_history = {} def process_input(self, user_input, user_id): # 初始化用户对话历史 if user_id not in self.conversation_history: self.conversation_history[user_id] = [] # 添加用户输入到历史 self.conversation_history[user_id].append({"role": "user", "content": user_input}) # 生成AI响应(模拟) if "你好" in user_input or "hello" in user_input: response = "你好!我是AI助手,有什么可以帮您的吗?" elif "时间" in user_input: response = f"现在是 {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" elif "状态" in user_input: response = "系统运行正常,所有组件都在线。" else: response = f"我已收到您的消息: '{user_input}'。这是一个模拟响应,实际系统中我会分析您的问题并提供专业解答。" # 添加AI响应到历史 self.conversation_history[user_id].append({"role": "assistant", "content": response}) return response def get_status(self): return { "model": self.model_name, "cache_dir": str(self.cache_dir), "use_gpu": self.use_gpu, "active_conversations": len(self.conversation_history) } self.ai_agent = AutonomousAgent( model_path=config.AGENT_PATH, cache_dir=config.MODEL_CACHE_DIR, use_gpu=config.USE_GPU, default_model=config.DEFAULT_MODEL ) logger.info("✅ AI智能体初始化成功") return self.ai_agent except Exception as e: logger.error(f"❌ AI智能体初始化失败: {str(e)}") return None def start_evolution_monitor(self): def monitor(): while True: try: cpu_usage = psutil.cpu_percent() mem_usage = psutil.virtual_memory().percent # 根据系统负载调整AI状态 if cpu_usage > 80: self.ai_core.physical_state["energy"] = max(20, self.ai_core.physical_state["energy"] - 5) self.ai_core.physical_state["mood"] = "strained" elif cpu_usage < 30: self.ai_core.physical_state["energy"] = min(100, self.ai_core.physical_state["energy"] + 2) time.sleep(60) except Exception as e: logging.error(f"进化监控错误: {str(e)}") time.sleep(300) monitor_thread = threading.Thread(target=monitor, daemon=True) monitor_thread.start() logger.info("✅ 进化监控线程已启动") def initialize_all(self): logger.info("=" * 50) logger.info("🚀 开始初始化AI系统") logger.info("=" * 50) self.initialize_system_paths() self.initialize_ai_core() self.initialize_hardware_manager() self.initialize_life_scheduler() self.initialize_ai_agent() self.initialize_environment_manager() self.start_evolution_monitor() logger.info("✅ 所有系统组件初始化完成") return { "ai_core": self.ai_core, "hardware_manager": self.hardware_manager, "life_scheduler": self.life_scheduler, "ai_agent": self.ai_agent, "environment_manager": self.environment_manager } # ========== Flask应用工厂 ========== def create_app(): app = Flask( __name__, template_folder='templates', static_folder='static', static_url_path='/static' ) app.secret_key = config.SECRET_KEY system_initializer = SystemInitializer() components = system_initializer.initialize_all() app.config['SYSTEM_COMPONENTS'] = components app.config['START_TIME'] = system_initializer.start_time app.config['BASE_DIR'] = system_initializer.base_dir # 初始化SocketIO socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') app.config['SOCKETIO'] = socketio # 注册路由 register_routes(app) register_error_handlers(app) return app, socketio # ========== 环境交互路由 ========== def register_environment_routes(app): @app.route('/environment') def environment_view(): return render_template('environment_view.html') @app.route('/api/environment/state', methods=['GET']) def get_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: state = env_manager.get_state() return jsonify(state) except Exception as e: app.logger.error(f"获取环境状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/environment/action', methods=['POST']) def execute_environment_action(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return jsonify({"success": False, "error": "环境管理器未初始化"}), 503 try: data = request.json action = data.get('action') params = data.get('params', {}) if not action: return jsonify({"success": False, "error": "缺少动作参数"}), 400 success = env_manager.execute_action(action, params) return jsonify({"success": success, "action": action}) except Exception as e: app.logger.error(f"执行环境动作失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 # ========== 环境状态广播 ========== def setup_environment_broadcast(app): socketio = app.config['SOCKETIO'] @socketio.on('connect', namespace='/environment') def handle_environment_connect(): app.logger.info('客户端已连接环境WebSocket') @socketio.on('disconnect', namespace='/environment') def handle_environment_disconnect(): app.logger.info('客户端已断开环境WebSocket') def broadcast_environment_state(): env_manager = app.config['SYSTEM_COMPONENTS'].get('environment_manager') if not env_manager: return while True: try: state = env_manager.get_state() socketio.emit('environment_update', state, namespace='/environment') time.sleep(1) except Exception as e: app.logger.error(f"环境状态广播失败: {str(e)}") time.sleep(5) broadcast_thread = threading.Thread( target=broadcast_environment_state, daemon=True, name="EnvironmentBroadcastThread" ) broadcast_thread.start() app.logger.info("✅ 环境状态广播线程已启动") # ========== 路由注册 ========== def register_routes(app): register_environment_routes(app) setup_environment_broadcast(app) @app.route('/') def index(): return render_template('agent_interface.html') @app.route('/status') def status(): try: components = app.config['SYSTEM_COMPONENTS'] status_data = { "server": { "status": "running", "uptime": time.time() - app.config['START_TIME'], "version": "1.0.0", "config": { "host": config.HOST, "port": config.PORT, "log_level": config.LOG_LEVEL, "default_model": config.DEFAULT_MODEL } }, "core": components['ai_core'].get_state(), "hardware": components['hardware_manager'].get_current_setup() } if components['environment_manager']: try: status_data["environment"] = components['environment_manager'].get_state() except Exception as e: status_data["environment"] = {"error": str(e)} if components['life_scheduler']: try: status_data["life_system"] = components['life_scheduler'].get_current_state() except Exception as e: status_data["life_system"] = {"error": str(e)} if components['ai_agent']: try: status_data["agent"] = components['ai_agent'].get_status() except Exception as e: status_data["agent"] = {"error": str(e)} return jsonify(status_data) except Exception as e: app.logger.error(f"获取状态失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # 核心系统路由 @app.route('/api/core/state') def get_core_state(): return jsonify(app.config['SYSTEM_COMPONENTS']['ai_core'].get_state()) @app.route('/api/core/mutate', methods=['POST']) def mutate_core(): data = request.get_json() new_code = data.get('genetic_code') if not new_code: return jsonify({"success": False, "error": "缺少遗传代码"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['ai_core'].mutate(new_code) return jsonify({"success": success, "message": message}) @app.route('/api/core/wear', methods=['POST']) def wear_dependency(): data = request.get_json() dep_name = data.get('dependency') version = data.get('version', 'latest') if not dep_name: return jsonify({"success": False, "error": "缺少依赖名称"}), 400 result = app.config['SYSTEM_COMPONENTS']['ai_core'].wear_dependency(dep_name, version) return jsonify({"success": True, "message": result}) # 硬件管理路由 @app.route('/api/hardware/catalog') def get_hardware_catalog(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].available_hardware) @app.route('/api/hardware/request', methods=['POST']) def request_hardware(): data = request.get_json() hw_type = data.get('type') spec = data.get('specification') if not hw_type or not spec: return jsonify({"success": False, "error": "缺少硬件类型或规格"}), 400 success, message = app.config['SYSTEM_COMPONENTS']['hardware_manager'].request_hardware(hw_type, spec) return jsonify({"success": success, "message": message}) @app.route('/api/hardware/current') def get_current_hardware(): return jsonify(app.config['SYSTEM_COMPONENTS']['hardware_manager'].get_current_setup()) # 生活系统路由 @app.route('/life') def life_dashboard(): return render_template('life_dashboard.html') @app.route('/api/life/status') def get_life_status(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: current_state = components['life_scheduler'].get_current_state() recent_activities = components['life_scheduler'].get_recent_activities(10) return jsonify({ "success": True, "current_activity": current_state.get("current_activity", "未知"), "next_scheduled": current_state.get("next_scheduled", "未知"), "recent_activities": recent_activities, "energy_level": current_state.get("energy_level", 100), "mood": current_state.get("mood", "平静") }) except Exception as e: app.logger.error(f"获取生活状态失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/adjust_schedule', methods=['POST']) def adjust_schedule(): components = app.config['SYSTEM_COMPONENTS'] if not components['life_scheduler']: return jsonify({"success": False, "error": "生活系统未初始化"}), 503 try: data = request.json adjustments = data.get("adjustments", {}) valid_activities = ["wake_up", "breakfast", "lunch", "dinner", "sleep"] for activity, new_time in adjustments.items(): if activity not in valid_activities: return jsonify({"success": False, "error": f"无效的活动类型: {activity}"}), 400 if not isinstance(new_time, str) or len(new_time) != 5 or new_time[2] != ':': return jsonify({"success": False, "error": f"无效的时间格式: {new_time}"}), 400 components['life_scheduler'].adjust_schedule(adjustments) return jsonify({ "success": True, "message": "计划表已更新", "new_schedule": components['life_scheduler'].daily_schedule }) except Exception as e: app.logger.error(f"调整作息时间失败: {traceback.format_exc()}") return jsonify({"success": False, "error": str(e)}), 400 # 聊天路由 @app.route('/chat', methods=['POST']) def chat(): components = app.config['SYSTEM_COMPONENTS'] if not components['ai_agent']: return jsonify({"error": "Agent未初始化"}), 503 try: data = request.get_json() user_input = data.get('message', '') user_id = data.get('user_id', 'default') if not user_input: return jsonify({"error": "消息内容不能为空"}), 400 app.logger.info(f"聊天请求: 用户={user_id}, 内容长度={len(user_input)}") response = components['ai_agent'].process_input(user_input, user_id) return jsonify({"response": response}) except Exception as e: app.logger.error(f"聊天处理失败: {traceback.format_exc()}") return jsonify({"error": "聊天处理失败", "details": str(e)}), 500 # 家具管理路由 furniture_cache = {} CACHE_DURATION = 3600 # 1小时 @app.route('/api/furniture') def get_furniture(): try: room = request.args.get('room', 'workshop') app.logger.info(f"获取家具数据: 房间={room}") current_time = time.time() if room in furniture_cache and current_time - furniture_cache[room]['timestamp'] < CACHE_DURATION: return jsonify(furniture_cache[room]['data']) furniture_data = { "workshop": [ {"type": "desk", "position": {"x": 0, "y": -1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "chair", "position": {"x": 0, "y": -1.5, "z": -1}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "bookshelf", "position": {"x": 3, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "computer", "position": {"x": 0.5, "y": -0.5, "z": -3.2}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "living_room": [ {"type": "sofa", "position": {"x": 0, "y": 0, "z": -2}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "tv", "position": {"x": 0, "y": 1.5, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}} ], "bedroom": [ {"type": "bed", "position": {"x": 0, "y": 0, "z": -3}, "rotation": {"x": 0, "y": 0, "z": 0}}, {"type": "nightstand", "position": {"x": 1.5, "y": 0, "z": -2.5}, "rotation": {"x": 0, "y": 0, "z": 0}} ] } furniture_cache[room] = { 'timestamp': current_time, 'data': furniture_data.get(room, []) } return jsonify(furniture_cache[room]['data']) except Exception as e: app.logger.error(f"获取家具数据失败: {traceback.format_exc()}") return jsonify({"error": "内部错误", "details": str(e)}), 500 # ========== 错误处理器 ========== def register_error_handlers(app): @app.errorhandler(404) def page_not_found(error): app.logger.warning(f"404错误: {request.path}") return jsonify({ "error": "资源未找到", "path": request.path, "method": request.method }), 404 @app.errorhandler(500) def internal_server_error(error): app.logger.error(f"500错误: {str(error)}") return jsonify({ "error": "服务器内部错误", "message": "系统遇到意外错误,请稍后重试" }), 500 @app.errorhandler(Exception) def handle_general_exception(error): app.logger.error(f"未处理异常: {traceback.format_exc()}") return jsonify({ "error": "未处理的异常", "type": type(error).__name__, "message": str(error) }), 500 # ========== WebSocket处理 ========== def setup_websocket_handlers(socketio): @socketio.on('connect') def handle_connect(): logger.info('客户端已连接') socketio.emit('system_status', {'status': 'ready'}) @socketio.on('disconnect') def handle_disconnect(): logger.info('客户端已断开连接') @socketio.on('user_message') def handle_user_message(data): user_id = data.get('user_id', 'guest') message = data.get('message', '') logger.info(f"收到来自 {user_id} 的消息: {message}") # 处理消息逻辑 response = f"已收到您的消息: {message}" # 如果有协调器,使用协调器处理 global coordinator if coordinator: try: response = coordinator.process_message(message) except Exception as e: logger.error(f"协调器处理消息失败: {str(e)}") socketio.emit('agent_response', { 'user_id': user_id, 'response': response }) # ========== 主程序入口 ========== if __name__ == '__main__': try: app, socketio = create_app() # 设置WebSocket处理器 setup_websocket_handlers(socketio) # 启动服务器 socketio.run( app, host=config.HOST, port=config.PORT, debug=config.DEBUG, use_reloader=False ) logger.info(f"服务器运行在 http://{config.HOST}:{config.PORT}") except KeyboardInterrupt: logger.info("服务器关闭") except Exception as e: logger.critical(f"服务器启动失败: {str(e)}") logger.error(traceback.format_exc()) 帮我改好 发我完整版哦
08-12
你稍等一下 我看你说应该放在意识层面,你看看是这俩哪个文件里# conscious_layer.py import time import threading import logging import uuid from collections import deque from typing import Dict, List, Any, Optional, Tuple from perception_interface import PerceptionInterface, SensorType, InputPriority from conscious_memory import MemorySystem, ContextIndex, EmotionType, MemoryContentType # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ConsciousLayer') logger.setLevel(logging.INFO) class ConsciousFramework: """增强型意识框架实现""" def __init__(self, params: Dict = None): self.params = params or {} self.memory_capacity = self.params.get('memory_capacity', 50) self.working_memory = deque(maxlen=self.memory_capacity) self.cognitive_state = { 'attention_level': 0.7, 'cognitive_load': 0.3, 'focus_mode': 'broad', 'arousal_level': 0.5, 'stress_level': 0.3 } self.metacognition = { 'insights': [], 'last_insight': None, 'reflection_count': 0, 'last_reflection': 0.0 } self.lock = threading.RLock() # 线程安全锁 def process(self, stimulus: Dict, biological_state: Dict) -> Dict: """处理输入刺激(线程安全)""" with self.lock: # 更新认知状态 self._update_cognitive_state(stimulus, biological_state) # 计算显著性 salience = self._calculate_salience(stimulus, biological_state) # 生成行动方案 action_plan = self._generate_action_plan(stimulus, salience) # 创建处理结果 result = { 'id': f"proc_{uuid.uuid4().hex}", 'perception': stimulus, 'salience': salience, 'action_plan': action_plan, 'timestamp': time.time(), 'biological_state': biological_state.copy() } # 添加到工作记忆 self.working_memory.append(result) # 自动清理低显著性项目 if len(self.working_memory) > self.memory_capacity * 0.8: self._auto_clean_working_memory() return result def _update_cognitive_state(self, stimulus: Dict, biological_state: Dict): """更新认知状态""" # 更新基础状态 for key in ['attention_level', 'arousal_level', 'stress_level']: if key in biological_state: # 平滑更新:新状态占70%,旧状态占30% self.cognitive_state[key] = ( biological_state[key] * 0.7 + self.cognitive_state.get(key, 0.5) * 0.3 ) # 更新认知负载 urgency = stimulus.get('urgency', 1) / 5.0 self.cognitive_state['cognitive_load'] = min(1.0, self.cognitive_state.get('cognitive_load', 0.3) * 0.8 + urgency * 0.2 ) # 根据负载调整注意力模式 if self.cognitive_state['cognitive_load'] > 0.7: self.cognitive_state['focus_mode'] = 'focused' elif self.cognitive_state['cognitive_load'] > 0.4: self.cognitive_state['focus_mode'] = 'balanced' else: self.cognitive_state['focus_mode'] = 'broad' def _calculate_salience(self, stimulus: Dict, biological_state: Dict) -> float: """计算刺激的显著性""" # 基于紧急度、注意力水平和情绪影响 urgency = stimulus.get('urgency', 1) / 5.0 attention = biological_state.get('attention_level', 0.5) emotion_impact = biological_state.get('emotion_impact', 0.5) # 计算基础显著性 base_salience = min(1.0, max(0.0, urgency * attention * (1 + emotion_impact * 0.3))) # 根据当前焦点模式调整 if self.cognitive_state['focus_mode'] == 'focused': # 专注模式下降低非紧急项目的显著性 if urgency < 0.4: base_salience *= 0.6 elif self.cognitive_state['focus_mode'] == 'broad': # 广泛模式下提高低紧急度项目的显著性 if urgency < 0.4: base_salience *= 1.2 return base_salience def _generate_action_plan(self, stimulus: Dict, salience: float) -> Dict: """生成行动方案""" # 根据显著性和来源确定优先级 source = stimulus.get('source', 'unknown') if source == 'user': priority = 'critical' if salience > 0.8 else 'high' action = 'respond' elif source == 'system': priority = 'high' if salience > 0.7 else 'medium' action = 'analyze' else: priority = 'medium' if salience > 0.5 else 'low' action = 'monitor' # 确定所需资源 resources = ['processing'] if salience > 0.6: resources.append('memory') if priority == 'critical': resources.append('priority_processing') return { 'priority': priority, 'action': action, 'required_resources': resources, 'timeout': 5.0 if priority == 'critical' else 10.0 } def get_cognitive_state_report(self) -> Dict: """获取认知状态报告(线程安全)""" with self.lock: return self.cognitive_state.copy() def get_metacognition_report(self) -> Dict: """获取元认知报告(线程安全)""" with self.lock: return self.metacognition.copy() def get_working_memory_state(self, max_items: int = 10) -> List[Dict]: """获取工作记忆状态(线程安全)""" with self.lock: return list(self.working_memory)[-max_items:] def metacognition_reflection(self, force: bool = False): """元认知反思过程(线程安全)""" with self.lock: current_time = time.time() # 检查是否需要触发反思 if not force and current_time - self.metacognition['last_reflection'] < 10.0: return self.metacognition['reflection_count'] += 1 self.metacognition['last_reflection'] = current_time # 分析工作记忆状态 wm_items = self.get_working_memory_state(max_items=20) high_salience_count = sum(1 for item in wm_items if item['salience'] > 0.7) # 生成洞察 new_insight = ( f"反思 #{self.metacognition['reflection_count']}: " f"工作记忆中有 {high_salience_count}/{len(wm_items)} 个高显著性项目, " f"认知负载: {self.cognitive_state['cognitive_load']:.2f}" ) self.metacognition['last_insight'] = new_insight if new_insight not in self.metacognition['insights']: self.metacognition['insights'].append(new_insight) def _auto_clean_working_memory(self, min_salience: float = 0.3): """自动清理工作记忆中的低显著性项目""" with self.lock: # 保留高显著性项目 self.working_memory = deque( (item for item in self.working_memory if item['salience'] >= min_salience), maxlen=self.memory_capacity ) def shutdown(self) -> Dict: """关闭框架(线程安全)""" with self.lock: return { 'status': 'success', 'working_memory_items': len(self.working_memory), 'last_reflection': self.metacognition['last_insight'], 'reflection_count': self.metacognition['reflection_count'] } class ConsciousLayer: """意识层 - 连接感知和记忆系统的高级处理层(增强版)""" def __init__(self, framework_params: Dict = None, perception_buffer_size: int = 100, memory_system: MemorySystem = None): """ 初始化意识层 参数: framework_params: 意识框架的配置参数 perception_buffer_size: 感知输入缓冲区大小 memory_system: 可选的记忆系统实例(用于依赖注入) """ # 初始化记忆系统(使用依赖注入或创建新实例) if memory_system: self.memory_system = memory_system else: self.memory_system = MemorySystem() logger.info("记忆系统初始化完成") # 初始化感知接口 self.perception = PerceptionInterface(buffer_size=perception_buffer_size) logger.info(f"感知接口初始化完成,缓冲区大小: {perception_buffer_size}") # 初始化意识框架 self.framework = ConsciousFramework(params=framework_params) logger.info("意识框架初始化完成") # 性能监控 self.stats = { "total_processed": 0, "last_processed": 0, "processing_times": deque(maxlen=100), "error_count": 0, "last_error": None, "input_queue_size": 0, "last_reflection": 0.0, "memory_usage": { "working_memory": 0, "long_term_memory": 0 } } # 状态缓存 self.last_state_report = None self.last_state_time = 0 self.state_cache_ttl = 3.0 # 状态报告缓存时间(秒) # 控制线程运行的标志 self._running = False self._processing_thread = None self._maintenance_thread = None def start_processing(self): """启动处理线程""" if self._running: logger.warning("处理线程已在运行") return self._running = True # 启动主处理线程 self._processing_thread = threading.Thread( target=self._processing_loop, name="ConsciousLayer-Processor", daemon=True ) self._processing_thread.start() # 启动维护线程 self._maintenance_thread = threading.Thread( target=self._maintenance_loop, name="ConsciousLayer-Maintenance", daemon=True ) self._maintenance_thread.start() logger.info("意识层处理线程和维护线程已启动") def stop_processing(self): """停止处理线程""" if not self._running: return self._running = False # 等待线程结束 if self._processing_thread and self._processing_thread.is_alive(): self._processing_thread.join(timeout=5.0) if self._maintenance_thread and self._maintenance_thread.is_alive(): self._maintenance_thread.join(timeout=5.0) logger.info("意识层处理线程已停止") def _processing_loop(self): """处理感知输入的循环""" while self._running: try: # 获取下一个输入(带超时以避免永久阻塞) input_packet = self.perception.get_next_input(timeout=0.1) # 更新队列大小统计 self.stats["input_queue_size"] = self.perception.input_buffer.qsize() if input_packet is None: continue # 无输入时继续循环 # 记录处理开始时间 start_time = time.time() # 获取当前生物状态 biological_state = self._get_current_biological_state() # 使用框架处理输入 result = self.framework.process( stimulus=input_packet, biological_state=biological_state ) # 构建长期记忆内容 memory_content = { "type": MemoryContentType.EVENT.value, "description": f"处理输入: {input_packet.get('source', 'unknown')}", "source": "conscious_layer", "stimulus": input_packet, "result": result, "salience": result["salience"] } # 提取情感关联 emotional_associations = [] emotion = biological_state.get('current_emotion', 'NEUTRAL') if emotion: emotional_associations.append( EmotionalAssociation( emotion=EmotionType.from_string(emotion), intensity=biological_state.get('emotion_intensity', 0.5), context="stimulus_processing" ) ) # 创建上下文索引 context = ContextIndex( cognitive_state=self.framework.cognitive_state['focus_mode'], activity="processing" ) # 将结果存入长期记忆 self.memory_system.store( content=memory_content, tags=["processed_stimulus", input_packet.get('source', 'unknown')], emotional_associations=emotional_associations, context=context, importance=result["salience"] ) # 更新统计信息 self._update_stats(start_time, success=True) except Exception as e: # 记录错误信息 error_msg = f"处理刺激失败: {str(e)}" self.stats["last_error"] = { "error": error_msg, "timestamp": time.time(), "stimulus": input_packet if 'input_packet' in locals() else None } self.stats["error_count"] += 1 logger.exception("处理感知输入时发生异常") # 更新统计信息(失败) if 'start_time' in locals(): self._update_stats(start_time, success=False) def _maintenance_loop(self): """维护循环(定期执行清理和反思)""" while self._running: try: # 每5秒执行一次维护 time.sleep(5) # 清理工作记忆 cleared_count = self.clear_working_memory(min_salience=0.25) if cleared_count > 0: logger.debug(f"清理了 {cleared_count} 个工作记忆项目") # 触发内部反思 self.framework.metacognition_reflection() self.stats["last_reflection"] = time.time() # 更新内存使用统计 self._update_memory_stats() except Exception as e: logger.error(f"维护循环发生错误: {str(e)}", exc_info=True) def _get_current_biological_state(self) -> Dict[str, Any]: """获取当前生物状态(增强版)""" # 在实际系统中,这里会连接生物传感器 # 模拟实现:使用框架的认知状态和随机波动 base_state = self.framework.get_cognitive_state_report() # 添加模拟情感状态 if base_state['stress_level'] > 0.6: emotion = 'FEAR' if base_state['arousal_level'] > 0.7 else 'ANGER' intensity = min(1.0, base_state['stress_level'] * 0.8) elif base_state['arousal_level'] > 0.7: emotion = 'JOY' if base_state['cognitive_load'] < 0.4 else 'EXCITEMENT' intensity = base_state['arousal_level'] * 0.9 else: emotion = 'NEUTRAL' intensity = 0.3 return { "arousal_level": base_state['arousal_level'], "stress_level": base_state['stress_level'], "attention_level": base_state['attention_level'], "current_emotion": emotion, "emotion_intensity": intensity, "emotion_impact": intensity * 0.7 # 情感影响因子 } def _update_stats(self, start_time: float, success: bool): """更新处理统计信息""" processing_time = time.time() - start_time self.stats["total_processed"] += 1 self.stats["last_processed"] = time.time() self.stats["processing_times"].append(processing_time) def _update_memory_stats(self): """更新内存使用统计""" self.stats["memory_usage"]["working_memory"] = len(self.framework.working_memory) if hasattr(self.memory_system, 'get_stats'): mem_stats = self.memory_system.get_stats() self.stats["memory_usage"]["long_term_memory"] = mem_stats.get('total_memories', 0) def deliberate_recall(self, keyword: str, max_items: int = 5) -> Dict[str, Any]: """ 主动回忆(增强版) 参数: keyword: 回忆关键词 max_items: 最大返回项目数 返回: 回忆结果字典 """ # 1. 使用框架的工作记忆功能 working_memory_results = self.framework.get_working_memory_state(max_items=max_items) # 2. 从长期记忆中检索相关内容 context = { "keywords": [keyword], "content_types": [MemoryContentType.EVENT.value] } affective_state = self._get_current_biological_state() long_term_results = self.memory_system.retrieve( context=context, affective_state=affective_state, max_results=max_items ) # 3. 合并结果 return { "working_memory": working_memory_results, "long_term_memory": long_term_results, "timestamp": time.time() } def get_state_report(self, force_refresh: bool = False) -> Dict[str, Any]: """ 获取状态报告(增强版) 参数: force_refresh: 是否强制刷新缓存 返回: 状态报告字典 """ # 使用缓存提高性能 current_time = time.time() if not force_refresh and self.last_state_report and \ (current_time - self.last_state_time) < self.state_cache_ttl: return self.last_state_report # 获取框架报告 framework_report = self.framework.get_cognitive_state_report() meta_report = self.framework.get_metacognition_report() # 获取工作记忆摘要 wm_items = self.framework.get_working_memory_state(max_items=10) wm_summary = [] for item in wm_items: content = item["perception"]["data"].get("content", "") wm_summary.append({ "id": item.get("id", "unknown"), "content_preview": content[:50] + "..." if len(content) > 50 else content, "salience": item["salience"], "age_seconds": round(current_time - item["timestamp"], 1), "priority": item["action_plan"].get("priority", "unknown") }) # 获取感知接口状态 perception_stats = getattr(self.perception, 'get_system_stats', lambda: "N/A")() # 获取记忆系统状态 memory_stats = "N/A" if hasattr(self.memory_system, 'get_stats'): memory_stats = self.memory_system.get_stats() # 构建完整报告 report = { "timestamp": current_time, "framework": framework_report, "metacognition": meta_report, "working_memory": { "count": len(wm_items), "summary": wm_summary }, "perception_interface": perception_stats, "memory_system": memory_stats, "performance": { "total_processed": self.stats["total_processed"], "last_processed": self.stats["last_processed"], "avg_processing_time": self._calculate_avg_processing_time(), "error_count": self.stats["error_count"], "last_error": self.stats["last_error"], "input_queue_size": self.stats["input_queue_size"], "memory_usage": self.stats["memory_usage"] } } # 缓存报告 self.last_state_report = report self.last_state_time = current_time return report def _calculate_avg_processing_time(self) -> float: """计算平均处理时间""" if not self.stats["processing_times"]: return 0.0 return sum(self.stats["processing_times"]) / len(self.stats["processing_times"]) def get_insights(self, max_insights: int = 5) -> List[str]: """ 获取元认知洞察 参数: max_insights: 最大返回洞察数 返回: 元认知洞察列表 """ meta_report = self.framework.get_metacognition_report() common_insights = meta_report.get("insights", []) # 添加最后一条洞察 last_insight = meta_report.get("last_insight") if last_insight and last_insight not in common_insights: common_insights.insert(0, last_insight) return common_insights[:max_insights] def get_working_memory(self, max_items: int = 10) -> List[Dict]: """ 获取完整工作记忆内容 参数: max_items: 最大返回项目数 返回: 工作记忆项目列表 """ return self.framework.get_working_memory_state(max_items=max_items) def clear_working_memory(self, min_salience: float = 0.0) -> int: """ 清除工作记忆中的低显著性项目 参数: min_salience: 保留的最小显著性阈值 返回: 被清除的项目数量 """ # 获取当前工作记忆 wm_items = self.framework.get_working_memory_state(max_items=1000) # 过滤掉低显著性项目 remaining = [item for item in wm_items if item["salience"] >= min_salience] # 更新工作记忆 self.framework.working_memory = deque(remaining, maxlen=self.framework.memory_capacity) return len(wm_items) - len(remaining) def trigger_deep_reflection(self, topic: str = None) -> str: """ 触发深度反思过程 参数: topic: 可选反思主题 返回: 生成的深度洞察 """ # 强制触发元认知反思 self.framework.metacognition_reflection(force=True) # 获取当前状态 state_report = self.get_state_report(force_refresh=True) # 生成深度洞察 insights = self.get_insights(max_insights=3) wm_count = state_report['working_memory']['count'] avg_load = state_report['framework']['cognitive_load'] deep_insight = ( f"深度反思: 系统当前处理了 {self.stats['total_processed']} 个输入, " f"工作记忆中有 {wm_count} 个项目, " f"平均认知负载: {avg_load:.2f}. " f"近期洞察: {insights[0] if insights else '无'}" ) # 存储深度洞察 if hasattr(self.framework, 'metacognition'): self.framework.metacognition['insights'].append(deep_insight) self.framework.metacognition['last_insight'] = deep_insight return deep_insight def shutdown(self) -> Dict[str, Any]: """ 安全关闭意识层 返回: 关闭状态报告 """ # 停止处理线程 self.stop_processing() # 生成最终状态报告 final_report = self.get_state_report(force_refresh=True) # 关闭框架 framework_shutdown = self.framework.shutdown() # 关闭记忆系统(如果支持) if hasattr(self.memory_system, 'shutdown'): memory_shutdown = self.memory_system.shutdown() else: memory_shutdown = {"status": "no_shutdown_method"} # 返回关闭信息 return { "timestamp": time.time(), "final_state": final_report, "framework_shutdown": framework_shutdown, "memory_shutdown": memory_shutdown, "total_processed": self.stats["total_processed"], "avg_processing_time": self._calculate_avg_processing_time(), "error_count": self.stats["error_count"] } # ===== 测试用例 ===== if __name__ == "__main__": print("=== 意识层测试 ===") # 初始化记忆系统 memory_system = MemorySystem("test_memory.json", max_memories=100) # 初始化意识层(注入记忆系统) conscious_layer = ConsciousLayer( framework_params={'memory_capacity': 30}, memory_system=memory_system ) # 启动处理线程 conscious_layer.start_processing() # 模拟运行一段时间 print("\n模拟运行5秒...") time.sleep(5) # 获取状态报告 print("\n获取状态报告:") report = conscious_layer.get_state_report() print(f"已处理输入: {report['performance']['total_processed']}") print(f"工作记忆项目数: {report['working_memory']['count']}") print(f"长期记忆项目数: {report['memory_system'].get('total_memories', 'N/A')}") # 测试主动回忆 print("\n测试主动回忆:") recall_result = conscious_layer.deliberate_recall("test") print(f"工作记忆结果: {len(recall_result['working_memory'])} 项") print(f"长期记忆结果: {len(recall_result['long_term_memory'])} 项") # 获取洞察 print("\n获取元认知洞察:") insights = conscious_layer.get_insights() for i, insight in enumerate(insights, 1): print(f"{i}. {insight}") # 触发深度反思 print("\n触发深度反思:") deep_insight = conscious_layer.trigger_deep_reflection() print(f"深度洞察: {deep_insight}") # 安全关闭 print("\n安全关闭系统...") shutdown_report = conscious_layer.shutdown() print(f"最终状态: {shutdown_report['final_state']['performance']['total_processed']} 项已处理") print(f"平均处理时间: {shutdown_report['avg_processing_time']:.4f} 秒") print(f"关闭状态: {shutdown_report['framework_shutdown']['status']}") # conscious_framework.py import logging import time import threading from collections import deque from typing import Dict, List, Any, Optional, Union logger = logging.getLogger('ConsciousFramework') class ConsciousFramework: """意识架构 - 高级认知处理层""" # 默认参数值 DEFAULT_PARAMS = { "max_working_memory": 10, "metacognition_interval": 5.0, "focus_threshold": 0.4, "confidence_threshold": 0.5, "stale_item_threshold": 300, # 5分钟 "salience_params": { "base_arousal_factor": 0.4, "stress_penalty": 0.2, "emotional_boost": {"fear": 1.3, "joy": 1.1}, "related_memory_boost": 1.2 } } def __init__(self, params: Dict = None): """ 初始化意识框架 参数: params: 配置参数字典(可选) """ # 合并默认参数和自定义参数 self.params = {**self.DEFAULT_PARAMS, **(params or {})} # 工作记忆系统 self.working_memory = deque(maxlen=self.params["max_working_memory"]) self.memory_capacity = self.params["max_working_memory"] self.lock = threading.Lock() # 工作记忆线程锁 # 元认知系统 self.metacognition = { "monitoring": False, "interval": self.params["metacognition_interval"], "last_monitor_time": time.time(), "insights": [] } # 认知状态 self.cognitive_state = { "focus_level": 0.7, # 0.0-1.0 "processing_mode": "analytical", # analytical/intuitive "confidence": 0.8, # 0.0-1.0 "last_mode_switch": time.time() } # 启动元认知监控线程 self._start_metacognition_monitor() logger.info(f"意识框架初始化完成,工作记忆容量: {self.memory_capacity}") def _start_metacognition_monitor(self): """启动元认知监控线程""" self.metacognition["monitoring"] = True self.monitor_thread = threading.Thread(target=self._metacognition_loop, daemon=True) self.monitor_thread.start() logger.info("元认知监控已启动") def _metacognition_loop(self): """元认知监控循环""" while self.metacognition["monitoring"]: current_time = time.time() if current_time - self.metacognition["last_monitor_time"] >= self.metacognition["interval"]: try: self._perform_metacognition() except Exception as e: logger.error(f"元认知监控失败: {str(e)}", exc_info=True) self.metacognition["last_monitor_time"] = current_time # 检查间隔 time.sleep(0.5) def _perform_metacognition(self): """执行元认知监控(增强版)""" logger.debug("执行元认知监控...") insights = [] # 1. 检查工作记忆负荷 memory_load = len(self.working_memory) / self.memory_capacity if memory_load > 0.8: insight = "工作记忆接近饱和,建议优先处理高显著性项目" insights.append(insight) logger.warning(insight) # 2. 分析认知状态 if self.cognitive_state["focus_level"] < self.params["focus_threshold"]: insight = "注意力水平较低,建议调整环境或休息" insights.append(insight) logger.warning(insight) # 3. 检查处理模式 if (self.cognitive_state["processing_mode"] == "analytical" and self.cognitive_state["confidence"] < self.params["confidence_threshold"]): insight = "分析模式信心不足,建议切换到直觉模式" insights.append(insight) logger.info(insight) # 4. 检查行动执行情况 if len(self.working_memory) > 0: oldest_item = min(self.working_memory, key=lambda x: x["timestamp"]) if time.time() - oldest_item["timestamp"] > self.params["stale_item_threshold"]: content_preview = oldest_item['perception']['content'][:20] + '...' if len( oldest_item['perception']['content']) > 20 else oldest_item['perception']['content'] insight = f"项目'{content_preview}'长时间未处理" insights.append(insight) logger.warning(insight) # 添加到元认知记录 if insights: self.metacognition["insights"].extend(insights) # 5. 长期监控趋势(每10次执行一次) if len(self.metacognition["insights"]) > 10: # 统计最常见的问题 insight_counts = {} for insight in self.metacognition["insights"]: insight_counts[insight] = insight_counts.get(insight, 0) + 1 # 获取前3个最常见的问题 top_insights = sorted(insight_counts.items(), key=lambda x: x[1], reverse=True)[:3] for insight, count in top_insights: logger.info(f"频繁出现的元认知问题: {insight} (出现次数: {count})") # 保留最近的5条记录 self.metacognition["insights"] = self.metacognition["insights"][-5:] def process(self, input_data: Dict, biological_state: Dict) -> Dict: """ 认知处理流程(带错误处理) 参数: input_data: 输入数据字典 biological_state: 生物状态字典 返回: 处理结果字典 """ try: # 更新认知状态(基于生物状态) self._update_cognitive_state(biological_state) # 认知处理阶段 perception = self._perceive(input_data, biological_state) thought = self._think(perception) action_plan = self._plan_action(thought) # 添加到工作记忆 memory_item = { "perception": perception, "thought": thought, "action_plan": action_plan, "timestamp": time.time(), "salience": perception["salience"] } self._add_to_working_memory(memory_item) # 返回处理结果 return { "perception": perception, "thought": thought, "action_plan": action_plan, "working_memory": self.get_working_memory_state(), "metacognition": self.get_metacognition_report(), "cognitive_state": self.get_cognitive_state_report() } except Exception as e: logger.error(f"认知处理失败: {str(e)}", exc_info=True) # 返回错误信息 return { "error": f"处理失败: {str(e)}", "input_data": input_data, "biological_state": biological_state } def _update_cognitive_state(self, biological_state: Dict): """根据生物状态更新认知状态""" # 感官敏锐度影响注意力 sensor_acuity = biological_state.get("sensor_acuity", 0.7) self.cognitive_state["focus_level"] = min(1.0, sensor_acuity * 1.2) # 疲劳度影响处理模式 fatigue = biological_state.get("fatigue", 0.3) current_mode = self.cognitive_state["processing_mode"] # 只有在满足切换条件且距离上次切换超过30秒时才切换 if fatigue > 0.6 and current_mode != "intuitive": if time.time() - self.cognitive_state["last_mode_switch"] > 30: self.cognitive_state["processing_mode"] = "intuitive" self.cognitive_state["last_mode_switch"] = time.time() logger.info("切换到直觉处理模式") elif fatigue <= 0.6 and current_mode != "analytical": if time.time() - self.cognitive_state["last_mode_switch"] > 30: self.cognitive_state["processing_mode"] = "analytical" self.cognitive_state["last_mode_switch"] = time.time() logger.info("切换到分析处理模式") # 觉醒水平影响信心 arousal = biological_state.get("arousal_level", 0.5) self.cognitive_state["confidence"] = arousal * 0.8 def _perceive(self, input_data: Dict, biological_state: Dict) -> Dict: """ 感知阶段 - 处理输入数据 参数: input_data: 输入数据字典 biological_state: 生物状态字典 返回: 感知结果字典 """ acuity = biological_state.get("sensor_acuity", 1.0) emotional_state = biological_state.get("emotional_state", "neutral") # 计算显著性(基于生物状态) salience = self._calculate_salience(input_data, biological_state) return { "content": input_data, "acuity": acuity, "salience": salience, "emotional_context": emotional_state, "timestamp": time.time() } def _calculate_salience(self, input_data: Dict, biological_state: Dict) -> float: """计算输入数据的显著性(参数化版)""" # 基础显著性 base_salience = input_data.get("salience", 0.5) # 生物状态影响 arousal = biological_state.get("arousal_level", 0.5) stress = biological_state.get("stress", 0.3) # 情绪影响 emotional_state = biological_state.get("emotional_state", "neutral") emotional_boost = self.params["salience_params"]["emotional_boost"].get(emotional_state, 1.0) base_salience *= emotional_boost # 相关性增强 related_memories = self._check_related_memories(input_data) if related_memories: base_salience *= self.params["salience_params"]["related_memory_boost"] # 计算公式 adjusted_salience = base_salience * (0.8 + arousal * self.params["salience_params"]["base_arousal_factor"] - stress * self.params["salience_params"]["stress_penalty"]) # 限制在0.0-1.0之间 return max(0.0, min(1.0, adjusted_salience)) def _check_related_memories(self, input_data: Dict) -> List: """检查相关工作记忆""" keyword = input_data.get("keyword", "") if not keyword: return [] # 使用线程锁保证安全访问 with self.lock: return [item for item in self.working_memory if keyword in item.get("perception", {}).get("content", "")] def _think(self, perception: Dict) -> Dict: """ 思考阶段 - 推理和处理感知信息 参数: perception: 感知结果字典 返回: 思考结果字典 """ # 根据处理模式选择推理策略 if self.cognitive_state["processing_mode"] == "analytical": return self._analytical_thinking(perception) else: return self._intuitive_thinking(perception) def _analytical_thinking(self, perception: Dict) -> Dict: """分析性思考""" # 简化实现 - 实际中应包含复杂的推理逻辑 content = perception.get("content", {}) salience = perception.get("salience", 0.5) # 生成假设 hypotheses = [] if salience > 0.7: hypotheses.append("高显著性事件,需要立即关注") if "danger" in content.get("keywords", []): hypotheses.append("检测到危险信号") # 评估证据 evidence = {} if "data" in content: evidence["data_consistency"] = 0.8 evidence["data_reliability"] = 0.7 return { "type": "analytical", "hypotheses": hypotheses, "evidence": evidence, "confidence": self.cognitive_state["confidence"] * 0.9 } def _intuitive_thinking(self, perception: Dict) -> Dict: """直觉性思考""" # 基于模式和经验的快速判断 content = perception.get("content", {}) emotional_context = perception.get("emotional_context", "neutral") # 生成直觉判断 judgment = "无特别发现" if emotional_context == "fear": judgment = "可能存在威胁,建议谨慎处理" elif "opportunity" in content.get("keywords", []): judgment = "发现潜在机会" return { "type": "intuitive", "judgment": judgment, "confidence": self.cognitive_state["confidence"] * 0.7, "basis": "模式识别与经验" } def _plan_action(self, thought: Dict) -> Dict: """ 行动计划阶段 - 基于思考结果制定行动计划 参数: thought: 思考结果字典 返回: 行动计划字典 """ # 根据思考类型制定计划 if thought["type"] == "analytical": return self._plan_analytical_action(thought) else: return self._plan_intuitive_action(thought) def _plan_analytical_action(self, thought: Dict) -> Dict: """制定分析性行动计划""" actions = [] priority = "medium" # 根据假设制定行动 for hypothesis in thought.get("hypotheses", []): if "立即关注" in hypothesis: actions.append("启动紧急响应协议") priority = "high" elif "危险信号" in hypothesis: actions.append("进行风险评估") actions.append("准备应急预案") priority = "high" # 如果没有高优先级行动 if not actions: actions.append("收集更多数据") actions.append("进行深入分析") priority = "medium" return { "actions": actions, "priority": priority, "estimated_time": 15.0, # 分钟 "resources_required": ["认知资源", "数据访问权限"] } def _plan_intuitive_action(self, thought: Dict) -> Dict: """制定直觉性行动计划""" judgment = thought.get("judgment", "") if "威胁" in judgment: return { "actions": ["采取防御姿态", "评估风险", "寻求更多信息"], "priority": "high", "estimated_time": 5.0, "resources_required": ["快速响应能力"] } elif "机会" in judgment: return { "actions": ["探索机会", "制定初步方案", "评估可行性"], "priority": "medium", "estimated_time": 10.0, "resources_required": ["创造力", "灵活性"] } else: return { "actions": ["维持现状", "监控环境"], "priority": "low", "estimated_time": 2.0, "resources_required": [] } def _add_to_working_memory(self, item: Dict): """添加项目到工作记忆(线程安全版)""" with self.lock: # 使用线程锁 # 检查是否已存在类似项目 existing_items = [i for i in self.working_memory if i["perception"]["content"] == item["perception"]["content"]] if existing_items: # 只更新关键字段,避免覆盖整个对象 existing_item = existing_items[0] existing_item["thought"] = item["thought"] existing_item["action_plan"] = item["action_plan"] existing_item["timestamp"] = time.time() existing_item["salience"] = item["salience"] logger.debug(f"更新工作记忆项目: {item['perception']['content'][:20]}...") else: # 添加新项目 self.working_memory.append(item) logger.debug(f"添加到工作记忆: {item['perception']['content'][:20]}...") def get_working_memory_state(self, max_items: int = 5) -> List[Dict]: """ 获取工作记忆状态(线程安全版) 参数: max_items: 最大返回项目数 返回: 工作记忆项目列表 """ with self.lock: # 使用线程锁 # 按显著性排序 sorted_memory = sorted(self.working_memory, key=lambda x: x["salience"], reverse=True) return sorted_memory[:max_items] def get_metacognition_report(self) -> Dict: """ 获取元认知报告 返回: 元认知状态字典 """ return { "monitoring_active": self.metacognition["monitoring"], "last_insight": self.metacognition["insights"][-1] if self.metacognition["insights"] else None, "insight_count": len(self.metacognition["insights"]), "common_insights": self._get_common_insights(), "next_monitor_in": max(0, self.metacognition["interval"] - ( time.time() - self.metacognition["last_monitor_time"])) } def _get_common_insights(self) -> List[str]: """获取最常见的元认知洞察""" if not self.metacognition["insights"]: return [] # 统计洞察频率 insight_counts = {} for insight in self.metacognition["insights"]: insight_counts[insight] = insight_counts.get(insight, 0) + 1 # 按频率排序 sorted_insights = sorted(insight_counts.items(), key=lambda x: x[1], reverse=True) return [insight[0] for insight in sorted_insights[:3]] def get_cognitive_state_report(self) -> Dict: """ 获取认知状态报告 返回: 认知状态字典 """ return self.cognitive_state.copy() def shutdown(self): """安全关闭意识框架""" self.metacognition["monitoring"] = False if self.monitor_thread.is_alive(): self.monitor_thread.join(timeout=2.0) logger.info("意识框架已安全关闭") return { "final_state": { "working_memory": list(self.working_memory), "metacognition": self.metacognition, "cognitive_state": self.cognitive_state }, "timestamp": time.time() }
08-11
评论
成就一亿技术人!
拼手气红包6.0元
还能输入1000个字符
 
红包 添加红包
表情包 插入表情
 条评论被折叠 查看
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值