-
Notifications
You must be signed in to change notification settings - Fork 413
2.2_HBase Reader
HBase-Reader插件用于HBase增量日志解析和数据模型转换,适用于需要实时同步HBase数据的场景,例如:BigData、DB升级、迁移与备份等。
如上图所示,开启HBase主集群的数据复制功能之后,主集群的RegionServer会自动向其从集群随机推送HLog,因此,DataLink利用多个HBase-Reader模拟HBase从集群,每个HBase-Reader相当于一个ReplicateHRegionServer,用于执行复制,并将HLog解析为HRecord对象,供HBaseTask的各个Writer插件获取和写入目标端数据源。
- 【HBase-Reader工作流程】
-
初始化ReplicateHRegionServer
> 构建ReplicationConfig:根据HBaseReaderParameter得到ReplicationConfig相关参数,包括HbaseName、RegionServer的handler数量、ZnodeParent、ZkConfig。
> 初始化ReplicateHRegionServer:根据ReplicationConfig初始化ZkClient和Hbase从集群的Conf,并由该Conf得到RpcServer的socket address,进而初始化RpcServer。
> 实例化相应的ZooKeeper连接和Watcher通知机制。
注:初始化HRecordChunk队列大小设置为1,初始化RpcServer的handler-count设置为2,设置大了也没用,因为TaskReader<->TaskWriter是单线程模型。 -
启动ReplicateHRegionServer
> 启动RpcServer,提供远程服务接口,处理RPC请求。
> 初始化从集群ZnodeParent,并为其添加子节点hbaseid和rs。hbaseid节点数据为从集群HBaseName的序列化值,rs下面的子节点数据为各个RpcServer的ServerName,格式为"RpcServer的hostName,随机端口,当前时间戳"(RpcServer的hostName即为其所在DataLink中Worker的IP)。
-
复制并解析HLog
> 当发生数据更新时,主集群随机获取从集群的一台RegionServer执行复制。最终RpcServer的Handler通过调用AdminProtos的replicateWALEntry方法,来发送RPC请求执行复制。
> 解析复制的WALEntrys数据,得到每行的RowKey和Cells信息,并将其转换为HRecords放到缓存队列中,用于HBase-Reader去fetch HBase主集群推送的最新数据。
- 【HBase Replication配置流程】
-
开启HBase数据复制功能(在集群的hbase-site.xml配置文件中开启复制功能,并重启集群),如下所示:
xml hbase.replication true ]]> -
开启HTable数据复制功能,如下所示:
sql 'column_family_name_1',REPLICATION_SCOPE => '1'},{NAME => 'column_family_name_2', REPLICATION_SCOPE => '1'}]]> -
约定Slave集群名称(如:hbase_replication),并在主集群中进行add_peer操作,如下所示:
sql或者
sql -
(可选)按需增加要同步的表或列族名单
sql -
Replication参数配置
参数名称 所属文件 配置说明 replication.source.nb.capacity hbase-site.xml 主集群每次向从集群发送的entry最大的个数,可根据集群规模做出适当调整 replication.source.size.capacity hbase-site.xml 主集群每次向从集群发送的entry的包的最大值大小 replication.source.ratio hbase-site.xml 主集群使用的从集群的RS的数据百分比,默认为0.1,需调整为1,充分利用从集群的RS replication.sleep.before.failover hbase-site.xml 主集群在RS宕机多长时间后进行failover,默认为2秒,具体的sleep时间是:
sleepBeforeFailover + (long) (new Random().nextFloat() * sleepBeforeFailover)replication.executor.workers hbase-site.xml 从事replication的线程数,如果写入量大,可以适当调大 【附】Replication常用命令
名称 | 描述 |
---|---|
add_peer | 添加集群间复制关系 |
disable_peer | 禁用集群间复制关系,但仍然保持对新改动的跟踪 |
enable_peer | 启用集群间复制关系,从上次关闭的位置继续复制 |
list_peers | 显示所有的集群间复制关系 |
remove_peer | 删除某个集群间复制关系 |
list_replicated_tables | 列出所有启用复制功能的表 |
set_peer_tableCFs | 设置某个peer下的需要复制的表(列族) |
show_peer_tableCFs | 显示某个peer下参与复制的表(列族) |
enable_table_replication | 开启某张表的复制功能 |
disable_table_replication | 禁用某张表的复制功能 |
-
【WAL日志/HLog介绍】
> WAL(Write-Ahead-Log)是HBase的RegionServer在处理数据插入和删除的过程中用来记录操作内容的一种日志。Client在向RegionServer端提交数据的时候,会优先写WAL日志,即在put 和delete写入内存之前都会写入WAL,然后再放入到实际拥有记录的存储文件的MemStore中。防止机器崩溃时能恢复。
> WAL在创建Region的时候创建,默认每个RegionServer有1个WAL,在HBase1.0开始支持多个WALHBASE-5699,这样可以提高写入的吞吐量。配置参数为hbase.wal.provider=multiwal,支持的值还有defaultProvider和filesystem(这2个是同样的实现)。
> WAL的持久化的级别默认为SYNC_WAL,同步写入WAL日志文件,保证数据写入了DataNode节点。主要实现类是FSHLog,负责将数据写入HDFS文件系统。
> 对每个WAL的写入使用的是多生产者单消费者的模式,这里使用到了disruptor框架,将WALKey和WALEdit信息封装为FSWALEntry,然后通过RingBufferTruck放入RingBuffer中。
WALKey:WAL日志的key,包括 regionName: 日志所属的Region,tablename:日志所属的表,writeTime:日志写入时间,clusterIds:cluster的id,在数据复制的时候会用到。
WALEdit:在hbase的事务日志中记录一系列的修改的一条事务日志。另外WALEdit实现了Writable接口,可用于序列化处理。
-
【按月/按年分表】
针对HBase按月/按年分表的表,通过以下两种方法简化操作:
> 主集群在每个月/每年新建表的时候,会自动为新表增加Replication属性,并添加到需要的peer中;
> DataLink在配置映射的时候,引入通配符机制,不用单独为每个月份/年份的表做配置(详见MediaInfo.Mode的Yearly和Monthly选项)。 -
【LeaderTask机制】
由于DataLink的HBaseTask模拟是HBASE的从集群,即一个HbaseTask模拟的是一个RegionServer,同一个Slave-Cluster下的HbaseTask的映射配置都是一样的,因此,DataLink引入LeaderTask机制来简化Task配置,只需在LeaderTask上进行配置即可,,其它FollowerTask会复用LeaderTask的配置信息。
-
【串行消费HLog】
主集群的RegionServer是单线程随机推送HLog,即使一个TaskReader接收到了多个RegionServer推送的HLog,同时RpcServer开启了多个Handler并行执行复制,但是由于TaskReader到TaskWriter的消费是单线程模型,所以,HBase-Reader目前未实现并发消费HLog。我们创建HBaseTask的时候,会比源端集群的RegionServer数目多,所以,同一时刻,打到一个Task上的请求数不会太多。
- 【场景测试】
测试场景 | 测试结果(描述) |
---|---|
Slave-Cluster整体不可用时 |
Master-Cluster会一直保留Hlog,直至Slave-Cluster恢复运行,数据不会丢失 |
某个peer被disable | Master-Cluster仍然会保持对新改动的跟踪,待peer恢复时,从上次关闭的位置继续复制,期间产生的数据不会丢失 |
Slave-Region-Server抛异常给 |
Master-Cluster接收到某个RegionServer抛出的异常之后,会进行重试(重试间隔10s),重试三次之后如果仍然报错, |
truncate操作 |
和关系型数据库类似,truncate操作是一个类ddl操作,HLog中不会记录删除数据 Slave-Cluster也不会接收到truncate类型的事件 |
HTable一次commit多个put |
【测试方法】:com.ucar.datalink.reader.hbase.replicate.ReplicateHRegionServerTest.putDataBatch() 【测试结果】:ReplicateHRegionServer接收WALEntry的批次和Htable进行commit的批次没有任何关系,HTable进行一次commit, 但ReplicateHRegionServer分几十次获取到了WALEntry |
负载均衡测试 | 【测试方法】:com.ucar.datalink.reader.hbase.replicate.ReplicateHRegionServerTest.putDataBatch() 【测试结果】:启动多个HbaseTask,由putDataBatch()方法产生的put操作,是被多个HbaseTask平均消费的 |
-
【HBaseReaderParameter】
在继承Reader插件通用参数基类(PluginReaderParameter,详见深入Task)的基础上,HBaseReaderParameter还根据HBaseTask的特点扩展了自己的参数类,用户可以根据需求在页面更改其参数配置。
参数名称 | 参数描述 |
---|---|
replZkMediaSourceId |
Task所模拟的HRegionServer,注册时所需要的zookeeper数据源信息 |
replZnodeParent |
Task所模拟的HRegionServer所属的Hbase集群名称(zk上的根节点名称) 注:replZkMediaSourceId和replZnodeParent都相同的Task,属于同一个Replication-Group |
-
【HRecord】
HBase-Reader插件将每条HBase的变更数据抽象为HRecord。其主要参数如下:HRecord参数 参数描述 备注 rowKey
HBase中表的主键 类型:byte[]
namespace
表所属的namespace
tableName
HBase的数据表名
columns
数据表的列信息
类型:List<HColumn>
HColumn包含列族、列名、数值、类型、timestamp
RSI
Record资源标识符
-
【同步映射相关参数】
Mapping参数 参数描述 备注 taskId
LeaderTask的id
同步映射只需要在LeaderTask上配置 sourceMediaId
源端表的id
对于HBase中按年或月分表的,只需配置一个通配符即可:
YEARLY,//按年分表的表名模式(表+后缀"${yyyy}")
MONTHLY,//按月分表的表名模式(表+后缀"${yyyyMM}")
SINGLE,//正常单表模式
关联技术 | 稳定版本 | 待测版本 |
---|---|---|
hbase | 0.98.16.1-hadoop2 |
注:HBase复制特性在0.94.*版本并不成熟。
- 【数据缺失问题】
每次拿到的数据只是一次数据变更事件,严格意义上,往hdfs做同步的时候,按现有的处理方式有数据丢失的问题。 - 【delete事件还原】
需要将WALEntry进行深度还原,后期为了能支持hbase向hbase做数据同步,HbaseWriter插件需要参考HRegionServer的代码,将HRecord反解析成hbase操作。
- 参考资料:
http://www.cnblogs.com/ios123/p/6410986.html
http://blog.csdn.net/shenliang1985/article/details/51420112