消息推送服务

摘要:
APM.Server消息推送服务的实现消息推送服务服务器推送目前流行就是私信、发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的;高性能、分布式可以如下解决:会话映射可采用rediscluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现。部分代码如下:1///2///消息转发3///4privatevoidForwardMsg()5{6try7{8varmsg=MessageQueue.Dequeue();9if(msg!
APM.Server 消息推送服务的实现

消息推送服务

服务器推送目前流行就是私信、发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的;高性能、分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现。
APM.Server基于简单

1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>();

1 private static ConcurrentQueue<Message> _messageQueue = new ConcurrentQueue<Message>();

实现。

部分代码如下:

消息推送服务第1张
1 /// <summary>
 2         /// 消息转发
 3         /// </summary>
 4         private void ForwardMsg()
 5         {
 6             try
 7             {
 8                 var msg = MessageQueue.Dequeue();
 9                 if (msg != null)
10                 {
11                     switch (msg.Type)
12                     {
13                         case (byte)MessageType.Sub:
14                             if (!msg.IsMuti)
15                             {
16                                 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
17                                     SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
18                             }
19                             if (!SessionDic.Exists(msg.SessionID, msg.Sender))
20                                 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
21                             break;
22                         case (byte)MessageType.Unsub:
23                             if (!msg.IsMuti)
24                             {
25                                 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
26                                     SessionDic.Del(msg.SessionID, msg.SessionID);
27                             }
28                             if (SessionDic.Exists(msg.SessionID, msg.Sender))
29                                 SessionDic.Del(msg.Sender, msg.SessionID);
30                             break;
31                         default:
32                             var session = SessionDic.Get(msg.SessionID);
33                             if (session != null)
34                             {
35                                 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
36                                 if (remotes != null && remotes.Count > 0)
37                                 {
38                                     Parallel.For(0, remotes.Count, i =>
39                                     {
40                                         this._server.SendMsg(remotes[i], Message.Serialize(msg));
41                                     });
42                                 }
43                             }
44                             this.OnMessage?.Invoke(msg);
45                             break;
46                     }
47 
48                 }
49             }
50             catch { }
51         }
复制代码
 1 /// <summary>
 2         /// 消息转发
 3         /// </summary>
 4         private void ForwardMsg()
 5         {
 6             try
 7             {
 8                 var msg = MessageQueue.Dequeue();
 9                 if (msg != null)
10                 {
11                     switch (msg.Type)
12                     {
13                         case (byte)MessageType.Sub:
14                             if (!msg.IsMuti)
15                             {
16                                 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
17                                     SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
18                             }
19                             if (!SessionDic.Exists(msg.SessionID, msg.Sender))
20                                 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
21                             break;
22                         case (byte)MessageType.Unsub:
23                             if (!msg.IsMuti)
24                             {
25                                 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
26                                     SessionDic.Del(msg.SessionID, msg.SessionID);
27                             }
28                             if (SessionDic.Exists(msg.SessionID, msg.Sender))
29                                 SessionDic.Del(msg.Sender, msg.SessionID);
30                             break;
31                         default:
32                             var session = SessionDic.Get(msg.SessionID);
33                             if (session != null)
34                             {
35                                 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
36                                 if (remotes != null && remotes.Count > 0)
37                                 {
38                                     Parallel.For(0, remotes.Count, i =>
39                                     {
40                                         this._server.SendMsg(remotes[i], Message.Serialize(msg));
41                                     });
42                                 }
43                             }
44                             this.OnMessage?.Invoke(msg);
45                             break;
46                     }
47 
48                 }
49             }
50             catch { }
51         }
复制代码
异步tcp通信——APM.Core 服务端概述异步tcp通信——APM.Core 解包异步tcp通信——APM.Server 消息推送服务的实现异步tcp通信——APM.ConsoleDemo

转载请标明本文来源:http://www.cnblogs.com/yswenli/
更多内容欢迎star作者的github:https://github.com/yswenli/APM

免责声明:文章转载自《消息推送服务》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇无线Mesh网络技术基础与应用.NET webapi 的单元测试下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RabbitMQ生产者发送消息确认

在使用RabbitMQ的时候,可以通过消息的持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去以后,消息到底有没有到达服务器呢?如果不进行特殊的配置,默认情况下发送消息的操作是不会返回任何消息给生产者的,也就是默认情况下是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久...

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。 由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。...

Kafka-如何保证生产者的可靠性

Kafka-如何保证生产者的可靠性 即使我们尽可能把broker配置的很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。 举例: 为broker配置了3个副本,并且禁用了不完全首领选举,这样应该可以保证万无一失。我们把生产者发送消息的acks设为1(只要首领接收到消息就可以认为消息写入成功)。生产者发送一个消息给首...

CentOS 查看日志命令

  cat tail -f 日 志 文 件 说    明 /var/log/message 系统启动后的信息和错误日志,是Red Hat Linux中最常用的日志之一 /var/log/secure 与安全相关的日志信息 /var/log/maillog 与邮件相关的日志信息 /var/log/cron 与定时任务相关的日志信息 /var/log/spoo...

android即时消息处理机制

     在android端做即时消息的时候。遇到的坑点是怎么保证消息即时性,又不耗电。为什么这么说呢?     原因是假设要保证消息即时性。通常有两种机制pull或者push。pull定时轮询机制,比較浪费server资源;pushserver推送机制,须要保持长连接,client和server都要求比較高(网络环境,server保持连接数等),它们的...

md5sum 和 sha256sum用于 验证软件完整性

md5sum 和 sha256sum 都用来用来校验软件安装包的完整性,本次我们将讲解如何使用两个命令进行软件安装包的校验: sha 是什么? sha 为安全散列算法(英语:Secure Hash Algorithm,缩写为SHA)是一个密码散列函数家族,是FIPS所认证的安全散列算法。能计算出一个数字消息所对应到的,长度固定的字符串(又称消息摘要)的算法...