快捷搜索:  汽车  科技

12306逻辑算法(高性能分布式缓存计算框架)

12306逻辑算法(高性能分布式缓存计算框架)我们采用 3 locator 4 server 的结构。分别在各主机创建 Geode 工作目录 /opt/geode_work,并在该目录中进入 gfsh 命令行。编辑 /etc/sysctl.confnet.ipv4.tcp_syncookies = 0 将此值设置为零,将禁用 SYN Cookie。sysctl -p 重载服务器说明准备好主机:

什么是 Geode,有哪些特性

​官网定义

Apache Geode 是一个数据管理平台,可在广泛分布的云架构中提供对数据密集型应用程序的实时、一致的访问。Geode 跨多个进程汇集内存、CPU、网络资源和可选的本地磁盘,以管理应用程序对象和行为。它使用动态复制和数据分区技术来实现高可用性、改进的性能、可伸缩性和容错性。除了作为分布式数据容器之外,Geode 还是一个内存数据管理系统,可提供可靠的异步事件通知和有保证的消息传递。

主要组件概念

  • locator:locator 定位器,类似于 zk ,进行选举协调,服务发现等功能,我们的应用程序链接的是 locator 定位器
  • server:真正提供缓存服务的功能
  • region:对数据进行区域划分,类似数据库中表的概念
  • gfsh:Geode 的命令行控制台
  • client:链接 Geode 服务的客户端

Geode 特性

  • 高读写吞吐量
  • 低且可预测的延迟
  • 高可扩展性
  • 持续可用性
  • 可靠的事件通知
  • 数据存储上的并行应用程序行为
  • 无共享磁盘持久性
  • 降低拥有成本
  • 客户/服务器的单跳能力
  • 客户/服务器安全
  • 多站点数据分布
  • 连续查询
  • 异构数据共享

Geode 与 Redis

总体来说 Geode 的功能包含 Redis 的功能,但是还是有一些迥异点的。

  1. 定位不同:Geode 定位数据管理平台,强调实时一致性, Redis 高速缓存。
  2. 集群:Geode 天然支持集群,节点是对等的,Redis 集群去中心化,主从复制。
  3. 部署方式:Geode 有点对点方式、C/S 方式、WAN 多数据中心方式,而 Redis 是 C/S 主从方式、集群方式。
  4. 查询:Geode 支持 OQL 查询、函数计算、Redis KV 查询
  5. 发布订阅:Geode 支持稳定的时间订阅和连续查询, Redis 的发布订阅貌似用的并不多。
  6. 事务支持:Geode 支持的也是存内存的 ACID 事务,对落盘的事务支持也不行,Redis 支持的也是内存型事务,相对来说,ACID 更高级一些。
  7. Geode 支持 Redis 的协议模拟,有 Redis Adaper。

Geode 集群搭建——四台虚拟机

前置说明

  1. 请确保服务器事先安装 JDK 8 update > 121 的版本。
  2. 系统时间正确,可以使用 NTP 网络服务。
  3. 配置了正确的主机名。
  4. 禁用 TCP SYN cookie。大多数默认 Linux 安装,使用 SYN cookie 来保护系统免受泛滥 TCP SYN 数据包的恶意攻击,但此功能与稳定和繁忙的 Geode 集群不兼容。安全实现应该通过将 Geode 服务器集群置于高级防火墙保护之下来寻求防止攻击。

如何禁用 TCP SYN cookie:

编辑 /etc/sysctl.conf

net.ipv4.tcp_syncookies = 0

将此值设置为零,将禁用 SYN Cookie。

sysctl -p 重载

服务器说明

准备好主机:

  1. 主机(192-168-33-15):locator1 server1
  2. 主机(192-168-33-20):locator2 server2
  3. 主机(192-168-33-23):locator3 server3
  4. 主机(192-168-33-29):server4

分别在各主机创建 Geode 工作目录 /opt/geode_work,并在该目录中进入 gfsh 命令行。

我们采用 3 locator 4 server 的结构。

准备安装包

下载地址:

https://geode.apache.org/releases/

一般我们选择第一个 tgz 的包 ,版本我这里先选用 1.8 的,为了后续测试滚动升级使用。

12306逻辑算法(高性能分布式缓存计算框架)(1)

上传,解压,配置 /etc/profile

拷贝至:

cp /tmp/upload_dir/ddh/apache-geode-1.8.0.tgz /opt/apache-geode-1.8.0.tgz

tar 解压:

tar xvf apache-geode-1.8.0.tgz

删除包文件:

rm -rf apache-geode-1.8.0.tgz

当然上传目录不强制,但是我们需要使用的配置环境变量:

vim /etc/profile

最后一行加入:

export PATH=$JAVA_HOME/bin:/opt/apache-geode-1.8.0/bin:$PATH

退出使更改生效。

source /etc/profile gfsh version --full

成功会输出如下(我这是 1.9.2 输出的版本):

root@pts/0 # gfsh version --fullBuild-Date: 2019-10-15 06:08:13 -0700Build-Id: jdeppe 0Build-Java-Version: 1.8.0_221Build-Platform: Mac OS X 10.14.6 x86_64Product-Name: Apache GeodeProduct-Version: 1.9.2Source-Date: 2019-10-14 15:59:02 -0700Source-Repository: release/1.9.2Source-Revision: 63c8058f036316618b6cd78e6727106b7ac0a888Native version: native code unavailable Running on: /192.168.33.23 4 cpu(s) amd64 Linux 2.6.32-696.23.1.el6.x86_64

准备 locator 和 server 的启动脚本

locator 脚本 (以 33.15 的脚本为例子 ):

#! /bin/shbasedir=`dirname $0`echo "BASE DIR:$basedir"cd $basedirlocators=192.168.33.15[10334] 192.168.33.20[10334] 192.168.33.23[10334] locatorname=locator_33_15 gfsh start locator --name=${locatorname} --locators=${locators} --port=10334 \ --initial-heap=512M --max-heap=1G --bind-address=192.168.33.15 \ --J=-Dgemfire.max-num-reconnect-tries=100 --J=-Dgemfire.member-timeout=120000 \ --J=-Dgemfire.distributed-system-id=254 \ 2>&1 &

max-num-reconnect-tries=100 一定要设置,非常重要。

server 的启动脚本:

#! /bin/shbasedir=`dirname $0`echo "BASE DIR:$basedir"cd $basedirlocators=192.168.33.15[10334] 192.168.33.20[10334] servername=server_33_15 gfsh start server --name=${servername} --locators=${locators} --locator-wait-time=120 --server-port=40401  \ --lock-memory=true --initial-heap=2G --max-heap=2G --J=-Xmn1g --bind-address=192.168.33.15 \ --J=-Dgemfire.max-num-reconnect-tries=100  --J=-Dgemfire.member-timeout=120000 \ --J=-Dgemfire.distributed-system-id=254 \ 2>&1 &

其他的 locator、server 按照该脚本进行更改即可, 具体参数按需定制,比如端口、超时时间、Java 内存的配置。

启动

依次启动 locator、server。

确定 locator leader 是那台机器

1. 启动成功的时候观察日志:

root@pts/0 # sh start_locator_33_20.shLog File: /opt/geode_work18/locator_33_20/locator_33_20.log JVM Arguments: -Dgemfire.locators=192.168.33.15[10334] 192.168.33.20[10334] 192.168.33.23[10334] -Dgemfire.enable-cluster-configuration=true -Dgemfire.load-cluster-configuration-from-dir=false -Dgemfire.max-num-reconnect-tries=100 -Dgemfire.member-timeout=120000 -Dgemfire.distributed-system-id=254 -Xms512M -Xmx1G -XX: UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=60 -Dgemfire.launcher.registerSignalHandlers=true -Djava.awt.headless=true -Dsun.rmi.dgc.server.gcInterval=9223372036854775806Class-Path: /opt/apache-geode-1.9.2/lib/geode-core-1.9.2.jar:/opt/apache-geode-1.9.2/lib/geode-dependencies.jar//重点关注这行 Successfully connected to: JMX Manager [host=192.168.33.15 port=1099] Cluster configuration service is up and running.

我们可以看到当我们启动的时候,打印的日志会告诉我们链接到了谁。

2. connect 到 locator 后的日志:

gfsh>connect --locator=192.168.33.23[10334]Connecting to Locator at [host=192.168.33.23 port=10334] .. //请关注下边这行 Connecting to Manager at [host=192.168.33.15 port=1099] .. Successfully connected to: [host=192.168.33.15 port=1099]

我们可以看到,当我们链接 33.23 的时候,它会转到 manager 地址,这就是当前的 locator leader 了。

打开 pulse 控制台

确认了 leader 地址,我们就可以访问 Pulse 了。

http://192.168.33.15:7070/pulse

默认用户名密码 admin/admin。

先看个效果图:

12306逻辑算法(高性能分布式缓存计算框架)(2)

gfsh 常用命令介绍

gfsh 的作用:

  • 开启,停止 locator, server
  • 开启停止 gateway-sender 和 receiver
  • 创建和销毁应用程序
  • 执行函数
  • 管理磁盘存储
  • 导入导出数据
  • 监控应用

启动 gfsh

还记得我们安装的时候在环境变量中进行了配置 profile,因此我们只需要输入 gfsh 就可以调启命令行。

root@pts/0 # gfsh    _________________________     __   / _____/ ______/ ______/ /____/ /  / /  __/ /___  /_____  / _____  / / /__/ / ____/  _____/ / /    / /  /______/_/      /______/_/    /_/    1.9.2Monitor and Manage Apache Geode

注意:命令行可以 tab 键自动补全,及其方便操作。

connect

要管理 Goode 集群我们需要连接到主 locator 上, 有两种方式。

1. 直接使用 JMX 进行连接

当我们知道哪个是主的时候,就直接使用 JMX 进行连接即可。

gfsh>connect --jmx-manager=192.168.33.15Connecting to Manager at [host=192.168.33.15 port=1099] .. Successfully connected to: [host=192.168.33.15 port=1099] Cluster-254 gfsh>

2. 连接任意一个 locator

指定 locator 的 ip [端口] 就可以了。

注意看 shell 中的第三行

Connecting to Manager at [host=192.168.33.15 port=1099] ..

实际上 locator 也是会返回 leader 的 JMX 让本地进行连接。

gfsh>connect --locator=192.168.33.23[10334]Connecting to Locator at [host=192.168.33.23 port=10334] .. Connecting to Manager at [host=192.168.33.15 port=1099] .. Successfully connected to: [host=192.168.33.15 port=1099] Cluster-254 gfsh>

创建 locator、server

详见安装时候的脚本实例。

创建 region

gfsh>create region --name=test --type=PARTITION_REDUNDANT_PERSISTENT_OVERFLOW --redundant-copies=1   Member    | Status | Message ------------ | ------ | ---------------------------------------- server_33_15 | OK     | Region "/test" created style="box-sizing: border-box; -webkit-font-smoothing: antialiased; color: rgb(136 136 255); outline: 0px !important;">"server_33_15" server_33_20 | OK     | Region "/test" created style="box-sizing: border-box; -webkit-font-smoothing: antialiased; color: rgb(136 136 255); outline: 0px !important;">"server_33_20" server_33_23 | OK     | Region "/test" created style="box-sizing: border-box; -webkit-font-smoothing: antialiased; color: rgb(136 136 255); outline: 0px !important;">"server_33_23" server_33_29 | OK     | Region "/test" created style="box-sizing: border-box; -webkit-font-smoothing: antialiased; color: rgb(136 136 255); outline: 0px !important;">"server_33_29" Changes to configuration for group 'cluster' are persisted.

注意:--type 参数,我们可以看到是由四个单词组成,分区、复制、持久化、磁盘,基本所有的类型都是由这四个拼凑的。

  • PARTITION:分区
  • REPLICATE:复制
  • PERSISTENT:持久化
  • OVERFLOW:内存不足减少内存使用
  • REDUNDANT:冗余 -> 可配置冗余数量,但是不知道是否会参与 read 不清楚
  • HEAP_LRU:最近最少使用清除内存

LOCAL                                     LOCAL_HEAP_LRU                            LOCAL_OVERFLOW                            LOCAL_PERSISTENT                           LOCAL_PERSISTENT_OVERFLOW                 PARTITION                                 PARTITION_HEAP_LRU                        PARTITION_OVERFLOW                         PARTITION_PERSISTENT                      PARTITION_PERSISTENT_OVERFLOW             PARTITION_PROXY                           PARTITION_PROXY_REDUNDANT                 PARTITION_REDUNDANT                       PARTITION_REDUNDANT_HEAP_LRU              PARTITION_REDUNDANT_OVERFLOW              PARTITION_REDUNDANT_PERSISTENT             PARTITION_REDUNDANT_PERSISTENT_OVERFLOW   REPLICATE                                 REPLICATE_HEAP_LRU                        REPLICATE_OVERFLOW                         REPLICATE_PERSISTENT                      REPLICATE_PERSISTENT_OVERFLOW             REPLICATE_PROXY

get、put

Cluster-254 gfsh>put  --region=test --key=abc --value=abc_v Result      : trueKey Class   : java.lang.String Key         : abc Value Class : java.lang.String Old Value   : nullCluster-254 gfsh>get --region=test --key=abc Result      : trueKey Class   : java.lang.String Key         : abc Value Class : java.lang.String Value       : "abc_v"

list

list 的所有命令 Cluster-254 gfsh>list list async-event-queues   list clients              list data-source          list deployed             list disk-stores          list durable-cqs          list functions            list gateways             list indexes              list jdbc-mappings        list jndi-binding         list lucene               list members              list regions

我们常用的有:

  • list members:查看所有成员
  • list regions:查看所有 region
  • list clients:查看链接的客户端
  • list functions:查看函数(线上我们还未使用函数计算功能)

像 indexs、lucene、gateways 现在都未用到。

describe

所有的命令,我们可以看到,跟上个命令所能看的东西很像。

Cluster-254 gfsh>describe describe client               describe config               describe connection           describe data-source          describe disk-store           describe jdbc-mapping         describe jndi-binding         describe lucene               describe member               describe offline-disk-store   describe region

查看 region 的描述:

Cluster-254 gfsh>describe region --name=test Name            : test Data Policy     : persistent partition Hosting Members : server_33_29                  server_33_20                  server_33_23                  server_33_15 Non-Default Attributes Shared By Hosting Members    Type    |        Name        | Value --------- | ------------------ | -------------------- Region    | size               | 1          | data-policy        | PERSISTENT_PARTITION Eviction  | eviction-action    | overflow-to-disk          | eviction-algorithm | lru-heap-percentage Partition | redundant-copies   | 1

查看 member 的描述:

gfsh>describe member --name=server_33_15 Name        : server_33_15 Id          : 192.168.33.15(server_33_15:1530)<v85>:41001Host        : 192.168.33.15Regions     : userblackcount              region1              test              photo              advert              message              userspace              activeuser              userblack              exp10m              pay7              user              dimension              userinfo PID         : 1530Groups      : Used Heap   : 752M Max Heap    : 1945M Working Dir : /opt/geode_work18/server_33_15 Log file    : /opt/geode_work18/server_33_15/server_33_15.log Locators    : 192.168.33.15[10334] 192.168.33.20[10334] Cache Server Information Server Bind              : Server Port              : 40401Running                  : trueClient Connections       : 1

status

所有能查看的:

status cluster-config-service   status gateway-receiver         status gateway-sender           status locator                  status server

查看 locator 的状态:

gfsh>status locator --name=locator_33_15 Locator in /opt/geode_work18/locator_33_15 on 192.168.33.15[10334] as locator_33_15 is currently online. Process ID: 1184Uptime: 2 days 2 hours 25 minutes 33 seconds Geode Version: 1.9.2Java Version: 1.8.0_102 Log File: /opt/geode_work18/locator_33_15/locator_33_15.log JVM Arguments: -Dgemfire.locators=192.168.33.15[10334] 192.168.33.20[10334] 192.168.33.23[10334] -Dgemfire.enable-cluster-configuration=true -Dgemfire.load-cluster-configuration-from-dir=false -Dgemfire.max-num-reconnect-tries=100 -Dgemfire.member-timeout=120000 -Dgemfire.distributed-system-id=254 -Xms512M -Xmx1G -XX: UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=60 -Dgemfire.launcher.registerSignalHandlers=true -Djava.awt.headless=true -Dsun.rmi.dgc.server.gcInterval=9223372036854775806Class-Path: /opt/apache-geode-1.9.2/lib/geode-core-1.9.2.jar:/opt/apache-geode-1.9.2/lib/geode-dependencies.jar Cluster configuration service is up and running.

query

可以做类似 SQL 语法的查询,这点其他缓存框架是做不到的。

Cluster-254 gfsh>query --query="select * from /test"Result : trueLimit  : 100 Rows   : 1 Result ------ abc_v

query 更多实例我们在 OQL 一节去探讨。

export

导出有日志、配置、集群配置、数据,我们常用的就是导出集群配置。

Cluster-254 gfsh>export cluster-configuration --zip-file-name=./cluster-config-back.zip File saved to /opt/geode_work18/./cluster-config-back.zip

有导出就有导入:

Cluster-254 gfsh>import import cluster-configuration   import data                     Cluster-254 gfsh>import cluster-configuration -- --action           --group            --xml-file         --zip-file-name

version

Cluster-254 gfsh>version --fullBuild-Date: 2019-10-15 06:08:13 -0700Build-Id: jdeppe 0Build-Java-Version: 1.8.0_221Build-Platform: Mac OS X 10.14.6 x86_64Product-Name: Apache GeodeProduct-Version: 1.9.2Source-Date: 2019-10-14 15:59:02 -0700Source-Repository: release/1.9.2Source-Revision: 63c8058f036316618b6cd78e6727106b7ac0a888Native version: native code unavailable Running on: /192.168.33.15 4 cpu(s) amd64 Linux 2.6.32-696.23.1.el6.x86_64

Geode OQL

OQL(object query language),对象查询语言,类 SQL 语法。

请注意使用 OQL 查询的前提,你要将你的应用对象上传到服务器中,所以我们先学习一个命令 deploy。

Cluster-254 gfsh>deploy --jar /opt/geode-study.jar Deploying files: geode-study.jar Total file size is: 0.00MB Continue?  (Y/n): y   Member    |  Deployed JAR   | Deployed JAR Location ------------ | --------------- | ----------------------------------------------- server_33_15 | geode-study.jar | /opt/geode_work/server_33_15/geode-study.v2.jar server_33_20 | geode-study.jar | /opt/geode_work/server_33_20/geode-study.v2.jar server_33_23 | geode-study.jar | /opt/geode_work/server_33_23/geode-study.v2.jar server_33_29 | geode-study.jar | /opt/geode_work/server_33_29/geode-study.v2.jar

我们将应用的实体类打包上传到 Geode 集群中,然后通过 deploy 命令进行部署。

  • deploy 后不需要重启,直接生效
  • 只 deploy 一次即可 全站生效
  • 注意对象要实现序列化接口

实现序列化的三种方式:

  • PDX 反序列化快
  • Geode Serializable 序列化比 PDX 快 25%
  • Java 序列化效率一般

我们示例中使用的是 Java 序列化。关于序列化,Geode 官方文档中使用了一个章节来说明 请看这里。

基于我们的 user 对象我们尝试一下 OQL 的查询:

@Datapublic class User implements Serializable {    private long  id;    private String name;    private int age;    private Date createTime; }

类似 map 的查询

keySet、values、entries 跟我们的 map 中的属性是一样的。

Cluster-254 gfsh>query --query="select * from /user.keySet limit 1"Result ----------------------- test.user.1574821108359Cluster-254 gfsh>query --query="select * from /user.values limit 1"     id       |  name  | age | createTime ------------- | ------ | --- | -------------1574821108359 | "xy27" | 27  | 1574821108359Cluster-254 gfsh>query --query="select * from /user.entries limit 1"test.user.1574821108359---------------------------------------------------------------------- {"id":1574821108359 "name":"xy27" "age":27 "createTime":1574821108359} Cluster-254 gfsh>query --query="select * from /user limit 1 "     id       |  name  | age | createTime ------------- | ------ | --- | -------------1574821108359 | "xy27" | 27  | 1574821108359

条件查询和排序

Cluster-254 gfsh>query --query="select * from /user t where t.age > 90 and t.age < 95 order by t.age desc "     id       |  name  | age | createTime ------------- | ------ | --- | -------------1574821109489 | "xy94" | 94  | 1574821109489 1574821109437 | "xy93" | 93  | 15748211094371574821109422 | "xy92" | 92  | 1574821109422 1574821109409 | "xy91" | 91  | 1574821109409

联合查询

暂时未准备数据,不过估计查询效率不会高了。

SELECT portfolio1.ID portfolio2.status FROM /exampleRegion portfolio1 /exampleRegion2 portfolio2 WHERE portfolio1.status = portfolio2.status

方法调用

实例中使用了 Spring 的 endsWith 和 length 方法。

Cluster-254 gfsh>query --query="select name name.length from /user t where t.name.endsWith('99') "name | length---- | ------ xy99 | 4

聚合查询

Cluster-254 gfsh>query --query="select age count(*) max(id)  from /user t where t.age > 90 group by t.age "age | 0 | id --- | - | -------------96  | 1 | 157482110954795  | 1 | 157482110951591  | 1 | 157482110940994  | 1 | 157482110948999  | 1 | 157482110958693  | 1 | 157482110943792  | 1 | 157482110942298  | 1 | 157482110957397  | 1 | 1574821109557

查询小结

  • 注意对象的序列化
  • 注意使用的类要部署到服务端
  • 提高查询效率可以使用索引
  • 查询时保证不要查太多数据
  • 关于分页还未找到好的方式

Geode 连续查询

连续查询就是,当我们指定了一个查询语句的时候,比如 age > 99,那么会直接返回当前的查询结果,同时你可以注册一个监听,后续有 age > 99 的记录的创建、更新、删除,都会回调到你的这个监听中来,这个后续满足条件的回调就是所谓的连续了。

@Test    public void cqTest() throws CqException CqExistsException {        GeodeService geodeService = new GeodeService(str);        geodeService.init();        ClientCache clientCache = geodeService.geodeClient();        //获取查询的service        QueryService queryService = clientCache.getQueryService();        CqAttributesFactory cqf = new CqAttributesFactory();        CqListener tradeEventListener = new UserCqListener();        cqf.addCqListener(tradeEventListener);        //创建 cq 属性        CqAttributes cqa = cqf.create();        //命名        String cqName = "user-cq-test";        String queryStr = "SELECT * FROM /user t where t.age > 98";        //创建cq 查询        CqQuery userCq = queryService.newCq(cqName queryStr cqa);        SelectResults sResults = null;        try {            //执行查询            sResults = userCq.executeWithInitialResults();            for (Object o : sResults) {                Struct s = (Struct) o;                User to = (User) s.get("value");                System.out.println("Intial result includes: " to);            }            //写入一个数据            User user = new User();            user.setId(id());            user.setName("xy100");            user.setAge(100);            user.setCreateTime(new Date());            geodeService.setObject(GeodeRegion.user user.getId() "" user);            Thread.sleep(1000);            //更新一个数据            user.setName("xy100-1");            geodeService.setObject(GeodeRegion.user user.getId() "" user);            //销毁一个数据            Thread.sleep(1000);            geodeService.remove(GeodeRegion.user user.getId() "");        } catch (RegionNotFoundException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }

监听器:

@Slf4jpublic class UserCqListener implements CqListener {    //事件接受    @Override    public void onEvent(CqEvent cqEvent) {        Operation queryOperation = cqEvent.getQueryOperation();        Object key = cqEvent.getKey();        User user = (User)cqEvent.getNewValue();        //更新        if (queryOperation.isUpdate()) {            log.info("cq update {} {}" key user);            //创建        } else if (queryOperation.isCreate()) {            log.info("cq create {} {}" key user);            //删除        } else if (queryOperation.isDestroy()) {            log.info("cq destroy {} {}" key user);        }    }    //错误事件    @Override    public void onError(CqEvent aCqEvent) {        log.info("error {}" aCqEvent);    } }

最终输出:

Intial result includes: User(id=1574821109586 name=xy99 age=99 createTime=Wed Nov 27 10:18:29 CST 2019) id=1574827945765 2019-11-27 12:12:25.830 INFO  [Cache Client Updater Thread  on 192.168.33.20(server_33_20:32746)<v6>:41001 port 40401] |UserCqListener|cq create test.user.1574827945765 User(id=1574827945765 name=xy100 age=100 createTime=Wed Nov 27 12:12:25 CST 2019)2019-11-27 12:12:26.857 INFO  [Cache Client Updater Thread  on 192.168.33.20(server_33_20:32746)<v6>:41001 port 40401] |UserCqListener|cq update test.user.1574827945765 User(id=1574827945765 name=xy100-1 age=100 createTime=Wed Nov 27 12:12:25 CST 2019)2019-11-27 12:12:27.875 INFO  [Cache Client Updater Thread  on 192.168.33.20(server_33_20:32746)<v6>:41001 port 40401] |UserCqListener|cq destroy test.user.1574827945765 null

那么我们思考一下这个连续查询能为我们做什么呢?

  • 类似 ZK 的通知?
  • 类似消息队列的订阅模型?

Goede 与 Spring 结合

与 Spring 结合有两种方式。

1. 使用 spring-data-geode

spring-data-geode 符合 data 系列的一贯作风,提供 template 和 repository 对象化操作。我们以 Spring Boot 为例。

POM 文件:

<dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>            <exclusions>                <exclusion>                    <artifactId>spring-boot-starter-logging</artifactId>                    <groupId>org.springframework.boot</groupId>                </exclusion>            </exclusions>        </dependency>        <dependency>            <groupId>org.springframework.geode</groupId>            <artifactId>spring-geode-starter</artifactId>            <version>1.2.1.RELEASE</version>        </dependency>        //geode 使用的是 log4j2 作为日志框架        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-log4j2</artifactId>        </dependency>

初始化 cacheClient:

@SpringBootApplication@ClientCacheApplication(        locators = {                @ClientCacheApplication.Locator(host = "192.168.33.15" port = 10334)                @ClientCacheApplication.Locator(host = "192.168.33.20" port = 10334)              //@ClientCacheApplication.Locator(host = "localhost" port = 10334)        } )public class GeodeStudyApplication {    public static void main(String[] args) {        SpringApplication.run(GeodeStudyApplication.class args);    } }

配置 region 和 template:

@Bean("userRegion")    public ClientRegionFactoryBean userRegion(GemFireCache gemFireCache) {        ClientRegionFactoryBean<String String> customers = new ClientRegionFactoryBean<>();        customers.setCache(gemFireCache);        customers.setClose(true);        customers.setName("user");        customers.setShortcut(ClientRegionShortcut.PROXY);        return customers;    }    @Resource(name = "userRegion")    private Region userRegion;    // template 是针对每个 region 的,因此每个 region 都需要 template    @Bean("userGemfireTemplate")    public GemfireTemplate userGemfireTemplate() {        GemfireTemplate gemfireTemplate = new GemfireTemplate();        log.info("user region==>"   userRegion);        gemfireTemplate.setRegion(userRegion);        return gemfireTemplate;    }

创建 UserService,更多 API 大家可以自行探索。

@Slf4j @Servicepublic class UserService {    @Resource(name = "userGemfireTemplate")    private GemfireTemplate userGemfireTemplate;    @Autowired    private AccountService accountService;    public void save(User user) {        /*           put  和 create 的区别           put 如果有会覆盖, create 如果有就EntryExistsException         */        String put = userGemfireTemplate.put(user.getId() JSON.toJSONString(user));      //  userGemfireTemplate.create(key value);        log.info("保存用户{}, 返回结果{}" user put);    }    public void delete(long id) {        Object remove = userGemfireTemplate.remove(id);        log.info("删除用户{}, 返回结果{}" id remove);    }    public User findById(long id) {        String userJson = userGemfireTemplate.get(id);        return JSONObject.parseObject(userJson User.class);    }    public void transaction(User user Account account) {        CacheTransactionManager txManager =             userGemfireTemplate.getRegion().getCache().getCacheTransactionManager();        try {            txManager.begin();            save(user);            int i = 10/0;            accountService.save(account);            txManager.commit();        } catch (Exception e) {            txManager.rollback();            e.printStackTrace();        } finally {        }    } }

2. 只用原生包

public enum GeodeRegion {    user("test.user.")    test1("test1.")    test2("test2.")    account("test.account.");    /**     * 前缀, 其实有没有都无所谓,因为前缀都一样     */    private String prefix;    private GeodeRegion(String prefix) {        this.prefix = prefix;    }    public String getPrefix() {        return this.prefix;    } }

@ConfigurationProperties@Servicepublic class GeodeService {    private static final Logger logger = LoggerFactory.getLogger(GeodeService.class);    private static ClientCache geodeClient = null;    private static final long PING_INTERVAL = 30000;    private static final int PING_ATTEMPT = 100;    private static Map<GeodeRegion Region> regionMap = new ConcurrentHashMap<GeodeRegion Region>();    public GeodeService() {    }    public GeodeService(String geodeLocateString) {        this.geodeLocateString = geodeLocateString;    }    @Value("${geode.locateString}")    private String geodeLocateString;    @PostConstruct    public void init() {        String[] locates = geodeLocateString.split(" ");        try {            ClientCacheFactory clientCacheFactory = new ClientCacheFactory();            clientCacheFactory.setPoolPingInterval(PING_INTERVAL);            clientCacheFactory.setPoolRetryAttempts(PING_ATTEMPT);            for (String host : locates) {                //默认端口10334 如果你的不是10334 ,那么你改造一下链接串就可以了                clientCacheFactory.addPoolLocator(host 10334);            }            //日志级别 生产可以设置warn  开发调试设置成info 或者debug            geodeClient = clientCacheFactory.set("log-level" "WARN").create();            //这个shotcut 要设置成proxy  本地不缓存状态 依据场景来设置            ClientRegionFactory regionFactory = geodeClient.createClientRegionFactory(ClientRegionShortcut.PROXY);            for (GeodeRegion key : GeodeRegion.values()) {                Region region = regionFactory.create(key.name());                logger.info("初始化region成功 regionName=" key.name());                regionMap.put(key region);            }            logger.info("初始化geode客户端成功");        } catch (Exception e) {            logger.error("初始化geode失败" e);            throw e;        }    }    public ClientCache geodeClient() {        return geodeClient;    }    @PreDestroy    public void destroy() {        geodeClient.close(false);    }    public void setObject(GeodeRegion regionName String key Object data) {        try {            regionMap.get(regionName).put(regionName.getPrefix() key data);        } catch (Exception e) {            logger.error("存储数据异常 region=" regionName " key=" regionName.getPrefix() key e);        }    }    public Object getObject(GeodeRegion regionName String key) {        Object obj = null;        try {            obj = regionMap.get(regionName).get(regionName.getPrefix() key);        } catch (Exception e) {            logger.error("获取数据异常 region=" regionName " key=" regionName.getPrefix() key e);        }        return obj;    }    public void remove(GeodeRegion regionName String key) {        try {            regionMap.get(regionName).remove(regionName.getPrefix() key);        } catch (Exception e) {            logger.error("删除数据异常 region=" regionName " key=" regionName.getPrefix() key e);        }    }    public Region getRegion(GeodeRegion regionName) {        return regionMap.get(regionName);    } }

这里如果需要什么功能按需添加就可以了,不像 spring-data 的模板把常用的操作都给我们封装好了。

通过努力发现,Geode 客户端无法再同一个 JVM 中链接两个集群,还望后续能找到解决办法。

Geode Acid 事务特性

解读 Acid 与传统数据库的不同,Geode 为了提高性能,使用了类似乐观锁的事务处理机制。

原子性

就是类似于乐观锁的冲突检查,如果有交叉事务就丢弃了。

一致性

事务操作前与操作后的状态一致,也就是符合逻辑性。比如:

  • 操作前 A:800,B:200
  • 操作后 A:600,B:400

隔离性

默认情况下,当一个事务正在进行时,它的更改只在运行该事务的线程中可见。同一进程中的其他线程和其他进程中的线程在提交操作开始之前不能看到更改。在开始提交之后,更改在缓存中是可见的,但是访问更改数据的其他线程可能会看到事务的部分结果,从而导致脏读。当然 Geode 提供严格的隔离性。

-Dgemfire.detectReadConflicts=true

this property causes read operations to succeed> CommitConflictException.

Geode 只会读取到一致性的结果,如果处在事务提交中的状态进行读取会抛出异常。

持久性

关系数据库通过使用磁盘存储进行恢复和事务日志记录来提供持久性。Geode 针对性能进行了优化,不支持事务的磁盘持久性。从测试中发现,确实不支持 persisternt 类型的 region。

Caused by: java.lang.UnsupportedOperationException: Operations style="box-sizing: border-box; -webkit-font-smoothing: antialiased; font-weight: bold; outline: 0px !important;">backup regions are not allowed because this thread has an active transaction

同样地 Geode 也提供了针对持久性的支持:

-Dgemfire.ALLOW_PERSISTENT_TRANSACTIONS=true

但是请注意:这个系统配置仅仅是消除了上边报出来的异常,是的程序能继续执行,但是对于磁盘写入的原子性并不能保证!

@Test    public void transaction() {        GeodeService geodeService = new GeodeService(str);        geodeService.init();        ClientCache clientCache = geodeService.geodeClient();        //获取事务管理器        CacheTransactionManager transactionManager = clientCache.getCacheTransactionManager();        try {            //开始事务            transactionManager.begin();            User user = new User();            user.setId(id());            user.setName("u1");            user.setCreateTime(new Date());            geodeService.setObject(GeodeRegion.test1 user.getId() "" JSON.toJSONString(user));            //提交事务            transactionManager.commit();        } catch (Exception e) {            e.printStackTrace();            //回滚事务            transactionManager.rollback();        }    }

Geode RESTful

要想使用 RESTful,首先需要再启动 server 的时候添加参数。

我们以 33.15 的 server 为例。

REST API 目前相对较弱,只能处理如下的需求:

  • 对 region 的基本操作
  • 对 query 的基本操作
  • 对 function 的基本操作

加入参数

basedir=`dirname $0`echo "BASE DIR:$basedir"cd $basedirlocators=192.168.33.15[10334] 192.168.33.20[10334] servername=server_33_15 gfsh start server --name=${servername} --locators=${locators} --locator-wait-time=120 --server-port=40401  \ --lock-memory=true --initial-heap=2G --max-heap=2G --J=-Xmn1g --start-rest-api=true --http-service-bind-address=192.168.33.15  --bind-address=192.168.33.15 \ --J=-Dgemfire.max-num-reconnect-tries=100  --J=-Dgemfire.member-timeout=120000 \ --J=-Dgemfire.distributed-system-id=254 --J=-Dgemfire.http-service-port=8888  \ 2>&1 &

我们加入下了如下参数:

--start-rest-api=true//启用 --http-service-bind-address=192.168.33.15 //配置 ip --J=-Dgemfire.http-service-port=8888 //配置端口

访问 Swagger

Geode 非常贴心地为我们继承了 Swagger 插件:

<http://192.168.33.15:8888/geode/swagger-ui.html#/>

12306逻辑算法(高性能分布式缓存计算框架)(3)

12306逻辑算法(高性能分布式缓存计算框架)(4)

12306逻辑算法(高性能分布式缓存计算框架)(5)

具体 API 可参考官方网页:

https://geode.apache.org/docs/guide/19/restapps/developrest_apps.html

Geode 部署模式

点对点

点对点的方式也叫对等方式,就是我们没有专门的 Geode 集群,服务是迁入到我们应用程序中的。

12306逻辑算法(高性能分布式缓存计算框架)(6)

C/S

该模式就是我们常用的集群模式,有一个 Geode 的中心集群,应用程序通过客户端去调用, 这个模式跟 ES 的集群, Redis 的集群非常相似。

12306逻辑算法(高性能分布式缓存计算框架)(7)

多站点

所谓的多站点,就是跨机房,跨地域进行数据同步,非常适合做备份之类的部署, Geode 能够保证数据的最终一致性。目前我们的生产环境通过多站点引入备份集群,以备容灾和其他非主要业务的访问。

12306逻辑算法(高性能分布式缓存计算框架)(8)



12306逻辑算法(高性能分布式缓存计算框架)(9)

猜您喜欢: