博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊rocketmq的KVConfigManager
阅读量:6914 次
发布时间:2019-06-27

本文共 8587 字,大约阅读时间需要 28 分钟。

本文主要研究一下rocketmq的KVConfigManager

KVConfigManager

org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java

public class KVConfigManager {    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);    private final NamesrvController namesrvController;    private final ReadWriteLock lock = new ReentrantReadWriteLock();    private final HashMap
> configTable = new HashMap
>(); public KVConfigManager(NamesrvController namesrvController) { this.namesrvController = namesrvController; } public void load() { String content = null; try { content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); } catch (IOException e) { log.warn("Load KV config table exception", e); } if (content != null) { KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); if (null != kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK"); } } } public void putKVConfig(final String namespace, final String key, final String value) { try { this.lock.writeLock().lockInterruptibly(); try { HashMap
kvTable = this.configTable.get(namespace); if (null == kvTable) { kvTable = new HashMap
(); this.configTable.put(namespace, kvTable); log.info("putKVConfig create new Namespace {}", namespace); } final String prev = kvTable.put(key, value); if (null != prev) { log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } else { log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { this.lock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("putKVConfig InterruptedException", e); } this.persist(); } public void persist() { try { this.lock.readLock().lockInterruptibly(); try { KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper(); kvConfigSerializeWrapper.setConfigTable(this.configTable); String content = kvConfigSerializeWrapper.toJson(); if (null != content) { MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath()); } } catch (IOException e) { log.error("persist kvconfig Exception, " + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e); } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("persist InterruptedException", e); } } public void deleteKVConfig(final String namespace, final String key) { try { this.lock.writeLock().lockInterruptibly(); try { HashMap
kvTable = this.configTable.get(namespace); if (null != kvTable) { String value = kvTable.remove(key); log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { this.lock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("deleteKVConfig InterruptedException", e); } this.persist(); } public byte[] getKVListByNamespace(final String namespace) { try { this.lock.readLock().lockInterruptibly(); try { HashMap
kvTable = this.configTable.get(namespace); if (null != kvTable) { KVTable table = new KVTable(); table.setTable(kvTable); return table.encode(); } } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("getKVListByNamespace InterruptedException", e); } return null; } public String getKVConfig(final String namespace, final String key) { try { this.lock.readLock().lockInterruptibly(); try { HashMap
kvTable = this.configTable.get(namespace); if (null != kvTable) { return kvTable.get(key); } } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("getKVConfig InterruptedException", e); } return null; } public void printAllPeriodically() { try { this.lock.readLock().lockInterruptibly(); try { log.info("--------------------------------------------------------"); { log.info("configTable SIZE: {}", this.configTable.size()); Iterator
>> it = this.configTable.entrySet().iterator(); while (it.hasNext()) { Entry
> next = it.next(); Iterator
> itSub = next.getValue().entrySet().iterator(); while (itSub.hasNext()) { Entry
nextSub = itSub.next(); log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(), nextSub.getValue()); } } } } finally { this.lock.readLock().unlock(); } } catch (InterruptedException e) { log.error("printAllPeriodically InterruptedException", e); } }}复制代码
  • 这里使用HashMap,然后通过ReentrantReadWriteLock进行并发控制,map的key是namespace,而value是一个HashMap
  • putKVConfig及deleteKVConfig使用的是写锁
  • persist、getKVListByNamespace、getKVConfig、printAllPeriodically使用的是读锁

MixAll.string2File

org/apache/rocketmq/common/MixAll.java

public static void string2File(final String str, final String fileName) throws IOException {        String tmpFile = fileName + ".tmp";        string2FileNotSafe(str, tmpFile);        String bakFile = fileName + ".bak";        String prevContent = file2String(fileName);        if (prevContent != null) {            string2FileNotSafe(prevContent, bakFile);        }        File file = new File(fileName);        file.delete();        file = new File(tmpFile);        file.renameTo(new File(fileName));    }    public static void string2FileNotSafe(final String str, final String fileName) throws IOException {        File file = new File(fileName);        File fileParent = file.getParentFile();        if (fileParent != null) {            fileParent.mkdirs();        }        FileWriter fileWriter = null;        try {            fileWriter = new FileWriter(file);            fileWriter.write(str);        } catch (IOException e) {            throw e;        } finally {            if (fileWriter != null) {                fileWriter.close();            }        }    }复制代码
  • 将文本内容写到指定路径的文件
  • 这里先写到.tmp文件,然后备份上一个版本的内容,在删除上一个版本的文件,最后将tmp文件重命名为正式的文件名

RemotingSerializable

org/apache/rocketmq/remoting/protocol/RemotingSerializable.java

public abstract class RemotingSerializable {    private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");    public static byte[] encode(final Object obj) {        final String json = toJson(obj, false);        if (json != null) {            return json.getBytes(CHARSET_UTF8);        }        return null;    }    public static String toJson(final Object obj, boolean prettyFormat) {        return JSON.toJSONString(obj, prettyFormat);    }    public static 
T decode(final byte[] data, Class
classOfT) { final String json = new String(data, CHARSET_UTF8); return fromJson(json, classOfT); } public static
T fromJson(String json, Class
classOfT) { return JSON.parseObject(json, classOfT); } public byte[] encode() { final String json = this.toJson(); if (json != null) { return json.getBytes(CHARSET_UTF8); } return null; } public String toJson() { return toJson(false); } public String toJson(final boolean prettyFormat) { return toJson(this, prettyFormat); }}复制代码
  • 这里toJson使用的是fastjson的方法

小结

  • rocketmq的KVConfigManager采用的是HashMap来存配置项,key为namespace,value为HashMap,存储的值采用的是String
  • 采用ReentrantReadWriteLock进行并发控制,支持序列JSON到磁盘,也支持从磁盘文件加载到内存

doc

转载地址:http://eiacl.baihongyu.com/

你可能感兴趣的文章
使用Cobbler2.4.0批量自动安装Esxi5.5
查看>>
我的友情链接
查看>>
Nagios 系统监控
查看>>
Python-w3
查看>>
解决Python开发过程中依赖库打包问题的方法
查看>>
jpeg note
查看>>
一个例子告诉你什么是CLR(JVM同理),以及版本兼容
查看>>
文章记录
查看>>
springAop
查看>>
AJAX入门学习-1:理解ajax
查看>>
ESXi中的虚拟机如何使用U盘
查看>>
把别人的Tcl/Tk代码加入到Go语言里13 游戏6 消除方块
查看>>
关于linux hrtimer高精度定时器的使用注意事项
查看>>
高清视频教程网站的搭建和分享
查看>>
Android 混淆代码总结
查看>>
Spark 二次排序
查看>>
zend studio中vim的安装
查看>>
实施微服务,我们需要哪些基础框架
查看>>
Linux 简单创建用户并指定文件夹权限
查看>>
openstack cobbler Icehouse ks 配置文件
查看>>