flume+hive处理日志

Hadoop Jenner 11892℃ 0评论

翻译自:http://www.lopakalogic.com/articles/hadoop-articles/log-files-flume-hive/

现在的情况是,你被告知需要设计一套方案,用来处理公司所有的日志文件。当然,业务用户希望他们可以通过可以想象到的方式对这些数据进行查询,但是他们不会定义任何用例。这听起来像是你需要弄清楚的问题吗?如果是,那么你来对地方了。

你首先要考虑的是,日志文件会不断的增长、增长。所以你需要采用一个经济可行的方式。对数据量和不是很明确的需求,数据仓库并不是最好的选择。当然,你会选择hadoop。非常好,这是第一步。那么现在,你要如何把那些日志文件写入hadoop呢。

当然,对于目前的场景来说,flume是一个完美的选择。毕竟,flume是围绕收集日志而设计的。所以,你开始在网上搜索一些方案,你会发现一些信息,但问题是,你找到的这些示例,并不是你所需要的。有一个推特的over-used示例,并且有一些其他的json示例,但是你所需要的只是简单明了的日志文件。那些对于我们来说并不是很好的方案。希望,这篇文章能够帮助到你。

如何用flume和hive管理日志文件?这就是这篇文建将要讲述的。这里会降到收集日志到HDFS,并且用HIVE管理他们。这个示例将使用Juniper Netscreen Firewalls的日志文件,但是这个一般的方式调整后可以处理任何你需要收集和处理的日志。

源代码

下面讨论的所有代码,都可以在github上找到:

flume-logs repository

先决条件

我希望这篇文章不会长的不能忍受,所以我将做一些假设:

你已经有了一个已经安装并配置好的集群。我正在使用CDH4.5,但是这并没有什么特殊的,它应该可以在任何hadoop发行版中使用。

你已经对flume有了一个基本的了解。我不会讲一些一般的概念和设置。有很多很好的文章,在那里学习这些更合适。

你已经对hive有了一个基本的了解。和flume类似,我想跟你说一些你还不知道的。

你已经了解正则表达式。

解决方案概述

这个方案以flume收集事件开始。在这个示例中,为了简单我将使用netcat,但是在现实生活中,你可能会使用系统日志的一些变种。时间将会被source接收,并且我们需要做的唯一一件事就是保证在header有一个时间戳,这样我们将使用时间拦截器。

下一步,我们将使用memory channel。同样的,这也是为了简单。在生产环境中,你可能会根据你的需要和情况作出不同的选择。否则,在channel中不会发生什么特别的事。

最后,我们开始配置sink,从这里开始,事情开始变得有趣。我们已经知道我们希望把数据写入到HDFS中,所以我们需要一个HDFS sink。然而,我们也知道我们会使用到Hive使数据可以访问。因此,在Hive中,我们会对我们的数据进行分区。

分区最明显的方式就是使用日期和时间,这将是有意义的,因为将分开的日志条目写入到基于时间的桶中,你可以使用HiveQL提取信息特定的列。这将影响到我们如何写入数据。在flume中,我们需要使用基于时间的转义序列写入条目,在hive中,我们需要定义匹配的分区。

然而,复杂度更深一点,因为Hive要求我们使用模式。所以重要的是,我们需要从日志条目中提取数据并且将他们替换到列中,这样Hive才可以读取他们。这里,有若干个可以使用的方案,但是这个实力,我们将提取一些准系统信息。

到这里,我们清楚的知道,有一些可能的方案。不同的firewalls可能有不同的格式等等。在这个简单的示例中,我们需要提取数据替换成列格式,并且输出他们。但是,还有可能是,我们需要使用列和模式重新排列已经提取的数据。例如,你可能发现,你提取到第一块数据是作为模式的最后一列,所以你需要保存这一块数据,在最后输出。

而在Hive这边需要考虑的是,写入HDFS的方式。默认情况下,flume将数据作为一个序列化的文件写入,这可能对你的场景来说不能正常工作。在我的示例中,我需要保证这些日志文件是简单的,并且访问和读取是可能的,也就是他们需要被保存成CSV文件。因此,创建一个自定义的序列化器是必要的,它将提取,重排和输出CSV格式数据。

flume配置

flume配置在下面会被展示出来,并且它同样在github项目的conf目录下。有几点值得提一下。netcat source并不是特别有趣和复杂的。它只是一个基本的source。

sink同样有几点需要指出。第一是它假设你已经为flume user创建了(/user/flume/logs)目录,你还需要放宽这个目录的权限,以允许访问。

在需要的写入HDFS的地方写上wirteFormat=Text和fileType=DataStream。此外,使用custom serializer,CSVSerializer,它将使用正则表达式提取数据,而在这种情况下,仅仅是基本的连接信息中指定的正则表达式提取数据。regexorder属性用于重新排序正则表达式组来匹配Hive模式期望的输出。请注意,有必要使用完全限定类名来定义的串行器及指定嵌入生成器类,这将在源代码段来解释。

最后,rolling相关的属性,旨在一小时在HDFS上创建一个文件,用来减少生成文件的数量。当然,这些都需要根据条件被改变,或粒度降低,然后转到每日分区。flume已经创建了很多文件,如果你不妥善管理这些配置,这绝对不是一个好习惯,那绝对是一个需要注意和警惕的。

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

tier1.sources.source1.type     = netcat
tier1.sources.source1.bind     = 127.0.0.1
tier1.sources.source1.port     = 9999
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.interceptors.i1.preserveExisting = true
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
tier1.sinks.sink1.type         = hdfs
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.path = /user/flume/logs/year=%Y/month=%m/day=%d/hour=%H
tier1.sinks.sink1.hdfs.rollInterval = 3600
tier1.sinks.sink1.hdfs.rollCount = 10000
tier1.sinks.sink1.hdfs.batchSize = 10000
tier1.sinks.sink1.serializer = com.freitas.flume.serializer.CSVSerializer$Builder
tier1.sinks.sink1.serializer.format = CSV
tier1.sinks.sink1.serializer.regex = .* proto=(\\d+) .* src=(.*) dst=(.*) src_port=(\\d+) dst_port=(\\d+).*
tier1.sinks.sink1.serializer.regexorder = 5 1 2 3 4

tier1.sinks.sink1.channel      = channel1
tier1.channels.channel1.capacity = 100

 

自定义Serializer

自定义Serializer真的不是什么大不了的事。咋一看,好像很复杂,但只要你使用它,就会发现其实它很容易。我能很容易的跟随flume的源代码和其他东西。这个类需要实现EventSerializer接口。我真的只需要定义whrite方法,它可以做大量工作。我也需要定义一个内联构造器类,并且创建父类的实例对象。flume就是这样做的,我们实在无法跟它争辩。

在逻辑上,它其实是非常简单的。构造函数将检索配置属性和设置和创建将顺序的正则表达式组必须写出来索引的哈希正则表达式。然后再write方法中,它将会在每一行使用正则表达式,提取组,把他们的期望的输出顺序索引的哈希,最后,通过哈希读取,并且用逗号分隔符分隔后输出。

开发Serializer

开发自定义Serializer并不是完全明显,值得进行一些讨论。它需要被编译成一个Jar文件,然后这个Jar文件需要放到一个特定的位置,最后,flume将会找到和使用它。你需要考虑类的个数。这里只有一个雷,但是需要执行同样的步骤。我选择Moven项目,是的生成Jar文件很容易,但是你可能用的Ant或者其他生成方式。一旦你有了Jar文件,你需要做一些事情,例如:

cd /var/lib/flume-ng/plugins.d
mkdir -p plugins.d/flume-logs/lib
chown -R flume plugins.d
chgrp -R flume plugins.d
cp flume-logs-1.0.0.jar /var/lib/flume-ng/plugins.d/flume-logs/lib

Hive配置

在Hive这边,你需要运行Hive Shell并且使用与输出匹配的模式创建一个外部表。事实上,你大概会先确定所需要的模式开始,但本办法只是为了使我更容易组织文章。请注意,分区使用日期和时间分区,分区期望flume创建的目录。使用外部表的工作已经好了,因为你不需要做任何耗时的加载数据的工作。他只是在你需要的地方,并且Hive可以开始使用它了。

CREATE EXTERNAL TABLE networkData (
  action_time BIGINT,
  src_ip STRING,
  dest_ip STRING,
  src_port STRING,
  dest_port STRING,
  protocol STRING
)
PARTITIONED BY (year int, month int, day int, hour int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/flume/logs/';

 

现在,在这个方案中,这是我最喜欢的一件事。执行Alter Table添加一个新的分区是必要的,目的是为了让Hive可以看见数据。这个动作并不是做完之后就不需要了。它必须被定时任务或其他类似的东西执行。这只是Hive的工作方式。

ALTER TABLE networkData
ADD PARTITION (year = 2014, month = 01, day = 28, hour = 14);

生成事件

好了,现在所有的东西已经被配置好了。剩下的事情就是生成一些数据,并且开始做一些HiveQL查询。为了做这个,我写了一个python的脚本用来生成一些事件并且将他们写入到netcat中。如果你恰好有一个系统日志生成器,你可以随意使用它们。但是对我来说,我需要有一个可以生成一些数据的程序。

这个脚本可以在generator目录中找到,可以通过如下方式调用:

./gen_events.py 2>&1 | nc 127.0.0.1 9999

 

它还会创建一个重放日志文件,如果你因为一些原因需要重放日志的话。

未来的注意事项

正如你看到的,这个示例每个小时保存一个文件。这只是为了将来能够经常被引用的短期数据。这里的问题是文件的数量。我相信你已经知道,节点只能支持一定数量的文件。每个小时保存一个文件将会非常快的增长很多文件。在某些时候,汇总一些文件到一个文件是必要的,这只会使得它比较难引用一点。只是思考的问题。

一旦在Hive中有了数据,那么你就需要考虑下一个问题,怎么用这些有趣的数据做一些事情。根据谁会用这些数据,怎样用这些数据,你需要为你的用户设置一些东西。你可以期望你的用户不会是典型的Hadoop书呆子,喜欢自己。相反,他们将面向企业用户,您将需要使事情更容易为他们。当然,Hive允许做一些简单的处理,但是当你的用户需要到一个新的水平会发生什么?我冒昧地建议猪是一个很好的选择。Pig比写MR jobs更简单,但是依然提供了一套丰富的工具做了不少复杂的处理。你可能会发现,你需要开发的是少量的可以满足用户需求的Pig脚本。另外一些比较精明的用户将能够开始很快创建自己的Pig脚本。

结论

我希望你已经发现这篇文章是有用的,并且对于使用Flume和Hive管理日志文件是有用的。当你真的一步一步去做,会发现其实并不难,但是这些步骤就是我们的方法。

期望你在大数据的冒险中有好运。

原创文章,转载请注明: 转载自始终不够

本文链接地址: flume+hive处理日志

转载请注明:始终不够 » flume+hive处理日志

喜欢 (3)