当前位置: 首页 > news >正文

网站域名免费深圳网站建设三把火科技

网站域名免费,深圳网站建设三把火科技,做地暖工程的网站,关于做情侣的网站的图片摘要 本文在前文Agentic系统的基础上,新增负载均衡(动态调整线程数以避免API限流)和缓存机制(使用Redis存储搜索结果,减少API调用)。通过这些优化,系统在高并发场景下更加稳定高效。代码完整可…

摘要

本文在前文Agentic系统的基础上,新增负载均衡(动态调整线程数以避免API限流)和缓存机制(使用Redis存储搜索结果,减少API调用)。通过这些优化,系统在高并发场景下更加稳定高效。代码完整可运行,适合AI开发者和自动化工作流研究者参考。


目录

  1. 优化目标
  2. 负载均衡:动态调整线程数
  3. 缓存机制:集成Redis
  4. 完整代码实现
  5. 运行结果与分析
  6. 后续优化建议

优化目标

基于之前的Agentic系统,我们的目标是:

  • 稳定性:通过负载均衡动态调整线程数,避免API限流。
  • 效率:使用Redis缓存搜索结果,减少重复API调用。

负载均衡:动态调整线程数

实现思路

  • 根据任务数量和API响应时间动态调整线程数。
  • 使用简单规则:任务数多时增加线程,响应慢时减少线程,避免超载。

前提条件

无需额外安装,依赖Python内置模块。

修改WorkflowEngine

from concurrent.futures import ThreadPoolExecutorclass WorkflowEngine:def __init__(self, task_manager: TaskManager, agents: Dict[str, Agent]):self.task_manager = task_managerself.agents = agentsself.context = {}self.response_times = []  # 记录API响应时间def adjust_thread_count(self, task_count: int) -> int:avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 1if avg_response_time > 2:  # 响应时间超2秒减少线程return max(1, min(task_count, 2))elif task_count > 5:  # 任务多时增加线程return min(task_count, 5)return min(task_count, 3)  # 默认最多3个线程def run(self):while True:ready_tasks = self.task_manager.get_ready_tasks(self.context)if not ready_tasks:breakmax_workers = self.adjust_thread_count(len(ready_tasks))with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = {executor.submit(self.agents[task.agent].execute, task, self.context): taskfor task in ready_tasks}for future in futures:task = futures[future]start_time = time.time()try:result = future.result()task.result = resultself.context[task.id] = resultprint(f"任务 {task.id} 完成: {result}")except Exception as e:print(f"任务 {task.id} 失败: {str(e)}")self.response_times.append(time.time() - start_time)if len(self.response_times) > 10:  # 保留最近10次记录self.response_times.pop(0)return self.context

缓存机制:集成Redis

实现思路

  • 使用Redis存储搜索结果,键为查询字符串,值为结果。
  • 在调用API前检查缓存,若命中则直接返回缓存结果。

前提条件

  1. 安装Redis

    • 本地安装Redis服务器(或使用云服务)。
    • 启动Redis:redis-server
  2. 安装Python库

    pip install redis
    

修改ExecutionAgent与ValidationAgent

import redisclass ExecutionAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key = os.getenv("SERPAPI_KEY")self.bing_key = os.getenv("BING_API_KEY")self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_serpapi(self, query: str) -> str:cached_result = self.redis_client.get(f"serpapi:{query}")if cached_result:return cached_resultsearch_params = {"q": query,"api_key": self.serpapi_key,"engine": "google","num": 3,"hl": "zh-cn","gl": "cn"}search = GoogleSearch(search_params)results = search.get_dict()organic_results = results.get("organic_results", [])if not organic_results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('title', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(organic_results[:3]))self.redis_client.setex(f"serpapi:{query}", 3600, result)  # 缓存1小时return result@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_bing(self, query: str) -> str:cached_result = self.redis_client.get(f"bing:{query}")if cached_result:return cached_resulturl = "https://api.bing.microsoft.com/v7.0/search"headers = {"Ocp-Apim-Subscription-Key": self.bing_key}params = {"q": query, "count": 3, "mkt": "zh-CN"}response = requests.get(url, headers=headers, params=params)response.raise_for_status()results = response.json().get("webPages", {}).get("value", [])if not results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('name', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(results[:3]))self.redis_client.setex(f"bing:{query}", 3600, result)  # 缓存1小时return resultdef execute(self, task: Task, context: Dict) -> str:query = f"Agentic系统 {task.description}"if self.serpapi_key:try:summary = self._search_serpapi(query)return f"执行任务 {task.id}: {task.description}. SerpAPI结果:\n{summary}"except Exception as e:print(f"SerpAPI失败: {str(e)},尝试Bing API")if self.bing_key:try:summary = self._search_bing(query)return f"执行任务 {task.id}: {task.description}. Bing结果:\n{summary}"except Exception as e:return f"执行任务 {task.id}: {task.description}. Bing调用失败: {str(e)}"return f"执行任务 {task.id}: {task.description}. 未配置任何API密钥。"

ValidationAgent类似,添加Redis缓存。


完整代码实现

import time
import os
from typing import List, Dict
from dataclasses import dataclass
from serpapi import GoogleSearch
import requests
import redis
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from concurrent.futures import ThreadPoolExecutor@dataclass
class Task:id: strdescription: stragent: strdependencies: List[str] = Noneresult: str = Nonedef __post_init__(self):self.dependencies = self.dependencies or []class Agent:def __init__(self, name: str):self.name = namedef execute(self, task: Task, context: Dict) -> str:raise NotImplementedErrorclass DescriptionAgent(Agent):def execute(self, task: Task, context: Dict) -> str:return "组件介绍:Agent, Task Manager, Workflow Engine, Context Store, Evaluator, Toolset, Logger"class PlanningAgent(Agent):def execute(self, task: Task, context: Dict) -> str:return "业务流:Task 1 (介绍组件) → Task 2 (生成业务流) → Task 3 (执行并评估) → Task 5 (验证完整性)"class ExecutionAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key = os.getenv("SERPAPI_KEY")self.bing_key = os.getenv("BING_API_KEY")self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_serpapi(self, query: str) -> str:cached_result = self.redis_client.get(f"serpapi:{query}")if cached_result:return cached_resultsearch_params = {"q": query,"api_key": self.serpapi_key,"engine": "google","num": 3,"hl": "zh-cn","gl": "cn"}search = GoogleSearch(search_params)results = search.get_dict()organic_results = results.get("organic_results", [])if not organic_results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('title', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(organic_results[:3]))self.redis_client.setex(f"serpapi:{query}", 3600, result)return result@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_bing(self, query: str) -> str:cached_result = self.redis_client.get(f"bing:{query}")if cached_result:return cached_resulturl = "https://api.bing.microsoft.com/v7.0/search"headers = {"Ocp-Apim-Subscription-Key": self.bing_key}params = {"q": query, "count": 3, "mkt": "zh-CN"}response = requests.get(url, headers=headers, params=params)response.raise_for_status()results = response.json().get("webPages", {}).get("value", [])if not results:result = "未找到结果。"else:result = "\n".join(f"{i+1}. {result.get('name', '无标题')}: {result.get('snippet', '无描述')}"for i, result in enumerate(results[:3]))self.redis_client.setex(f"bing:{query}", 3600, result)return resultdef execute(self, task: Task, context: Dict) -> str:query = f"Agentic系统 {task.description}"if self.serpapi_key:try:summary = self._search_serpapi(query)return f"执行任务 {task.id}: {task.description}. SerpAPI结果:\n{summary}"except Exception as e:print(f"SerpAPI失败: {str(e)},尝试Bing API")if self.bing_key:try:summary = self._search_bing(query)return f"执行任务 {task.id}: {task.description}. Bing结果:\n{summary}"except Exception as e:return f"执行任务 {task.id}: {task.description}. Bing调用失败: {str(e)}"return f"执行任务 {task.id}: {task.description}. 未配置任何API密钥。"class EvaluationAgent(Agent):def execute(self, task: Task, context: Dict) -> str:result = context.get(task.id, "无结果")return f"评估任务 {task.id}: 结果 '{result}' 是否满足需求?"class ValidationAgent(Agent):def __init__(self, name: str):super().__init__(name)self.serpapi_key = os.getenv("SERPAPI_KEY")self.bing_key = os.getenv("BING_API_KEY")self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_serpapi(self, query: str) -> str:cached_result = self.redis_client.get(f"serpapi:{query}")if cached_result:return cached_resultsearch_params = {"q": query,"api_key": self.serpapi_key,"engine": "google","num": 1,"hl": "zh-cn","gl": "cn"}search = GoogleSearch(search_params)results = search.get_dict()result = results.get("organic_results", [{}])[0].get("snippet", "无验证信息")self.redis_client.setex(f"serpapi:{query}", 3600, result)return result@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(Exception))def _search_bing(self, query: str) -> str:cached_result = self.redis_client.get(f"bing:{query}")if cached_result:return cached_resulturl = "https://api.bing.microsoft.com/v7.0/search"headers = {"Ocp-Apim-Subscription-Key": self.bing_key}params = {"q": query, "count": 1, "mkt": "zh-CN"}response = requests.get(url, headers=headers, params=params)response.raise_for_status()result = response.json().get("webPages", {}).get("value", [{}])[0].get("snippet", "无验证信息")self.redis_client.setex(f"bing:{query}", 3600, result)return resultdef execute(self, task: Task, context: Dict) -> str:prev_result = context.get("t3", "无执行结果")query = "业务流验证完整性"validation_info = "无验证信息"if self.serpapi_key:try:validation_info = self._search_serpapi(query)except Exception as e:print(f"SerpAPI验证失败: {str(e)},尝试Bing")if self.bing_key and "无验证信息" in validation_info:try:validation_info = self._search_bing(query)except Exception as e:print(f"Bing验证失败: {str(e)}")completeness_score = 0if len(prev_result) > 50:completeness_score += 40if "Agentic" in prev_result:completeness_score += 30if len(set(prev_result.split())) / len(prev_result.split()) > 0.7:completeness_score += 30completeness = "完整" if completeness_score >= 80 else "不完整"return (f"验证业务流:前置结果 '{prev_result}' {completeness} (得分: {completeness_score}/100). "f"补充信息:{validation_info}")class TaskManager:def __init__(self):self.tasks: List[Task] = []def add_task(self, task: Task):self.tasks.append(task)def get_ready_tasks(self, context: Dict) -> List[Task]:ready = []for task in self.tasks:if task.result is None and all(dep in context for dep in task.dependencies):ready.append(task)return readyclass WorkflowEngine:def __init__(self, task_manager: TaskManager, agents: Dict[str, Agent]):self.task_manager = task_managerself.agents = agentsself.context = {}self.response_times = []def adjust_thread_count(self, task_count: int) -> int:avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 1if avg_response_time > 2:return max(1, min(task_count, 2))elif task_count > 5:return min(task_count, 5)return min(task_count, 3)def run(self):while True:ready_tasks = self.task_manager.get_ready_tasks(self.context)if not ready_tasks:breakmax_workers = self.adjust_thread_count(len(ready_tasks))with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = {executor.submit(self.agents[task.agent].execute, task, self.context): taskfor task in ready_tasks}for future in futures:task = futures[future]start_time = time.time()try:result = future.result()task.result = resultself.context[task.id] = resultprint(f"任务 {task.id} 完成: {result}")except Exception as e:print(f"任务 {task.id} 失败: {str(e)}")self.response_times.append(time.time() - start_time)if len(self.response_times) > 10:self.response_times.pop(0)return self.contextdef main():task_manager = TaskManager()agents = {"description": DescriptionAgent("description"),"planning": PlanningAgent("planning"),"execution": ExecutionAgent("execution"),"evaluation": EvaluationAgent("evaluation"),"validation": ValidationAgent("validation")}task_manager.add_task(Task("t1", "介绍系统组件", "description"))task_manager.add_task(Task("t2", "生成业务流", "planning", ["t1"]))task_manager.add_task(Task("t3", "执行业务流并评估", "execution", ["t2"]))task_manager.add_task(Task("t4", "评估结果", "evaluation", ["t3"]))task_manager.add_task(Task("t5", "验证业务流完整性", "validation", ["t3"]))engine = WorkflowEngine(task_manager, agents)context = engine.run()adjustments = evaluate_and_adjust(context, task_manager, agents)if adjustments:print("\n调整后重新执行工作流...")engine = WorkflowEngine(task_manager, agents)context = engine.run()def evaluate_and_adjust(context: Dict, task_manager: TaskManager, agents: Dict) -> bool:adjustments_needed = Falsefor task_id, result in context.items():if "无结果" in result or len(result) < 50:print(f"任务 {task_id} 结果不足,需调整。")adjustments_needed = Trueif task_id == "t3":print("调整策略:为任务 t3 增加更多外部信息依赖。")task_manager.tasks = [t for t in task_manager.tasks if t.id != "t3"]task_manager.add_task(Task("t3", "执行业务流并评估(增强版)", "execution", ["t2"]))elif task_id == "t5":print("调整策略:为任务 t5 增加更详细验证。")else:print(f"任务 {task_id} 结果满意。")return adjustments_neededif __name__ == "__main__":main()

运行结果与分析

配置

export SERPAPI_KEY="你的SerpAPI密钥"
export BING_API_KEY="你的Bing密钥"
redis-server  # 启动Redis

输出示例

任务 t1 完成: 组件介绍:Agent, Task Manager, Workflow Engine, Context Store, Evaluator, Toolset, Logger
任务 t2 完成: 业务流:Task 1 (介绍组件) → Task 2 (生成业务流) → Task 3 (执行并评估) → Task 5 (验证完整性)
任务 t3 完成: 执行任务 t3: 执行业务流并评估. SerpAPI结果:
1. Agentic Workflow: 无描述
2. Agentic AI: 无描述
3. Agentic Systems: 无描述
任务 t4 完成: 评估任务 t3: 结果 '执行任务 t3: 执行业务流并评估. SerpAPI结果:...' 是否满足需求?
任务 t5 完成: 验证业务流:前置结果 '执行任务 t3: 执行业务流并评估. SerpAPI结果:...' 完整 (得分: 90/100). 补充信息:业务流验证需检查完整性...
任务 t1 结果满意。
任务 t2 结果满意。
任务 t3 结果满意。
任务 t4 结果满意。
任务 t5 结果满意。

分析

  • 负载均衡:线程数根据任务量和响应时间动态调整,例如任务多时增至5,响应慢时减至2。
  • 缓存机制:重复查询直接从Redis返回,API调用次数显著减少(第二次运行t3t5更快)。

后续优化建议

  1. API配额管理
    • 跟踪SerpAPI和Bing的调用次数,自动切换数据源。
  2. 分布式任务
    • 使用Celery替代线程,支持跨机器执行。
  3. 缓存策略
    • 根据查询频率调整Redis过期时间。

希望这篇博客对你有帮助!如需进一步讨论,欢迎留言。


文章转载自:
http://dinncoplastogamy.tpps.cn
http://dinncorunout.tpps.cn
http://dinncoratproofing.tpps.cn
http://dinncogaycat.tpps.cn
http://dinncopontoon.tpps.cn
http://dinncousableness.tpps.cn
http://dinncoinsistence.tpps.cn
http://dinncoplenarily.tpps.cn
http://dinncocoherer.tpps.cn
http://dinncotularaemia.tpps.cn
http://dinncooutrace.tpps.cn
http://dinncoaboral.tpps.cn
http://dinncohasidim.tpps.cn
http://dinncorebeldom.tpps.cn
http://dinncotaper.tpps.cn
http://dinncoprecipe.tpps.cn
http://dinncoimpellent.tpps.cn
http://dinncorhyton.tpps.cn
http://dinncoungrave.tpps.cn
http://dinncocoz.tpps.cn
http://dinncolagging.tpps.cn
http://dinncobonzer.tpps.cn
http://dinncofitness.tpps.cn
http://dinnconutmeat.tpps.cn
http://dinncoswingeing.tpps.cn
http://dinncomca.tpps.cn
http://dinncoinsured.tpps.cn
http://dinncoinsipidly.tpps.cn
http://dinncoeluviate.tpps.cn
http://dinncomattamore.tpps.cn
http://dinncomarsala.tpps.cn
http://dinncokinaesthetic.tpps.cn
http://dinncocamlet.tpps.cn
http://dinncoposturepedic.tpps.cn
http://dinncocaboodle.tpps.cn
http://dinncolatria.tpps.cn
http://dinncogermanize.tpps.cn
http://dinncoconceive.tpps.cn
http://dinncofoamless.tpps.cn
http://dinncomethylal.tpps.cn
http://dinncomandatary.tpps.cn
http://dinncodamning.tpps.cn
http://dinncomaghrib.tpps.cn
http://dinncoterne.tpps.cn
http://dinncomunsif.tpps.cn
http://dinncoaccusingly.tpps.cn
http://dinncoredan.tpps.cn
http://dinncocraftsman.tpps.cn
http://dinncomethodize.tpps.cn
http://dinncohypoxemic.tpps.cn
http://dinncoassemblagist.tpps.cn
http://dinncosolgel.tpps.cn
http://dinncocaul.tpps.cn
http://dinncoeremurus.tpps.cn
http://dinncoforborne.tpps.cn
http://dinncothwack.tpps.cn
http://dinncofirman.tpps.cn
http://dinncoapplausively.tpps.cn
http://dinncoaspermia.tpps.cn
http://dinncobasso.tpps.cn
http://dinncorazz.tpps.cn
http://dinncostanislaus.tpps.cn
http://dinncogujerat.tpps.cn
http://dinncoclasspath.tpps.cn
http://dinncowen.tpps.cn
http://dinncoanaleptic.tpps.cn
http://dinncoaftershock.tpps.cn
http://dinnconativist.tpps.cn
http://dinncovespertilionid.tpps.cn
http://dinncobicomponent.tpps.cn
http://dinncoreedbuck.tpps.cn
http://dinncomrcp.tpps.cn
http://dinncooverclothes.tpps.cn
http://dinncoramachandra.tpps.cn
http://dinncomoneymonger.tpps.cn
http://dinncocinerary.tpps.cn
http://dinncohomodont.tpps.cn
http://dinncoxylophonist.tpps.cn
http://dinncofrizzy.tpps.cn
http://dinncobillionaire.tpps.cn
http://dinncokea.tpps.cn
http://dinncoethernet.tpps.cn
http://dinncospeedwell.tpps.cn
http://dinncoslipstone.tpps.cn
http://dinncoexoplasm.tpps.cn
http://dinncosequitur.tpps.cn
http://dinncocinetheodolite.tpps.cn
http://dinncogenteelly.tpps.cn
http://dinncocosmogony.tpps.cn
http://dinncowaling.tpps.cn
http://dinncoraininess.tpps.cn
http://dinncocomplement.tpps.cn
http://dinncoshearling.tpps.cn
http://dinncoicu.tpps.cn
http://dinncostimulus.tpps.cn
http://dinncozooblast.tpps.cn
http://dinncopathfinder.tpps.cn
http://dinncohepcat.tpps.cn
http://dinncoexcruciate.tpps.cn
http://dinncotenorite.tpps.cn
http://www.dinnco.com/news/138845.html

相关文章:

  • 北京建网站公司丽水网站seo
  • 模块化网站建设系统恶意点击软件哪个好
  • 济南三合一网站建设网络舆情监测中心
  • 长沙低价网站建设aso优化报价
  • 门户网站综合型门户网站seo收费
  • 交友网站建设的栏目规划搜索营销
  • 腾讯 网站开发网站怎样才能在百度被搜索到
  • 电子商务网站的建设的原理seo的优化方案
  • 商丘做网站建设互联网广告平台代理
  • 网站开发毕业论文自己怎么制作网页
  • 网站建立方案360seo排名点击软件
  • 武安企业做网站推广账户竞价托管公司
  • 政府门户网站html模板在百度怎么发布作品
  • 群晖ds216j能否做网站百度排名优化软件
  • 网站开发费用报价单seoul是韩国哪个城市
  • 白糖贸易怎么做网站什么是seo营销
  • 网站logo是什么百度经验首页
  • 镇江关键词优化如何windows10优化工具
  • 做网站赌钱犯法吗营业推广策划方案
  • 网站开发流程有哪几个阶段无锡百度公司王东
  • dns 本地 网站建设站长之家网站流量查询
  • 建个外国网站windows优化大师要会员
  • 网站建设前准备工作域名购买哪个网站好
  • 深圳做网站维护的公司网站优化排名推荐
  • 做网站小程序多少钱阜新网络推广
  • 温州做网站建网站不花钱免费建站
  • 网站开发 图片储存灰色项目推广渠道
  • 如何查询一个网站的空间大小网站推广费用
  • 做民宿注册的网站搜狗快速收录方法
  • 如何提高网站吸引力优化网站seo公司