1、Linux 公社(LinuxIDC.com)于 2006 年 9 月 25 日注册并开通网站, Linux 现在已经成为一种广受关注和支持的一种操作系统,IDC 是互联网数据中心, LinuxIDC 就是关于 Linux 的数据中心。LinuxIDC.com 提供包括 Ubuntu,Fedora,SUSE 技术,以及最新 IT 资讯等 Linux 专业类网站。DataX插件开发指南版本号 修改内容 修改日期 修改人V0.1 创建 2011-09-08 何健超目录一、 概述 .4二、 Reader 插件开发(以 httpreader 为例) .51、 确定插件所需配置的参数 .52、 构建相应包
2、和类结构 .53、 实现重载方法 .74、 自定义 split 方法 .10三、 Writer 插件开发( 以 streamwriter 为例) .131、 确定插件参数、构建相应包和类结构 .132、 实现重载方法 .14四、 插件运行配置(以 httpreader 为例) .161、 注册插件 .162、 修改 build.xml 文件,打包 .16一、 概述DataX 是一个在不同类型的数据库( 文件系统)之间交换数据的工具,采用“框架+插件 ”的结构,框架相当于一个数据中转平台,而插件则为访问不同类型的数据库(文件系统)提供实现。DataX 插件分为 Reader 和 Writer 两
3、类。Reader 负责从数据源端读取数据到Storage(交换空间 ),Writer 负责将 Storage 中的数据写入到数据目的端。Storage可以适配不同种类的 Reader 和 Writer,从而实现数据同步。目前 DataX 版本已经提供的 Reader 插件如下:1、 hdfsreader : 支持从 hdfs 文件系统获取数据。2、 mysqlreader: 支持从 mysql 数据库获取数据。3、 sqlserverreader: 支持从 sqlserver 数据库获取数据。4、 oraclereader : 支持从 oracle 数据库获取数据。5、 streamreade
4、r: 支持从 stream 流获取数据(常用于测试)6、 httpreader : 支持从 http URL 获取数据。提供的 Writer 插件如下:1、 hdfswriter :支持向 hdbf 写入数据。2、 mysqlwriter :支持向 mysql 写入数据。3、 sqlserverwriter:支持向 sqlserver 写入数据。4、 oraclewriter :支持向 oracle 写入数据。5、 streamwriter :支持向 stream 流写入数据。 (常用于测试)用户可以根据需要开发自己的 Reader & Writer 插件。现在以 HttpReader 和St
5、reamWriter 插件为例,使用 eclipse 分别说明 Reader 和 Writer 插件开发过程。二、 Reader 插件开发(以 httpreader 为例)1、 确定插件所需配置的参数确定插件参数,并在 common.plugin.ParamsKey.java 中,创建静态类 HttpReader,尤其注意对参数的注释尽量参照源码规范,DataX 运行时,会根据此处声明的参数和注释生成对应的模板 Job_xml.此处参数设置非常重要,如图:图 12、 构建相应包和类结构在源码文件的 plugins.reader 包下构建 httpreader 包,再在httpreader 包下
6、创建类 HttpReader,并让之继承 common.plugin.Reader.图 2图 33、 实现重载方法获得图 3 所示效果:现在开始分别实现 init( ), connectToDb( ), startRead(LineSender sender), finish( ) 四个方法。(1) Init( ):通过从 Reader 间接继承自 DefaultPlugin 的 PluginParam类型的参数 param 获取配置 httpreader 插件的参数 (此处可以对参数进行检查和格式处理等操作),如图:图 4(2) connectToDb( ): 本插件不需要此操作,函数为空实
7、现。 (在数据库相关插件中,主要操作是通过 DbSource.getConnection(keyId)获取connection) ,如在 mysqlreader 中,该函数为:图 5(3) startRead(LineSender sender):根据 init( )初始化的参数,连接相应的 http URL, 获取其中数据并用 BufferedReader 封装,循环处理每行数据,调用 sender.createLine() 产生 line,并通过line.addField(fieldStr)把每行数据切分成字段后组装成 Line 中的 field,调用 sender.sendToWrite
8、r(line)将此行数据写入 Storage。如图:图 6(4) finish():本插件不需要此操作,函数实现为空。 (在数据库相关插件中,主要完成关闭 connection 操作。 )如在 mysqlreader 中,该函数为:图 74、 自定义 split 方法截止到目前,已经实现了一个简单的 httpreader(从一个 http URL读取数据) ,目前还未实现读取多个 http URL 的功能,为此,可以在common.plugin.ParamsKey.java 中的静态类 HttpReader 中,添加如下两项:图 8然后让 httpreader 插件覆盖默认的 split(PluginParam param)方法。操作如下:(1) split(PluginParam param)方法对 param 按照规则切分,生成List,DataX 框架会根据此 List 产生插件实例执行任务。图 9(2) 为了清晰结构或者切分规则复杂,建议再构建类 HttpURLSplitter 辅助 split 方法实现切分,该类需要继承 Splitter: