Elasticsearch日志监控–自定义flume-elasticsearch-sink

2018年11月27日

为了方便监控 ES 的慢查询日志, 采用方案:flume+elasticsearch+kibana方式, 但是我们ES版本是6.*, 而Flume官方版本只兼容ES1.7…

所以需要自定义flume 对接ES的 Sink代码

 

  1. Flume 原理&架构

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;

 

Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink; Event本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去

 

Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是

1)Source: 完成对日志数据的收集,分成transtion 和 event 打入到channel之中
Flume提供了各种source的实现,包括Avro Source、 Exce Source、 SpoolingDirectory Source、 NetCat Source、 Syslog Source、 Syslog TCP Source、Syslog UDP Source、 HTTP Source、 HDFS Source, etc。

2)Channel: Flume Channel主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Flume对于Channel, 则提供了Memory Channel、 JDBC Chanel、 File Channel,etc

3)Sink: Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
包括HDFS sink、 Logger sink、 Avro sink、 File Roll sink、 Null sink、 HBasesink, etc。

 

 

Flume可靠性—事务机制

Flume 使用事务性的方式保证传送Event整个过程的可靠性。

Sink 必须在Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。

这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。

比如 Flume支持在本地保存一份文件 channel 作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。

 

 

自定义flume-elasticsearch-sink(面向ES 6.* )

在es入库场景上,在需要保证数据的准确性,rest方式并不能保证结果的准确性,rest方式采用的是restClient,基于http协议(百万级数量级下会有数据丢失)

BulkProcessor使用的是TransportClient,基于Tcp协议; 因此Flume 向ES导入数据也是用BulkProcessor

public class ElasticSearchSink extends AbstractSink implements Configurable {

    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class);
    private BulkProcessor bulkProcessor;
    private IndexBuilder indexBuilder;
    private Serializer serializer;

    @Override
    public void configure(Context context) {
        String[] hosts = getHosts(context);
        if(ArrayUtils.isNotEmpty(hosts)) {
            TransportClient client = new ElasticsearchClientBuilder( //创建ES Client
                    context.getString(PREFIX + ES_CLUSTER_NAME, DEFAULT_ES_CLUSTER_NAME), hosts)
                    .setTransportSniff(context.getBoolean(
                            PREFIX + ES_TRANSPORT_SNIFF, false))
                    .setIgnoreClusterName(context.getBoolean(
                            PREFIX + ES_IGNORE_CLUSTER_NAME, false))
                    .setTransportPingTimeout(Util.getTimeValue(context.getString(
                            PREFIX + ES_TRANSPORT_PING_TIMEOUT), DEFAULT_ES_TIME))
                    .setNodeSamplerInterval(Util.getTimeValue(context.getString(
                            PREFIX + ES_TRANSPORT_NODE_SAMPLER_INTERVAL), DEFAULT_ES_TIME))
                    .build();
            buildIndexBuilder(context);
            buildSerializer(context);
            bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client); //创建ES的bulkProcesseor 实例(加载flume 配置文件中关于ES的配置信息,并通过ES client 批量发送数据)
        } else {
            logger.error("Could not create transport client, No host exist");
        }
    }

    //处理主函数,发送消息到ES
    @Override
    public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction txn = channel.getTransaction();//保证事务原子性
        txn.begin();
        try {
            Event event = channel.take();//从Channel中获取Event
            if (event != null) {
                String body = new String(event.getBody(), Charsets.UTF_8);
                if (!Strings.isNullOrEmpty(body)) {
                    logger.debug("start to sink event [{}].", body);
                    String index = indexBuilder.getIndex(event);
                    String type = indexBuilder.getType(event);
                    String id = indexBuilder.getId(event);
                    XContentBuilder xContentBuilder = serializer.serialize(event);
                    if(xContentBuilder != null) {
                        if (!StringUtil.isNullOrEmpty(id)) {
                            bulkProcessor.add(new IndexRequest(index, type, id)//添加event 中ES相关信息生成IndexRequest, 定义序列化函数, 并提交到bulkProcessor中处理
                                    .source(xContentBuilder));
                        } else {
                            bulkProcessor.add(new IndexRequest(index, type)
                                    .source(xContentBuilder));
                        }
                    } else {
                        logger.error("Could not serialize the event body [{}] for index [{}], type[{}] and id [{}] ",
                                new Object[]{body, index, type, id});
                    }
                }
                logger.debug("sink event [{}] successfully.", body);
            }
            txn.commit();
            return Status.READY;
        } catch (Throwable tx) {
            try {
                txn.rollback();
            } catch (Exception ex) {
                logger.error("exception in rollback.", ex);
            }
            logger.error("transaction rolled back.", tx);
            return Status.BACKOFF;
        } finally {
            txn.close();
        }
    }

    @Override
    public void stop() {
        if (bulkProcessor != null) {
            bulkProcessor.close();
        }
    }

    /**
     * builds Index builder  根据Flume配置文件中的index Builder定义,生成一个ES  index builder实例
     */
    private void buildIndexBuilder(Context context) {
        String indexBuilderClass = DEFAULT_ES_INDEX_BUILDER;
        if (StringUtils.isNotBlank(context.getString(ES_INDEX_BUILDER))) {
            indexBuilderClass = context.getString(ES_INDEX_BUILDER);
        }
        this.indexBuilder = instantiateClass(indexBuilderClass);
        if (this.indexBuilder != null) {
            this.indexBuilder.configure(context);
        }
    }

    /**
     * builds Serializer  定义序列化服务实例
     */
    private void buildSerializer(Context context) {
        String serializerClass = DEFAULT_ES_SERIALIZER;
        if (StringUtils.isNotEmpty(context.getString(ES_SERIALIZER))) {
            serializerClass = context.getString(ES_SERIALIZER);
        }
        this.serializer = instantiateClass(serializerClass);
        if(this.serializer != null) {
            this.serializer.configure(context);
        }
    }

    //根据类名,找到类,并生成一个实例
    private <T> T instantiateClass(String className) {
        try {
            @SuppressWarnings("unchecked")
            Class<T> aClass = (Class<T>) Class.forName(className);
            return aClass.newInstance();
        } catch (Exception e) {
            logger.error("Could not instantiate class " + className, e);
            Throwables.propagate(e);
            return null;
        }
    }

    /**
     * returns hosts  获取ES的 URL信息
     */
    private String[] getHosts(Context context) {
        String[] hosts = null;
        if (StringUtils.isNotBlank(context.getString(ES_HOSTS))) {
            hosts = context.getString(ES_HOSTS).split(",");
        }
        return hosts;
    }
}

 

报表显示可以通过elasticsearch -head插件, 或者kibana 显示日志内容

参考https://github.com/cognitree/flume-elasticsearch-sink

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注