任务池管理与执行器

摘要:
在上述任何业务场景中,我们都需要一个管理类,其职责是管理一堆线程和要执行的相同类型的任务集合。线程将等待执行馈送给它的任务。当任务集大于线程集的数量时,任务将在队列中等待;当线程集合的数量大于任务集合的数量时,线程将处于阻塞和等待状态,执行器将相应地处于不饱和状态。

1 前言

      一个后台实时处理的业务平台,通常我们会根据数据的输入与输出,依据时间轴进行分解成不同阶段或不同粒度的逻辑任务,而每一个待处理的数据我们称为任务或者消息。任务之间的关系可以分为两类:a 上下游父子关系,b 可以并行运行的兄弟关系。具有上下游关系的任务集合具有逻辑或数据依赖关系,即上游任务执行完后,才能执行下游任务;具有兄弟关系的任务间逻辑上互不影响,可以并行运行。

      无论是上面任一情况的业务场景,我们都需要一种管理类,其职责:管理着一堆线程及其待执行的同类型任务集合。线程会等待去执行喂给它的任务,当任务集合大于线程集合的个数时,任务会在队列排队等待;而当线程集合个数大于任务集合时,线程会挂起处于阻塞等待状态,执行器也相应地处于不饱和状态。在jdk里面有现成的管理类ThreadPoolExecutor,那么在c++里面看看类似的实现吧: 

2 任务与任务池

2.1任务

 无论是消息或业务数据,可以抽象地表达为:

      struct data_pair
      {

         char *data;

         int len;

      }

2.2 任务池

     任务的缓存用队列表达:

     std::queue<data_pair*> _queue; 

2.3 任务提交入口

  int CQueueThread::writeData(void *data, int len)

    {

        if (data == NULL || len <= 0) {
            return EXIT_FAILURE;
        }

        data_pair *item = new data_pair();
        item->data = (char*) malloc(len);
        assert(item->data != NULL);
        memcpy(item->data, data, len);
        item->len = len;       
        _mutex.lock();
        _queue.push(item);
        _mutex.signal();

        _mutex.unlock();

        return EXIT_SUCCESS;
    }

3线程池   

3.1 线程封装

    c++里面类似jdk里面Thread类的封装CThread          

任务池管理与执行器第1张任务池管理与执行器第2张
{

class CThread {

 

public:

    /**

     * 构造函数

     */

    CThread() {

        tid = 0;

        pid = 0;

    }

 

    /**

     * 起一个线程,开始运行

     */

    bool start(Runnable *r, void *a) {

        runnable = r;

        args = a;

        return 0 == pthread_create(&tid, NULL, CThread::hook, this);

    }

 

    /**

     * 等待线程退出

     */

    void join() {

        if (tid) {

            pthread_join(tid, NULL);

            tid = 0;

            pid = 0;

        }

    }

 

    /**

     * 得到Runnable对象

     *

     * @return Runnable

     */

    Runnable *getRunnable() {

        return runnable;

    }

 

    /**

     * 得到回调参数

     *

     * @return args

     */

    void *getArgs() {

        return args;

    }

   

    /***

     * 得到线程的进程ID

     */

    int getpid() {

        return pid;

    }

 

    /**

     * 线程的回调函数

     *

     */

 

    static void *hook(void *arg) {

        CThread *thread = (CThread*) arg;

        thread->pid = gettid();

 

        if (thread->getRunnable()) {

            thread->getRunnable()->run(thread, thread->getArgs());

        }

 

        return (void*) NULL;

    }

   

private:   

    /**

     * 得到tid号

     */

    #ifdef _syscall0

    static _syscall0(pid_t,gettid)

    #else

    static pid_t gettid() { return static_cast<pid_t>(syscall(__NR_gettid));}

    #endif

 

private:

    pthread_t tid;      // pthread_self() id

    int pid;            // 线程的进程ID

    Runnable *runnable;

    void *args;

}; 

}
View Code

3.2 线程池

    并行处理的能力有线程池的个数决定,定义如下:

 CThread *_thread;

 int _threadCount;    

 4 执行器 

  4.1 执行启动

int CDefaultRunnable::start() {
    if (_thread != NULL || _threadCount < 1) {
        TBSYS_LOG(ERROR, "start failure, _thread: %p, threadCount: %d", _thread, _threadCount);
        return 0;
    }

    _thread = new CThread[_threadCount];
    if (NULL == _thread)
    {
        TBSYS_LOG(ERROR, "create _thread object failed, threadCount: %d", _threadCount);
        return 0;
    }

    int i = 0;
    for (; i<_threadCount; i++)
    {
        if (!_thread[i].start(this, (void*)((long)i)))
        {
          return i;
        }
    }

  return i;
}

 4.2 执行

     执行器包含了具体业务的执行:

    void CQueueThread::run(CThread *thread, void *args)
    {
        int threadIndex = (int)((long)(args));
        _mutex.lock();
        while(!_stop) {
            while(_stop == 0 && _queue.empty()) {
                _mutex.wait();
            }
            if (_stop) {
                break;
            }

            data_pair *item = _queue.front();
            _queue.pop();
            _mutex.unlock();
            if (item != NULL) {
                if (_handler) {
                    _handler->handleQueue(item->data, item->len, threadIndex, _args);
                }

                if (item->data) {
                    free(item->data);
                }
                free(item);
            }
            _mutex.lock();
        }

        _mutex.unlock();   

 5 样例代码 

    CMyHandler handler;
    CQueueThread queueThread(3, &handler, NULL);
    queueThread.start();
    char data[1024];
for(int i=1; i<=mWriteCount; i++) { int len = sprintf(data, "data_%05d", i); queueThread.writeData(data, len+1); } queueThread.wait();

 

参考

      http://code.taobao.org/p/tfs/src/

免责声明:文章转载自《任务池管理与执行器》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【原创】HBase中查询优化小点react + antd form表单验证自定义验证validator根据后台接口判断验证下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

C#细说多线程(下)

本文主要从线程的基础用法,CLR线程池当中工作者线程与I/O线程的开发,并行操作PLINQ等多个方面介绍多线程的开发。 其中委托的BeginInvoke方法以及回调函数最为常用。而 I/O线程可能容易遭到大家的忽略,其实在开发多线程系统,更应该多留意I/O线程的操作。特别是在ASP.NET开发当中,可能更多人只会留意在客户端使用Ajax或者在服务器端使用U...

Java 中的协程库

一、协程 一个进程可以产生许多线程,每个线程有自己的上下文,当我们在使用多线程的时候,如果存在长时间的 I/O 操作,线程会一直处于阻塞状态,这个时候会存在很多线程处于空闲状态,会造成线程资源的浪费。这就是协程适用的场景。 协程,其实就是在一个线程中,有一个总调度器,对于多个任务,同时只有一个任务在执行,但是一旦该任务进入阻塞状态,就将该任务设置为挂起,运...

多线程:C#线程同步lock,Monitor,Mutex,同步事件和等待句柄

转自:http://www.cnblogs.com/freshman0216/archive/2008/07/29/1252253.html 本篇从Monitor,Mutex,ManualResetEvent,AutoResetEvent,WaitHandler的类关系图开始, 希望通过本篇的介绍能对常见的线程同步方法有一个整体的认识,而对每种方式的使用细...

关于Web服务器的认识

       马上就要毕业了,也要开始找工作了,大学写了这么多代码了,却没有好好总结一下常用的概念很是遗憾额,就通过这篇博客记录一下我最常用的一些知识好了。        说到Web服务器,有很多文章都介绍的很好,之前看到一篇非常不错的,对我帮助很大,可惜现在找不到原文了,看到博客园有人转载,我就在这里也记一下好了,在此非常感谢作者的分析,受益匪浅。   ...

Python3之并发(五)---线程条件(Condition)和事件(Event)

一、线程条件Condition(条件变量) 依赖锁对象(Lock,RLock),锁对象可以通过参数传入获得,或者使用自动创建的默认锁对象当多个条件变量需要共享同一个锁时,建议传入锁对象 除了带有获取到锁的锁定池,Condition还包含一个未获取到锁的等待池,等待池中的线程处于等待阻塞状态,直到锁定池中的线程调用notify()/notifyAll()通知...

C#中的多线程

原文:http://www.albahari.com/threading/part2.aspx 文章来源:http://blog.gkarch.com/threading/part2.html 1同步概要 在第 1 部分:基础知识中,我们描述了如何在线程上启动任务、配置线程以及双向传递数据。同时也说明了局部变量对于线程来说是私有的,以及引用是如何在线程之间...