分布式事务10_最大努力通知形-copy

摘要:
实现业务活动的主动发送。完成业务处理后,向业务活动的被动方发送消息以允许消息丢失。业务活动的被动方根据定时策略向业务活动的主动发送方发送查询。丢失的业务消息的恢复限制了被动方的处理结果,不会影响主动方的处理效果。业务查询和校对系统的建设成本。应用范围对业务的最终一致性具有时间敏感性。可以查询整个企业业务活动中使用的服务模式。可以查询业务活动的特征,向业务活动的被动方发送通知消息(允许消息丢失)。可以设置主动发送

实现
业务活动的主动发,在完成业务处理后,向业务活动的被动方发送消息允许消息丢失
业务活动的被动方根据定时策略,向业务活动的主动发查询,恢复丢失的业务消息
约束
被动方的处理结果不影响主动方的处理结果
成本
业务查询与校对系统的建设成本
适用范围
对业务最终一致性的时间敏感度低
跨企业的业务活动
用到的服务模式
可查询操作
方案特点
业务活动的主动发在完成业务处理后,向业务活动被动方发送通知消息(允许消息丢失)
主动发可以设置时间梯度通知规则,在通知失败后按照规则重复通知,直到通知N次后不再通知
主动发提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息
行业应用案例
银行通知、商户通知等(各大交易业务平台的商户通知:多次通知、查询校对、对账文件)
设计
数据库
rp_notify_record
DROP TABLE IF EXISTS `rp_notify_record`;
CREATE TABLE `rp_notify_record` (
`id` varchar(50) NOT NULL DEFAULT '' COMMENT '主键ID',
`version` int(11) NOT NULL DEFAULT '0' COMMENT '版本事情',
`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
`edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
`notify_rule` varchar(255) DEFAULT NULL COMMENT '通知规则(单位:分钟)',
`notify_times` int(11) NOT NULL DEFAULT '0' COMMENT '已通知次数',
`limit_notify_times` int(11) NOT NULL DEFAULT '0' COMMENT '最大通知次数限制',
`url` varchar(2000) NOT NULL DEFAULT '' COMMENT '通知请求链接(包含通知内容)',
`merchant_order_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户订单号',
`merchant_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户编号',
`status` varchar(50) NOT NULL DEFAULT '' COMMENT '通知状态(对应枚举值)',
`notify_type` varchar(30) DEFAULT NULL COMMENT '通知类型',
PRIMARY KEY (`id`),
KEY `AK_KEY_2` (`merchant_order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='通知记录表 RP_NOTIFY_RECORD';
rp_notify_record_log
DROP TABLE IF EXISTS `rp_notify_record_log`;
CREATE TABLE `rp_notify_record_log` (
`id` varchar(50) NOT NULL DEFAULT '' COMMENT 'ID',
`version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
`edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
`notify_id` varchar(50) NOT NULL DEFAULT '' COMMENT '通知记录ID',
`request` varchar(2000) NOT NULL DEFAULT '' COMMENT '请求内容',
`response` varchar(2000) NOT NULL DEFAULT '' COMMENT '响应内容',
`merchant_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商户编号',
`merchant_order_no` varchar(50) NOT NULL COMMENT '商户订单号',
`http_status` varchar(50) NOT NULL COMMENT 'HTTP状态',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='通知记录日志表 RP_NOTIFY_RECORD_LOG';
接口
RpNotifyService
public interface RpNotifyService {

/**
* 创建消息通知
* @param rpNotifyRecord
*/
public long createNotifyRecord(RpNotifyRecord rpNotifyRecord) throws NotifyBizException;

/**
* 修改消息通知
* @param rpNotifyRecord
*/
public void updateNotifyRecord(RpNotifyRecord rpNotifyRecord) throws NotifyBizException;

/**
* 创建消息通知记录
* @param rpNotifyRecordLog
* @return
*/
public long createNotifyRecordLog(RpNotifyRecordLog rpNotifyRecordLog) throws NotifyBizException;

/**
* 发送消息通知
* @param notifyUrl 通知地址
* @param merchantOrderNo 商户订单号
* @param merchantNo 商户编号
*/
public void notifySend(String notifyUrl,String merchantOrderNo,String merchantNo) throws NotifyBizException;


/**
* 通过ID获取通知记录
* @param id
* @return
*/
public RpNotifyRecord getNotifyRecordById(String id) throws NotifyBizException;

/**
* 根据商户编号,商户订单号,通知类型获取通知记录
* @param merchantNo 商户编号
* @param merchantOrderNo 商户订单号
* @param notifyType 消息类型
* @return
*/
public RpNotifyRecord getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(String merchantNo , String merchantOrderNo , String notifyType) throws NotifyBizException;

/**
* 按条件分页查询通知记录.
*/
public PageBean<RpNotifyRecord> queryNotifyRecordListPage(PageParam pageParam , Map<String, Object> paramMap) throws NotifyBizException;


}
RpNotifyServiceImpl
public class RpNotifyServiceImpl implements RpNotifyService {

@Autowired
private JmsTemplate notifyJmsTemplate;

@Autowired
private RpNotifyRecordDao rpNotifyRecordDao;

@Autowired
private RpNotifyRecordLogDao rpNotifyRecordLogDao;

/**
* 创建消息通知
*
* @param rpNotifyRecord
*/
@Override
public long createNotifyRecord(RpNotifyRecord rpNotifyRecord) {
return rpNotifyRecordDao.insert(rpNotifyRecord);
}

/**
* 修改消息通知
*
* @param rpNotifyRecord
*/
@Override
public void updateNotifyRecord(RpNotifyRecord rpNotifyRecord) {
rpNotifyRecordDao.update(rpNotifyRecord);
}

/**
* 创建消息通知记录
*
* @param rpNotifyRecordLog
* @return
*/
@Override
public long createNotifyRecordLog(RpNotifyRecordLog rpNotifyRecordLog) {
return rpNotifyRecordLogDao.insert(rpNotifyRecordLog);
}



/**
* 发送消息通知
*
* @param notifyUrl 通知地址
* @param merchantOrderNo 商户订单号
* @param merchantNo 商户编号
*/
@Override
public void notifySend(String notifyUrl, String merchantOrderNo, String merchantNo) {

RpNotifyRecord record = new RpNotifyRecord();
record.setNotifyTimes(0);
record.setLimitNotifyTimes(5);
record.setStatus(NotifyStatusEnum.CREATED.name());
record.setUrl(notifyUrl);
record.setMerchantOrderNo(merchantOrderNo);
record.setMerchantNo(merchantNo);
record.setNotifyType(NotifyTypeEnum.MERCHANT.name());

Object toJSON = JSONObject.toJSON(record);
final String str = toJSON.toString();

notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(str);
}
});
}

/**
* 通过ID获取通知记录
*
* @param id
* @return
*/
@Override
public RpNotifyRecord getNotifyRecordById(String id) {
return rpNotifyRecordDao.getById(id);
}

/**
* 根据商户编号,商户订单号,通知类型获取通知记录
*
* @param merchantNo 商户编号
* @param merchantOrderNo 商户订单号
* @param notifyType 消息类型
* @return
*/
@Override
public RpNotifyRecord getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(String merchantNo, String merchantOrderNo, String notifyType) {
return rpNotifyRecordDao.getNotifyByMerchantNoAndMerchantOrderNoAndNotifyType(merchantNo,merchantOrderNo,notifyType);
}

@SuppressWarnings("unchecked")
@Override
public PageBean<RpNotifyRecord> queryNotifyRecordListPage(PageParam pageParam, Map<String, Object> paramMap) {
return rpNotifyRecordDao.listPage(pageParam,paramMap);
}


}
延时执行
import java.util.concurrent.DelayQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.roncoo.pay.app.notify.core.NotifyPersist;
import com.roncoo.pay.app.notify.core.NotifyTask;
import com.roncoo.pay.service.notify.aip.RpNotifyService;

/**
* 商户通知应用启动类.
*
*/
public class App
{
private static final Log LOG = LogFactory.getLog(App.class);

/**
* 通知任务延时队列,对象只能在其到期时才能从队列中取走。
*/
public static DelayQueue<NotifyTask> tasks = new DelayQueue<NotifyTask>();

private static ClassPathXmlApplicationContext context;

private static ThreadPoolTaskExecutor threadPool;

public static RpNotifyService rpNotifyService;

public static NotifyPersist notifyPersist;

public static void main(String[] args) {
try {
context = new ClassPathXmlApplicationContext(new String[] { "spring/spring-context.xml" });
context.start();
threadPool = (ThreadPoolTaskExecutor) context.getBean("threadPool");
rpNotifyService = (RpNotifyService) context.getBean("rpNotifyService");

// 从数据库中取一次数据用来当系统启动时初始化
notifyPersist = (NotifyPersist) context.getBean("notifyPersist");
.initNotifyDataFromDB();

startThread(); // 启动任务处理线程

LOG.info("== context start");
} catch (Exception e) {
LOG.error("== application start error:", e);
return;
}
synchronized (App.class) {
while (true) {
try {
App.class.wait();
} catch (InterruptedException e) {
LOG.error("== synchronized error:", e);
}
}
}
}

private static void startThread() {
LOG.info("==>startThread");

threadPool.execute(new Runnable() {
public void run() {
try {
while (true) {
LOG.info("==>threadPool.getActiveCount():" + threadPool.getActiveCount());
LOG.info("==>threadPool.getMaxPoolSize():" + threadPool.getMaxPoolSize());
// 如果当前活动线程等于最大线程,那么不执行
if (threadPool.getActiveCount() < threadPool.getMaxPoolSize()) {
LOG.info("==>tasks.size():" + tasks.size());
//使用take方法获取过期任务,如果获取不到,就一直等待,知道获取到数据
final NotifyTask task = tasks.take();
if (task != null) {
threadPool.execute(new Runnable() {
public void run() {
tasks.remove(task);
task.run(); // 执行通知处理
LOG.info("==>tasks.size():" + tasks.size());
}
});
}
}
}
} catch (Exception e) {
LOG.error("系统异常;",e);
}
}
});
}

}
————————————————
版权声明:本文为CSDN博主「chenshiying007」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_27384769/article/details/79331027

免责声明:文章转载自《分布式事务10_最大努力通知形-copy》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇分布式事务八_可靠消息最终一致性方案-copy分布式事务九_基于可靠消息的最终一致性代码-copy下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

windows消息机制的有趣发现(二)

3.一个有窗口和窗口过程函数但没有消息循环的程序 一个程序,如果我们创建了窗口,也定义了窗口过程函数,但是没有建立消息循环会怎样呢?我们在win32控制台项目下编写如下代码: #include <windows.h> #define WM_TEST 10000 LRESULT CALLBACK WndProc(HWND, UINT, WPARA...

ubuntu下DNS原理及相关设置

1.DNS原理分析如下: 当 DNS 客户机需要查询程序中使用的名称时,它会查询本地DNS 服务器来解析该名称。客户机发送的每条查询消息都包括3条信息,以指定服务器应回答的问题。● 指定的 DNS 域名,表示为完全合格的域名 (FQDN) 。● 指定的查询类型,它可根据类型指定资源记录,或作为查询操作的专门类型。● DNS域名的指定类别。对于DNS 服务器,...

devops基础05--nexus

devops devops基础01--gitlab - omgasw - 博客园 (cnblogs.com) devops基础02--jenkins - omgasw - 博客园 (cnblogs.com) devops基础03--sonarqube - omgasw - 博客园 (cnblogs.com) devops基础04--maven - omgas...

18、进程通信方法(Linux和windows下),线程通信方法(Linux和 windows下)

名称及方式 管道(pipe):允许一个进程和另一个与它有共同祖先的进程之间进行通信 命名管道(FIFO):类似于管道,但是它可以用于任何两个进程之间的通信,命名管道在文件系统中有 对应的文件名。命名管道通过命令mkfifo或系统调用mkfifo来创建 消息队列(MQ):消息队列是消息的连接表,包括POSIX消息对和System V消息队列。有足够权限的...

Spring IOC 和 AOP概述

IoC(控制反转,(Inversion of Control):本来是由应用程序管理的对象之间的依赖关系,现在交给了容器管理,这就叫控制反转,即交给了IoC容器,Spring的IoC容器主要使用DI方式实现的。 不需要主动查找,对象的查找、定位和创建全部由容器管理 DI(Dependency Injection):IOC 的另一种表述方式:即组件以一些...

Socket的使用

最近这段时间,在开发中碰到一个核销的需求——手机公众号打开二维码页面后,第三方扫码器扫描出现的二维码,扫描后第三方在第三方后台处理数据并调用我司的接口,我方要主动将消息推送给前端,要求前后端配合,前端使用websocket后端使用socket,在进入二维码页面时前端就使用websocket,后台消息处理完后才从后端向前端主动推送消息。 公司的架构用的是spr...