一个消息调度框架构建

摘要:
消息处理流程构建一个MDU模块,注册入框架中,初始MDU没有注册PID,未构建消息处理任务。通过消息框架提供的消息发送接口直接发送消息,消息框架根据接收PID信息,将消息填入对应MDU的消息队列中。MDU消息队列会被多个任务并发写入消息,被消息处理任务读取消息处理,需要对消息队列进行互斥和同步。如果MDU中包含太多PID,由于所有PID在一个消息队列中串行运行,会影响PID的响应,影响系统性能。耗时PID和实时要求高的PID不放入一个MDU中。
  • 基本框架

一个消息调度框架构建第1张

  1. MDU(消息分发单元):包含一个消息处理任务,包含自身的消息队列,是一个消息调度的基本单位。
  2. PID (功能子模块) :框架中用PID作为模块的划分,每个模块具有自己的PID编号,根据功能和调度需求可以安排多个PID到一个MDU中,PID是消息通信的一个基本单位,每个PID提供一个消息处理入口。
  3. MQ (消息队列) :使用消息队列作为任务通信的数据结构。
  • 消息处理流程

一个消息调度框架构建第2张

  1. 构建一个MDU模块,注册入框架中,初始MDU没有注册PID,未构建消息处理任务。
  2. 构建PID,注册入对应的MDU中,如果是MDU中第一个PID,构建消息处理任务。消息处理任务从该MDU对应的消息队列中取消息处理。
  3. 消息处理任务获取消息后根据消息中携带的接收PID的信息分发到对应的PID模块处理。
  • 完整的消息交互流程

一个消息调度框架构建第3张

  1. 任务A申请消息,消息内容必须包括发送模块PID编号、接收模块PID编号、消息内容。
  2. 通过消息框架提供的消息发送接口直接发送消息,消息框架根据接收PID信息,将消息填入对应MDU的消息队列中。
  3. MDU的消息处理任务B从消息队列中获取消息处理。
  4. MDU消息队列会被多个任务并发写入消息,被消息处理任务读取消息处理,需要对消息队列进行互斥和同步。详见http://www.cnblogs.com/chencheng/p/2893421.html
  • MUD、PID规划
  1. MDU作为一个调度基本单元,如果一个MDU中只有一个PID会导致系统中任务多,任务切换的开销大。
  2. 如果MDU中包含太多PID,由于所有PID在一个消息队列中串行运行,会影响PID的响应,影响系统性能。
  3. 功能紧耦合的PID放入一个MDU中。
  4. 耗时PID和实时要求高的PID不放入一个MDU中。
  • 实现

MDU:

一个消息调度框架构建第4张一个消息调度框架构建第5张
importmyQueue
from myThread importmyThread
classmdu:
    def __init__(self, mduID):
        self.mduId  =mduID
        self.msgQue = myQueue.myQueue(10)
        self.map ={}
    defgetMduID(self):
        returnself.mduId
    defregistPid(self, pidID, pid):
        self.map[pidID] =pid
        if 1==len(self.map):
            self.run()
    defmsgEnQueue(self,msg):
        self.msgQue.enQueue(msg)
    defmsgProcess(self):
        whileTrue:
            msg =self.msgQue.deQueue()
            recvPid =msg.getRecvPid()
            self.map[recvPid].msgProcess(msg);
    defrun(self):
        t =myThread(self.msgProcess)
        t.start()
View Code

PID:

一个消息调度框架构建第4张一个消息调度框架构建第7张
importmessage
importsupport
importmdu
importpdb
classpid:
    def __init__(self, pidID):
        self.pidID =pidID
        self.registMe()
    defregistMe(self):
        support.registPid(self)
    defgetPidID(self):
        return self.pidID
View Code

SUPPORT:

一个消息调度框架构建第4张一个消息调度框架构建第9张
importmdu
importpdb
mduMap ={}
defregistMdu(mdu):
    mduMap[mdu.getMduID()] =mdu
defgetMdu(revPid):
    return mduMap[revPid&0xFFFF0000>>16]
defregistPid(pid):
    mdu =getMdu(pid.getPidID())
    #pdb.set_trace()
mdu.registPid(pid.getPidID(), pid)
defsendMsg(msg):
    mdu =getMdu(msg.getRecvPid())
    mdu.msgEnQueue(msg)
View Code

MQ:

一个消息调度框架构建第4张一个消息调度框架构建第11张
from threading importLock
from threading importCondition
importthreading
classmyQueue:
    def __init__(self, size):
        self.size =size
        self.list =list()
        self.lock =Lock()
        self.notFullCond =Condition(self.lock)
        self.notEmptyCond =Condition(self.lock)
    defisFull(self):
        if self.size ==len(self.list):
            returnTrue
        returnFalse
    defisEmpty(self):
        if 0 ==len(self.list):
            returnTrue
        returnFalse
    defenQueue(self, elem):
        self.lock.acquire()
        whileself.isFull():
            print('queue is full, waiting...')
            self.notFullCond.wait()   
        print(threading.current_thread().getName() + 'product ' +str(elem))
        self.list.append(elem)
        self.notEmptyCond.notify()
        self.lock.release()
    defdeQueue(self):
        self.lock.acquire()
        whileself.isEmpty():
            print('queue is empty, waiting...')
            self.notEmptyCond.wait()
        elem =self.list[0]
        del(self.list[0])
        print(threading.current_thread().getName() + 'consume ' +str(elem))
        self.notFullCond.notify_all()
        self.lock.release()
        return elem
View Code

转载请注明原始出处:http://www.cnblogs.com/chencheng/p/3236158.html

免责声明:文章转载自《一个消息调度框架构建》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Django ORM相关操作格式化一个文件的大小下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Modbus消息帧

  两种传输模式中(ASCII和RTU),传输设备以将Modbus消息转为有起点和终点的帧,这就允许接收的设备在消息起始处开始工作,读地址分配信息,判断哪一个设备被选中(广播方式则传给所以设备),判知何时信息已完成。部分的消息也能侦测到并且错误能设置为返回结果。   1、ASCII帧   使用ASCII模式,消息以冒号(:)字符(ASCII 3AH)开始,...

SIP协议参数详情

SIP消息结构 请求消息和响应消息都包括SIP消息头字段和SIP消息体字段; SIP消息头主要用来指明本消息是有由谁发起和由谁接受,经过多少跳转等基本信息; SIP消息体主要用来描述本次会话具体实现方式; 请求消息格式 SIP请求消息的格式,由SIP消息头和一组参数行组成 消息体定义:  Call-ID:头字段是用来将消息分组的唯一性标识  From:头字段...

Qt for windows消息循环、libqxt分析和wince快捷键处理

Qt for windows消息循环、libqxt分析和wince快捷键处理 利用Qt做windows图形界面开发和MFC相比,个人感觉还是比较简单好用的:首先利用Designer工具搞个ui文件;然后在程序中写几个信号和槽;然后加载ui文件;最后显示界面就搞定了。 在界面开发中,快捷键处理肯定是必不可少的。现在使用的是开源的第三方处理类:libqxt。它...

wcf通道Channel

正文       客户端与服务进行交互的过程是通过通道进行交互的。客户端通过调用代理类执行相应的方法,通过通道编码,调用上下文,传输客户端的事务,管理可靠会话,对消息正文的加密,最后要执行的通道是传输通道就像我们七层的最后一层是物理传输层与服务端的那一头的传输通道交接。服务端拿到以后会逐个拆包,然后交给分发器,分发器交给对应的服务处理。         ...

PID控制器开发笔记之十一:专家PID控制器的实现

  前面我们讨论了经典的数字PID控制算法及其常见的改进与补偿算法,基本已经覆盖了无模型和简单模型PID控制经典算法的大部。再接下来的我们将讨论智能PID控制,智能PID控制不同于常规意义下的智能控制,是智能算法与PID控制算法的结合,是基于PID控制器的智能化优化。   在本章我们首先来探讨一下专家PID算法。正如前面所说,专家PID算法是专家系统与PI...

从点击Button到弹出一个MessageBox, 背后发生了什么

思考一个最简单的程序行为:我们的Dialog上有一个Button, 当用户用鼠标点击这个Button时, 我们弹出一个MessageBox。 这个看似简单的行为, 谁能说清楚它是如何运行起来的,背后究竟发生了什么?  下面是我个人尝试的解答: (1)我们的鼠标点击事件到达设备的驱动程序, 驱动程序把消息放入系统硬件输入队列SHIQ(system ha...