为了方便监控 ES 的慢查询日志, 采用方案:flume+elasticsearch+kibana方式, 但是我们ES版本是6.*, 而Flume官方版本只兼容ES1.7…
所以需要自定义flume 对接ES的 Sink代码
-
Flume 原理&架构
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;
Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink; Event本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去
Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是
1)Source: 完成对日志数据的收集,分成transtion 和 event 打入到channel之中
Flume提供了各种source的实现,包括Avro Source、 Exce Source、 SpoolingDirectory Source、 NetCat Source、 Syslog Source、 Syslog TCP Source、Syslog UDP Source、 HTTP Source、 HDFS Source, etc。
2)Channel: Flume Channel主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Flume对于Channel, 则提供了Memory Channel、 JDBC Chanel、 File Channel,etc
3)Sink: Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
包括HDFS sink、 Logger sink、 Avro sink、 File Roll sink、 Null sink、 HBasesink, etc。
Flume可靠性—事务机制
Flume 使用事务性的方式保证传送Event整个过程的可靠性。
Sink 必须在Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。
这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。
比如 Flume支持在本地保存一份文件 channel 作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。
自定义flume-elasticsearch-sink(面向ES 6.* )
在es入库场景上,在需要保证数据的准确性,rest方式并不能保证结果的准确性,rest方式采用的是restClient,基于http协议(百万级数量级下会有数据丢失)
BulkProcessor使用的是TransportClient,基于Tcp协议; 因此Flume 向ES导入数据也是用BulkProcessor
public class ElasticSearchSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class);
private BulkProcessor bulkProcessor;
private IndexBuilder indexBuilder;
private Serializer serializer;
@Override
public void configure(Context context) {
String[] hosts = getHosts(context);
if(ArrayUtils.isNotEmpty(hosts)) {
TransportClient client = new ElasticsearchClientBuilder( //创建ES Client
context.getString(PREFIX + ES_CLUSTER_NAME, DEFAULT_ES_CLUSTER_NAME), hosts)
.setTransportSniff(context.getBoolean(
PREFIX + ES_TRANSPORT_SNIFF, false))
.setIgnoreClusterName(context.getBoolean(
PREFIX + ES_IGNORE_CLUSTER_NAME, false))
.setTransportPingTimeout(Util.getTimeValue(context.getString(
PREFIX + ES_TRANSPORT_PING_TIMEOUT), DEFAULT_ES_TIME))
.setNodeSamplerInterval(Util.getTimeValue(context.getString(
PREFIX + ES_TRANSPORT_NODE_SAMPLER_INTERVAL), DEFAULT_ES_TIME))
.build();
buildIndexBuilder(context);
buildSerializer(context);
bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client); //创建ES的bulkProcesseor 实例(加载flume 配置文件中关于ES的配置信息,并通过ES client 批量发送数据)
} else {
logger.error("Could not create transport client, No host exist");
}
}
//处理主函数,发送消息到ES
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction txn = channel.getTransaction();//保证事务原子性
txn.begin();
try {
Event event = channel.take();//从Channel中获取Event
if (event != null) {
String body = new String(event.getBody(), Charsets.UTF_8);
if (!Strings.isNullOrEmpty(body)) {
logger.debug("start to sink event [{}].", body);
String index = indexBuilder.getIndex(event);
String type = indexBuilder.getType(event);
String id = indexBuilder.getId(event);
XContentBuilder xContentBuilder = serializer.serialize(event);
if(xContentBuilder != null) {
if (!StringUtil.isNullOrEmpty(id)) {
bulkProcessor.add(new IndexRequest(index, type, id)//添加event 中ES相关信息生成IndexRequest, 定义序列化函数, 并提交到bulkProcessor中处理
.source(xContentBuilder));
} else {
bulkProcessor.add(new IndexRequest(index, type)
.source(xContentBuilder));
}
} else {
logger.error("Could not serialize the event body [{}] for index [{}], type[{}] and id [{}] ",
new Object[]{body, index, type, id});
}
}
logger.debug("sink event [{}] successfully.", body);
}
txn.commit();
return Status.READY;
} catch (Throwable tx) {
try {
txn.rollback();
} catch (Exception ex) {
logger.error("exception in rollback.", ex);
}
logger.error("transaction rolled back.", tx);
return Status.BACKOFF;
} finally {
txn.close();
}
}
@Override
public void stop() {
if (bulkProcessor != null) {
bulkProcessor.close();
}
}
/**
* builds Index builder 根据Flume配置文件中的index Builder定义,生成一个ES index builder实例
*/
private void buildIndexBuilder(Context context) {
String indexBuilderClass = DEFAULT_ES_INDEX_BUILDER;
if (StringUtils.isNotBlank(context.getString(ES_INDEX_BUILDER))) {
indexBuilderClass = context.getString(ES_INDEX_BUILDER);
}
this.indexBuilder = instantiateClass(indexBuilderClass);
if (this.indexBuilder != null) {
this.indexBuilder.configure(context);
}
}
/**
* builds Serializer 定义序列化服务实例
*/
private void buildSerializer(Context context) {
String serializerClass = DEFAULT_ES_SERIALIZER;
if (StringUtils.isNotEmpty(context.getString(ES_SERIALIZER))) {
serializerClass = context.getString(ES_SERIALIZER);
}
this.serializer = instantiateClass(serializerClass);
if(this.serializer != null) {
this.serializer.configure(context);
}
}
//根据类名,找到类,并生成一个实例
private <T> T instantiateClass(String className) {
try {
@SuppressWarnings("unchecked")
Class<T> aClass = (Class<T>) Class.forName(className);
return aClass.newInstance();
} catch (Exception e) {
logger.error("Could not instantiate class " + className, e);
Throwables.propagate(e);
return null;
}
}
/**
* returns hosts 获取ES的 URL信息
*/
private String[] getHosts(Context context) {
String[] hosts = null;
if (StringUtils.isNotBlank(context.getString(ES_HOSTS))) {
hosts = context.getString(ES_HOSTS).split(",");
}
return hosts;
}
}
报表显示可以通过elasticsearch -head插件, 或者kibana 显示日志内容
参考https://github.com/cognitree/flume-elasticsearch-sink
没有评论