• 从学生到士兵:携笔从戎的青春不后悔 2019-11-10
  • 计划经济不是计划两字的意思。计划经济是按照公有经济规律进行搞经济,其核心价值是强大公有制,实现公有(共有)资本在其所有者们进行分配。即谁的资本谁受益,谁投资谁受 2019-11-05
  • 加强党对意识形态工作的领导 2019-11-05
  • 纠正事业企业化腐败,中国事业要对中国人民生存风险无偿负起责任来。 2019-10-21
  • 商务部公布对原产美国等进口氢碘酸反倾销调查初裁 2019-10-16
  • 跟上孩子的“数字步伐” 2019-10-15
  • 夏天多吃点凉拌菜 这么吃,让你胃口大开! 2019-10-15
  • 山东高考评卷过半 作文严格执行双、三及四评制 2019-09-08
  • 网络餐饮新规明年起实施 鼓励外卖使用安全无害包装盒 2019-08-09
  • 农历五月初二 唐代高僧雪峰义存禅师圆寂纪念日 2019-08-02
  • 虾仁怎么炒最下饭?答案是宫保-美食资讯 2019-08-02
  • 青海学习十九大精神嵌套页面--青海频道--人民网 2019-06-01
  • 天津举办改善营商环境专题讲座 2019-05-29
  • 法国红酒法国思慕干红葡萄酒750ml【价格 品牌 图片 评论】 2019-05-29
  • 就在今晚!提前预测世界杯开幕式会请几个超模! 2019-05-13
  • 气排球发球手势图解法: RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    排球比赛多少局 www.mykud.com 作者: 时间: 2018-11-28 分类: 技术文章 | 0条评论 |

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息,这个广播模式场景,适用于分布式服务器更新缓存或配置等场景。代码实现上其实很简单,就是在消费端添加:

     Java Code By www.mykud.com
    1
    consumer.setMessageModel(MessageModel.BROADCASTING);


    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package org.hope.lee.producer;



    import com.alibaba.rocketmq.client.exception.MQBrokerException;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

    import com.alibaba.rocketmq.client.producer.SendCallback;

    import com.alibaba.rocketmq.client.producer.SendResult;

    import com.alibaba.rocketmq.common.message.Message;

    import com.alibaba.rocketmq.common.message.MessageQueue;

    import com.alibaba.rocketmq.remoting.exception.RemotingException;



    public class ProducerBroadCast {

        
    public static void main(String[] args) {

            DefaultMQProducer producer = 
    new DefaultMQProducer(“push_consumer”);

            producer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    try {

                
    // 设置实例名称

                producer.setInstanceName(“producer_broadcast”);

                
    // 设置重试次数

                producer.setRetryTimesWhenSendFailed(3);

                
    // 开启生产者

                producer.start();

                
    // 创建一条消息

                Message msg = new Message(“topic_broadcast”“TagA”“OrderID0034”“message_broadcast_test”.getBytes());

                SendResult send = producer.send(msg);

                System.out.println(
    “id:—>” + send.getMsgId() + “,result:—>” + send.getSendStatus());

                

            } 
    catch (MQClientException e) {

                e.printStackTrace();

            } 
    catch (RemotingException e) {

                e.printStackTrace();

            } 
    catch (MQBrokerException e) {

                e.printStackTrace();

            } 
    catch (InterruptedException e) {

                e.printStackTrace();

            } 

            producer.shutdown();

        }

    }


    Consumer端:

     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember1 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer1 Started.”);



        }

    }

    class MqBroadCastListener implements MessageListenerConcurrently{

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

            
    try {

                MessageExt msg = msgs.get(
    0);

                
    String msgBody = new String(msg.getBody(), “utf-8”);

                System.out.println(
    “msgBody:” + msgBody);

            } 
    catch(Exception e) {

                e.printStackTrace();

                
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

            }

            
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        }

        

    }


     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember2 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer2 Started.”);



        }

    }


    结果:

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    吾乐吧软件站提醒大家:

    上面的代码是转载的,下面才是吾乐吧要说的重点,小编按照上面代码整合到自己的项目之后发现了几个大坑,一直没跑起来,现在分享下解决方法:

    1、上面的Lisener部分,可以改成这样的写法:

     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    //设置一个Listener,主要进行消息的逻辑处理  

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override  

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  

                                                        ConsumeConcurrentlyContext context) {  

            
    for (MessageExt messageExt : msgs) {    

               
    String messageBody = new String(messageExt.getBody());   

                System.out.println(
    new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(

                     
    new Date())+“消费响应:msgId : “ + messageExt.getMsgId() + “,  msgBody : “ + messageBody);//输出消息内容    

            }    

              

            
    //返回消费状态  

            //CONSUME_SUCCESS 消费成功  

            //RECONSUME_LATER 消费失败,需要稍后重新消费  

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

        }  

    }); 


    2、RocketMQ的消费者一直提示“readLocalOffset Exception, maybe fastjson version too low”的解决方法:

    一开始,还以为是fastjson版本不正确,换了最新版也是不行,后面调试进去mq代码才发现,广播模式会在本地生成一个一些文件,然后里面的文件出问题了(内容为空,然后强制转JSON。。。你懂的。。。),你把里面的东西删除,就可以正常了,删除以下2个文件:
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json.bak

    3、广播模式下,RocketMQ不会更新已消费的状态,依然是 NOT_ONLINE 状态。

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    所以如果你发现没有变成CONSUMED状态,完全不用担心,详情请看这里: https://github.com/apache/rocketmq/issues/296#issuecomment-384849461

    All by flydoos 2018-11-28

    本文采用 CC协议 发布,转载请注明:转载自 排球比赛多少局

    本文链接地址://www.mykud.com/?p=29968

    发表评论

    ?
    微软MSDN资源免费订阅,MSDN 我告诉你
  • 从学生到士兵:携笔从戎的青春不后悔 2019-11-10
  • 计划经济不是计划两字的意思。计划经济是按照公有经济规律进行搞经济,其核心价值是强大公有制,实现公有(共有)资本在其所有者们进行分配。即谁的资本谁受益,谁投资谁受 2019-11-05
  • 加强党对意识形态工作的领导 2019-11-05
  • 纠正事业企业化腐败,中国事业要对中国人民生存风险无偿负起责任来。 2019-10-21
  • 商务部公布对原产美国等进口氢碘酸反倾销调查初裁 2019-10-16
  • 跟上孩子的“数字步伐” 2019-10-15
  • 夏天多吃点凉拌菜 这么吃,让你胃口大开! 2019-10-15
  • 山东高考评卷过半 作文严格执行双、三及四评制 2019-09-08
  • 网络餐饮新规明年起实施 鼓励外卖使用安全无害包装盒 2019-08-09
  • 农历五月初二 唐代高僧雪峰义存禅师圆寂纪念日 2019-08-02
  • 虾仁怎么炒最下饭?答案是宫保-美食资讯 2019-08-02
  • 青海学习十九大精神嵌套页面--青海频道--人民网 2019-06-01
  • 天津举办改善营商环境专题讲座 2019-05-29
  • 法国红酒法国思慕干红葡萄酒750ml【价格 品牌 图片 评论】 2019-05-29
  • 就在今晚!提前预测世界杯开幕式会请几个超模! 2019-05-13
  • 福建快三选号技巧 时时彩后三包一胆技巧 一级日本黄色片 股票行情实时查询002027 万国娱乐手机版 贵州11选5开奖查询 线上赚钱的方式有哪些 时彩官网 老11选5开奖公告 学术会议服务怎么赚钱 四川时时网 吉林十一选五任三技巧 交易员在熊市赚钱 吉林时时网上购买火车票 吉林11选5计划 淘宝快3技巧稳赚