rocketmq 查看topic消息列表(RocketMQ源码分析三之删除Topic)
rocketmq 查看topic消息列表(RocketMQ源码分析三之删除Topic)-hcluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)说明-c是
上一章中。我们分析了 RocketMQ 创建 Topic 的命令过程。本章,我们开始分析 删除 Topic 的过程。
命令参数用法:sh mqadmin deleteTopic -n 192.168.1.100:9876 
指令:deleteTopic 
代码入口:org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand
| 
     参数  | 
     是否必填  | 
     说明  | 
| 
     -c  | 
     是  | 
     cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)  | 
| 
     -h  | 
     否  | 
     打印帮助  | 
| 
     -n  | 
     是  | 
     nameserve 服务地址列表,格式ip:port;ip:port;…  | 
| 
     -t  | 
     是  | 
     Topic 名字  | 
// RocketMQ 配置了 命令行的执行 shell 脚本入口。就是下面的 mqadmin.sh 这个文件
mqadmin.sh
// 解析命令行入口
org.apache.rocketmq.tools.command.MQAdminStartup#main0
// 设置 namesrvAddr 为全局变量。
if (commandLine.hasOption('n')) {
    String namesrvAddr = commandLine.getOptionValue('n');
    System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY  namesrvAddr);
}
整体流程
    

RocketMQ 删除 Topic 大概流程
Broker 删除无用Topic代码片段@Override
    public int cleanUnusedTopic(Set<String> topics) {
        Iterator<Entry<String  ConcurrentMap<Integer  ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String  ConcurrentMap<Integer  ConsumeQueue>> next = it.next();
            String topic = next.getKey();
            // Topic可以删除
            if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                ConcurrentMap<Integer  ConsumeQueue> queueTable = next.getValue();
                for (ConsumeQueue cq : queueTable.values()) {
                    // 消费者队列销毁,mapped 文件删除
                    cq.destroy();
                    log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned" 
                        cq.getTopic() 
                        cq.getQueueId()
                    );
                    this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic()  cq.getQueueId());
                }
                it.remove();
                log.info("cleanUnusedTopic: {} topic destroyed"  topic);
            }
        }
        return 0;
    }
Consume Queue 关闭&删除 
    
在 RocketMQ 的基础概念的一章中提到。Topic 是有 N 个queue,而且,每个 Consume queue 都是独立的一个 MappFile 文件。Consumer queue 就相当于 消息在 commitLog 的二级索引。
public void destroy() {
        this.maxPhysicOffset = -1;
        this.minLogicOffset = 0;
        this.mappedFileQueue.destroy();
        if (isExtReadEnable()) {
            this.consumeQueueExt.destroy();
        }
    }
public void destroy() {
        for (MappedFile mf : this.mappedFiles) {
            mf.destroy(1000 * 3);
        }
        // 移除所有的元素,让垃圾回收器回收垃圾
        this.mappedFiles.clear();
        this.flushedWhere = 0;
        // delete parent directory
        File file = new File(storePath);
        if (file.isDirectory()) {
            file.delete();
        }
    }
总结
- RocketMQ 删除 Topic 的时候,先删除 Broker ,在删除 Name Server 的 Topic 路由信息。
 - RocketMQ 中的 Topic 是一个逻辑概念,但是 queue 是一个实打实的。具体体现在Consum queue。
 - 在生产环境中,做好监控,切完别胡乱删除Topic。一般删除 Topic 先禁止写权限,在禁止读权限,观察一段时间后,再删除。
 
关于我
前 去哪儿网 技术专家!混迹中间件职场8 年!分享各种Java中间件知识!




