背景
因为用到了 hbase,也算第一次纯 java 开发,所以做一下笔记,
小组内有个日志收集系统,最近一个月把日志的存储从 mysql 迁移到 hbase,这种写入后就不会再做修改的应用场景,非常适合切换到 hbase。
改造之前存在的问题:
- 每天能收集到500G+的日志,现有的方案只存储异常日志,当某种日志很多时,从mysql中查询时会超时
- 收集上来的日志裸露存在普通硬盘,单个磁盘最大2T。由一个脚本每个小时做一次rsync,平均分散到4个机器的某个磁盘,当该磁盘写满后,需要手工修改脚本切换另一个磁盘
- 存储冗余,磁盘裸露保存最近n天的日志,mysql里又保存最近m天的异常日志
- 需要额外维护脚本定时删除磁盘、mysql里的日志
- 未有存储所有日志
简单来说,需要定期人工维护;存储层不容易横向扩展,后者是关键点。
一个月前有新的需求,需要有一个大前提,要有最近n天的所有日志;快速读取日志查询日志。
后来决定把存储从 mysql 切换到 hbase,搜索组那边有一个 hbase 集群,可直接使用。
收集环节
这个小系统现在分为两个部分:收集、分析和存储。
收集环节,由 master 端把收集任务下发到分布式的 agent,agent 拿到任务后,依赖 inotify 机制读取指定的日志文件,提交到 master。
master 端实时接收agent的数据,每个小时创建新的文件,文件名带有「小时」段的时间戳。
现有方案是另有一个写db的进程,把master吐出的文件写入 mysql, 它会分析日志内容,只保存异常日志。
以上程序全部由C实现。
分析、存储
为了不影响现有的方案,实现平稳改造,继续依赖 inotify 。我另外用 java 写了个小程序,使用了一个叫 JNotify 的jar包,该包封装了 inotify。
- 在 java 进程中开一个 inoitfy 线程,检查 master 定时吐出的文件,当 inotify 检测到「新创建」的文件,回调 JNotifyListener 接口下的函数
- 回调函数把新创建的文件,按轮训的策略,平均分发给 n 个 worker 线程中的其中一个,存入该 worker 线程的文件队列里,inotify 线程的逻辑到此为止
- 每个 worker 线程通过检测文件位移 offset,判断是否有新的日志内容需要读取。这里没有继续依赖 inotify 了,有利有弊
- worker 线程把新增的日志内容 map 到内存中,一次最多读取 10M 内容
- worker 线程根据日志的种类,调用不同的分析器;为每种日志类型实现一个分析器 parser, 接口声明下的 parser 会做3个事情
- 分析后由 worker 写入hbase,value为每行日志内容,考虑到 value 的长度,现在是每1000条提交一次,同时记录当前位移 offset
- worker 线程定时从队列中删除1个小时前的文件
在 worker 的流程中,只是在调用分析器这里需要对日志种类做一下判断,其余的逻辑跟文件类型无关。以后若有其他种类的日志,为该类日志新增一个分析器,worker 再增加对该分析器的调用就可以了。
分析器 parser 做3个事情:
- 维护它自己的正则表达式,使用该表达式分析日志,表达式可为空,可配置
- 根据分析情况,返回一个字符串到 worker 线程,worker 线程使用这个字符串来拼接 hbase 的 row key,不能为空。后续从hbase查询时,会优先使用这个字符串,即为了方便查询
- 保存统计信息,分析日志的过程可以顺便做简单统计,worker 线程调用该函数提交统计信息;该函数也可以什么都不做。因为一些简单的分析、统计任务,合并在一起后,后续就没必要由 map reduce 再次从 hbase 日志中读取日志来一次分析、统计。好吧,其实是搜索组说机器的计算资源不够,目前也才几百G的日志文件,统计的过程对导入过程的效率影响不大。
hbase 的 row key 考虑
hbase 里的数据表预先创建好,设置了 TTL, 暂时日志保存3天,后续根据需要再做调整,减轻人工维护存储成本。
hbase(main):001:0> describe 'table_name_xxx'
DESCRIPTION ENABLED
'table_name_xxx', {NAME => 'l', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_ true
SCOPE => '0', VERSIONS => '1', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0', TTL => '259200 SECO
NDS (3 DAYS)', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKC
ACHE => 'true'}
在 row key 的固定位置,保存有上面提到的各个分析器生成的字符串,以及毫秒级别的时间戳,为了避免相同时间戳,还有固定长度的 index 最为整个 row key 的后缀,目的就是为了保证 row key 的唯一。
row key 的开始部分,则根据业务情况,让连续的日志尽可能多的连续写在一块,但不同的日志,则尽可能地分散到各个 region。
而从 hbase 查询时,指定 start row 提供给到 hbase 一个开始查询的位置,再通过前缀匹配过滤,比如,按我的row key 设计,对于字符串810552074_29_0
,表示某个服务器上的某种日志(这个串是属于已有的配置信息,查询之前是已知的),那我要查询这种日志在某个时间点之后的日志,就很简单了:
scan 'table_name_xxx', STARTROW=>'810552074_29_0_1409296887375', FILTER=>"PrefixFilter('810552074_29_0')", LIMIT=>10
查询后,记录下最后一行的 row key(完整的key,比如810552074_27_0_1409380765531_00000495),作为下次查询的start key, 第2、3、4…次查询所花费的时间,比第1次快了很多。
因为有一个查询日志的页面给到其他团队的开发同学,他们在页面上可以实现类似tail -f xx日志
或tail -n 100 xx日志
的功能,无需登录服务器就可以看到他们项目的日志了(没权限…)
查询样例:
String query_demo(String startKey, String prefixFilter, String qualifier, int limit)
{
Scan scan = new Scan();
// todo, tested later
scan.setCaching(100);
scan.setBatch(1);
String lastRowKey = null;
String rowValue;
try {
// start row key, important
scan.setStartRow(startKey.getBytes());
// set filter as needed
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(new PrefixFilter(prefixFilter.getBytes()));
if (qualifier != null && !"".equals(qualifier)) {
filterList.addFilter(new QualifierFilter(
CompareFilter.CompareOp.EQUAL,
new BinaryComparator(qualifier.getBytes()))
);
}
scan.setFilter(filterList);
// create a hbase connection
HTable htable = new HTable(XXXCLASS.getHbaseSharedConf(), XXXCLASS.getTable_name());
ResultScanner rs = htable.getScanner(scan);
int cnt = 0;
try {
for (Result r = rs.next(); r != null; r = rs.next()) {
for (Cell cell : r.rawCells()) {
lastRowKey = new String(CellUtil.cloneRow(cell));
rowValue = new String(CellUtil.cloneValue(cell));
// now we get a rowValue...
}
cnt++;
if (cnt >= limit) {
break;
}
}
} catch (Exception e) {
log.error("exp: ", e);
} finally {
rs.close();
htable.close();
}
} catch (Exception ex) {
log.error("exp", ex);
}
return lastRowKey;
}
效率
- 导入 hbase 时,平均 ?KB/ms
- 从 hbase 查询,第一次查询 ?ms,后续查询平均 ?ms
link:
- JNotify: http://jnotify.sourceforge.net
- The Apache HBase™ Reference Guide: http://hbase.apache.org/book.html
– EOF –