数据同步最高的软件工具是什么(DataX数据异构数据同步神器)
数据同步最高的软件工具是什么(DataX数据异构数据同步神器)Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。有了插件,DataX 可支持任意数据源到数据源,只要实现了 Reader/Writer Plugin,官方已经实现了主流的数据源插件,比如 MySQL、Oracle、SQLServer 等,当然我们也可以开发一个 DataX 插件。官网地址:https://github.com/alibaba/DataXDataX是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
需求背景需求一:前段时间,公司进行一个十年老系统重构,其中涉及到很多数据库、表的迁移,基本思路就是大表通过定时任务分片同步,小表通过rest接口的方式同步数据,所以其中开发了大量只使用一次的代码;
需求二:在做数据异构时,比如将mysql数据库表同步到ES或者ADB时,其中需要进行一定的数据清洗和过滤,所以增量数据通过订阅binglog放到MQ中进行监听处理,历史全量数据通过定时任务或者将历史数据灌到一个新的数据库,也通过订阅binglog放到MQ中进行相同的处理,如果涉及到ES索引重建,那么需要重复操作以上流程;
需求三:大表的数据归档,比如财务的清结算数据,业务方只关心最近几个月的数据,类似业务表数据量越来越大,很可能产生慢查询、锁表等一系列问题,所以需要将历史数据有规划的进行数据归档。但是以上看似很简单的操作如果不通过程序实现,那么就需要依赖DBA实施;
以上列举了工作中经常会遇到的场景,当然类似的需求还有很多很多。通过以上分析,我们不难发现目前的做法开发成本、资源成本较高。那么我们需要通过什么手段来解决以上问题呢?最近调研了一些资料,感觉DataX基本上可以非常简单完成以上功能。
官网地址:https://github.com/alibaba/DataX
设计理念DataX是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
架构设计大家在用过 IDEA 的时候都知道,IDEA 有很多非常棒的插件,用户可根据自身编程需求,下载相关的插件,DataX 也是使用这种可插拔的设计,采用了 Framework Plugin 的架构设计,如下图所示:
有了插件,DataX 可支持任意数据源到数据源,只要实现了 Reader/Writer Plugin,官方已经实现了主流的数据源插件,比如 MySQL、Oracle、SQLServer 等,当然我们也可以开发一个 DataX 插件。
Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。
Wiriter: 数据写入模块,负责不断向Framwork取数据,并将数据写入到目的端。
Framework:用于连接read和writer 作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等你核心技术问题。
运行原理DataX 核心主要由 Job、Task Group、Task、Schedule、Channel 等概念组成:
Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。
TaskGroup:负责启动Task。
Channel:DataX 会单独启动一条线程运行运行一个 Task,而 Task 会持有一个 Channel,用作 Reader 与 Writer 的数据传输媒介,DataX 的数据流向都是按照 Reader—>Channel—>Writer 的方向流转。Channel 作为传输通道,即能充当缓冲层,同时还能对数据传输进行限流操作。
调度流程DataX 将用户的 job.json 同步作业配置解析成一个 Job,DataX 通过 JobContainer 完成全局切分、调度、前置语句和后置语句等工作,整体调度流程用如下图表示:
操作实战1.下载地址: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2.解压
3.运行自检脚本 bin/datax.py job/job.json
出现以上命令框代表环境可用。
4.通过命令行测试mysql数据库表之间同步,表c_s同步到c_s1,首先编写同步脚本,vim /job/mysql-mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader"
"parameter": {
"column": [
"id"
"c_name"
]
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/mybd"
]
"table": [
"c_s"
]
}
]
"password": "root"
"username": "root"
}
}
"writer": {
"name": "mysqlwriter"
"parameter": {
"column": [
"id"
"cname"
]
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/mybd"
"table": [
"c_s1"
]
}
]
"password": "root"
"username": "root"
"writeMode": "replace"
}
}
}
]
"setting": {
"speed": {
"channel": "2"
}
}
}
}
然后执行脚本,bin/datax.py job/mysql-mysql.json
数据表中的3条数据同步完成。
5.通过命令行测试mysql数据同步ES,表c_s同步到索引c_s上,首先编写同步脚本,vim /job/mysql-es.json
{
"job": {
"setting": {
"speed": {
"channel": 1
"record": -1
"byte": -1
}
}
"content": [{
"reader": {
"name": "mysqlreader"
"parameter": {
"username": "root"
"password": "root"
"column": [
"id"
"c_name"
]
"splitPk": "id"
"connection": [{
"table": [
"c_s"
]
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/mybd"
]
}]
}
}
"writer": {
"name": "elasticsearchwriter"
"parameter": {
"endpoint": "http://es-cn-xxxxx.elasticsearch.aliyuncs.com:9200"
"index": "c_s"
"type": "default"
"accessId": "elastic"
"accessKey": "xxxxxx"
"settings": {"index" :{"number_of_shards": 1 "number_of_replicas": 0}}
"discovery": false
"batchSize": 5000
"splitter": " "
"column": [
{"name": "id" "type": "id"}
{ "name": "cname" "type": "text" }
]
}
}
}]
}
}
然后执行脚本,bin/datax.py job/mysql-es.json
Mysql数据同步ES完成,索引如下图:
以上是Datax实现的基本数据源同步插件功能,如果我们需要将Mysql的数据同步到MQ中,需要去开发插件,后期继续开发出统一的MQ Writer插件,这样就能实现复杂的业务处理数据异构场景啦。
操作界面以上是通过脚本实现数据异构的同步,但是这样是不够方便灵活的,阿里开源的时候并没有提供可视化界面,好在已经有大佬开源了一套datax-web:https://github.com/WeiYe-Jing/datax-web 前端界面,我们可以通过界面灵活的进行脚本配置和执行,界面如下:
不断分享开发过程用到的技术和面试经常被问到的问题,如果您也对IT技术比较感兴趣可以「关注」我