AI-Agent 智能体-开发总结

问题

系统架构图

项目中规则树的体现

public class AiAgentEngineStarterEntity {

    /**
     * 客户端ID列表
     * 
     * 需要预热的AI客户端ID集合,这些客户端在引擎启动时
     * 被初始化并装配相应的AI模型、工具链和顾问组件
     */
    private List<Long> clientIdList;
}

为什么是client_id

因为client代表一个完整的AI智能体客户端配置单元,通过它可以关联查询到客户端的所有配置组件。 img_20.png

通过id获取所有配置 img_21.png

DefaultArmoryStrategyFactory.DynamicContext - 动态上下⽂

动态上下⽂在 RootNode 节点中加载,所有数据都被⼀次性加载并存储到 DynamicContext 中,并且这 ⾥是并发加载,提升了性能。后续节点都可以从参数直接获取,不⽤从数据库中找了! 过泛型方法setValue和getValue,实现了类型安全的数据存取,避免了频繁的类型转换。

AI-Agent装配流程图img_22.png

PromptChatMemoryAdvisor

Spring AI实现的提示词记忆顾问

  • 自动维护历史对话记录
  • 在每次请求的时候将相关的历史消息自动注入到提示词中
// 项目中的实际使用
ChatClient chatClient = ChatClient.builder(chatModel)
    .defaultAdvisors(
        // 添加记忆顾问
        new PromptChatMemoryAdvisor(MessageWindowChatMemory.builder()
            .maxMessages(10)  // 最多记住10条消息
            .build())
    )
    .build();
// 调用时指定会话ID
String response = chatClient
    .prompt("你好,我是张三")
    .advisors(a -> a.param(CHAT_MEMORY_CONVERSATION_ID_KEY, "user_123"))
    .call()
    .content();

MessageWindowChatMemory是消息窗口记忆的实现, 通过滑动窗口的形式保存maxMessages条消息。

RagAnswerAdvisor

自定义的RAG顾问实现。这个方法是核心的从向量数据库中获取到和提问相关的documents, 然后由此注入上下文参数, 构建一个新的增强请求

    HashMap<String, Object> context = new HashMap(request.adviseContext());
    // 1. 构建增强提示词
    String advisedUserText = request.userText() + System.lineSeparator() + this.userTextAdvise;
    
    // 2. 渲染查询模板
    String query = new PromptTemplate(request.userText(), request.userParams()).render();
    
    // 3. 执行向量检索
    SearchRequest searchRequestToUse = SearchRequest.from(this.searchRequest)
    .query(query)
    .filterExpression(this.doGetFilterExpression(context))
    .build();
    List<Document> documents = this.vectorStore.similaritySearch(searchRequestToUse);
    
    // 4. 构建文档上下文
    context.put("qa_retrieved_documents", documents);
    String documentContext = (String)documents.stream()
    .map(Document::getText)
    .collect(Collectors.joining(System.lineSeparator()));
    Map<String, Object> advisedUserParams = new HashMap(request.userParams());
    
    // 5. 注入上下文参数
    advisedUserParams.put("question_answer_context", documentContext);
    
    AdvisedRequest advisedRequest = AdvisedRequest
    .from(request)
    .userText(advisedUserText)
    .userParams(advisedUserParams)
    .adviseContext(context).build();
    return advisedRequest;
    }

AdvisedRequest 和 AdvisedResponse 是 Spring AI 的增强请求和响应对象。这两个类的作用是在于提供一个增强的上下文,使得在处理请求和响应时可以携带额外的信息,比如用户的查询、上下文信息等。 通过增强请求和响应,可以在处理过程中注入更多的上下文信息,在before中处理请求时,可以根据用户的查询和上下文信息来构建一个更丰富的请求对象。 这里的上下文是通过知识库向量存储(VectorStore)进行检索得到的,增强请求可以包含检索到的相关文档信息。 这里的nextAroundCall和nextAroundStream方法是调用链的一部分,用于在增强请求处理完成后继续执行下一个环节的逻辑。 在aroundCall和aroundStream方法中,增强请求会被传递到下一个环节进行处理。 在处理响应时,增强响应对象可以携带更多的信息,比如检索到的文档信息、处理结果等。 也就是我能从增强相应对象中获取到rag知识库检索到的相关文档信息。

ai-agent 预热(preheat)流程

  • 我们先根据aiClientIds从RootNode节点出发, 异步从库表中查询组装出来业务中流转的VO对象, 并将它们添加到Map中
  • 然后进入到AiClientToolMcpNode, 根据Map中的MCPVOList组转起来所有McpSyncClient然后将其注册为Bean
  • 进入到AiClientAdvisorNode, 根据Advisor的类型, 组装rag或者chatmemory类型的bean
  • 再到AiClientModelNode, 组装OpenAiChatModel并这个model绑定的MCP Tool添加进去
  • 最后就是AiClientNode将chatModel, MCP, Advisor这三类Bean组装成一个对话客户端ChatClient, 然后将其注册为bean

ai-agent 对话和定时任务

对话分成流式对话和普通对话两种

普通对话-aiAgentChat方法

获取到完整的AI回答再给出返回

  1. 获取所有的client的ID

    List<Long> aiClientIds = repository.queryAiClientIdsByAiAgentId(aiAgentId);
    String content = "";
    
  2. 根据clientId取出来对应的client Bean对象, 链式调用这个agent对应的client们, 渐进式提问

    for (Long aiClientId : aiClientIds) {
    ChatClient chatClient = defaultArmoryStrategyFactory.chatClient(aiClientId);
    content = chatClient.prompt(message + "," + content)
                        .system(s -> s.param("current_date", LocalDate.now().toString()))
                        .advisors(a -> a
                        .param(CHAT_MEMORY_CONVERSATION_ID_KEY, "chatId 101")
                        .param(CHAT_MEMORY_RETRIEVE_SIZE_KEY, 100))
                        .call().content();
    }
    
    • 这里传入的advisor的参数CHAT_MEMORY_CONVERSATION_ID_KEY是用于标识这次对话的ID, 这个ID是用来区分上下文的, 不同的对话有着不同的ID、不同的上下文.
    • param(CHAT_MEMORY_RETRIEVE_SIZE_KEY, 100)) 这个参数的作用是将对话历史记录的数量限制在100条。

    这里为什么会是链式调用, 一个问题调用一个Client进行回答不就行了吗?

    注意我们的方法名不是clientChat而是agentChat, 一个agent能够串联多个client, 在这个方法中多client协作的工作共同构建最后的回答, 实现功能互补或者质量提升, 亦或是问题分解, 举例: 场景一:专业分工协作

    假设有3个智能体:

    // aiClientId = 1: 文本分析专家

    // aiClientId = 2: 逻辑推理专家

    // aiClientId = 3: 总结归纳专家

    // 第一轮:分析专家处理原始问题

    content = ""; // 初始为空

    content = 分析专家.process("用户问题" + "," + ""); // 得到分析结果

    // 第二轮:推理专家基于分析结果进行推理

    content = 推理专家.process("用户问题" + "," + "分析结果"); // 得到推理结论

    // 第三轮:总结专家整合前面的结果

    content = 总结专家.process("用户问题" + "," + "分析结果+推理结论"); // 最终答案

    场景二:渐进式优化

    // 智能体链逐步完善答案质量

    // 第一个智能体:给出初步答案

    // 第二个智能体:基于初步答案进行补充和优化

    // 第三个智能体:进行最终校验和润色

流式对话-aiAgentChatStream方法

获取到一部分返回以后立马呈现给用户

  1. 获取配置
    // 查询模型ID
    Long modelId = repository.queryAiClientModelIdByAgentId(aiAgentId);
    // 获取对话模型
    ChatModel chatModel = defaultArmoryStrategyFactory.chatModel(modelId);
    
  2. 构建messages
    • 如果对话不携带rag, 则messages就只有传入的message
     messages.add(new UserMessage(message));
    
    • 如果携带rag
      1. 根据ragId获取到tag
      2. 从pg中查询到相近的documents
      SearchRequest searchRequest = SearchRequest.builder()
                                                 .query(message)
                                                 .topK(5)
                                                 .filterExpression("knowledge == '" + tag + "'")
                                                 .build();
      List<Document> documents = vectorStore.similaritySearch(searchRequest);
      
      1. 构建结构化的rag prompt
      String documentCollectors = documents.stream()
                                           .map(Document::getFormattedContent)
                                           .collect(Collectors.joining());
      Message ragMessage = new SystemPromptTemplate(""" .... DOCUMENTS: {documents} """)
                                                   .createMessage(Map.of("documents", documentCollectors));
      messages.add(new UserMessage(message));
      
  3. 使用messages与第一步获取到的model对话
return chatModel.stream(Prompt.builder()
                .messages(messages)
                .build());

为什么这里又不使用链式调用多个client了?

链式调用多个client的本质是将当前client的输出 + 原问题作为下一个client的输入, 如果这样我们必须阻塞式等待client完整回答完毕, 而当前方法实现的是流式回答, 是输出了一部分就给用户呈现一部分, 所以不能链式调用.

AgentTaskJob

init方法-在Bean初始化以后执行

这个方法上有个@PostConstruct注解, 这个注解的功能是在Spring容器初始化完成后执行该方法.这个方法的主要功能就是初始化了一个任务调度器.

public void init() {
   // 初始化任务调度器
   ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); // 这个类是 Spring 提供的一个线程池任务调度器
   scheduler.setPoolSize(10);
   scheduler.setThreadNamePrefix("agent-task-scheduler-");
   scheduler.setWaitForTasksToCompleteOnShutdown(true); // 设置在关闭时等待任务完成
   scheduler.setAwaitTerminationSeconds(60); // 设置等待任务完成的时间
   scheduler.initialize();
   this.taskScheduler = scheduler;
}

executeTask方法-执行任务

  1. 获取任务的参数(json格式) String taskParam = task.getTaskParam();
  2. 执行任务 aiAgentChatService.aiAgentChat(task.getAgentId(), taskParam);

这里的执行任务为什么就是aiAgentChat?

在前面的xfg的mcp章节, 其实就能看到我们让AI执行一个任务就是与AI对话, 让它调用对应的MCP执行特定的任务。 这里也是一样的, 同时agent的链式调用client还能使得我们这个任务由多个client协同执行。

scheduleTask方法-调度器执行任务

  1. 创建任务调度器:调度器要执行的方法就是executeTask(task);时间设置通过task的cron表达式
ScheduledFuture<?> future = taskScheduler.schedule(() -> executeTask(task), new CronTrigger(task.getCronExpression()));
  1. 将这个任务放到类全局map中, 记录这个任务已经被我们调度执行成功了 scheduledTasks.put(task.getId(), future);

refreshTasks方法-移除invalid任务, 执行valid任务

  1. 从taskService中查询到所有有效的任务配置
List<AiAgentTaskScheduleVO> taskSchedules =  aiAgentTaskService.queryAllValidTaskSchedule();
  1. 处理每个任务:将这个任务的id放入到Map中, 用于后面删除调度器中已经invalid任务;如果任务不存在调用scheduleTask(task)创建并调度新任务
for (AiAgentTaskScheduleVO task : taskSchedules) {
   Long taskId = task.getId();
   currentTaskIds.put(taskId, true);
   // 如果任务已经存在,则跳过
   if (scheduledTasks.containsKey(taskId)) {
      continue;
   }
   // 创建并调度新任务
   scheduleTask(task);
}
  1. 移除不存在的任务:获取现在所有在调度器中的任务的keySet, 也就是所有在调度器中的任务的taskId集合A; 上面处理每个任务的时候记录的现在仍然有效的taskId集合B, 如果A不在B中, 说明这个任务已经不存在了;从调度器task集合中获取任务并将任务移除.
scheduledTasks.keySet().removeIf(taskId -> {
    if (!currentTaskIds.containsKey(taskId)) {
        ScheduledFuture<?> future = scheduledTasks.remove(taskId);
        if (future != null) {
            future.cancel(true);
            log.info("已移除任务,ID: {}", taskId);
        }
        return true;
    }
    return false;
 });

cleanInvalidTasks方法-清理已经被标记为无效的任务

  1. 获取所有已经失效的任务的ID: List<Long> invalidTaskIds = aiAgentTaskService.queryAllInvalidTaskScheduleIds();
  2. 从调度器中移除这些任务
for (Long taskId : invalidTaskIds) {
    ScheduledFuture<?> future = scheduledTasks.remove(taskId);
    if (future != null) {
        future.cancel(true);
        log.info("已移除无效任务,ID: {}", taskId);
    }
}

chat和task模块可以说是看似简单的实现, 但是具有很强的拓展性, 比如我想设置一个监控数据监控面板然后发现异常微信推送消息的组件

  1. 实现能从数据监控面板获取数据的MCP工具
  2. 实现发送微信消息的MCP工具
  3. 注册一个分析数据监控面板client, 注册一个发送总结分析发送微信消息的client
  4. 将两个client串联成一个agent, 设置这个agent的定时任务
  5. 这样就实现了监控数据面板在AI分析即将出现问题的时候通知开发者的功能
ON THIS PAGE