启动broker时加上autoCreateTopicEnable=true, Windows下的命令

start mqbroker.cmd -n 192.168.3.70:9876 autoCreateTopicEnable=true

查看 broker 的配置参数
mqbroker.cmd -m

rocketmq服务端的版本和客户端的版本需要保持一致

在这里插入图片描述
在这里插入图片描述

前台页面监听心跳

INFO closeChannel: close the connection to remote address[127.0.01:9876] result: true
INFO closeChannel: close the connection to remote address[192.168.0.144:10911] result: true
192.168.0.144问题就是出在上面,
解决方式,用解压缩方式打开懒人rocketmq-console-ng-1.0.1包
修改
BOOT-INFclassesapplication.properties
将localhost:9876 改成 cmd下 ipconfig 对应自己的本地ip地址
保存,并重新启动
再次查看启动日志
INFO closeChannel: close the connection to remote address[192.168.0.144:9876] result: true
INFO closeChannel: close the connection to remote address[192.168.0.144:10911] result: true
再次请求接口返回200成功

rocketmq:
name-server: 192.168.0.144:9876

手动创建Topic

topic是broker中的一个个分组,如果设置topic自动创建的话如果创建多了,可能会影响电脑的运行速度,所以我们来手动创建。

在命令行中创建Topic时应使用updateTopic命令,其使用示例如下:
mqadmin.cmd updateTopic -n localhost:9876 -b localhost:10911 -t tx-mq-TOPIC

各参数含义如下:

mqadmin updateTopic [-b ] [-c ] [-h] [-n ] [-o ] [-p ] [-r ] [-s ]
-t [-u ] [-w ]
-b,–brokerAddr create topic to which broker
-c,–clusterName create topic to which cluster
-h,–help Print help

-n,–namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,–order set topic’s order(true|false)
-p,–perm set topic’s permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,–readQueueNums set read queue nums
-s,–hasUnitSub has unit sub (true|false)
-t,–topic topic name
-u,–unit is unit topic (true|false)
-w,–writeQueueNums set write queue nums

注意事项及存在的问题:
RocketMQ规定,在使用updateTopic命令创建topic时,-b或-c选项必须指定其中一个(都指定则处理-b参数,忽略-c参数),与此同时,-t参数也为必要参数,缺少这几个必要参数则topic创建失败!!!

使用Java代码手动创建topic的形式中,使用-b选项创建topic可以正常使用,此时RocketMQ直接使用指定的broker地址来找到broker并在对应broker上创建topic。

使用Java代码手动创建topic的形式中,直接使用-c选项创建topic无法创建成功,因为org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute方法需要一个DefaultMQAdminExt对象来连接到对应nameserver上以便获取对应集群下的所有broker信息,
DefaultMQAdminExt对象针对nameserver的处理代码为:private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses(),查看代码发现其实际实现为System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV))。此时问题出现了,这里的实现代码并不能get到namesrv地址的参数值,导致连接到namesrv失败,完整报错信息如下:

点击查看完整报错信息

org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed
at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:185)
at com.bayss.bws.common.utils.RocketMQUtil.createTopic(RocketMQUtil.java:54)
at com.bayss.bws.agent.internal.ConsumerManager.init(ConsumerManager.java:78)
at com.bayss.bws.agent.core.InitializeAgent.init(InitializeAgent.java:74)
at com.bayss.bws.agent.core.InitializeAgent.onApplicationEvent(InitializeAgent.java:184)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:402)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:359)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:896)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.bayss.bws.agent.core.BwsAgentApplication.main(BwsAgentApplication.java:24)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:392)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1193)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222)
at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83)
at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:158)
… 17 more

于是决定采用手动set属性值的方式(对应属性值设值完成后不再需要-n参数即可创建topic),手动set属性值的代码如下:

@Configuration
public class RocketMQConfig implements InitializingBean {
// 必须保证这里能获取到正确的namesrv地址,否则再次gg
@Value("${rocketmq.name-server}")
private String rocketMQNamesrv;
@Override
public void afterPropertiesSet() throws Exception {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, rocketMQNamesrv);
}
}
在命令行中使用updateTopic命令帮助时,有-n选项,但是在RocketMQ源码中并没有发现该选项的处理逻辑,并且在只设置-n(namesrv)时程序会报错(因为缺少第一条所说的-b或-c参数)。
使用示例:
String[] subargs = new String[] {
“-b 10.1.4.231:10911”,
“-t unit-test-from-java-111”,
//"-r 8",
//"-w 8",
//"-p 6",
//"-o false",
//"-u false",
//"-s false"
};
boolean isTopicCreated = RocketMQUtil.createTopic(subargs);
//boolean isTopicCreated = RocketMQUtil.createTopic(“10.1.4.231:10911”, “”, “testttttt”);
if (isTopicCreated) {
System.err.println(“topic create success”);
}
Java工具类

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**

  • fixme 类中的方法不确定能否支持多namesrv的情况!

  • @author Zephyr

  • @date 2019/12/21.
    */
    public class RocketMQUtil {

    /**

    • 创建topic 可以自定义所有topic支持的参数

    • @param subargs updateTopic命名支持的所有参数选项

    • @return topic创建成功,返回 true

    • @throws SubCommandException
      */
      public static boolean createTopic(String[] subargs) throws SubCommandException {
      /String[] subargs = new String[] {
      “-b 10.1.4.231:10911”,
      “-t unit-test-from-java-1”,
      “-r 8”,
      “-w 8”,
      “-p 6”,
      “-o false”,
      “-u false”,
      “-s false”};
      /
      UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
      Options options = ServerUtil.buildCommandlineOptions(new Options());
      final Options updateTopicOptions = cmd.buildCommandlineOptions(options);
      final CommandLine commandLine = ServerUtil
      .parseCmdLine("mqadmin " cmd.commandName(),
      subargs, updateTopicOptions, new PosixParser());

      cmd.execute(commandLine, updateTopicOptions, null);
      return true;
      }

    /**

    • 根据 brokerAddr or clusterName 创建topic
    • @param brokerAddr 在指定 broker 上创建topic时,此参数为必填,否则传null
    • @param clusterName 在指定 cluster 上创建topic时,此参数为必填,否则传null
    • @param topic 要创建的topic
    • @return 创建成功,返回true
      */
      public static boolean createTopic(String brokerAddr, String clusterName, String topic) throws Exception {
      if (StringUtils.isBlank(topic)) {
      return false;
      }
      List argList = new LinkedList<>();
      argList.add("-t " topic);
      if (StringUtils.isNotBlank(brokerAddr)) {
      argList.add("-b " brokerAddr.trim());
      } else {
      argList.add("-c " clusterName.trim());
      }
      return createTopic(argList.toArray(new String [0]));
      }

    /**

    • 在指定name server下使用默认参数创建topic
    • @param namesrvAddr
    • @param topic
    • @return
      */
      public static boolean createTopic(String namesrvAddr, String topic) {
      try {
      Set clusterNames = RocketMQUtil.getClusterNames(namesrvAddr);
      for (String clusterName : clusterNames) {
      RocketMQUtil.createTopic(null, clusterName, topic);
      }
      return true;
      } catch (Exception e) {
      e.printStackTrace();
      return false;
      }
      }

    /**

    • 获取指定 namesrv下的集群信息

    • @param namesrvAddr

    • @return

    • @throws MQClientException

    • @throws InterruptedException

    • @throws MQBrokerException

    • @throws RemotingTimeoutException

    • @throws RemotingSendRequestException

    • @throws RemotingConnectException
      */
      public static ClusterInfo getClusterInfo(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
      if (StringUtils.isBlank(namesrvAddr)) {
      return new ClusterInfo();
      }
      DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(5000L);
      mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
      mqAdminExt.setNamesrvAddr(namesrvAddr);
      mqAdminExt.start();

      ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
      mqAdminExt.shutdown();
      return clusterInfo;
      }

    /**

    • 获取指定name server下的所有集群名称
    • @param namesrvAddr
    • @return
    • @throws MQClientException
    • @throws InterruptedException
    • @throws MQBrokerException
    • @throws RemotingTimeoutException
    • @throws RemotingSendRequestException
    • @throws RemotingConnectException
      */
      public static Set getClusterNames(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
      return getClusterInfo(namesrvAddr).getClusterAddrTable().keySet();
      }

    /**

    • 获取指定 namesrv 下的所有broker信息(多name server下不确定能否正常工作)
    • @param namesrvAddr namesrv地址
    • @return HashMap<String, BrokerData>
      */
      public static Map<String, BrokerData> getAllBrokerInfo(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
      return getClusterInfo(namesrvAddr).getBrokerAddrTable();
      }

    /**

    • 获取连接到指定 namesrv 下的所有broker地址
    • @param namesrvAddr
    • @return
      */
      public static Set getBrokerAddrs(String namesrvAddr) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
      Map<String, BrokerData> allBrokerInfo = getAllBrokerInfo(namesrvAddr);
      Set brokerAddrs = new HashSet<>();
      for (BrokerData brokerData : allBrokerInfo.values()) {
      brokerAddrs.addAll(brokerData.getBrokerAddrs().values());
      }
      return brokerAddrs;
      }
      }

 
import org.apache.rocketmq.client.producer.DefaultMQProducer;
 
public class TopicDemo {
    public static void main(String[] args) throws Exception {
        //分组名haoke这个可以任意设置
        DefaultMQProducer producer = new DefaultMQProducer("haoke");
 
        //设置nameserver的地址
        producer.setNamesrvAddr("192.168.62.132:9876");
 
        //启动生产者
        producer.start();
 
        /**
         * 创建topic,参数分别是:borker的名称,topic的名称,queue的数量
         * broker要和虚拟机broker.conf配置文件中brokername的名字一致
         * newTopic的名字随便起,queueNum8的意思是新建的消息队列数为8个
         */
        producer.createTopic("broker_haoke_im","my-topic",8);
        System.out.println("topic创建成功!");
        producer.shutdown();
    }
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐