初次使用hbase
Toc
  1. 背景
  2. 收集环节
  3. 分析、存储
  4. hbase 的 row key 考虑
  5. 效率


背景

因为用到了 hbase,也算第一次纯 java 开发,所以做一下笔记,

小组内有个日志收集系统,最近一个月把日志的存储从 mysql 迁移到 hbase,这种写入后就不会再做修改的应用场景,非常适合切换到 hbase。

改造之前存在的问题:

  1. 每天能收集到500G+的日志,现有的方案只存储异常日志,当某种日志很多时,从mysql中查询时会超时
  2. 收集上来的日志裸露存在普通硬盘,单个磁盘最大2T。由一个脚本每个小时做一次rsync,平均分散到4个机器的某个磁盘,当该磁盘写满后,需要手工修改脚本切换另一个磁盘
  3. 存储冗余,磁盘裸露保存最近n天的日志,mysql里又保存最近m天的异常日志
  4. 需要额外维护脚本定时删除磁盘、mysql里的日志
  5. 未有存储所有日志

简单来说,需要定期人工维护;存储层不容易横向扩展,后者是关键点。

一个月前有新的需求,需要有一个大前提,要有最近n天的所有日志;快速读取日志查询日志。

后来决定把存储从 mysql 切换到 hbase,搜索组那边有一个 hbase 集群,可直接使用。


收集环节

这个小系统现在分为两个部分:收集、分析和存储。

收集环节,由 master 端把收集任务下发到分布式的 agent,agent 拿到任务后,依赖 inotify 机制读取指定的日志文件,提交到 master。

master 端实时接收agent的数据,每个小时创建新的文件,文件名带有「小时」段的时间戳。

现有方案是另有一个写db的进程,把master吐出的文件写入 mysql, 它会分析日志内容,只保存异常日志。

以上程序全部由C实现。


分析、存储

为了不影响现有的方案,实现平稳改造,继续依赖 inotify 。我另外用 java 写了个小程序,使用了一个叫 JNotify 的jar包,该包封装了 inotify。

  1. 在 java 进程中开一个 inoitfy 线程,检查 master 定时吐出的文件,当 inotify 检测到「新创建」的文件,回调 JNotifyListener 接口下的函数
  2. 回调函数把新创建的文件,按轮训的策略,平均分发给 n 个 worker 线程中的其中一个,存入该 worker 线程的文件队列里,inotify 线程的逻辑到此为止
  3. 每个 worker 线程通过检测文件位移 offset,判断是否有新的日志内容需要读取。这里没有继续依赖 inotify 了,有利有弊
  4. worker 线程把新增的日志内容 map 到内存中,一次最多读取 10M 内容
  5. worker 线程根据日志的种类,调用不同的分析器;为每种日志类型实现一个分析器 parser, 接口声明下的 parser 会做3个事情
  6. 分析后由 worker 写入hbase,value为每行日志内容,考虑到 value 的长度,现在是每1000条提交一次,同时记录当前位移 offset
  7. worker 线程定时从队列中删除1个小时前的文件

在 worker 的流程中,只是在调用分析器这里需要对日志种类做一下判断,其余的逻辑跟文件类型无关。以后若有其他种类的日志,为该类日志新增一个分析器,worker 再增加对该分析器的调用就可以了。

分析器 parser 做3个事情:

  1. 维护它自己的正则表达式,使用该表达式分析日志,表达式可为空,可配置
  2. 根据分析情况,返回一个字符串到 worker 线程,worker 线程使用这个字符串来拼接 hbase 的 row key,不能为空。后续从hbase查询时,会优先使用这个字符串,即为了方便查询
  3. 保存统计信息,分析日志的过程可以顺便做简单统计,worker 线程调用该函数提交统计信息;该函数也可以什么都不做。因为一些简单的分析、统计任务,合并在一起后,后续就没必要由 map reduce 再次从 hbase 日志中读取日志来一次分析、统计。好吧,其实是搜索组说机器的计算资源不够,目前也才几百G的日志文件,统计的过程对导入过程的效率影响不大。


hbase 的 row key 考虑

hbase 里的数据表预先创建好,设置了 TTL, 暂时日志保存3天,后续根据需要再做调整,减轻人工维护存储成本。

hbase(main):001:0> describe 'table_name_xxx'
DESCRIPTION                                                                                       ENABLED
 'table_name_xxx', {NAME => 'l', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_ true
 SCOPE => '0', VERSIONS => '1', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0', TTL => '259200 SECO
 NDS (3 DAYS)', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKC
 ACHE => 'true'}

在 row key 的固定位置,保存有上面提到的各个分析器生成的字符串,以及毫秒级别的时间戳,为了避免相同时间戳,还有固定长度的 index 最为整个 row key 的后缀,目的就是为了保证 row key 的唯一。

row key 的开始部分,则根据业务情况,让连续的日志尽可能多的连续写在一块,但不同的日志,则尽可能地分散到各个 region。

而从 hbase 查询时,指定 start row 提供给到 hbase 一个开始查询的位置,再通过前缀匹配过滤,比如,按我的row key 设计,对于字符串810552074_29_0,表示某个服务器上的某种日志(这个串是属于已有的配置信息,查询之前是已知的),那我要查询这种日志在某个时间点之后的日志,就很简单了:

scan 'table_name_xxx', STARTROW=>'810552074_29_0_1409296887375', FILTER=>"PrefixFilter('810552074_29_0')", LIMIT=>10

查询后,记录下最后一行的 row key(完整的key,比如810552074_27_0_1409380765531_00000495),作为下次查询的start key, 第2、3、4…次查询所花费的时间,比第1次快了很多。

因为有一个查询日志的页面给到其他团队的开发同学,他们在页面上可以实现类似tail -f xx日志tail -n 100 xx日志的功能,无需登录服务器就可以看到他们项目的日志了(没权限…)

查询样例:

String query_demo(String startKey, String prefixFilter, String qualifier, int limit)
{
    Scan scan = new Scan();
    // todo, tested later
    scan.setCaching(100);
    scan.setBatch(1);

    String lastRowKey = null;
    String rowValue;
    try {
        // start row key, important
        scan.setStartRow(startKey.getBytes());
        // set filter as needed
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        filterList.addFilter(new PrefixFilter(prefixFilter.getBytes()));
        if (qualifier != null && !"".equals(qualifier)) {
            filterList.addFilter(new QualifierFilter(
                            CompareFilter.CompareOp.EQUAL,
                            new BinaryComparator(qualifier.getBytes()))
            );
        }
        scan.setFilter(filterList);

        // create a hbase connection
        HTable htable = new HTable(XXXCLASS.getHbaseSharedConf(), XXXCLASS.getTable_name());
        ResultScanner rs = htable.getScanner(scan);
        int cnt = 0;

        try {
            for (Result r = rs.next(); r != null; r = rs.next()) {
                for (Cell cell : r.rawCells()) {
                    lastRowKey = new String(CellUtil.cloneRow(cell));
                    rowValue = new String(CellUtil.cloneValue(cell));

                    // now we get a rowValue...
                }
                cnt++;
                if (cnt >= limit) {
                    break;
                }
            }
        } catch (Exception e) {
            log.error("exp: ", e);
        } finally {
            rs.close();
            htable.close();
        }
    } catch (Exception ex) {
        log.error("exp", ex);
    }

    return lastRowKey;
}


效率

  1. 导入 hbase 时,平均 ?KB/ms
  2. 从 hbase 查询,第一次查询 ?ms,后续查询平均 ?ms


link:

  1. JNotify: http://jnotify.sourceforge.net
  2. The Apache HBase™ Reference Guide: http://hbase.apache.org/book.html



– EOF –

Categories: java
Tags: java