Live555 分析(一):类介绍

摘要:
从程序的结构来看,live项目包括了四个基本库、程序入口类和一些测试代码。UsageEnvironment:包括抽象类UsageEnvironment和抽象类TaskScheduler,这两个类用于事件调度,其中包括实现了对事件的异步读取、对事件句柄的设置及对错误信息的输出等;该库中还有一个HashTable,这是一个通用的HashTable,在整个项目中都可以使用它,当然该HashTable也是一个抽象类。BasicUsageEnvironment:主要是对UsageEnvironment中对应类的实现。=0)break;SingleStep();}}BasicTaskScheduler0从TaskScheduler派生,所以还是一个任务调度对象,所以依然说明任务调度对象是整个程序的发动机。

从程序的结构来看,live项目包括了四个基本库、程序入口类(在mediaServer中)和一些测试代码(在testProgs中)。

四个基本静态库是UsageEnvironmentBasicUsageEnvironmentgroupsockliveMedia

UsageEnvironment:

包括抽象类UsageEnvironment和抽象类TaskScheduler,这两个类用于事件调度,其中包括实现了对事件的异步读取、对事件句柄的设置及对错误信息的输出等;该库中还有一个HashTable,这是一个通用的HashTable,在整个项目中都可以使用它,当然该HashTable也是一个抽象类。

BasicUsageEnvironment:

主要是对UsageEnvironment中对应类的实现。 

BasicUsageEnvironment和UsageEnvironment中的类都是用于整个系统的基础功能类.比如UsageEnvironment代表了整个系统运行的环境,它提供了错误记录和错误报告的功能,无论哪一个类要输出错误,就需要保存UsageEnvironment的指针.而TaskScheduler则提供了任务调度功能.整个程序的运行发动机就是它,它调度任务,执行任务(任务就是一个函数).TaskScheduler由于在全局中只有一个,所以保存在了UsageEnvironment中.而所有的类又都保存了UsageEnvironment的指针,所以谁想把自己的任务加入调度中,那是很容易的.在此还看到一个结论:整个live555(服务端)只有一个线程.

BasicUsageEnvironment和UsageEnvironment中的类初始化:

  //Begin by setting up our usage environment:
  TaskScheduler* scheduler =BasicTaskScheduler::createNew();
  env = BasicUsageEnvironment::createNew(*scheduler); 

在服务端中的main()函数,可见其创建一个RTSPServer类实例后,和在客服端的main函数连接成功,开始播放后,即进入相同的一个函数:

  env->taskScheduler().doEventLoop(); //does not return

在函数定义在BasicTaskScheduler0.cpp中:

void BasicTaskScheduler0::doEventLoop(char*watchVariable) {
    //Repeatedly loop, handling readble sockets and timed events:
    while (1) {
        if (watchVariable != NULL && *watchVariable != 0)
            break;
        SingleStep();
    }
}

BasicTaskScheduler0从TaskScheduler派生,所以还是一个任务调度对象,所以依然说明任务调度对象是整个程序的发动机。循环中每次走一步:SingleStep():

voidBasicTaskScheduler::SingleStep(unsigned maxDelayTime) 
{
        fd_set readSet = fReadSet; //make a copy for this select() call
        
        //计算select socket们时的超时时间。
        DelayInterval const& timeToDelay =fDelayQueue.timeToNextAlarm();
        structtimeval tv_timeToDelay;
        
        tv_timeToDelay.tv_sec =timeToDelay.seconds();
        tv_timeToDelay.tv_usec =timeToDelay.useconds();
        
        //Very large "tv_sec" values cause select() to fail.
        //Don't make it any larger than 1 million seconds (11.5 days)
        const long MAX_TV_SEC =MILLION;
        if (tv_timeToDelay.tv_sec >MAX_TV_SEC) 
        {
                tv_timeToDelay.tv_sec =MAX_TV_SEC;
        }
        
        //Also check our "maxDelayTime" parameter (if it's > 0):
        if (maxDelayTime > 0
                && (tv_timeToDelay.tv_sec > (long)maxDelayTime/MILLION 
                || (tv_timeToDelay.tv_sec == (long)maxDelayTime/MILLION 
                && tv_timeToDelay.tv_usec > (long)maxDelayTime%MILLION))) 
        {
                tv_timeToDelay.tv_sec = maxDelayTime/MILLION;
                tv_timeToDelay.tv_usec = maxDelayTime%MILLION;
        }

        //先执行socket的select操作,以确定哪些socket任务(handler)需要执行。
        int selectResult = select(fMaxNumSockets, &readSet, NULL, NULL, &tv_timeToDelay);
        
        if (selectResult < 0) 
        {
#if defined(__WIN32__) || defined(_WIN32)
                int err =WSAGetLastError();
                //For some unknown reason, select() in Windoze sometimes fails with WSAEINVAL if
                //it was called with no entries set in "readSet".  If this happens, ignore it:
                if (err == WSAEINVAL && readSet.fd_count == 0) 
                {
                        err = 0;
                        //To stop this from happening again, create a dummy readable socket:
                        int dummySocketNum = socket(AF_INET, SOCK_DGRAM, 0);
                        FD_SET((unsigned)dummySocketNum, &fReadSet);
                }
                if (err != 0) 
                {
#else
                        if (errno != EINTR && errno !=EAGAIN) 
                        {
#endif
                                //Unexpected error - treat this as fatal:
#if !defined(_WIN32_WCE)perror("BasicTaskScheduler::SingleStep(): select() fails");
#endifexit(0);
                        }
                }
          
          //Handle any delayed event that may have come due:
          //执行一个最迫切的延迟任务。
fDelayQueue.handleAlarm();
          
          //Call the handler function for one readable socket:
          HandlerIterator iter(*fReadHandlers);
          HandlerDescriptor*handler;
          
          //To ensure forward progress through the handlers, begin past the last
          //socket number that we handled:
        if (fLastHandledSocketNum >= 0) 
        {
            //找到上次执行的socket handler的下一个
                while ((handler = iter.next()) !=NULL) 
                {
                        if (handler->socketNum == fLastHandledSocketNum) break;
                }
                if (handler ==NULL) 
                {
                        fLastHandledSocketNum = -1;
                        iter.reset(); //start from the beginning instead
}
        }
        
         //从找到的handler开始,找一个可以执行的handler,不论其状态是可读,可写,还是出错,执行之。
        while ((handler = iter.next()) !=NULL) 
        {
                if (FD_ISSET(handler->socketNum, &readSet) &&FD_ISSET(handler->socketNum, &fReadSet) /*sanity check */ &&handler->handlerProc !=NULL) 
                {
                        fLastHandledSocketNum = handler->socketNum;
                        //Note: we set "fLastHandledSocketNum" before calling the handler,
                        //in case the handler calls "doEventLoop()" reentrantly.
                        (*handler->handlerProc)(handler->clientData, SOCKET_READABLE);
                        break;
                }
        }
        
        //如果寻找完了依然没有执行任何handle,则从头再找。
        if (handler == NULL && fLastHandledSocketNum >= 0) 
        {
                //We didn't call a handler, but we didn't get to check all of them,
                //so try again from the beginning:
iter.reset();
                while ((handler = iter.next()) !=NULL) 
                {
                        if (FD_ISSET(handler->socketNum, &readSet) &&FD_ISSET(handler->socketNum, &fReadSet) /*sanity check */ &&handler->handlerProc !=NULL) 
                        {
                                fLastHandledSocketNum = handler->socketNum;
                                //Note: we set "fLastHandledSocketNum" before calling the handler,
                                //in case the handler calls "doEventLoop()" reentrantly.
                                (*handler->handlerProc)(handler->clientData, SOCKET_READABLE);
                                break;
                        }
                }
                
                //依然没有找到可执行的handler。
                if (handler == NULL) fLastHandledSocketNum = -1;//because we didn't call a handler
}
}

总结为以下四步:
1>为所有需要操作的socket执行select。
2> 找到第一个应执行的延迟任务并执行之
3> 找出第一个应执行的socket任务(handler)并执行之

可见,每一步中只执行三个任务队列中的一项。到这里,我们不尽要问这些sockethandlerdelaytask是哪里来的?

DelayQueue类:译为"延迟队列",它是一个队列,每一项代表了一个要调度的任务(在它的fToken变量中保存)。同时保存了这个任务离执行时间点的剩余时间。可以预见,它就是在TaskScheduler中用于管理调度任务的东西。注意:此队列中的任务只被执行一次!执行完后这一项即被无情抛弃!

HandlerSet类:Handler集合。Handler是什么呢?它是一种专门用于执行socket操作的任务(函数),HandlerSet被TaskScheduler用来管理所有的socket任务(增删改查)。所以TaskScheduler中现在已调度两种任务了:socket任务(handlerSet)和延迟任务(DelayQueue).其实TaskScheduler还调度第三种任务:Event,介个后面再说。

HandlerDescriptor类:socket handlet描述。

socket handler加入执行队列后会一直存在,而delay task在执行完一次后会立即弃掉。

socket handler保存在队列BasicTaskScheduler0::HandlerSet* fHandlers中,通过调用envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this)这样的方式加入到socket handler队列中:

void BasicTaskScheduler::turnOnBackgroundReadHandling(intsocketNum,
                BackgroundHandlerProc*handlerProc,
                void*clientData) 
{
    if (socketNum < 0) return;
    
    FD_SET((unsigned)socketNum, &fReadSet);
    
    fReadHandlers->assignHandler(socketNum, handlerProc, clientData);

    if (socketNum+1 >fMaxNumSockets) {
        fMaxNumSockets = socketNum+1;
    }
}

socket handler添加时为什么需要那些参数呢?socketNum是需要的,因为要select socket(socketNum即是socket()返回的那个socket对象)。conditionSet也是需要的,它用于表明socket在select时查看哪种装态,是可读?可写?还是出错?proc和clientData这两个参数就不必说了(真有不明白的吗?)。再看BackgroundHandlerProc的参数,socketNum不必解释,mask是什么呢?它正是对应着conditionSet,但它表明的是select之后的结果,比如一个socket可能需要检查其读/写状态,而当前只能读,不能写,那么mask中就只有表明读的位被设置。

void HandlerSet::assignHandler(intsocketNum,
        TaskScheduler::BackgroundHandlerProc*handlerProc,
        void*clientData) 
{
    //First, see if there's already a handler for this socket:
    HandlerDescriptor*handler;
    HandlerIterator iter(*this);
    
    while ((handler = iter.next()) !=NULL) {
        if (handler->socketNum == socketNum) break;
    }
    
    if (handler == NULL) { //No existing handler, so create a new descr:
        handler = newHandlerDescriptor(fHandlers.fNextHandler);
        handler->socketNum =socketNum;
    }

    handler->handlerProc =handlerProc;
    handler->clientData =clientData;
}

delay task保存在队列BasicTaskScheduler0::DelayQueue fDelayQueue中,通过调用envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)FramedSource::afterGetting, this)这样的方式加入到delay task队列中,需要传入task延迟等待的微秒(百万分之一秒)数(第一个参数):

TaskToken BasicTaskScheduler0::scheduleDelayedTask(int64_t microseconds,TaskFunc* proc, void*clientData) 
{
  if (microseconds < 0)
    microseconds = 0;
//DelayInterval 是表示时间差的结构   DelayInterval timeToDelay((long) (microseconds / 1000000),(long) (microseconds % 1000000));   //创建delayQueue中的一项   AlarmHandler* alarmHandler = newAlarmHandler(proc, clientData,timeToDelay);   //加入DelayQueue   fDelayQueue.addEntry(alarmHandler);
//返回delay task的唯一标志   return (void*) (alarmHandler->token()); }

delay task的执行都在函数fDelayQueue.handleAlarm()中,handleAlarm()在类DelayQueue中实现。看一下handleAlarm():

voidDelayQueue::handleAlarm() 
{
  //如果第一个任务的执行时间未到,则同步一下(重新计算各任务的等待时间)。
  if (head()->fDeltaTimeRemaining !=DELAY_ZERO)
    synchronize();
//如果第一个任务的执行时间到了,则执行第一个,并把它从队列中删掉。   if (head()->fDeltaTimeRemaining ==DELAY_ZERO) {     //This event is due to be handled:     DelayQueueEntry* toRemove =head();     removeEntry(toRemove); //do this first, in case handler accesses queue     //执行任务,执行完后会把这一项销毁。     toRemove->handleTimeout();   } }

可能感觉奇怪,其它的任务队列都是先搜索第一个应该执行的项,然后再执行,这里干脆,直接执行第一个完事。那就说明第一个就是最应该执行的一个吧?也就是等待时间最短的一个吧?那么应该在添加任务时,将新任务跟据其等待时间插入到适当的位置而不是追加到尾巴上吧?猜得对不对还得看fDelayQueue.addEntry(alarmHandler)这个函数是怎么执行的。

void DelayQueue::addEntry(DelayQueueEntry*newEntry) 
{
  //重新计算各项的等待时间
  synchronize();

  //取得第一项
  DelayQueueEntry* cur =head();
  //从头至尾循环中将新项与各项的等待时间进行比较
  while (newEntry->fDeltaTimeRemaining >= cur->fDeltaTimeRemaining) {
    //如果新项等待时间长于当前项的等待时间,则减掉当前项的等待时间。
    //也就是后面的等待时几只是与前面项等待时间的差,这样省掉了记录插入时的时间的变量。
    newEntry->fDeltaTimeRemaining -= cur->fDeltaTimeRemaining;
    //下一项
    cur = cur->fNext;
  }

  //循环完毕,cur就是找到的应插它前面的项,那就插它前面吧
  cur->fDeltaTimeRemaining -= newEntry->fDeltaTimeRemaining;

  //Add "newEntry" to the queue, just before "cur":
  newEntry->fNext =cur;
  newEntry->fPrev = cur->fPrev;
  cur->fPrev = newEntry->fPrev->fNext =newEntry;
}

groupsock:

groupsock库中包括了GroupEId、Groupsock、GroupsockHelper、NetAddress、NetInterface等类。

这个是放在单独的库Groupsock中。它封装了socket操作,支持udp多播放支持和一对多单播的功能,tcp socket创建。Groupsock类有两个构造函数:

一个是ASM(即任意源组播模型):

//Constructor for a source-independent multicast group
Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const&groupAddr,
                            Port port, u_int8_t ttl)
                            :OutputSocket(env, port),//创建udp socket
deleteIfNoMembers(False), isSlave(False),
                             fIncomingGroupEId(groupAddr, port.num(), ttl), 
                             fDests(NULL), fTTL(ttl) 
{
    addDestination(groupAddr, port);//记录组播地址

    if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {//如果非组播地址,则不会进入里面,组播地址,则加入组播
        if (DebugLevel >= 1) {
            env << *this << ": failed to join group: " << env.getResultMsg() << "";
        }
    }

    //Make sure we can get our source address:
    if (ourSourceAddressForMulticast(env) == 0) {//获取本机的地址
        if (DebugLevel >= 0) { //this is a fatal error
            env << "Unable to determine our source address: " << env.getResultMsg() << "";
        }
    }

    if (DebugLevel >= 2) env << *this << ": created
";
}

另一个是SSM(指定信源组播模型):

//Constructor for a source-specific multicast group
Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const&groupAddr,
                            struct in_addr const&sourceFilterAddr,Port port)
                            : OutputSocket(env, port),//创建udp socket
deleteIfNoMembers(False), isSlave(False),
                            fIncomingGroupEId(groupAddr, sourceFilterAddr, port.num()),
                            fDests(NULL), fTTL(255) 
{
    addDestination(groupAddr, port);//记录组播地址

    //First try a SSM join.  If that fails, try a regular join:
    if (!socketJoinGroupSSM(env, socketNum(), groupAddr.s_addr, sourceFilterAddr.s_addr)) {//如果非组播地址,则退出
        if (DebugLevel >= 3) {
            env << *this << ": SSM join failed: " <<env.getResultMsg();
            env << "- trying regular join instead
";
        }
        
        if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {//如果是组播地址则加入
            if (DebugLevel >= 1) {
                env << *this << ": failed to join group: " << env.getResultMsg() << "";
            }
        }
    }

    if (DebugLevel >= 2) env << *this << ": created
";
}

根据上面两个构造函数,我们发现只是加入组播,真正的创建组播在哪里呢?就是在构造函数的初始化列表中的OutputSocket,这个类最终调用了Socket的setupDatagramSocket函数:

int setupDatagramSocket(UsageEnvironment&env, Port port,
#ifdef IP_MULTICAST_LOOP
            Boolean setLoopback
#elseBoolean
#endif) 
{
    if (!initializeWinsockIfNecessary()) {
        socketErr(env, "Failed to initialize 'winsock': ");
        return -1;
    }

    int newSocket = socket(AF_INET, SOCK_DGRAM, 0); //创建udp socket
    if (newSocket < 0) {
        socketErr(env, "unable to create datagram socket: ");
        returnnewSocket;
    }

    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof reuseFlag) < 0) {//设置socket 可重用
        socketErr(env, "setsockopt(SO_REUSEADDR) error: ");
        closeSocket(newSocket);
        return -1;
    }

#if defined(__WIN32__) || defined(_WIN32)
    //Windoze doesn't properly handle SO_REUSEPORT or IP_MULTICAST_LOOP
#else#ifdef SO_REUSEPORT
    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuseFlag, sizeof reuseFlag) < 0) {//端口复用
        socketErr(env, "setsockopt(SO_REUSEPORT) error: ");
        closeSocket(newSocket);
        return -1;
    }
#endif
#ifdef IP_MULTICAST_LOOP
    const u_int8_t loop =(u_int8_t)setLoopback;
    if (setsockopt(newSocket, IPPROTO_IP, IP_MULTICAST_LOOP, (const char*)&loop, sizeof loop) < 0) {//数据送到本地回环接口
        socketErr(env, "setsockopt(IP_MULTICAST_LOOP) error: ");
        closeSocket(newSocket);
        return -1;
    }
#endif
#endif

    //Note: Windoze requires binding, even if the port number is 0
    netAddressBits addr =INADDR_ANY;
#if defined(__WIN32__) || defined(_WIN32)
#else
    if (port.num() != 0 || ReceivingInterfaceAddr !=INADDR_ANY) {
#endif
        if (port.num() == 0) addr =ReceivingInterfaceAddr;
        
        MAKE_SOCKADDR_IN(name, addr, port.num());
        
        if (bind(newSocket, (struct sockaddr*)&name, sizeof name) != 0) {//绑定socket
            char tmpBuffer[100];
            sprintf(tmpBuffer, "bind() error (port number: %d): ",
            ntohs(port.num()));
            socketErr(env, tmpBuffer);
            closeSocket(newSocket);
            return -1;
        }
#if defined(__WIN32__) || defined(_WIN32)
#else}
#endif

    //Set the sending interface for multicasts, if it's not the default:
    if (SendingInterfaceAddr !=INADDR_ANY) {
        structin_addr addr;
        addr.s_addr =SendingInterfaceAddr;

        if (setsockopt(newSocket, IPPROTO_IP, IP_MULTICAST_IF, (const char*)&addr, sizeof addr) < 0) {//设置组播的默认网络接口
            socketErr(env, "error setting outgoing multicast interface: ");
            closeSocket(newSocket);
            return -1;
        }
    }

    returnnewSocket;
}

而GroupsockHelper.cpp不但定义了setupDatagramSocket函数,还定义了udpSocket的读(readSocket)写(writeSocket)函数。

上面讲的udp的组播和单播,下面分析一下tcp的单播。

首先是tcp socket创建:

int setupStreamSocket(UsageEnvironment&env,
                      Port port, Boolean makeNonBlocking) 
{
    if (!initializeWinsockIfNecessary()) {
        socketErr(env, "Failed to initialize 'winsock': ");
        return -1;
    }

    int newSocket = socket(AF_INET, SOCK_STREAM, 0);//创建tcp socket
    if (newSocket < 0) {
        socketErr(env, "unable to create stream socket: ");
        returnnewSocket;
    }

    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof reuseFlag) < 0) { //设置socket 复用
        socketErr(env, "setsockopt(SO_REUSEADDR) error: ");
        closeSocket(newSocket);
        return -1;
    }

    //SO_REUSEPORT doesn't really make sense for TCP sockets, so we
    //normally don't set them.  However, if you really want to do this
    //#define REUSE_FOR_TCP
#ifdef REUSE_FOR_TCP
#if defined(__WIN32__) || defined(_WIN32)
    //Windoze doesn't properly handle SO_REUSEPORT
#else#ifdef SO_REUSEPORT
    if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuseFlag, sizeof reuseFlag) < 0) {//设置端口复用
        socketErr(env, "setsockopt(SO_REUSEPORT) error: ");
        closeSocket(newSocket);
        return -1;
    }
#endif
#endif
#endif

    //Note: Windoze requires binding, even if the port number is 0
#if defined(__WIN32__) || defined(_WIN32)
#else
    if (port.num() != 0 || ReceivingInterfaceAddr !=INADDR_ANY) {
#endifMAKE_SOCKADDR_IN(name, ReceivingInterfaceAddr, port.num());
        if (bind(newSocket, (struct sockaddr*)&name, sizeof name) != 0) {//绑定socket
            char tmpBuffer[100];
            sprintf(tmpBuffer, "bind() error (port number: %d): ", ntohs(port.num()));
            socketErr(env, tmpBuffer);
            closeSocket(newSocket);
            return -1;
        }
#if defined(__WIN32__) || defined(_WIN32)
#else}
#endif

    if(makeNonBlocking) {
        if (!makeSocketNonBlocking(newSocket)) {
            socketErr(env, "failed to make non-blocking: ");
            closeSocket(newSocket);
            return -1;
        }
    }

    returnnewSocket;
}

那程序是怎么判断用组播还是单播,是tcp还是udp呢?

在RTSP的setupMediaSubsession()函数中的“SETUP”会向服务端确定是否支持单播或者组播,当收到服务器的应该后,进行下面的处理:

... ...   

    if (streamUsingTCP) {//如果客服端将streamUsingTcp设置为1,则表明rtp和rtcp进行tcp传输;如果为0,则是udp传输。tcp传输只能是单播,udp传输则可能是单播也可能是组播,服务端的设置,如果服务端支持组播,那只能是组播,反之则是单播。
            //Tell the subsession to receive RTP (and send/receive RTCP)
            //over the RTSP stream:
            if (subsession.rtpSource() != NULL)//rtp连接
                subsession.rtpSource()->setStreamSocket(fInputSocketNum, subsession.rtpChannelId);//rtpSource()函数获取的fRTPSource在initiate函数根据sdp描述的编码类型创建的
            if (subsession.rtcpInstance() != NULL)//rtcp连接
                subsession.rtcpInstance()->setStreamSocket(fInputSocketNum, subsession.rtcpChannelId);//rtcpInstance()函数获取的fRTCPInstance在initiate函数中创建的
        } else {//udp传输
            //Normal case.
            //Set the RTP and RTCP sockets' destination address and port
            //from the information in the SETUP response: 
            subsession.setDestinations(fServerAddress); //它管理着一个本地socket和多个目的地址,因为是UDP,所以只需知道对方地址和端口即可发送数据。所以我们从服务器获取了新的rtp和rtcp端口加入到目标地址中。
}
... ...

setStreamSocket函数最终调用的是RTPInterface::setStreamSocket,将rtp和rtcp的socket保存到streams->fStreamSocketNum(注意:tcp单播:服务端的socket是绑定的,每次客服端请求便会创建一个clientsocket, 而客服端rtsp rtp rtcp协议用就是用的这个socket;udp单播,live555没有采用connect的方式,直接setDestinations函数加入到fDests链表中,发送时候的目标地址在此链表中),当rtp和rtcp发送数据的时候调用的RTPInterface::sendPacket函数将会用到:

void RTPInterface::sendPacket(unsigned char*packet, unsigned packetSize) 
{
    //Normal case: Send as a UDP packet:
    fGS->output(envir(), fGS->ttl(), packet, packetSize);//udp传输

    //Also, send over each of our TCP sockets:
    for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) {//tcp传输
        sendRTPOverTCP(packet, packetSize, streams->fStreamSocketNum, streams->fStreamChannelId);
    }
}

setDestinations()函数定义在MediaSubsession.cpp中:

voidMediaSubsession::setDestinations(unsigned defaultDestAddress) 
{
    //Get the destination address from the connection endpoint name
    //(This will be 0 if it's not known, in which case we use the default)
    unsigned destAddress =connectionEndpointAddress();
    if (destAddress == 0) destAddress =defaultDestAddress;
    struct in_addr destAddr; destAddr.s_addr =destAddress;

    //The destination TTL remains unchanged:
    int destTTL = ~0; //means: don't change

    if (fRTPSocket !=NULL) {
        Port destPort(serverPortNum);//这里serverPortNum是在服务端应答中获取rtp的端口号
        fRTPSocket->changeDestinationParameters(destAddr, destPort, destTTL);//这里的fRTPSocket是在前面的initiate函数中创建的
}
    if (fRTCPSocket != NULL && !isSSM()) {
        //Note: For SSM sessions, the dest address for RTCP was already set.
        Port destPort(serverPortNum+1);//这个是rtcp的端口号
        fRTCPSocket->changeDestinationParameters(destAddr, destPort, destTTL);//这里的fRTCPSocket在前面的initiate函数创建的
}
}
changeDestinationParameters()定义在Groupsock.cpp中:
//改变目的地址的参数
//newDestAddr是新的目的地址
//newDestPort是新的目的端口
//newDestTTL是新的TTL
voidGroupsock::changeDestinationParameters(
        struct in_addr const&newDestAddr,
        Port newDestPort,
        intnewDestTTL)
{
    if (fDests ==NULL)
        return;

    //获取第一个目的地址(此处不是很明白:fDest是一个单向链表,每次添加一个目的地址,
    //都会把它插入到最前目,难道这个函数仅改变最后一个添加的目的地址?)
    struct in_addr destAddr = fDests->fGroupEId.groupAddress();
    if (newDestAddr.s_addr != 0) {
        if (newDestAddr.s_addr !=destAddr.s_addr
                &&IsMulticastAddress(newDestAddr.s_addr))
        {
            //如果目的地址是一个多播地址,则离开老的多播组,加入新的多播组。
socketLeaveGroup(env(), socketNum(), destAddr.s_addr);
            socketJoinGroup(env(), socketNum(), newDestAddr.s_addr);
        }
        destAddr.s_addr =newDestAddr.s_addr;
    }

    portNumBits destPortNum = fDests->fGroupEId.portNum();
    if (newDestPort.num() != 0) {
        if (newDestPort.num() != destPortNum &&IsMulticastAddress(destAddr.s_addr))
        {
            //如果端口也不一样,则先更改本身socket的端口
            //(其实是关掉原先的socket的,再以新端口打开一个socket)。
changePort(newDestPort);
            //然后把新的socket加入到新的多播组。
            //And rejoin the multicast group:
socketJoinGroup(env(), socketNum(), destAddr.s_addr);
        }
        destPortNum =newDestPort.num();
        fDests->fPort =newDestPort;
    }

    u_int8_t destTTL =ttl();
    if (newDestTTL != ~0)
        destTTL =(u_int8_t) newDestTTL;

    //目标地址的所有信息都在fGroupEId中,所以改变成员fGroupEId。
    fDests->fGroupEId =GroupEId(destAddr, destPortNum, destTTL);
    
    //(看起来这个函数好像只用于改变多播时的地址参数,
    //以上分析是否合理,肯请高人指点)
}

liveMedia:

是很重要的一个库,其不仅包含了实现RTSP Server的类,还包含了针对不同流媒体类型(如TS流、PS流等)编码的类。在该库中,基类是Medium,层次关系非常清晰。在该库中,有几个很重要的类,如RTSPServer、ServerMediaSession、RTPSink、RTPInterface、FramedSource等。  

从上面这个主要的类结构可以看出,liveMedia库中的基类为Medium,其下又有几个非常重要的部分,一个是×××Subsession,除Medium父类外,所有的×××Subsession类都继承于ServerMediaSubsession;一个是×××Source,MediaSource的frameSource下主要包含FramedFileSource、RTPSource、FramedFilter等几个主要的部分;一个是MediaSink,以继承于RTPSink的类居多。

此外,还包含了用于处理packet的BufferedPacketFactory和BufferedPacket及其相关子类,用于处理流分析的StreamParser及其子类。

公用类:

RTPInterface类:发送rtp rtcp数据包;startNetworkReading函数注册tcp读取rtp rtcp数据的回调函数,注册udp读取rtp rtcp数据的回调函数。

RTCPInstance类:rtcp协议的实现,创建RTPInterface类实例fRTCPInterface。

服务端实现需要以下几个基类:  

RTSPServer:创建rtsp分tcp socket,注册任务incomingConnectionHandler,在此任务函数里,accept接收客服端连接,创建RTSPClientSession类;

RTSPClientSession类:注册任务incomingRequestHandler,在此任务函数里,readSocket读取客服端发送的信息,并且解析出cmdName,收到"OPTIONS"命令则通过handleCmd_OPTIONS函数应答,收到"DESCRIBE"则通过handleCmd_DESCRIBE函数应答,收到"SETUP"命令则通过handleCmd_SETUP函数应答,收到"TEARDOWN"、"PLAY"、"PAUSE"、"GET_PARAMETER"通过handleCmd_withinSession函数应答,如果都不是则通过handleCmd_notSupported函数应答。  

ServerMediaSubsession类:多媒体流可能包含几个子流,每个流都带有fTrackId,比如视频流track1,音频流track2

ServerMediaSession类:一个多媒体流增加addSubsession子流,产生generateSDPDescriptionSDP描述。

FramedSource类:提供虚函数doGetNextFrame函数去获取信息,此函数具体实现在派生类中,比如我的是在OpenFileSource类实现。

RTPSink类:服务端数据是从 FramedSource流到 RTPSink,并且创建RTPInterface类实例fRTPInterface。

单播:

StreamState类:startPlaying函数开始播放,endPlaying函数结束播放,pause函数暂停播放。

Destinations类:udp连接,保存isTCP为false,目标地址,rtp的端口,rtcp的端口;tcp连接,保存isTcp为TRUE,tcp socket值,rtp rtcp通道ID。

OnDemandServerMediaSubsession类:单播的时候,通过getStreamParameters函数创建udp socket,创建FramedSource, 创建RTPSink,创建StreamState,创建Destinations,当收到“PLAY”的命令时调用startStream开始播放,当收到“PAUSE”时调用pauseStream函数暂停播放,当收到“TERADOWN”时停止播放。

组播:

PassiveServerMediaSubsession类:getStreamParameters函数改变rtp和rtcp的目标地址。

客服端实现需要以下几个基类:

RTPSource:客户端数据则是从 RTPSource 流到 XXXFileSink, 并且创建RTPInterface类实例fRTPInterface。

MediaSubsession类:initiate函数创建rtp rtcp socket,并且根据服务端传来的编码类型fCodecName来创建相应的RTPSource,创建RTCPInstance;解析SDP的各种属性;setDestinations函数设置服务端地址。

RTSPClient: 创建tcp socket,连接rtsp服务端,sendOptionsCmd函数发送“OPTIONS”命令;describeURL函数则是发送“DESCRIBE”命令;announceSDPDescription函数则是发送“ANNOUNCE”命令;setupMediaSubsession函数则是发送“SETUP”命令,并且设置socket的目标地址;playMediaSession函数则是发送“PLAY”命令;pauseMediaSession函数则是发送“PAUSE”命令;等等。主要是RTSP的通信处理。

基本上,整个liveMedia库的主要类结构就是这样。

http://www.live555.com上的相关文档中提到穿透防火墙的问题,方法是开启一个HTTP的tunnel,然后我们可以在liveMedia库中找到一个RTSPOverHTTPServer的类,该类解决了这样的问题。

mediaServer:

Live555MediaServer提供了main函数,DynamicRTSPServer继承了RTSPServer并重写了虚函数lookupServerMediaSession。用不到。 

免责声明:文章转载自《Live555 分析(一):类介绍》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Hibernate Validatormacbook golang的debug模式不好使下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

axios 学习文档

什么是 axios? Axios 是一个基于 promise 的 HTTP 库,可以用在浏览器和 node.js 中。axios Github 特性 从浏览器中创建 XMLHttpRequests 从 node.js 创建 http 请求 支持 Promise API 拦截请求和响应 转换请求数据和响应数据 取消请求 自动转换 JSON 数据 客户端支持...

Linux网络编程 select/epoll得知socket有数据可读,如何判断数据全部被读取完毕?

补充一点:只有在使用epoll ET(Edge Trigger)模式的时候,才需要关注数据是否读取完毕了。使用select或者epoll的LT模式,其实根本不用关注数据是否读完了,select/epoll检测到有数据可读去读就OK了。 这里有两种做法: 1. 针对TCP,调用recv方法,根据recv方法的返回值,如果返回值小于我们指定的recv buf...

zookeeper适用场景:配置文件同步

问题导读:1.本文三个角色之间是什么关系?2.三个角色的作用是什么?3.如何代码实现这三个角色的作用?在 zookeeper适用场景:zookeeper解决了哪些问题有关于分布式集群配置文件同步问题的描述,本文介绍如何把zk应用到配置文件分发的场景。假设有三个角色 trigger:发布最新的配置文件数据,发送指令和数据给zk_agent,实现是下面的tr...

QMesageBox的使用

一、使用构造函数弹出对话框 1、 QMessageBox msgBox;//最简单的对话框,里面什么也没有   QString str = “test”;   msgBox.setText(str);   msgBox.exec(); 2、   QMessageBox message(QMessageBox::NoIcon, "Title", "Conte...

从IP层TTL递减看校验和及ICMP

一、协议栈中的校验和 在IP协议及UDP/TCP协议中都是用了校验和字段,这个字段通常没有人会关注,就好像现在已经没有人知道当时的一个字节中保留的一个校验bit一样。我也是偶尔看我们常用的traceroute功能的时候间接看到了这个字段。traceroute的流程大致是这样的:从1不断的增加IP协议头中TTL字段的数值,期待中间的路由节点发送一个报文过期...

WebSocket以及socketIO的使用

简介 WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。 现在,很多网站为了实现推送技术,所用的技术都是 Ajax 轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出H...