快捷搜索:  汽车  科技

zookeeperdubbo原理图(模拟Dubbo的zookeeper一致性Hash发现)

zookeeperdubbo原理图(模拟Dubbo的zookeeper一致性Hash发现)

接之前一篇<手写zookeeper来模拟dubbo的注册/发现> 使用一致性Hash来进行查找需要寻找的服务.

Hash处理接口

public interface HashFunc { public Long hash(Object key); }

一致性Hash类

public class ConsistentHash<T> { /** * Hash计算对象,用于自定义hash算法 */ hashFunc hashFunc; /** * 复制的节点个数 */ private int numberOfReplicas; /** * 一致性Hash环 */ private SortedMap<long T> circle = new TreeMap(); /** * 构造,使用Java默认的Hash算法 * @param numberOfReplicas 复制的节点个数,增加每个节点的复制节点有利于负载均衡 * @param nodes 节点对象 */ public ConsistentHash(int numberOfReplicas Collection<T> nodes) { this.numberOfReplicas = numberOfReplicas; this.hashFunc = new HashFunc() { public Long hash(Object key) { // return fnv1HashingAlg(key.toString()); return MD5HashingAlg(key.toString()); } }; //初始化节点 for (T node : nodes) { add(node); } } /** * 构造 * @param hashFunc hash算法对象 * @param numberOfReplicas 复制的节点个数,增加每个节点的复制节点有利于负载均衡 * @param nodes 节点对象 */ public ConsistentHash(HashFunc hashFunc int numberOfReplicas Collection<T> nodes) { this.numberOfReplicas = numberOfReplicas; this.hashFunc = hashFunc; //初始化节点 for (T node : nodes) { add(node); } } /** * 增加节点<br> * 每增加一个节点,就会在闭环上增加给定复制节点数<br> * 例如复制节点数是2,则每调用此方法一次,增加两个虚拟节点,这两个节点指向同一Node * 由于hash算法会调用node的toString方法,故按照toString去重 * * @param node 节点对象 */ public void add(T node) { for (int i = 0; i < numberOfReplicas; i ) { circle.put(hashFunc.hash(node.toString() i) node); } } /** * 移除节点的同时移除相应的虚拟节点 * * @param node 节点对象 */ public void remove(T node) { for (int i = 0; i < numberOfReplicas; i ) { circle.remove(hashFunc.hash(node.toString() i)); } } /** * 获得一个最近的顺时针节点 * * @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点 * @return 节点对象 */ public T get(Object key) { if (circle.isEmpty()) { return null; } long hash = hashFunc.hash(key); if (!circle.containsKey(hash)) { SortedMap<Long T> tailMap = circle.tailMap(hash); //返回此映射的部分视图,其键大于等于 hash hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } //正好命中 return circle.get(hash); } /** * 使用MD5算法 * @param key * @return */ private static long md5HashingAlg(String key) { MessageDigest md5 = null; try { md5 = MessageDigest.getInstance("MD5"); md5.reset(); md5.update(key.getbytes()); byte[] bKey = md5.digest(); long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8)| (long) (bKey[0] & 0xFF); return res; } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } return 0l; } /** * 使用FNV1hash算法 * @param key * @return */ private static long fnv1HashingAlg(String key) { final int p = 16777619; int hash = (int) 2166136261L; for (int i = 0; i < key.length(); i ) hash = (hash ^ key.charAt(i)) * p; hash = hash << 13; hash ^= hash >> 7; hash = hash << 3; hash ^= hash >> 17; hash = hash << 5; return hash; } }

发现代码

@Component public class ClientComsumer implements Watcher { //本地缓存服务列表 private static Map<String List<String>> servermap; @Autowired private ZookeeperServer zkServer ; private ZooKeeper zk; private ConsistentHash consistentHash; @Autowired Environment env; @PostConstruct private void init() throws IOException { String address = env.getProperty("zookeeper.address"); this.zk = zkServer.getConnection(address this); } private List<String> getNodeList(String serverName) throws KeeperException InterruptedException IOException { if (servermap == null) { servermap = new HashMap<>(); } Stat exists = null; try { String s = "/guanjian/" serverName; exists = zk.exists(s this); } catch (Exception e) { } //判断是否存在该服务 if (exists == null) return null; List<String> serverList = servermap.get(serverName); if (serverList != null && serverList.size() > 0) { //将已存在的serverList放入一致性Hash环 this.consistentHash = new ConsistentHash(serverList.size() serverList); return serverList; } List<String> children = zk.getChildren("/guanjian/" serverName this); List<String> list = new ArrayList<>(); for (String s : children) { byte[] data = zk.getData("/guanjian/" serverName "/" s this null); String datas = new String(data); NodeStat nodeStat = JSONObject.parseObject(datas NodeStat.class); if (!Status.stop.equals(nodeStat.getStatus())) { list.add(datas); } } //将list放入一致性Hash环 this.consistentHash = new ConsistentHash(list.size() list); servermap.put(serverName list); return list; } public String getServerinfo(String serverName) throws KeeperException InterruptedException IOException { try { List<String> nodeList = getNodeList(serverName); if (nodeList == null|| nodeList.size()< 1) { return null; } //这里使用得随机负载策略,如需需要自己可以实现其他得负载策略 //String snode = nodeList.get((int) (Math.random() * nodeList.size())); //从一致性Hash环的第一个服务开始查找 String snode = (String) this.consistentHash.get(nodeList.get(0)); NodeStat nodeStat = JSONObject.parseObject(snode NodeStat.class); List<String> children = zk.getChildren("/guanjian/" serverName this); //随机负载后 将随机取得节点后的状态更新为run for (String s : children) { byte[] data = zk.getData("/guanjian/" serverName "/" s this null); String datas = new String(data); if (snode.equals(datas)) { nodeStat.setStatus(Status.run); zk.setData("/guanjian/" serverName "/" s JSONObject.toJSONString(nodeStat).getBytes() 0); break; } } return JSONObject.toJSONString(nodeStat); } catch (Exception e) { e.printStackTrace(); return null; } } @Override public void process(WatchedEvent watchedEvent) { //如果服务节点数据发生变化则清空本地缓存 if (watchedEvent.getType().equals(Event.EventType.NodeChildrenChanged)) { servermap = null; } } }

zookeeperdubbo原理图(模拟Dubbo的zookeeper一致性Hash发现)(1)

猜您喜欢: