rocketmq学习

摘要:
这里有些不错的介绍rocketmq的文章特性客户端实践

官网地址


安装name server和broker

git clone https://github.com/apache/incubator-rocketmq.git

cd incubator-rocketmq

mvn clean package install -Prelease-all assembly:assembly -U

然后target目录下的apache-rocketmq-all就是我们需要的

把apache-rocketmq-all抽出来,移到apache-rocketmq-all目录下

执行nohup sh bin/mqnamesrv & 启动name server

tail -f ~/logs/rocketmqlogs/namesrv.log 查看name server日志

执行nohup sh bin/mqbroker -n localhost:9876 & 启动Broker

tail -f ~/logs/rocketmqlogs/broker.log 查看Broker日志

demo

添加依赖

<!--rocketmq-->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.0.0-incubating</version>
</dependency>

消费者

/**
 * @author fengzp
 * @date 2017/3/31下午5:10
 * @email fengzp@gzyitop.com
 * @company 广州易站通计算机科技有限公司
 */
public class ConsumerMQ {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /**
         * ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupName");

        /**
         * 指定服务端和端口
         */
        consumer.setNamesrvAddr("localhost:9876");

        /**
         * 订阅指定topic下tags为TagName的消息; "TagA || TagB || TagC" 代表订阅TagA和TagB和TagC的消息; "*" 代表订阅所有消息
         */
        consumer.subscribe("TopicName", "TagName");


        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /**
         * * 默认msgs里只有一条消息,可以通过consumer.setConsumeMessageBatchMaxSize();来设置批量接收消息
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    /**
                     * msg.getMsgId(); //msg唯一id
                     * msg.getTopic();
                     * msg.getTags();
                     */
                    System.out.println(new String(msg.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

        System.out.println("Consumer Started.");
    }
}    

提供者

/**
* @author fengzp
* @date 2017/3/31下午5:07
* @email fengzp@gzyitop.com
* @company 广州易站通计算机科技有限公司
*/
public class ProducerMQ {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("GroupName");

        producer.setNamesrvAddr("localhost:9876");
        producer.setInstanceName("InstanceName");
        producer.start();

        try {
            for (int i = 0; i < 3; i++) {
                Message msg = new Message("TopicName", "TagName", (new Date() + " fengzp hao shuai a " + i).getBytes());

                SendResult sendResult = producer.send(msg);

                System.out.println(sendResult.getMsgId() + " : " + sendResult.getSendStatus().name());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 应用退出时,需要调用shutdown方法来在MetaQ服务器上注销自己
         */
        producer.shutdown();
    }
}

测试

先启动消费者,然后启动提供者

提供者:
rocketmq学习第1张

消费者:
rocketmq学习第2张

消息成功发送,并且触发了订阅。



这里有些不错的介绍rocketmq的文章

特性

客户端实践

免责声明:文章转载自《rocketmq学习》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Flink本地安装和创建Flink应用scan chain的原理和实现——8.AT SPEED Test &amp;amp; OCC下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RocketMq 测试

向长辈致敬: 蠍: 卢兄  像RocketMq 这个怎么测试? 卢兄: 中间件的测试,要结合中间件的特点来测试 蠍: 特点什么意思   卢兄: 比如,这个工具是作为发布消息和消费消息的,具有解耦的特点 卢兄: 那么你可以通过做交易,去看他是否能够正常发布消息,消费消息 卢兄: 最重要的还是要去测试他的性能 蠍: 性能怎么测试 卢兄: 让其消息队列积累大量的...

rocketmq学习(二) rocketmq集群部署与图形化控制台安装

1.rocketmq图形化控制台安装   虽然rocketmq为用户提供了使用命令行管理主题、消费组以及broker配置的功能,但对于不够熟练的非运维人员来说,命令行的管理界面还是较难使用的。为此,我们可以使用图形化的管理界面来简化管理操作。   rocketmq官方推荐的图形化控制台目前还处在不成熟的孵化阶段。仓库地址为(https://github.c...

【RocketMQ】同一个项目中,同一个topic,可以存在多个消费者么?

一、问题答案 是不可以的 而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息 /** * @date 2019/05/28 */ @Component @Slf4j public class MqConsumer implementsMessageConsumer { @Overr...

RocketMQ消息至少一次(At least Once)投递和消费

至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。 生产者 在同步非顺序投递的时候,每次都是轮询到不同的队列: Message message = new Message("topic...

RocketMQ事务消费和顺序消费详解

一、RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。 rocketMq实现顺序消费的原理  produce在发送消息的时候,把消息...

rokectMq

目录 消息模型 集群部署 NameServer producer消息队列选择器MessageQueueSeleetor 消息存储 commitLog数据存储文件 ConsumeQueue索引文件 IndexFile索引文件 消息查找 根据偏移量查询 根据消息id查询 根据消息key查询 实时更新消息消费队列与索引文件 消息队列与索引文件恢复 过期...