datax离线数据同步

DataX

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

项目地址:https://github.com/alibaba/DataX

特点

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

DataX为什么要使用插件机制?

从设计之初,DataX就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式:

  • 插件只需关心数据的读取或者写入本身。
  • 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。

作为插件开发人员,则需要关注两个问题:

  1. 数据源本身的读写数据正确性。
  2. 如何与框架沟通、合理正确地使用框架。

系统要求

  • Linux
  • JDK(1.8以上,推荐1.8)
  • Python(推荐Python2.6.X)
  • Apache Maven 3.x (Compile DataX)

安装

下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

1
2
3
4
$ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
$ tar xvf datax.tar.gz
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

自检脚本: python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

DataX插件需要遵循统一的目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
${DATAX_HOME}
|-- bin
| `-- datax.py
|-- conf
| |-- core.json
| `-- logback.xml
|-- lib
| `-- datax-core-dependencies.jar
`-- plugin
|-- reader
| `-- mysqlreader
| |-- libs
| | `-- mysql-reader-plugin-dependencies.jar
| |-- mysqlreader-0.0.1-SNAPSHOT.jar
| `-- plugin.json
`-- writer
|-- mysqlwriter
| |-- libs
| | `-- mysql-writer-plugin-dependencies.jar
| |-- mysqlwriter-0.0.1-SNAPSHOT.jar
| `-- plugin.json
|-- oceanbasewriter
`-- odpswriter
  • ${DATAX_HOME}/bin: 可执行程序目录。
  • ${DATAX_HOME}/conf: 框架配置目录。
  • ${DATAX_HOME}/lib: 框架依赖库目录。
  • ${DATAX_HOME}/plugin: 插件目录。

插件目录分为readerwriter子目录,读写插件分别存放。插件目录规范如下:

  • ${PLUGIN_HOME}/libs: 插件的依赖库。
  • ${PLUGIN_HOME}/plugin-name-version.jar: 插件本身的jar。
  • ${PLUGIN_HOME}/plugin.json: 插件描述文件。

尽管框架加载插件时,会把${PLUGIN_HOME}下所有的jar放到classpath,但还是推荐依赖库的jar和插件本身的jar分开存放。

注意: 插件的目录名字必须和plugin.json中定义的插件名称一致

参考路径:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

MySQL数据同步MySQL

reader插件:https://github.com/alibaba/DataX/tree/master/mysqlreader

writer插件:https://github.com/alibaba/DataX/tree/master/mysqlwriter

同步任务json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
[root@hjx01 bin]# cat ran.json 
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": [
"select * from user_1 where UpdateTime < '$bizdate';"
],
"jdbcUrl": [
"jdbc:mysql://1.1.1.10:3306/hjxdb"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "replace",
"username": "root",
"password": "123456",
"column": ['*'],
"connection": [
{
"jdbcUrl": "jdbc:mysql://1.1.1.10:3306/hjxdb?useUnicode=true&characterEncoding=gbk",
"table": [
"user_2"
]
}
]
}
}
}
]
}
}
  • reader插件中采用了querySql的方式, 不需要再配置 column 和 table
  • column指定*字段,源表字段个数、类型有变动时,需手动修改目标表ddl,否则提示字段数量不一致

执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[root@hjx01 bin]# python datax.py ran.json -p "-Dbizdate='2018-06-27 15:16:17'"

[root@hjx01 bin]# python datax.py -h

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Usage: datax.py [options] job-url-or-path

Options:
-h, --help show this help message and exit

Product Env Options:
Normal user use these options to set jvm parameters, job runtime mode
etc. Make sure these options can be used in Product Env.

-j <jvm parameters>, --jvm=<jvm parameters>
Set jvm parameters if necessary.
--jobid=<job unique id>
Set job unique id when running by Distribute/Local
Mode.
-m <job runtime mode>, --mode=<job runtime mode>
Set job runtime mode such as: standalone, local,
distribute. Default mode is standalone.
-p <parameter used in job config>, --params=<parameter used in job config>
Set job parameter, eg: the source tableName you want
to set it by command, then you can use like this:
-p"-DtableName=your-table-name", if you have mutiple
parameters: -p"-DtableName=your-table-name
-DcolumnName=your-column-name".Note: you should config
in you job tableName with ${tableName}.
-r <parameter used in view job config[reader] template>, --reader=<parameter used in view job config[reader] template>
View job config[reader] template, eg:
mysqlreader,streamreader
-w <parameter used in view job config[writer] template>, --writer=<parameter used in view job config[writer] template>
View job config[writer] template, eg:
mysqlwriter,streamwriter

Develop/Debug Options:
Developer use these options to trace more details of DataX.

-d, --debug Set to remote debug mode.
--loglevel=<log level>
Set log level such as: debug, info, all etc.

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
mysql> select * from user_1;
+--------+----------+--------------+---------------------+
| userid | username | userpassword | UpdateTime |
+--------+----------+--------------+---------------------+
| 1 | apple | apple | 2018-06-27 15:14:04 |
| 2 | pera | apple | 2018-06-27 15:14:04 |
| 3 | per | apple | 2018-06-27 15:14:04 |
| 4 | er | apple | 2018-06-27 15:14:04 |
| 5 | er | le | 2018-06-27 15:14:04 |
| 6 | er | lee | 2018-06-27 15:14:04 |
| 7 | er | leee | 2018-06-27 15:14:04 |
| 8 | er | lese | 2018-06-27 15:14:04 |
| 9 | er | lese | 2018-06-27 15:14:04 |
| 10 | er | lesse | 2018-06-27 15:14:04 |
| 11 | er | lesss | 2018-06-27 15:14:04 |
| 12 | er | eesss | 2018-06-27 15:14:04 |
| 13 | ww | dddd | 2018-06-27 15:14:04 |
| 14 | apple | apple | 2018-06-27 15:16:17 |
+--------+----------+--------------+---------------------+
14 rows in set (0.00 sec)

mysql> select * from user_2;
+--------+----------+--------------+---------------------+
| userid | username | userpassword | UpdateTime |
+--------+----------+--------------+---------------------+
| 1 | apple | apple | 2018-06-27 15:14:04 |
| 2 | pera | apple | 2018-06-27 15:14:04 |
| 3 | per | apple | 2018-06-27 15:14:04 |
| 4 | er | apple | 2018-06-27 15:14:04 |
| 5 | er | le | 2018-06-27 15:14:04 |
| 6 | er | lee | 2018-06-27 15:14:04 |
| 7 | er | leee | 2018-06-27 15:14:04 |
| 8 | er | lese | 2018-06-27 15:14:04 |
| 9 | er | lese | 2018-06-27 15:14:04 |
| 10 | er | lesse | 2018-06-27 15:14:04 |
| 11 | er | lesss | 2018-06-27 15:14:04 |
| 12 | er | eesss | 2018-06-27 15:14:04 |
| 13 | ww | dddd | 2018-06-27 15:14:04 |
+--------+----------+--------------+---------------------+
13 rows in set (0.00 sec)

2018-06-27 15:59:47.760 [job-0] INFO JobContainer -
任务启动时刻 : 2018-06-27 15:59:36
任务结束时刻 : 2018-06-27 15:59:47
任务总计耗时 : 11s
任务平均流量 : 20B/s
记录写入速度 : 1rec/s
读出记录总数 : 13
读写失败总数 : 0

批量生成同步任务json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": [
"select * from ${reader_table} where UpdateTime < '$bizdate';"
],
"jdbcUrl": [
"${reader_datasource}"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "replace",
"username": "root",
"password": "123456",
"column": ['*'],
"connection": [
{
"jdbcUrl": "${write_datasource}"
"table": [
"${write_table}"
]
}
]
}
}
}
]
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!