IDEA调试Apache RocketMQ源码

starlin 1,182 2019-09-27

最近打算看看RocketMQ的源码,首当其冲的肯定是搭建RocketMQ的调试环境,网上看了很多介绍,都是简单的介绍(大佬们可能觉得太简单了吧),于是乎就有了这一篇文章了

下载

点击此处跳转至github RocketMQ项目,利用git clone下来后,通过IDEA导入即可,如下图:

导入idea

准备环境

在D盘再建立一个rocketmq-env的文件夹(我的目录是D:\IDEAWorkSpace\private\rocketmq-env),将源码文件夹中的distribution/conf/broker.conf、distribution/conf/logback_broker.xml、distribution/conf/logback_namesrv.xml复制到D:\IDEAWorkSpace\private\rocketmq-env目录下
修改broker.conf,配置如下:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#存储路径
storePathRootDir=D:\\IDEAWorkSpace\\private\\rocketmq-env\\stores
#commitLog存储路径
storePathCommitLog=D:\\IDEAWorkSpace\\private\\rocketmq-env\\stores\\commitlog
#消费队列存储路径
storePathConsumeQueue=D:\\IDEAWorkSpace\\private\\rocketmq-env\\stores\\consumequeue
#消息索引存储路径
storePathIndex=D:\\IDEAWorkSpace\\private\\rocketmq-env\\stores\\index
#checkpoint文件存储路径
storeCheckpoint=D:\\IDEAWorkSpace\\private\\rocketmq-env\\stores\\checkpoint
#abort文件存储路径
abortFile=D:\\IDEAWorkSpace\\private\\rocketmq-env\\stores\\abort

启动NameServer

找到rocketmq\namesrv\NamesrvStartup.java,执行main函数即可,执行之前需要修改一下配置,增加Environment variables(ROCKETMQ_HOME=D:\IDEAWorkSpace\private\rocketmq-env),如下图所示:

nameserver

启动成功后,控制台输出如下:

The Name Server boot success. serializeType=JSON

启动Broker

找到 rocketmq\broker\BrokerStartup.java,执行main函数即可。执行之前先修改一下配置文件,同样的也要先增加下Environment variables(ROCKETMQ_HOME=D:\IDEAWorkSpace\private\rocketmq-env),若不配置,启动会报错找不到环境变量。还要修改Program arguments(-c D:\IDEAWorkSpace\private\rocketmq-env\conf\broker.conf),如下所示:

broker

启动成功后,控制台输出如下:

The broker[broker-a, 10.50.4.105:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

启动Producer

找到 example\quickstart\Producer.java ,修改 nameserver 的地址

producer.setNamesrvAddr("127.0.0.1:9876");

如果启动后报如下错误,则表示磁盘空间不够用,但是磁盘空间明明还剩余5G多,怎么会提示空间不够了

Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.94 CQ:  0.94 INDEX:  0.94, maybe your broker machine memory too small.
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:671)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:467)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:449)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:403)

查看源码DefaultMessageStore类后发现原来是按照使用磁盘比率来计算的,默认为75%,而我的磁盘使用率已经94%了,所以才会出现次错误提示,解决办法有2个:

  1. 清理磁盘空间
  2. 在broker.conf中配置属性diskMaxUsedSpaceRatio=95(源码中最大也只能设置为95,这种方式我没有测试过)

启动成功后,控制台会输出:

SendResult [sendStatus=SEND_OK, msgId=0A3204690D4818B4AAC28947C57803E2, offsetMsgId=0A32046900002A9F000000000006136E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=553]
SendResult [sendStatus=SEND_OK, msgId=0A3204690D4818B4AAC28947C57903E3, offsetMsgId=0A32046900002A9F0000000000061422, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=553]
SendResult [sendStatus=SEND_OK, msgId=0A3204690D4818B4AAC28947C57B03E4, offsetMsgId=0A32046900002A9F00000000000614D6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=554]
SendResult [sendStatus=SEND_OK, msgId=0A3204690D4818B4AAC28947C57C03E5, offsetMsgId=0A32046900002A9F000000000006158A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=554]
SendResult [sendStatus=SEND_OK, msgId=0A3204690D4818B4AAC28947C57E03E6, offsetMsgId=0A32046900002A9F000000000006163E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=554]
SendResult [sendStatus=SEND_OK, msgId=0A3204690D4818B4AAC28947C57F03E7, offsetMsgId=0A32046900002A9F00000000000616F2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=554]
15:46:22.234 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[10.50.4.105:10911] result: true
15:46:22.336 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true

启动Consumer

找到 example\quickstart\Consumer.java ,修改 nameserver 的地址

consumer.setNamesrvAddr("127.0.0.1:9876");

启动成功后,控制台如下:

ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=553, sysFlag=0, bornTimestamp=1569570382201, bornHost=/10.50.4.105:1213, storeTimestamp=1569570382202, storeHost=/10.50.4.105:10911, msgId=0A32046900002A9F0000000000061422, commitLogOffset=398370, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=554, CONSUME_START_TIME=1569570382203, UNIQ_KEY=0A3204690D4818B4AAC28947C57903E3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 53], transactionId='null'}]] 
ConsumeMessageThread_11 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=554, sysFlag=0, bornTimestamp=1569570382203, bornHost=/10.50.4.105:1213, storeTimestamp=1569570382204, storeHost=/10.50.4.105:10911, msgId=0A32046900002A9F00000000000614D6, commitLogOffset=398550, bodyCRC=917938826, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=555, CONSUME_START_TIME=1569570382206, UNIQ_KEY=0A3204690D4818B4AAC28947C57B03E4, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 54], transactionId='null'}]] 
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=554, sysFlag=0, bornTimestamp=1569570382204, bornHost=/10.50.4.105:1213, storeTimestamp=1569570382205, storeHost=/10.50.4.105:10911, msgId=0A32046900002A9F000000000006158A, commitLogOffset=398730, bodyCRC=1102156316, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=555, CONSUME_START_TIME=1569570382208, UNIQ_KEY=0A3204690D4818B4AAC28947C57C03E5, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 55], transactionId='null'}]] 
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=1, storeSize=180, queueOffset=554, sysFlag=0, bornTimestamp=1569570382206, bornHost=/10.50.4.105:1213, storeTimestamp=1569570382206, storeHost=/10.50.4.105:10911, msgId=0A32046900002A9F000000000006163E, commitLogOffset=398910, bodyCRC=1359908749, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=555, CONSUME_START_TIME=1569570382209, UNIQ_KEY=0A3204690D4818B4AAC28947C57E03E6, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 56], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=554, sysFlag=0, bornTimestamp=1569570382207, bornHost=/10.50.4.105:1213, storeTimestamp=1569570382208, storeHost=/10.50.4.105:10911, msgId=0A32046900002A9F00000000000616F2, commitLogOffset=399090, bodyCRC=638172955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=555, CONSUME_START_TIME=1569570382212, UNIQ_KEY=0A3204690D4818B4AAC28947C57F03E7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 57], transactionId='null'}]] 

总结

  1. 启动 NameServer 和 Broker 要配置 ROCKETMQ_HOME 变量
  2. 注意是否开启自动创建Topic的配置,如果没有开启,需要主动创建Topic
  3. 配置文件(broker.conf,日志文件输出目录)的修改

end,感谢阅读!


# RocketMQ