import os import asyncio from agentscope_runtime.engine import AgentApp from agentscope_runtime.engine.schemas.agent_schemas import AgentRequest from agentscope.model import {{ .Provider }}Model from agentscope.formatter import {{ .Provider }}Formatter from agentscope_runtime.adapters.agentscope.memory import AgentScopeSessionHistoryMemory from agentscope_runtime.engine.services.agent_state import InMemoryStateService from agentscope_runtime.engine.services.session_history import InMemorySessionHistoryService from agentscope.pipeline import stream_printing_messages from agentscope_runtime.engine.deployers.local_deployer import LocalDeployManager from agentscope_runtime.engine.deployers.utils.deployment_modes import DeploymentMode from agent import Agent from toolkit import toolkit, init_toolkit_sync app = AgentApp( app_name="{{.AppName}}", app_description="{{.AppDescription}}", ) def load_sys_prompt(prompt_file_name="prompt.md"): script_dir = os.path.dirname(os.path.abspath(__file__)) prompt_path = os.path.join(script_dir, prompt_file_name) with open(prompt_path, 'r', encoding='utf-8') as f: return f.read() @app.init async def init_func(self): """初始化状态和会话服务""" self.state_service = InMemoryStateService() self.session_service = InMemorySessionHistoryService() await self.state_service.start() await self.session_service.start() @app.shutdown async def shutdown_func(self): """清理服务""" await self.state_service.stop() await self.session_service.stop() @app.query(framework="agentscope") async def query_func(self, msgs, request: AgentRequest, **kwargs): session_id = request.session_id user_id = request.user_id # 恢复 Agent 状态 state = await self.state_service.export_state( session_id=session_id, user_id=user_id, ) # ---- 创建 Agent ---- agent = Agent( name="{{.AgentName}}", model={{ .Provider }}Model( "{{.ChatModel}}", api_key=os.getenv("{{.APIKeyEnvVar}}"), stream={{.EnableStreaming | boolToPython}}, ), sys_prompt=load_sys_prompt(), toolkit=toolkit, memory=AgentScopeSessionHistoryMemory( service=self.session_service, session_id=session_id, user_id=user_id, ), formatter={{ .Provider }}Formatter(), ) agent.set_console_output_enabled(enabled=False) # 恢复状态 if state: agent.load_state_dict(state) # ---- 流式输出 ---- async for msg, last in stream_printing_messages( agents=[agent], coroutine_task=agent(msgs), ): yield msg, last # ---- 保存 Agent 状态 ---- state = agent.state_dict() await self.state_service.save_state( user_id=user_id, session_id=session_id, state=state, ) async def main(): """以独立进程模式部署应用""" deployment_info = await app.deploy( LocalDeployManager(host="{{.HostBinding}}", port={{.DeploymentPort}}), mode=DeploymentMode.DETACHED_PROCESS, ) url = deployment_info['url'] print(f"✅ 部署成功:{url}") print(f"📍 部署 ID:{deployment_info['deploy_id']}") print( f""" Check health: curl {url}/health Shutdown: curl -X POST {url}/admin/shutdown """ ) print(f"🌟 You can deploy it to Higress by using: hgctl agent add {url}") return deployment_info if __name__ == "__main__": init_toolkit_sync() asyncio.run(main())