当前位置: 首页 > news >正文

网站建设的步骤过程视频地推团队去哪里找

网站建设的步骤过程视频,地推团队去哪里找,wordpress 兼职,怎么用joomla做网站说明 这应该是进一步的完善ADBS的工作模式。 之所以做A系列的架构工具,就是为了可以实现大型的数据处理、存储。从应用上说,是为了提高效率,并达到超高的效果。 为了达到这个目的,就必须从数据架构上、任务调度上、逻辑架构上作…

说明

这应该是进一步的完善ADBS的工作模式。

之所以做A系列的架构工具,就是为了可以实现大型的数据处理、存储。从应用上说,是为了提高效率,并达到超高的效果。

为了达到这个目的,就必须从数据架构上、任务调度上、逻辑架构上作出好的设计,并将之实现。逻辑架构主要对应的就是Core的设计,目前初步的实现了ETL,模型的还没有去实现,但有了Core和ETL的经验,那么模型只是另一种形态和时间问题。

但无论如何,数据架构一定是所有应用的基础,所以第一步也就是实现了数据架构。ADBS是基于Mongo和Redis搭建的一套可适应高并发并支持多Worker并行执行的数据库流转体系,目前看来效果是很理想的,单核日吞吐可以达到3000万条数据以上。

因为单步的ADBS已经包含了包括流转、监控、分发在内的一系列程序(sniffer, io型),单个ADBS已经具备了很强的独立服务能力,而如果要进行定制化修改会略显麻烦。 所以从应用于结构可靠性的角度上考虑,我倾向于使用「简单结构,多层迭代」。这也是从计算机发展史里得到的教训,简单的结构(迭代/叠加)才可能实现真正复杂的功能。

本次的内容就是在进行ADBS之间连接时,sniffer的动作。

内容

1 Sniffer的取数场景

广义上来说,ADBS里除了Worker,其他都是Sniffer。不过这里特指进行取数衔接的程序部分。

我大致想了一下,Sniffer可能有几个取数场景:

  • 1 从数据库取数。这在之前的项目服务类场景下常见,需要我向数据库发起Range或者Set查询,来获取要处理的数据。
  • 2 文件取数。例如这次,我会手动下载510300的离线文件,然后由Sniffer驱动取到第一个ADBS
  • 3 ADBS取数。也就是本次讨论的内容,从ADBS取数。本质上也是从数据库取数,但是由于ADBS存在一些规范,所以取数的模式可以比较固定。另外就是未来ADBS之间衔接必然是不可少的一部分,所以特殊独立出来。

2 取数模式

某个step_out的数据如下
在这里插入图片描述
step_out提供了默认的任务通道_ch001,在sniffer取数时需要根据这个通道的状态进行查询、ACK。

一般情况下,Sniffer取数后要立即ACK状态1,这样避免其他Sniffer重复的获取数据;在最后一步的时候ACK2或者3表示任务的完成状态。有时候,数据不满足条件,这样会导致计算失败。此时会有再次巡查Sniffer,检查到超时会将任务重启初始化。当然,超时重试也有次数限制。

初次请求:

  • 1 请求数据【是否为空】【是否满足可用条件】
  • 2 * 回应【ACK】【如果是数据库取数】
  • 3 根据规则判断下一步可行性【队列是否溢出】
  • 4 执行具体操作 【增删改】
  • 5 进行回应【ACK】【异常上报】

再次巡查:

  • 1 检查数据是否认领+超时
  • 2 将数据的通道字段翻回0

在这里加入一个限定:只有一个Sniffer向Step Out(Mongo)发起取数。

加入这个限定后,请求过程会得到简化,但是这样合理吗?

在ADBS中,任务的分发是通过Redis的Stream完成的,天然的分发方式。所以Sniffer的这个变化不影响并发处理。

并且对于IO来说,单个Sniffer可以吞吐的速度已经足够快了。每秒1万条就已经超过单核ADBS一天吞吐量的好几倍了。

所以,结论是可行。

加入限定后,流程变得简单:

  • 1 请求数据

    • 执行两个判定:数据是否为空、数据是否满足特定要求(基础要求)
  • 2 判断目标队列是否会溢出

  • 3 根据2的结果决定是否执行操作

  • 4 根据3的结果决定是否ACK

这样就不用考虑并行时的抢占,也不必考虑考虑巡检重置超时的问题。

具体的做法其实可以参考StreamsIO.M2S的方法,当时只是考虑在本ADBS中将Mongo数据拉到工作队列并记录日志。现在的差别是读取数据和日志不是同一个ADBS。

实例:

这个Sniffer运行在MyQuantBaseStep2Signals,向MyQuantBase.step1_mongo_out发起取数。

  • 1 Sniffer实际运行的ADBS是MyQuantBaseStep2Signals。
  • 2 Sniffer请求的源是MyQuantBase.step1_mongo_out
  • 3 Sniffer记录的log是在MyQuantBaseStep2Signals
  • 4 Sniffer的目标是 MyQuantBaseStep2Signals.

需要的一些参数

  • 1 redis服务地址
  • 2 超时设置
  • 3 批次取数量
  • 4 源mongo的tier1和tier2
  • 5 当前mongo记录日志的tier1和tier2
  • 6 当前所使用的通道
  • 7 目标队列的最大长度

代码:

from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import *from configs_base import redis_agent_host,project_name,cur_w
from configs_base import color_print,step1_stream_in# 判断队列是否可以插入
def is_q_available(stream_name, maxlen = 100000, new_task_len = 10000, redis_agent = None,connection_hash =None  ):cur_redis_agent = redis_agentcur_len_resp = req.post(cur_redis_agent + 'len_of_queue/',json ={'stream_name':stream_name,'connection_hash':connection_hash}).json()if cur_len_resp['status']:cur_len = cur_len_resp['data']if cur_len + new_task_len >=maxlen:return False else:return True else:print('Connection Error')return False #  基于并发方法,向数据库存数【队列Write相关-写入消息】- 其实是使用pipeline - 最好单次一万左右
def parrallel_write_msg(stream_name, data_listofdict = None ,maxlen = None, time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False):cur_redis_agent = redis_agentcur_maxlen = maxlen or 100000# 默认十秒超时time_out = time_out or 30print('>>> 并发写Stream')tick11 = time.time()resp_dict = req.post(cur_redis_agent + 'batch_add_msg/',json ={'connection_hash':connection_hash,'stream_name':stream_name,'msg_dict_list':data_listofdict,'maxlen':cur_maxlen,'is_return_msg_id_list':is_return_msg_id_list},timeout=time_out).json()tick13 = time.time()print('写入任务数据 {:.2f}'.format(tick13 -tick11))return resp_dict# 回应
def ack_mongo(w =  None,tier1 = None, tier2 = None, key_list = None,keyname = None, channel_name = None, channel_val = None ):cur_w = w or self.w var_list =[tier1, tier2, key_list, keyname, channel_name, channel_val]assert all(var_list), ','.join(var_list)+'参数不可为空'filter_list =  [{keyname:{'$in':key_list}}]attr_list = [{channel_name:channel_val}]inc_list = [{channel_name+ '_cnt': 1}]return cur_w.update_with_inc(tier1 = tier1, tier2 = tier2, filter_list= filter_list, attr_list = attr_list, inc_list = inc_list)# ============== Modify 2023.01.10
# 【基础定义区-常变】
cur_machine = get_machine_name()
print('Current Machine', cur_machine)redis_agent_host = 'http://172.17.0.1:24021/'
redis_connection_hash =None# 这个sniffer盯的是上一个ADBS的输出
# source 
source_server = 'm7.24065'
source_tier1 = 'MyQuantBase'
source_tier2 = 'step1_mongo_out'
gs_id = 'rec_id'current_tier1 = project_namemarket = 'SH'
code ='510300'
start_slot = 26299291# 这个可以自由定义,这里我用了24000最大周期 + 之前有一部分误写入的部分
burnt_slots = 20000
batch_num = 10000
# 目标队列允许的最大长度
target_q_max_len = 100000
target_q_name = '%s.%s' % (current_tier1, step1_stream_in)
sniffer_name = 'sniffer01_query_step1_result'
keyname = gs_id
channel_name = '_ch001'
custom_filter_list = [{'market':market,'code':code}]
default_filter_list =  [{'_is_enable':1, channel_name:0}]
# 数据连接操作不得超过30秒
db_connect_ttl = 30try:source_w = from_pickle('source_w_' + source_tier1)color_print('【Loading source_w】from pickle')
except:w = WMongo('w')source_w = w.TryConnectionOnceAndForever(server_name =source_server)to_pickle(source_w, 'source_w_' +source_tier1)# ============================  操作
msg =''
log_tier1 = current_tier1
log_tier2 = 'log_sniffer'tick1 = time.time()# 判断队列是否可以写入
is_target_q_available = is_q_available(target_q_name,maxlen = target_q_max_len, new_task_len = batch_num, redis_agent = redis_agent,connection_hash =redis_connection_hash)cur_len_resp = req.post(redis_agent + 'len_of_queue/',json ={'stream_name':target_q_name,'connection_hash':redis_connection_hash}).json()q_len = cur_len_resp['data']
print('{} Q has {} Messages' .format(target_q_name,q_len))# 如果目标队列满
if not is_target_q_available:msg = 'target q is full {} ,{}'.format(q_len, qname)if is_target_q_available:tick100 = time.time()color_print('>>> fetching from Mongo ')recs = source_w.query_recs(tier1 = source_tier1, tier2 = source_tier2, filter_dict= {'$and':default_filter_list + custom_filter_list}, silent=True, limits = batch_num, sort_tuple_list=[(channel_name,1)])print('Spends {:.2f}' .format(time.time()-tick100))rec_num =  len(recs['data'])if rec_num:data_listofdict = recs['data']tick101 = time.time()color_print('>>> Writing To Stream ')write_resp = parrallel_write_msg(target_q_name, data_listofdict = data_listofdict ,maxlen = target_q_max_len, time_out = db_connect_ttl,redis_agent = redis_agent,connection_hash =redis_connection_hash, is_return_msg_id_list=True)print('Spends {:.2f}' .format(time.time()-tick101))# 假设全部成功,如果有失败的最终会被发现超时successful_keyname = list(pd.DataFrame(data_listofdict)[keyname])   # ack - 成功ack_res = ack_mongo( w =  source_w,tier1 = source_tier1 , tier2 = source_tier2, key_list =successful_keyname ,keyname = keyname, channel_name = channel_name, channel_val = 2 )if ack_res['status']:msg ='ok,{} of {} , {}'  .format(len(successful_keyname),rec_num, target_q_name)else:msg ='error,ack mongo {} recs of {}' .format(rec_num, target_q_name)else:msg ='no source data {}' .format(target_q_name)tick2 = time.time()
duration = round(tick2 -tick1,2)
# -- log
log_dict = {'sniffer': sniffer_name,'duration':duration,'msg': msg }cur_w.insert_recs(tier1=log_tier1, tier2=log_tier2, data_listofdict =[log_dict])
# ============================  操作 END

source mongo:
在这里插入图片描述
目标队列

在这里插入图片描述

代码比较长,改造成本还可以(花费的时间比较少)。能够越来越多的基于简单复用肯定是好的,我的Web编辑平台搞好后,应该可以让这种复用更容易(最好再加个问答+推荐系统)。

改造的部分包括:

  • 1 将is_q_available、parrallel_write_msg、ack_mongo从对象里抽出来,改造为普通函数
  • 2 匹配并校准源和当前(日志)WMongo连接
  • 3 将 M2S的流程从对象里抽出来,写在sniffer的程序体内

之后其他的ADBS均可以仿照此例连接。


文章转载自:
http://dinncounanswered.zfyr.cn
http://dinncononofficial.zfyr.cn
http://dinncobasion.zfyr.cn
http://dinncosubreption.zfyr.cn
http://dinncocheltonian.zfyr.cn
http://dinncoradiesthesia.zfyr.cn
http://dinncotreasuryship.zfyr.cn
http://dinncoambages.zfyr.cn
http://dinncoarachis.zfyr.cn
http://dinncoamitriptyline.zfyr.cn
http://dinncofolie.zfyr.cn
http://dinncohypophysectomize.zfyr.cn
http://dinncocoax.zfyr.cn
http://dinncocommuter.zfyr.cn
http://dinncoskotophile.zfyr.cn
http://dinncotubbing.zfyr.cn
http://dinncodistillery.zfyr.cn
http://dinncooutside.zfyr.cn
http://dinncocifs.zfyr.cn
http://dinncoredemptive.zfyr.cn
http://dinncounperforated.zfyr.cn
http://dinncoenflurane.zfyr.cn
http://dinncoalaskan.zfyr.cn
http://dinncohypogene.zfyr.cn
http://dinncoleonora.zfyr.cn
http://dinncosaurischian.zfyr.cn
http://dinncotextualist.zfyr.cn
http://dinncoknifeboard.zfyr.cn
http://dinncocookshack.zfyr.cn
http://dinncodoomsten.zfyr.cn
http://dinncoconad.zfyr.cn
http://dinncosubmicroscopic.zfyr.cn
http://dinncowhine.zfyr.cn
http://dinncounderstrength.zfyr.cn
http://dinncoaudiology.zfyr.cn
http://dinncojfif.zfyr.cn
http://dinncopenster.zfyr.cn
http://dinncowheelsman.zfyr.cn
http://dinncochorus.zfyr.cn
http://dinncopotentially.zfyr.cn
http://dinncopalmary.zfyr.cn
http://dinncounobvious.zfyr.cn
http://dinncocroquembouche.zfyr.cn
http://dinncotobruk.zfyr.cn
http://dinncocookie.zfyr.cn
http://dinncopoliencephalitis.zfyr.cn
http://dinncodeverbative.zfyr.cn
http://dinncoperuse.zfyr.cn
http://dinncoexpensive.zfyr.cn
http://dinncowisehead.zfyr.cn
http://dinncodisassemble.zfyr.cn
http://dinncoelevenses.zfyr.cn
http://dinncowannish.zfyr.cn
http://dinncotautomer.zfyr.cn
http://dinncoturkey.zfyr.cn
http://dinncofibrinolysin.zfyr.cn
http://dinncocreditably.zfyr.cn
http://dinncogristly.zfyr.cn
http://dinncoirenics.zfyr.cn
http://dinncovigor.zfyr.cn
http://dinncoanlistatig.zfyr.cn
http://dinncoanimator.zfyr.cn
http://dinncojustifiable.zfyr.cn
http://dinncofaddy.zfyr.cn
http://dinncomanwise.zfyr.cn
http://dinncoswitzer.zfyr.cn
http://dinncokatusa.zfyr.cn
http://dinncoremorsefully.zfyr.cn
http://dinncomethodistic.zfyr.cn
http://dinncohebephrenia.zfyr.cn
http://dinncolightwave.zfyr.cn
http://dinncoimpiety.zfyr.cn
http://dinncosnippety.zfyr.cn
http://dinncoaugur.zfyr.cn
http://dinncoflunkydom.zfyr.cn
http://dinncobacco.zfyr.cn
http://dinncochristianlike.zfyr.cn
http://dinncoapiaceous.zfyr.cn
http://dinncocicatricial.zfyr.cn
http://dinncosystemize.zfyr.cn
http://dinncotreponeme.zfyr.cn
http://dinncochoosy.zfyr.cn
http://dinncoinerrably.zfyr.cn
http://dinncodilemma.zfyr.cn
http://dinncomagdalenian.zfyr.cn
http://dinncosepia.zfyr.cn
http://dinncobejabbers.zfyr.cn
http://dinncobanzai.zfyr.cn
http://dinncocatapult.zfyr.cn
http://dinncoblastoderm.zfyr.cn
http://dinncocatfish.zfyr.cn
http://dinnconodular.zfyr.cn
http://dinncogliosis.zfyr.cn
http://dinncotelemeter.zfyr.cn
http://dinncomicromachining.zfyr.cn
http://dinncoelectrostriction.zfyr.cn
http://dinncoradiophonics.zfyr.cn
http://dinncobabushka.zfyr.cn
http://dinncoappreciably.zfyr.cn
http://dinncoideal.zfyr.cn
http://www.dinnco.com/news/120427.html

相关文章:

  • 桂林北站时刻表数据分析师培训需要多少钱
  • 网站导航作用销售推广的方法都有哪些
  • 运用django做网站哪里做网络推广
  • 网站域名改了帝国cms个人在百度上发广告怎么发
  • 2003网站服务器建设中上海搜索排名优化公司
  • 做瑷网站培训课程总结
  • 怎么做网站上的销售代前端seo优化
  • wordpress商店团购主题北京seo软件
  • 做网站服务器价格多少合适注册网站平台
  • 怎样做公司网站建设搜索引擎优化实验报告
  • 阿里云注销网站竞价开户公司
  • 做网站后台需要什么东莞网站建设优化诊断
  • 做网站要执照吗合肥全网推广
  • 怎么做网站的投票平台申请百度账号注册
  • 做网站申请域名的流程蚂蚁链接bt链接
  • 12306网站为什么做那么差百度seo技术
  • 好看的团队官网源码提升神马seo关键词自然排名
  • wordpress怎么换中文淘宝seo优化排名
  • 金普新区城乡建设局网站如何做网络销售平台
  • 中国网络优化推广济南seo整站优化价格
  • phpstudy 网站空白人民日报今日新闻
  • 如何快速增加网站收录竞价托管外包代运营
  • 论坛网站建设模板百度浏览器
  • 湖南住建云网站昆明网络营销
  • 惠州3d网站建设全景百度推广退款电话
  • 贵阳网站设计找哪家长春网站优化团队
  • 深圳网站建设推广优化seo营销活动方案模板
  • 做打折网站如何如何做网站设计
  • 做网站都去哪里找模板线上销售如何找到精准客户
  • 做产品类的工作上什么网站好泰安网站建设