• 就在今晚!提前预测世界杯开幕式会请几个超模! 2019-05-13
  • 来自报告里的大白话,让你百听不厌 2019-05-07
  • 全国取得专利代理人资格人数达3.7万 2019-05-07
  • VAR技术再抢镜 瑞典队1-0点杀韩国 2019-05-02
  • 井助国:我参与了审判日本战犯 2019-04-28
  • 昌都市图书馆首个全民阅读推广项目结题 2019-04-28
  • “一带一路”倡议下的跨境创业故事 2019-04-24
  • 回复@老老保老张工:计划不要批准?那不还是你自己做主?有必要走形式么? 2019-04-14
  • 端午节当天广州地铁延长1小时收车 2019-04-14
  • 【改革印记——看中国发展】从扫盲到留守儿童关爱:暑期社会实践的变迁 2019-04-10
  • 明星待遇!俄罗斯神猫预测2018世界杯揭幕战胜者 2019-04-09
  • 莫斯科1辆出租车冲进人群 造成7位墨西哥球迷受伤 2019-04-03
  • 神山冈仁波齐的转山之路文章中国国家地理网 2019-04-02
  • 惊艳卢浮宫小牛电动发布新款电动车惊艳卢浮宫小牛电动发布新款电动车-手机行情 2019-03-26
  • 《朱熹〈诗经〉解释学研究》简介 2019-03-26
  • 排球计分规则: RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    排球比赛多少局 www.mykud.com 作者: 时间: 2018-11-28 分类: 技术文章 | 0条评论 |

    Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息,这个广播模式场景,适用于分布式服务器更新缓存或配置等场景。代码实现上其实很简单,就是在消费端添加:

     Java Code By www.mykud.com
    1
    consumer.setMessageModel(MessageModel.BROADCASTING);


    就可以了。我们看实验步骤:

    一、启动ConsumerBroadCastMember1

    二、启动ConsumerBroadCastMember2

    三、运行ProducerBraodCast

    四、我们可以看到两个Consumer都收到了同样的消息。

    Producer端:

     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package org.hope.lee.producer;



    import com.alibaba.rocketmq.client.exception.MQBrokerException;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

    import com.alibaba.rocketmq.client.producer.SendCallback;

    import com.alibaba.rocketmq.client.producer.SendResult;

    import com.alibaba.rocketmq.common.message.Message;

    import com.alibaba.rocketmq.common.message.MessageQueue;

    import com.alibaba.rocketmq.remoting.exception.RemotingException;



    public class ProducerBroadCast {

        
    public static void main(String[] args) {

            DefaultMQProducer producer = 
    new DefaultMQProducer(“push_consumer”);

            producer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    try {

                
    // 设置实例名称

                producer.setInstanceName(“producer_broadcast”);

                
    // 设置重试次数

                producer.setRetryTimesWhenSendFailed(3);

                
    // 开启生产者

                producer.start();

                
    // 创建一条消息

                Message msg = new Message(“topic_broadcast”“TagA”“OrderID0034”“message_broadcast_test”.getBytes());

                SendResult send = producer.send(msg);

                System.out.println(
    “id:—>” + send.getMsgId() + “,result:—>” + send.getSendStatus());

                

            } 
    catch (MQClientException e) {

                e.printStackTrace();

            } 
    catch (RemotingException e) {

                e.printStackTrace();

            } 
    catch (MQBrokerException e) {

                e.printStackTrace();

            } 
    catch (InterruptedException e) {

                e.printStackTrace();

            } 

            producer.shutdown();

        }

    }


    Consumer端:

     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember1 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer1 Started.”);



        }

    }

    class MqBroadCastListener implements MessageListenerConcurrently{

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

            
    try {

                MessageExt msg = msgs.get(
    0);

                
    String msgBody = new String(msg.getBody(), “utf-8”);

                System.out.println(
    “msgBody:” + msgBody);

            } 
    catch(Exception e) {

                e.printStackTrace();

                
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

            }

            
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        }

        

    }


     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package org.hope.lee.consumer;



    import java.util.List;



    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

    import com.alibaba.rocketmq.common.message.MessageExt;

    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;



    public class ConsumerBroadCastMember2 {

        
    public static void main(String[] args) throws MQClientException {

            DefaultMQPushConsumer consumer = 
    new DefaultMQPushConsumer(“consumer_broadcast”);

            consumer.setNamesrvAddr(
    “192.168.31.176:9876;192.168.31.165:9876”);

            
    // 批量消费,每次拉取10条

            consumer.setConsumeMessageBatchMaxSize(10);

            
    //设置广播消费

            consumer.setMessageModel(MessageModel.BROADCASTING);

            
    //设置集群消费

    //        consumer.setMessageModel(MessageModel.CLUSTERING);

            // 如果非第一次启动,那么按照上次消费的位置继续消费

            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            
    // 订阅PushTopic下Tag为push的消息

            consumer.subscribe(“topic_broadcast”“TagA || Tag B || Tage C”);

            consumer.registerMessageListener(
    new MqBroadCastListener());

            consumer.start();

            System.out.println(
    “Consumer2 Started.”);



        }

    }


    结果:

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    吾乐吧软件站提醒大家:

    上面的代码是转载的,下面才是吾乐吧要说的重点,小编按照上面代码整合到自己的项目之后发现了几个大坑,一直没跑起来,现在分享下解决方法:

    1、上面的Lisener部分,可以改成这样的写法:

     Java Code By www.mykud.com
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    //设置一个Listener,主要进行消息的逻辑处理  

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override  

        
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  

                                                        ConsumeConcurrentlyContext context) {  

            
    for (MessageExt messageExt : msgs) {    

               
    String messageBody = new String(messageExt.getBody());   

                System.out.println(
    new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(

                     
    new Date())+“消费响应:msgId : “ + messageExt.getMsgId() + “,  msgBody : “ + messageBody);//输出消息内容    

            }    

              

            
    //返回消费状态  

            //CONSUME_SUCCESS 消费成功  

            //RECONSUME_LATER 消费失败,需要稍后重新消费  

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

        }  

    }); 


    2、RocketMQ的消费者一直提示“readLocalOffset Exception, maybe fastjson version too low”的解决方法:

    一开始,还以为是fastjson版本不正确,换了最新版也是不行,后面调试进去mq代码才发现,广播模式会在本地生成一个一些文件,然后里面的文件出问题了(内容为空,然后强制转JSON。。。你懂的。。。),你把里面的东西删除,就可以正常了,删除以下2个文件:
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json
    C:\Users\改成你的用户名\.rocketmq_offsets\XXX\XXX\offsets.json.bak

    3、广播模式下,RocketMQ不会更新已消费的状态,依然是 NOT_ONLINE 状态。

    RocketMQ 广播模式消费(适用于分布式服务器更新缓存或配置等场景)

    所以如果你发现没有变成CONSUMED状态,完全不用担心,详情请看这里: https://github.com/apache/rocketmq/issues/296#issuecomment-384849461

    All by flydoos 2018-11-28

    本文采用 CC协议 发布,转载请注明:转载自 排球比赛多少局

    本文链接地址://www.mykud.com/?p=29968

    发表评论

    ?
    微软MSDN资源免费订阅,MSDN 我告诉你
  • 就在今晚!提前预测世界杯开幕式会请几个超模! 2019-05-13
  • 来自报告里的大白话,让你百听不厌 2019-05-07
  • 全国取得专利代理人资格人数达3.7万 2019-05-07
  • VAR技术再抢镜 瑞典队1-0点杀韩国 2019-05-02
  • 井助国:我参与了审判日本战犯 2019-04-28
  • 昌都市图书馆首个全民阅读推广项目结题 2019-04-28
  • “一带一路”倡议下的跨境创业故事 2019-04-24
  • 回复@老老保老张工:计划不要批准?那不还是你自己做主?有必要走形式么? 2019-04-14
  • 端午节当天广州地铁延长1小时收车 2019-04-14
  • 【改革印记——看中国发展】从扫盲到留守儿童关爱:暑期社会实践的变迁 2019-04-10
  • 明星待遇!俄罗斯神猫预测2018世界杯揭幕战胜者 2019-04-09
  • 莫斯科1辆出租车冲进人群 造成7位墨西哥球迷受伤 2019-04-03
  • 神山冈仁波齐的转山之路文章中国国家地理网 2019-04-02
  • 惊艳卢浮宫小牛电动发布新款电动车惊艳卢浮宫小牛电动发布新款电动车-手机行情 2019-03-26
  • 《朱熹〈诗经〉解释学研究》简介 2019-03-26
  • 北京赛车pk10怎么分大小 足彩胜负彩18102期预测 2018海南环岛赛海口赛段路线图 麻将怎么打 重庆时时彩是官方的么 天津时时彩最快开奖网 重庆时时彩开奖直播 三d走势图 北京快乐8任一计划 快乐8官网 幸运28预测 金冠娱乐城 北京pk10八码2-9名算法 内蒙古时时彩形态走 河北时时彩网 五分彩群