Logstash – 快速指南

Logstash – 快速指南


Logstash – 简介

Logstash 是一种基于过滤器/管道模式的工具,用于收集、处理和生成日志或事件。它有助于集中和实时分析来自不同来源的日志和事件。

Logstash 是用运行在 JVM 上的 JRuby 编程语言编写的,因此您可以在不同的平台上运行 Logstash。它从几乎所有类型的来源收集不同类型的数据,如日志、数据包、事件、事务、时间戳数据等。数据源可以是社交数据、电子商务、新闻文章、CRM、游戏数据、网络趋势、金融数据、物联网、移动设备等。

Logstash 一般特性

Logstash 的一般功能如下 –

  • Logstash 可以从不同来源收集数据并发送到多个目的地。

  • Logstash 可以处理所有类型的日志数据,如 Apache 日志、Windows 事件日志、网络协议数据、标准输入数据等等。

  • Logstash 还可以处理 http 请求和响应数据。

  • Logstash 提供了多种过滤器,可以帮助用户通过解析和转换数据来发现数据中的更多含义。

  • Logstash 还可用于处理物联网中的传感器数据。

  • Logstash 是开源的,在 Apache 许可版本 2.0 下可用。

Logstash 关键概念

Logstash 的关键概念如下 –

事件对象

它是 Logstash 中的主要对象,它封装了 Logstash 管道中的数据流。Logstash 使用此对象来存储输入数据并添加在过滤阶段创建的额外字段。

Logstash 为开发人员提供了一个事件 API 来操作事件。在本教程中,此事件被称为记录数据事件、日志事件、日志数据、输入日志数据、输出日志数据等各种名称。

管道

它由 Logstash 中从输入到输出的数据流阶段组成。输入数据进入管道并以事件的形式进行处理。然后以用户或终端系统所需的格式发送到输出目的地。

输入

这是 Logstash 管道中的第一阶段,用于获取 Logstash 中的数据以进行进一步处理。Logstash 提供了各种插件来从不同平台获取数据。一些最常用的插件是——文件、系统日志、Redis 和 Beats。

筛选

这是 Logstash 的中间阶段,事件的实际处理发生在这里。开发人员可以使用 Logstash 预定义的正则表达式模式来创建序列,以区分事件中的字段和接受的输入事件的标准。

Logstash 提供了各种插件来帮助开发人员解析事件并将其转换为理想的结构。一些最常用的过滤器插件是 – Grok、Mutate、Drop、Clone 和 Geoip。

输出

这是 Logstash 管道的最后一个阶段,在此阶段可以将输出事件格式化为目标系统所需的结构。最后,它使用插件将完成处理后的输出事件发送到目的地。一些最常用的插件是 – Elasticsearch、File、Graphite、Statsd 等。

Logstash 优势

以下几点说明了Logstash的各种优势。

  • Logstash 提供正则表达式模式序列来识别和解析任何输入事件中的各种字段。

  • Logstash 支持各种用于提取日志数据的 Web 服务器和数据源。

  • Logstash 提供了多个插件来解析日志数据并将其转换为任何用户所需的格式。

  • Logstash 是集中式的,这使得处理和收集来自不同服务器的数据变得容易。

  • Logstash 支持许多数据库、网络协议和其他服务作为日志事件的目标源。

  • Logstash 使用 HTTP 协议,这使得用户可以升级 Elasticsearch 版本,而无需在锁定步骤中升级 Logstash。

Logstash 的缺点

以下几点解释了Logstash的各种缺点。

  • Logstash 使用 http,这会对日志数据的处理产生负面影响。

  • 使用 Logstash 有时可能有点复杂,因为它需要对输入日志数据有很好的理解和分析。

  • 过滤器插件不是通用的,因此,用户可能需要找到正确的模式序列以避免解析错误。

在下一章中,我们将了解 ELK Stack 是什么以及它如何帮助 Logstash。

Logstash – ELK 堆栈

ELK 代表Elasticsearch、LogstashKibana在 ELK 堆栈中,Logstash 从不同的输入源中提取日志数据或其他事件。它处理事件,然后将其存储在 Elasticsearch 中。Kibana 是一个 Web 界面,它访问来自 Elasticsearch 的日志数据并将其可视化。

麋鹿

Logstash 和 Elasticsearch

Logstash 提供了输入输出 Elasticsearch 插件来读写日志事件到 Elasticsearch。Elasticsearch 公司也推荐将 Elasticsearch 作为输出目的地,因为它与 Kibana 兼容。Logstash 通过 http 协议将数据发送到 Elasticsearch。

Elasticsearch 提供批量上传功能,有助于将来自不同来源或 Logstash 实例的数据上传到集中式 Elasticsearch 引擎。ELK 与其他 DevOps 解决方案相比具有以下优势 –

  • ELK 堆栈更易于管理,并且可以扩展以处理 PB 级的事件。

  • ELK 堆栈架构非常灵活,它提供了与 Hadoop 的集成。Hadoop 主要用于归档目的。Logstash 可以使用flume 直接连接Hadoop,Elasticsearch 提供了一个名为es-hadoop的连接来连接Hadoop。

  • ELK 拥有的总成本远低于其替代品。

Logstash 和 Kibana

Kibana 不直接与 Logstash 交互,而是通过数据源,即 ELK 堆栈中的 Elasticsearch。Logstash 从每个来源收集数据,Elasticsearch 以非常快的速度对其进行分析,然后 Kibana 提供有关该数据的可操作见解。

Kibana 是一个基于 Web 的可视化工具,它可以帮助开发人员和其他人在 Elasticsearch 引擎中分析 Logstash 收集的大量事件的变化。这种可视化使预测或查看输入源的错误或其他重要事件趋势的变化变得容易。

Logstash – 安装

要在系统上安装 Logstash,我们应该按照以下步骤操作 –

步骤 1 – 检查计算机中安装的 Java 版本;它应该是 Java 8,因为它与 Java 9 不兼容。您可以通过以下方式检查:

在 Windows 操作系统 (OS) 中(使用命令提示符) –

> java -version 

在 UNIX 操作系统中(使用终端) –

$ echo $JAVA_HOME

第 2 步– 从以下位置下载 Logstash –

https://www.elastic.co/downloads/logstash

  • 对于 Windows 操作系统,下载 ZIP 文件。

  • 对于 UNIX 操作系统,下载 TAR 文件。

  • 对于 Debian 操作系统,请下载 DEB 文件。

  • 对于 Red Hat 和其他 Linux 发行版,请下载 RPN 文件。

  • APT 和 Yum 实用程序也可用于在许多 Linux 发行版中安装 Logstash。

步骤 3 – Logstash 的安装过程非常简单。让我们看看如何在不同平台上安装 Logstash。

注意– 不要在安装文件夹中放置任何空格或冒号。

  • Windows 操作系统– 解压缩 zip 包并安装 Logstash。

  • UNIX OS – 在任何位置提取 tar 文件并安装 Logstash。

$tar –xvf logstash-5.0.2.tar.gz

为 Linux 操作系统使用 APT 实用程序 –

  • 下载并安装公共签名密钥 –
$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  • 保存存储库定义 –
$ echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | sudo
   tee -a /etc/apt/sources.list.d/elastic-5.x.list
  • 运行更新 –
$ sudo apt-get update
  • 现在您可以使用以下命令进行安装 –
$ sudo apt-get install logstash

为 Debian Linux 操作系统使用 YUM 实用程序

  • 下载并安装公共签名密钥 –
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
  • 在您的“/etc/yum.repos.d/”目录中的带有 .repo 后缀的文件中添加以下文本。例如,logstash.repo

[logstash-5.x]
name = Elastic repository for 5.x packages
baseurl = https://artifacts.elastic.co/packages/5.x/yum
gpgcheck = 1
gpgkey = https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled = 1
autorefresh = 1
type = rpm-md
  • 您现在可以使用以下命令安装 Logstash –
$ sudo yum install logstash

步骤 4 – 转到 Logstash 主目录。在 bin 文件夹内,在 Windows 的情况下运行elasticsearch.bat文件,或者您可以使用命令提示符和通过终端执行相同操作。在 UNIX 中,运行 Logstash 文件。

我们需要指定输入源、输出源和可选过滤器。为了验证安装,您可以通过使用标准输入流 (stdin) 作为输入源和标准输出流 (stdout) 作为输出源,使用基本配置运行它。您也可以使用-e选项在命令行中指定配置

在 Windows 中 –

> cd logstash-5.0.1/bin
> Logstash -e 'input { stdin { } } output { stdout {} }'

在 Linux 中 –

$ cd logstash-5.0.1/bin
$ ./logstash -e 'input { stdin { } } output { stdout {} }'

注意– 在 Windows 的情况下,您可能会收到一个错误,指出 JAVA_HOME 未设置。为此,请将其在环境变量中设置为“C:\Program Files\Java\jre1.8.0_111”或您安装 java 的位置。

第 5 步– Logstash Web 界面的默认端口是 9600 到 9700,在logstash-5.0.1\config\logstash.yml 中定义为http.port,它将选择给定范围内的第一个可用端口。

我们可以通过浏览http://localhost:9600或端口是否不同来检查 Logstash 服务器是否已启动并运行,然后请检查命令提示符或终端。我们可以看到分配的端口为“成功启动 Logstash API 端点 {:port ⇒ 9600}。它将以以下方式返回一个 JSON 对象,其中包含有关已安装 Logstash 的信息 –

{
   "host":"manu-PC", 
   "version":"5.0.1",
   "http_address":"127.0.0.1:9600",
   "build_date":"2016-11-11T22:28:04+00:00",
   "build_sha":"2d8d6263dd09417793f2a0c6d5ee702063b5fada",
   "build_snapshot":false
}

Logstash – 内部架构

在本章中,我们将讨论 Logstash 的内部架构和不同组件。

Logstash 服务架构

Logstash 处理来自不同服务器和数据源的日志,它充当托运人。托运人用于收集日志,这些日志安装在每个输入源中。Redis、KafkaRabbitMQ这样的代理是为索引器保存数据的缓冲区,可能有多个代理作为故障转移实例。

Lucene这样的索引器用于索引日志以获得更好的搜索性能,然后输出存储在 Elasticsearch 或其他输出目的地。输出存储中的数据可用于 Kibana 和其他可视化软件。

Logstash 服务架构

Logstash 内部架构

Logstash 管道由三个组件Input、FiltersOutput 组成输入部分负责指定和访问Apache Tomcat Server的日志文件夹等输入数据源

Logstash 内部架构

解释 Logstash 管道的示例

Logstash 配置文件包含有关 Logstash 三个组件的详细信息。在本例中,我们将创建一个名为Logstash.conf的文件名

以下配置从输入日志“inlog.log”中捕获数据并将其写入输出日志“outlog.log”,而没有任何过滤器。

配置文件

Logstash 配置文件只是使用输入插件inlog.log文件中复制数据,并使用输出插件将日志数据刷新到outlog.log文件中。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/inlog.log"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/outlog.log"
   }
}

运行 Logstash

Logstash 使用–f选项指定配置文件。

C:\logstash\bin> logstash –f logstash.conf

日志文件

以下代码块显示了输入日志数据。

Hello tutorialspoint.com

输出日志

Logstash 输出包含消息字段中的输入数据。Logstash 还向输出添加了其他字段,如时间戳、输入源路径、版本、主机和标签。

{
   "path":"C:/tpwork/logstash/bin/log/inlog1.log",
   "@timestamp":"2016-12-13T02:28:38.763Z",
   "@version":"1", "host":"Dell-PC",
   "message":" Hello tutorialspoint.com", "tags":[]
}

尽你所能,Logstash 的输出包含的不仅仅是通过输入日志提供的数据。输出包含源路径、时间戳、版本、主机名和标签,用于表示错误等额外消息。

我们可以使用过滤器来处理数据并使其对我们的需求有用。在下一个示例中,我们使用过滤器来获取数据,这将输出限制为仅带有动词(如 GET 或 POST 后跟唯一资源标识符)的数据

配置文件

在这种Logstash配置中,我们将其命名添加过滤器神交滤除输入数据。与模式序列输入日志匹配的输入日志事件只会错误地到达输出目的地。Logstash 在输出事件中添加了一个名为“_grokparsefailure”的标签,该标签与 grok 过滤器模式序列不匹配。

Logstash 提供了许多内置的正则表达式模式来解析流行的服务器日志,比如 Apache。这里使用的模式需要一个动词,如 get、post 等,后跟一个统一的资源标识符。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/inlog2.log"
   }
}
filter {
   grok {
      match => {"message" => "%{WORD:verb} %{URIPATHPARAM:uri}"}
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/outlog2.log"
   }
}

运行 Logstash

我们可以使用以下命令运行 Logstash。

C:\logstash\bin> logstash –f  Logstash.conf

inlog2.log

我们的输入文件包含两个由默认分隔符分隔的事件,即新行分隔符。第一个事件与 GROk 中指定的模式匹配,第二个不匹配。

GET /tutorialspoint/Logstash
Input 1234

outlog2.log

我们可以看到第二个输出事件包含“_grokparsefailure”标签,因为它与 grok 过滤器模式不匹配。用户还可以使用输出插件中“if”条件在输出中删除这些不匹配的事件

{
   "path":"C:/tpwork/logstash/bin/log/inlog2.log",
   "@timestamp":"2016-12-13T02:47:10.352Z","@version":"1","host":"Dell-PC","verb":"GET",
   "message":"GET /tutorialspoint/logstash", "uri":"/tutorialspoint/logstash", "tags":[]
}
{
   "path":"C:/tpwork/logstash/bin/log/inlog2.log",
   "@timestamp":"2016-12-13T02:48:12.418Z", "@version":"1", "host":"Dell-PC",
   "message":"t 1234\r", "tags":["_grokparsefailure"]
}

Logstash – 收集日志

来自不同服务器或数据源的日志是使用托运人收集的。Shipper 是安装在服务器中的 Logstash 实例,它访问服务器日志并发送到特定的输出位置。

它主要将输出发送到 Elasticsearch 进行存储。Logstash 从以下来源获取输入 –

  • 标准输入
  • 系统日志
  • 文件
  • TCP/UDP
  • Microsoft Windows 事件日志
  • 网络套接字
  • 零米
  • 定制扩展

使用 Apache Tomcat 7 服务器收集日志

在这个例子中,我们使用文件输入插件收集安装在 windows 中的 Apache Tomcat 7 服务器的日志,并将它们发送到另一个日志。

日志文件

这里配置Logstash访问本地安装的Apache Tomcat 7的访问日志。在文件插件的路径设置中使用正则表达式模式从日志文件中获取数据。这在其名称中包含“访问”并添加了 apache 类型,这有助于将 apache 事件与集中目标源中的其他事件区分开来。最后,输出事件将显示在 output.log 中。

input {
   file {
      path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/*access*"
      type => "apache"
   }
} 
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令运行 Logstash。

C:\logstash\bin> logstash –f  Logstash.conf

Apache Tomcat 日志

访问 Apache Tomcat 服务器及其 Web 应用程序 ( http://localhost:8080 ) 以生成日志。日志中更新的数据由 Logstash 实时读取,并按照配置文件中的指定存储在 output.log 中。

Apache Tomcat 根据日期生成一个新的访问日志文件,并在那里记录访问事件。在我们的例子中,它是Apache Tomcat日志目录中的localhost_access_log.2016-12-24.txt

0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:00 +0800] "GET / HTTP/1.1" 200 11418
0:0:0:0:0:0:0:1 - munish [
   25/Dec/2016:18:37:02 +0800] "GET /manager/html HTTP/1.1" 200 17472
0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:08 +0800] "GET /docs/ HTTP/1.1" 200 19373
0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:10 +0800] "GET /docs/introduction.html HTTP/1.1" 200 15399

输出日志

您可以在输出事件中看到,添加了一个类型字段,并且该事件存在于消息字段中。

{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt",
   "@timestamp":"2016-12-25T10:37:00.363Z","@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:00 +0800] \"GET /
   HTTP/1.1\" 200 11418\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - munish [25/Dec/2016:18:37:02 +0800] \"GET /
   manager/html HTTP/1.1\" 200 17472\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:08 +0800] \"GET /docs/
   HTTP/1.1\" 200 19373\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:20.436Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:10 +0800] \"GET /docs/
   introduction.html HTTP/1.1\" 200 15399\r","type":"apache","tags":[]
}

使用 STDIN 插件收集日志

在本节中,我们将讨论使用STDIN 插件收集日志的另一个示例

日志文件

这是一个非常简单的示例,其中 Logstash 正在读取用户在标准输入中输入的事件。在我们的例子中,它是命令提示符,它将事件存储在 output.log 文件中。

input {
   stdin{}
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令运行 Logstash。

C:\logstash\bin> logstash –f  Logstash.conf

在命令提示符中写入以下文本 –

用户输入了以下两行。Logstash 通过分隔符设置分隔事件,其默认值为 ‘\n’。用户可以通过更改文件插件中分隔符的值来更改。

Tutorialspoint.com welcomes you
Simply easy learning

输出日志

以下代码块显示了输出日志数据。

{
   "@timestamp":"2016-12-25T11:41:16.518Z","@version":"1","host":"Dell-PC",
   "message":"tutrialspoint.com welcomes you\r","tags":[]
}
{
   "@timestamp":"2016-12-25T11:41:53.396Z","@version":"1","host":"Dell-PC",
   "message":"simply easy learning\r","tags":[]
}

Logstash – 支持的输入

Logstash 支持来自不同来源的大量日志。它正在与著名的来源合作,如下所述。

从指标收集日志

系统事件和其他时间活动记录在指标中。Logstash 可以从系统指标访问日志并使用过滤器处理它们。这有助于以自定义方式向用户显示事件的实时提要。根据指标过滤器flush_interval设置和默认情况下刷新指标;设置为 5 秒。

我们通过收集和分析通过 Logstash 运行的事件并在命令提示符上显示实时提要来跟踪 Logstash 生成的测试指标。

日志文件

此配置包含一个生成器插件,由 Logstash 提供用于测试指标并将类型设置设置为“生成”以进行解析。在过滤阶段,我们仅使用“if”语句处理具有生成类型的行。然后,指标插件计算仪表设置中指定的字段。指标插件在flush_interval 中指定的每 5 秒后刷新计数

最后,使用编解码器插件将过滤器事件输出到标准输出,如命令提示符以进行格式化。Codec 插件使用 [ events ][ rate_1m ] 值在 1 分钟滑动窗口中输出每秒事件。

input {
   generator {
     	type => "generated"
   }
}
filter {
   if [type] == "generated" {
      metrics {
         meter => "events"
         add_tag => "metric"
      }
   }
}
output {
   # only emit events with the 'metric' tag
   if "metric" in [tags] {
      stdout {
         codec => line { format => "rate: %{[events][rate_1m]}"
      }
   }
}

运行 Logstash

我们可以使用以下命令运行 Logstash。

>logsaths –f logstash.conf

标准输出(命令提示符)

rate: 1308.4
rate: 1308.4
rate: 1368.654529135342
rate: 1416.4796003951449
rate: 1464.974293984808
rate: 1523.3119444107458
rate: 1564.1602979542715
rate: 1610.6496496890895
rate: 1645.2184750334154
rate: 1688.7768007612485
rate: 1714.652283095914
rate: 1752.5150680019278
rate: 1785.9432934744932
rate: 1806.912181962126
rate: 1836.0070454626025
rate: 1849.5669494173826
rate: 1871.3814756851832
rate: 1883.3443123790712
rate: 1906.4879113216743
rate: 1925.9420717997118
rate: 1934.166137658981
rate: 1954.3176526556897
rate: 1957.0107444542625

从 Web 服务器收集日志

Web 服务器会生成大量有关用户访问和错误的日志。Logstash 有助于使用输入插件从不同服务器提取日志并将它们存储在一个集中位置。

我们正在从本地 Apache Tomcat 服务器stderr 日志中提取数据并将其存储在 output.log 中。

日志文件

这个 Logstash 配置文件指示 Logstash 读取 apache 错误日志并添加一个名为“apache-error”的标签。我们可以简单地使用文件输出插件将它发送到 output.log。

input {
   file {
      path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0 /logs/*stderr*"
      type => "apache-error"  
   }
} 
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令运行 Logstash。

>Logstash –f Logstash.conf

输入日志示例

这是示例stderr 日志,它在 Apache Tomcat 中发生服务器事件时生成。

C:\Program Files\Apache Software Foundation\Tomcat 7.0\logs\ tomcat7-stderr.2016-12-25.log

Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["http-bio-9999"]
Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["ajp-bio-8009"]
Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start
INFO: Server startup in 823 ms

输出日志

{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start\r",
   "type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"INFO: Starting ProtocolHandler [
      \"ajp-bio-8009\"]\r","type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start\r",
   "type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"INFO: Server startup in 823 ms\r","type":"apache-error","tags":[]
}

从数据源收集日志

首先,让我们了解如何配置 MySQL 进行日志记录。在[mysqld]下MySQL数据库服务器的my.ini文件添加以下几行

在 Windows 中,它存在于 MySQL 的安装目录中,位于 –

C:\wamp\bin\mysql\mysql5.7.11

在 UNIX 中,您可以在 -/etc/mysql/my.cnf 中找到它

general_log_file   = "C:/wamp/logs/queries.log"
general_log = 1

日志文件

在这个配置文件中,file plugin 用于读取 MySQL 日志并将其写入 ouput.log。

input {
   file {
      path => "C:/wamp/logs/queries.log"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

查询日志

这是在 MySQL 数据库中执行的查询生成的日志。

2016-12-25T13:05:36.854619Z   2 Query		select * from test1_users
2016-12-25T13:05:51.822475Z    2 Query	select count(*) from users
2016-12-25T13:05:59.998942Z    2 Query         select count(*) from test1_users

输出日志

{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:37.905Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:36.854619Z    2 Query\tselect * from test1_users",
   "tags":[]
}
{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:51.938Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:51.822475Z    2 Query\tselect count(*) from users",
   "tags":[]
}
{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:06:00.950Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:59.998942Z    2 Query\tselect count(*) from test1_users",
   "tags":[]
}

Logstash – 解析日志

Logstash 使用输入插件接收日志,然后使用过滤器插件来解析和转换数据。日志的解析和转换是根据输出目的地中存在的系统执行的。Logstash 解析日志数据并仅转发必填字段。稍后,这些字段被转换为目标系统的兼容且可理解的形式。

如何解析日志?

日志的解析是使用GROK(知识的图形表示)模式执行的,您可以在 Github 中找到它们 –

https://github.com/elastic/logstash/tree/v1.4.2/patterns

Logstash 将日志数据与指定的 GROK Pattern 或 Pattern 序列匹配,用于解析日志,例如“%{COMBINEDAPACHELOG}”,通常用于 apache 日志。

解析后的数据更加结构化,易于搜索和执行查询。Logstash 在输入日志中搜索指定的 GROK 模式并从日志中提取匹配的行。您可以使用 GROK 调试器来测试您的 GROK 模式。

GROK 模式的语法是 %{SYNTAX:SEMANTIC}。Logstash GROK 过滤器采用以下形式编写 –

%{PATTERN:FieldName}

这里,PATTERN 代表 GROK 模式,fieldname 是字段的名称,它代表输出中解析的数据。

例如,使用在线 GROK 调试器https://grokdebug.herokuapp.com/

输入

日志中的示例错误行 –

[Wed Dec 07 21:54:54.048805 2016] [:error] [pid 1234:tid 3456829102]
   [client 192.168.1.1:25007] JSP Notice:  Undefined index: abc in
   /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11

GROK 模式序列

此 GROK 模式序列与日志事件匹配,其中包含时间戳,后跟日志级别、进程 ID、事务 ID 和错误消息。

\[(%{DAY:day} %{MONTH:month} %{MONTHDAY} %{TIME} %{YEAR})\] \[.*:%{LOGLEVEL:loglevel}\]
   \[pid %{NUMBER:pid}:tid %{NUMBER:tid}\] \[client %{IP:clientip}:.*\]
   %{GREEDYDATA:errormsg}

输出

输出为 JSON 格式。

{
   "day": [
      "Wed"
   ],
   "month": [
      "Dec"
   ],
   "loglevel": [
      "error"
   ],
   "pid": [
      "1234"
   ],
   "tid": [
      "3456829102"
   ],
   "clientip": [
      "192.168.1.1"
   ],
   "errormsg": [
      "JSP Notice:  Undefined index: abc in
      /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11"
   ]
}

Logstash – 过滤器

Logstash 在输入和输出之间的管道中间使用过滤器。Logstash 度量的过滤器操作和创建事件,如Apache-Access许多过滤器插件用于管理 Logstash 中的事件。在这里,在Logstash 聚合过滤器的示例中,我们过滤数据库中每个 SQL 事务的持续时间并计算总时间。

安装聚合过滤器插件

使用 Logstash-plugin 实用程序安装聚合过滤器插件。Logstash-plugin 是 Logstash 中bin 文件夹windows 的批处理文件

>logstash-plugin install logstash-filter-aggregate

日志文件

在此配置中,您可以看到三个用于初始化、递增生成事务总持续时间的“if”语句,即sql_duration聚合插件用于添加 sql_duration,存在于输入日志的每个事件中。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - 
            %{NOTSPACE:taskid} - %{NOTSPACE:logger} - 
            %{WORD:label}( - %{INT:duration:int})?" 
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"    
   }
}

运行 Logstash

我们可以使用以下命令运行 Logstash。

>logstash –f logstash.conf 

输入日志

以下代码块显示了输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

输出日志

如配置文件中所指定,记录器所在的最后一个“if”语句 – TRANSACTION_END,它打印总事务时间或 sql_duration。这已在 output.log 中以黄色突出显示。

{
   "path":"C:/tpwork/logstash/bin/log/input.log","@timestamp": "2016-12-22T19:04:37.214Z",
   "loglevel":"INFO","logger":"TRANSACTION_START","@version": "1","host":"wcnlab-PC",
   "message":"8566 - TRANSACTION_START - start\r","tags":[]
}
{
   "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.366Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 320\r","taskid":"48566","tags":[]
}
{
   "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.373Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 200\r","taskid":"48566","tags":[]
}
{
   "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.380Z","loglevel":"INFO","logger":"TRANSACTION_END",
   "@version":"1","host":"wcnlab-PC","label":"end",
   "message":" INFO - 48566 - TRANSACTION_END - end\r","taskid":"48566","tags":[]
}

Logstash – 转换日志

Logstash 提供了各种插件来转换解析的日志。这些插件可以在日志中添加、删除更新字段,以便在输出系统中更好地理解和查询。

我们正在使用Mutate 插件在输入日志的每一行中添加一个字段名称 user。

安装 Mutate 过滤器插件

安装 mutate 过滤器插件;我们可以使用以下命令。

>Logstash-plugin install Logstash-filter-mutate

日志文件

在这个配置文件中,Mutate Plugin 添加在 Aggregate Plugin 之后添加一个新字段。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [ "message", "%{LOGLEVEL:loglevel} -
         %{NOTSPACE:taskid} - %{NOTSPACE:logger} -
         %{WORD:label}( - %{INT:duration:int})?" ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ; 
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
   mutate {
      add_field => {"user" => "tutorialspoint.com"}
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令运行 Logstash。

>logstash –f logstash.conf

输入日志

以下代码块显示了输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

输出日志

您可以看到输出事件中有一个名为“user”的新字段。

{
   "path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.383Z",
   "@version":"1",
   "host":"wcnlab-PC",
   "message":"NFO - 48566 - TRANSACTION_START - start\r",
   "user":"tutorialspoint.com","tags":["_grokparsefailure"]
}
{
   "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.383Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 320\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}
{
   "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO",
   "logger":"SQL","@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 200\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}
{
   "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO",
   "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC","label":"end",
   "message":" INFO - 48566 - TRANSACTION_END - end\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}

Logstash – 输出阶段

输出是 Logstash 管道的最后一个阶段,它将过滤器数据从输入日志发送到指定的目的地。Logstash 提供多个输出插件,将过滤后的日志事件存储到各种不同的存储和搜索引擎。

存储日志

Logstash 可以将过滤后的日志存储在File、Elasticsearch Engine、stdout、AWS CloudWatch等中。TCP、UDP、Websocket等网络协议也可以在 Logstash 中用于将日志事件传输到远程存储系统。

在 ELK 堆栈中,用户使用 Elasticsearch 引擎来存储日志事件。在下面的示例中,我们将为本地 Elasticsearch 引擎生成日志事件。

安装 Elasticsearch 输出插件

我们可以使用以下命令安装 Elasticsearch 输出插件。

>logstash-plugin install Logstash-output-elasticsearch

日志文件

此配置文件包含一个 Elasticsearch 插件,该插件将输出事件存储在本地安装的 Elasticsearch 中。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [ "message", "%{LOGLEVEL:loglevel} -
      %{NOTSPACE:taskid} - %{NOTSPACE:logger} -  
      %{WORD:label}( - %{INT:duration:int})?" ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
   mutate {
      add_field => {"user" => "tutorialspoint.com"}
   }
}
output {
   elasticsearch {
      hosts => ["127.0.0.1:9200"]
   }
}

输入日志

以下代码块显示了输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

在本地主机上启动 Elasticsearch

要在本地主机上启动 Elasticsearch,您应该使用以下命令。

C:\elasticsearch\bin> elasticsearch

Elasticsearch 准备就绪后,您可以通过在浏览器中输入以下 URL 来检查它。

http://本地主机:9200/

回复

以下代码块显示了 Elasticsearch 在 localhost 的响应。

{
   "name" : "Doctor Dorcas",
   "cluster_name" : "elasticsearch",
   "version" : {
      "number" : "2.1.1",
      "build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
      "build_timestamp" : "2015-12-15T13:05:55Z",
      "build_snapshot" : false,
      "lucene_version" : "5.3.1"
   },
   "tagline" : "You Know, for Search"
}

注意– 有关 Elasticsearch 的更多信息,您可以单击以下链接。

https://www.tutorialspoint.com/elasticsearch/index.html

现在,使用上述 Logstash.conf 运行 Logstash

>Logstash –f Logstash.conf

在输出日志中粘贴上述文本后,该文本将被 Logstash 存储在 Elasticsearch 中。您可以通过在浏览器中输入以下 URL 来检查存储的数据。

http://localhost:9200/logstash-2017.01.01/_search?pretty

回复

是存放在索引Logstash-2017.01.01中的JSON格式的数据。

{
   "took" : 20,
   "timed_out" : false,
   "_shards" : {
      "total" : 5,
      "successful" : 5,
      "failed" : 0
   },
   "hits" : {
      "total" : 10,
      "max_score" : 1.0,
      "hits" : [ {
         "_index" : "logstash-2017.01.01",
         "_type" : "logs",
         "_id" : "AVlZ9vF8hshdrGm02KOs",
         "_score" : 1.0,
         "_source":{
            "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log", 
            "@timestamp":"2017-01-01T12:17:49.140Z","loglevel":"INFO",
            "logger":"SQL","@version":"1","host":"wcnlab-PC",
            "label":"transaction1",
            "message":" INFO - 48566 - SQL - transaction1 - 200\r",
            "user":"tutorialspoint.com","taskid":"48566","tags":[]
         }
      },
      {
         "_index" : "logstash-2017.01.01",
         "_type" : "logs",
         "_id" : "AVlZ9vF8hshdrGm02KOt",
         "_score" : 1.0,
         "_source":{
            "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
            "@timestamp":"2017-01-01T12:17:49.145Z","loglevel":"INFO",
            "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC",
            "label":"end",
            "message":" INFO - 48566 - TRANSACTION_END - end\r",
            "user":"tutorialspoint.com","taskid":"48566","tags":[]
         }
      }
   }
}

Logstash – 支持的输出

Logstash 提供了多个插件来支持各种数据存储或搜索引擎。日志的输出事件可以发送到输出文件、标准输出或 Elasticsearch 等搜索引擎。Logstash 支持三种类型的输出,它们是 –

  • 标准输出
  • 文件输出
  • 空输出

现在让我们详细讨论其中的每一个。

标准输出 (stdout)

它用于将过滤后的日志事件作为数据流生成到命令行界面。这是将数据库事务的总持续时间生成到标准输出的示例。

日志文件

此配置文件包含一个 stdout 输出插件,用于将总 sql_duration 写入标准输出。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid}
            - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" 
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   if [logger] == "TRANSACTION_END" {
      stdout {
         codec => line{format => "%{sql_duration}"}
      }
   }
}

注意– 如果尚未安装,请安装聚合过滤器。

>logstash-plugin install Logstash-filter-aggregate

运行 Logstash

我们可以使用以下命令运行 Logstash。

>logstash –f logsatsh.conf

输入日志

以下代码块显示了输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END – end

stdout(它将是 Windows 中的命令提示符或 UNIX 中的终端)

这是总的 sql_duration 320 + 200 = 520。

520

文件输出

Logstash 还可以将过滤器日志事件存储到输出文件中。我们将使用上述示例并将输出存储在文件中,而不是 STDOUT。

日志文件

此 Logstash 配置文件指示 Logstash 将总 sql_duration 存储到输出日志文件。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input1.log"
   }
} 
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} -
            %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" 
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   if [logger] == "TRANSACTION_END" {
      file {
         path => "C:/tpwork/logstash/bin/log/output.log"
         codec => line{format => "%{sql_duration}"}
      }
   }
}

运行日志

我们可以使用以下命令运行 Logstash。

>logstash –f logsatsh.conf

输入日志

以下代码块显示了输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END – end

输出日志

以下代码块显示了输出日志数据。

520

空输出

这是一个特殊的输出插件,用于分析输入和过滤插件的性能。

Logstash – 插件

Logstash 为其管道的所有三个阶段(输入、过滤器和输出)提供了各种插件。这些插件帮助用户从各种来源(如 Web 服务器、数据库、网络协议等)捕获日志。

捕获后,Logstash 可以将数据解析并转换为用户需要的有意义的信息。最后,Logstash 可以将有意义的信息发送或存储到各种目标源,如 Elasticsearch、AWS Cloudwatch 等。

输入插件

Logstash 中的输入插件可帮助用户从各种来源提取和接收日志。使用输入插件的语法如下 –

Input {
   Plugin name {
      Setting 1……
      Setting 2……..
   }
}

您可以使用以下命令下载输入插件 –

>Logstash-plugin install Logstash-input-<plugin name>

Logstash-plugin 实用程序位于Logstash 安装目录bin 文件夹中。下表列出了 Logstash 提供的输入插件。

Sr.No. 插件名称和说明
1

beats

从 elastic beats 框架中获取日志数据或事件。

2

cloudwatch

从 CloudWatch 中提取事件,这是 Amazon Web Services 提供的 API。

3

couchdb_changes

来自使用此插件提供的 couchdb 的 _chages URI 的事件。

4

drupal_dblog

使用启用的 DBLog 提取 drupal 的看门狗日志记录数据。

5

Elasticsearch

检索在 Elasticsearch 集群中执行的查询的结果。

6

eventlog

从 Windows 事件日志中获取事件。

7

exec

获取 shell 命令输出作为 Logstash 中的输入。

8

file

从输入文件中获取事件。当 Logstash 与输入源一起在本地安装并且可以访问输入源日志时,这很有用。

9

generator

它用于测试目的,这会创建随机事件。

10

github

从 GitHub webhook 捕获事件。

11

graphite

从石墨监控工具获取指标数据。

12

heartbeat

它还用于测试并产生类似心跳的事件

13

http

通过两种网络协议收集日志事件,即 http 和 https。

14

http_poller

它用于将 HTTP API 输出解码为事件。

15

jdbc

它将 JDBC 事务转换为 Logstash 中的事件。

16

jmx

使用 JMX 从远程 Java 应用程序中提取指标。

17

log4j

通过 TCP 套接字从 Log4j 的 socketAppender 对象捕获事件。

18

rss

将命令行工具的输出作为 Logstash 中的输入事件。

19

tcp

通过 TCP 套接字捕获事件。

20

twitter

从 Twitter 流 API 收集事件。

21

unix

通过 UNIX 套接字收集事件。

22

websocket

通过 websocket 协议捕获事件。

23

xmpp

通过 Jabber/xmpp 协议读取事件。

插件设置

所有插件都有其特定的设置,这有助于指定插件中的重要字段,如端口、路径等。我们将讨论一些输入插件的设置。

文件

此输入插件用于直接从输入源中存在的日志或文本文件中提取事件。它的工作方式类似于 UNIX 中的 tail 命令,保存上次读取的游标并仅从输入文件中读取新追加的数据,但可以使用 star_position 设置进行更改。以下是此输入插件的设置。

Setting Name 默认值 描述
add_field {} 将新字段附加到输入事件。
close_older 3600 上次读取时间(以秒为单位)超过此插件中指定的文件将被关闭。
codec “清楚的” 它用于在进入 Logstash 管道之前对数据进行解码。
delimiter “\n” 它用于指定新的行分隔符。
discover_interval 15 它是在指定路径中发现新文件之间的时间间隔(以秒为单位)。
enable_metric 真的 它用于启用或禁用指定插件的指标报告和收集。
exclude 它用于指定应从输入插件中排除的文件名或模式。
Id 为该插件实例指定唯一标识。
max_open_files 它随时指定 Logstash 的最大输入文件数。
path 指定文件的路径,它可以包含文件名的模式。
start_position “结尾” 如果需要,您可以更改为“开始”;最初 Logstash 应该从头开始读取文件,而不仅仅是新的日志事件。
start_interval 1 它以秒为单位指定时间间隔,在此之后 Logstash 检查已修改的文件。
tags 要添加任何其他信息,例如 Logstash,当任何日志事件未能符合指定的 grok 过滤器时,它会在标签中添加“_grokparsefailure”。
type 这是一个特殊字段,您可以将其添加到输入事件中,它在过滤器和 kibana 中很有用。

弹性搜索

这个特定的插件用于读取 Elasticsearch 集群中的搜索查询结果。以下是此插件中使用的设置 –

Setting Name 默认值 描述
add_field {} 与文件插件相同,用于在输入事件中附加字段。
ca_file 用于指定 SSL 证书颁发机构文件的路径。
codec “清楚的” 它用于在进入 Logstash 管道之前解码来自 Elasticsearch 的输入事件。
docinfo “错误的” 如果您想从 Elasticsearch 引擎中提取索引、类型和 id 等附加信息,您可以将其更改为 true。
docinfo_fields [“_index”, “_type”, “_id”] 您可以在 Logstash 输入中删除不需要的任何字段。
enable_metric 真的 它用于启用或禁用该插件实例的报告和指标收集。
hosts 它用于指定所有弹性搜索引擎的地址,这些引擎将作为该 Logstash 实例的输入源。语法是host:port 或IP:port。
Id 它用于为该特定输入插件实例提供唯一标识号。
index “logstash-*” 它用于指定索引名称或模式,Logstash 将监视 Logstash 以进行输入。
password 用于身份验证。
query “{ \”sort\”: [ \”_doc\” ] }” 查询执行。
ssl 错误的 启用或禁用安全套接字层。
tags 在输入事件中添加任何附加信息。
type 它用于对输入表单进行分类,以便在后期轻松搜索所有输入事件。
user 出于真实目的。

事件簿

这个输入插件从 windows 服务器的 win32 API 读取数据。以下是此插件的设置 –

Setting Name 默认值 描述
add_field {} 与文件插件相同,用于在输入事件中附加一个字段
codec “清楚的” 用于解码来自windows的输入事件;在进入 Logstash 管道之前
logfile [“应用”、“安全”、“系统”] 输入日志文件中所需的事件
interval 1000 它以毫秒为单位,定义了两次连续检查新事件日志之间的间隔
tags 在输入事件中添加任何附加信息
type 它用于将特定插件的输入表单分类为给定类型,以便在后期轻松搜索所有输入事件

推特

此输入插件用于从其 Streaming API 收集 twitter 的提要。下表描述了此插件的设置。

Setting Name 默认值 描述
add_field {} 与文件插件相同,用于在输入事件中附加一个字段
codec “清楚的” 用于解码来自windows的输入事件;在进入 Logstash 管道之前
consumer_key 它包含 Twitter 应用程序的使用者密钥。欲了解更多信息,请访问https://dev.twitter.com/apps/new
consumer_secret 它包含 Twitter 应用程序的消费者密钥。欲了解更多信息,请访问https://dev.twitter.com/apps/new
enable_metric 真的 它用于启用或禁用该插件实例的报告和指标收集
follows

它指定用逗号分隔的用户 ID,LogStash 会检查这些用户在 Twitter 中的状态。

欲了解更多信息,请访问

https://dev.twitter.com

full_tweet 错误的 如果您希望 Logstash 读取从 twitter API 返回的完整对象,您可以将其更改为 true
id 它用于为该特定输入插件实例提供唯一标识号
ignore_retweets 错误的 您可以更改设置为 true 以忽略输入推特提要中的转推
keywords 这是一组关键字,需要在 twitters 输入源中进行跟踪
language 它定义了 LogStash 需要的来自输入 twitter 提要的推文的语言。这是一个标识符数组,它定义了 Twitter 中的特定语言
locations 根据指定的位置从输入源中过滤掉推文。这是一个数组,其中包含位置的经度和纬度
oauth_token 这是一个必需的文件,其中包含用户 oauth 令牌。欲了解更多信息,请访问以下链接https://dev.twitter.com/apps
oauth_token_secret 这是一个必需的文件,其中包含用户 oauth 秘密令牌。欲了解更多信息,请访问以下链接https://dev.twitter.com/apps
tags 在输入事件中添加任何附加信息
type 它用于将特定插件的输入表单分类为给定类型,以便在后期轻松搜索所有输入事件

TCP

TCP 用于通过 TCP 套接字获取事件;它可以从模式设置中指定的用户连接或服务器读取。下表描述了此插件的设置 –

Setting Name 默认值 描述
add_field {} 与文件插件相同,用于在输入事件中附加一个字段
codec “清楚的” 用于解码来自windows的输入事件;在进入 Logstash 管道之前
enable_metric 真的 它用于启用或禁用该插件实例的报告和指标收集
host “0.0.0.0” 客户端依赖的服务器操作系统的地址
id 它包含 Twitter 应用程序的消费者密钥
mode “服务器” 它用于指定输入源是服务器还是客户端。
port 它定义了端口号
ssl_cert 用于指定SSL证书的路径
ssl_enable 错误的 启用或禁用 SSL
ssl_key 指定 SSL 密钥文件的路径
tags 在输入事件中添加任何附加信息
type 它用于将特定插件的输入表单分类为给定类型,以便在后期轻松搜索所有输入事件

Logstash – 输出插件

Logstash 支持各种输出源和不同的技术,如数据库、文件、电子邮件、标准输出等。

使用输出插件的语法如下 –

output {
   Plugin name {
      Setting 1……
      Setting 2……..
   }
}

您可以使用以下命令下载输出插件 –

>logstash-plugin install logstash-output-<plugin name>

Logstash-插件工具存在于Logstash安装目录下的bin文件夹。下表描述了 Logstash 提供的输出插件。

Sr.No. 插件名称和说明
1

CloudWatch

此插件用于将聚合指标数据发送到亚马逊 Web 服务的 CloudWatch。

2

csv

它用于以逗号分隔的方式编写输出事件。

3

Elasticsearch

用于将输出日志存储在 Elasticsearch 索引中。

4

email

它用于在生成输出时发送通知电子邮件。用户可以在电子邮件中添加有关输出的信息。

5

exec

它用于运行与输出事件匹配的命令。

6

ganglia

它将指标扭曲到 Gangila 的 gmond。

7

gelf

它用于以 GELF 格式为 Graylog2 生成输出。

8

google_bigquery

它将事件输出到 Google BigQuery。

9

google_cloud_storage

它将输出事件存储到 Google Cloud Storage。

10

graphite

它用于将输出事件存储到 Graphite。

11

graphtastic

它用于在 Windows 上编写输出指标。

12

hipchat

它用于将输出日志事件存储到 HipChat。

13

http

它用于将输出日志事件发送到 http 或 https 端点。

14

influxdb

它用于在 InfluxDB 中存储输出事件。

15

irc

它用于将输出事件写入 irc。

16

mongodb

它将输出数据存储在 MongoDB 中。

17

nagios

它用于将被动检查结果通知 Nagios。

18

nagios_nsca

它用于通过 NSCA 协议将被动检查结果通知 Nagios。

19

opentsdb

它将 Logstash 输出事件存储到 OpenTSDB。

20

pipe

它将输出事件流式传输到另一个程序的标准输入。

21

rackspace

用于将输出的日志事件发送到 Rackspace Cloud 的 Queue 服务。

22

redis

它使用 rpush 命令将输出的日志数据发送到 Redis 队列。

23

riak

它用于将输出事件存储到 Riak 分布式键/值对。

24

s3

它将输出日志数据存储到 Amazon Simple Storage Service。

25

sns

它用于将输出事件发送到 Amazon 的 Simple Notification Service。

26

solr_http

它在 Solr 中索引和存储输出日志数据。

27

sps

它用于将事件发送到 AWS 的 Simple Queue Service。

28

statsd

它用于将指标数据发送到 statsd 网络守护程序。

29

stdout

它用于在 CLI 的标准输出(如命令提示符)上显示输出事件。

30

syslog

它用于将输出事件发送到系统日志服务器。

31

tcp

它用于将输出事件发送到 TCP 套接字。

32

udp

它用于通过 UDP 推送输出事件。

33

websocket

它用于通过 WebSocket 协议推送输出事件。

34

xmpp

它用于通过 XMPP 协议推送输出事件。

所有插件都有其特定的设置,这有助于指定插件中的重要字段,如端口、路径等。我们将讨论一些输出插件的设置。

弹性搜索

Elasticsearch 输出插件使 Logstash 能够将输出存储在 Elasticsearch 引擎的特定集群中。这是用户的著名选择之一,因为它包含在 ELK Stack 包中,因此为 Devops 提供端到端解决方案。下表描述了此输出插件的设置。

Setting Name 默认值 描述
action 指数 它用于定义在 Elasticsearch 引擎中执行的操作。此设置的其他值是删除、创建、更新等。
cacert 它包含带有 .cer 或 .pem 的文件路径,用于服务器的证书验证。
codec “清楚的” 它用于在将输出日志数据发送到目标源之前对其进行编码。
doc_as_upset 错误的 此设置用于更新操作的情况。如果在输出插件中未指定文档 ID,它将在 Elasticsearch 引擎中创建一个文档。
document_type 它用于将相同类型的事件存储在相同的文档类型中。如果未指定,则事件类型用于相同。
flush_size 500 这用于提高 Elasticsearch 中批量上传的性能
hosts [“127.0.0.1”] 它是用于输出日志数据的目标地址数组
idle_flush_time 1 它定义了两次刷新之间的时间限制(秒),Logstash 在此设置中指定的时间限制后强制刷新
index “logstash-%{+YYYY.MM.dd}” 用于指定Elasticsearch引擎的索引
manage_temlpate 真的 用于在 Elasticsearch 中应用默认模板
parent 用于指定Elasticsearch中父文档的id
password 用于验证对 Elasticsearch 中安全集群的请求
path 用于指定 Elasticsearch 的 HTTP 路径。
pipeline 它用于设置摄取管道,用户希望为事件执行
proxy 用于指定 HTTP 代理
retry_initial_interval 2 它用于设置批量重试之间的初始时间间隔(秒)。每次重试后都会加倍,直到达到 retry_max_interval
retry_max_interval 64 用于设置 retry_initial_interval 的最大时间间隔
retry_on_conflict 1 Elasticsearch 更新文档的重试次数
ssl 启用或禁用保护到 Elasticsearch 的 SSL/TLS
template 包含自定义模板在 Elasticsearch 中的路径
template_name “日志存储” 这用于在 Elasticsearch 中命名模板
timeout 60 它是对 Elasticsearch 的网络请求的超时时间
upsert “” 它更新文档,或者如果 document_id 不存在,它会在 Elasticsearch 中创建一个新文档
user 它包含在安全的 Elasticsearch 集群中对 Logstash 请求进行身份验证的用户

电子邮件

当 Logstash 生成输出时,电子邮件输出插件用于通知用户。下表描述了此插件的设置。

Setting Name 默认值 描述
address “本地主机” 这是邮件服务器的地址
attachments [] 它包含附加文件的名称和位置
body “” 它包含电子邮件的正文,应该是纯文本
cc 它包含以逗号分隔的电子邮件地址,用于电子邮件抄送
codec “清楚的” 它用于在将输出日志数据发送到目标源之前对其进行编码。
contenttype “文本/html;字符集= UTF-8” 它用于电子邮件的内容类型
debug 错误的 用于在调试模式下执行邮件中继
domain “本地主机” 它用于设置发送电子邮件的域
from “logstash.alert@nowhere.com” 它用于指定发件人的电子邮件地址
htmlbody “” 用于指定 html 格式的邮件正文
password 它用于与邮件服务器进行身份验证
port 25 用于定义与邮件服务器通信的端口
replyto 用于指定email回复字段的email id
subject “” 它包含电子邮件的主题行
use_tls 错误的 启用或禁用与邮件服务器通信的 TSL
username 包含用于与服务器进行身份验证的用户名
via “smtp” 它定义了 Logstash 发送电子邮件的方法

网址

此设置用于通过 http 将输出事件发送到目的地。这个插件有以下设置 –

Setting Name 默认值 描述
automatic_retries 1 用于设置logstash的http请求重试次数
cacert 它包含服务器证书验证的文件路径
codec “清楚的” 它用于在将输出日志数据发送到目标源之前对其进行编码。
content_type I 指定对目的服务器的http请求的内容类型
cookies 真的 它用于启用或禁用 cookie
format “json” 用于设置http请求体的格式
headers 它包含http头的信息
http_method “” 用于指定logstash在请求中使用的http方法,取值可以是“put”、“post”、“patch”、“delete”、“get”、“head”
request_timeout 60 它用于与邮件服务器进行身份验证
url 指定 http 或 https 端点是此插件的必需设置

标准输出

stdout 输出插件用于在命令行界面的标准输出上写入输出事件。它是 Windows 中的命令提示符和 UNIX 中的终端。此插件具有以下设置 –

Setting Name 默认值 描述
codec “清楚的” 它用于在将输出日志数据发送到目标源之前对其进行编码。
workers 1 它用于指定输出的工人数量

统计数据

它是一个网络守护进程,用于通过 UDP 将矩阵数据发送到目标后端服务。它是 Windows 中的命令提示符和 UNIX 中的终端。这个插件有以下设置 –

Setting Name 默认值 描述
codec “清楚的” 它用于在将输出日志数据发送到目标源之前对其进行编码。
count {} 它用于定义要在指标中使用的计数
decrement [] 它用于指定递减指标名称
host “本地主机” 它包含 statsd 服务器的地址
increment [] 它用于指定增量度量名称
port 8125 它包含statsd服务器的端口
sample_rate 1 它用于指定指标的采样率
sender “%{主持人}” 它指定发件人的姓名
set {} 它用于指定一个集合度量
timing {} 它用于指定时间度量
workers 1 它用于指定输出的工人数量

过滤插件

Logstash 支持各种过滤器插件来解析输入日志并将其转换为更结构化且易于查询的格式。

使用过滤器插件的语法如下 –

filter {
   Plugin name {
      Setting 1……
      Setting 2……..
   }
}

您可以使用以下命令下载过滤器插件 –

>logstash-plugin install logstash-filter-<plugin name>

Logstash-plugin 实用程序位于 Logstash 安装目录的 bin 文件夹中。下表描述了 Logstash 提供的输出插件。

Sr.No. 插件名称和说明
1

aggregate

该插件从相同类型的各种事件中收集或聚合数据,并在最终事件中处理它们

2

alter

它允许用户更改日志事件的字段,而 mutate 过滤器不处理

3

anonymize

它用于用一致的哈希替换字段的值

4

cipher

它用于在将输出事件存储到目标源之前对其进行加密

5

clone

它用于在 Logstash 中创建输出事件的副本

6

collate

它按时间或计数合并来自不同日志的事件

7

csv

该插件根据分隔符解析输入日志中的数据

8

date

它解析事件中字段的日期并将其设置为事件的时间戳

9

dissect

该插件可帮助用户从非结构化数据中提取字段,并使 grok 过滤器可以轻松正确地解析它们

10

drop

它用于删除所有相同类型或任何其他相似的事件

11

elapsed

它用于计算开始和结束事件之间的时间

12

Elasticsearch

它用于将 Elasticsearch 中存在的先前日志事件的字段复制到 Logstash 中的当前字段

13

extractnumbers

它用于从日志事件中的字符串中提取数字

14

geoip

它在事件中添加了一个字段,其中包含日志事件中存在的 IP 位置的纬度和经度

15

grok

是常用的过滤器插件,用于解析事件获取字段

16

i18n

它从日志事件中的文件中删除特殊字符

17

json

它用于在事件中或在事件的特定字段中创建结构化的 Json 对象

18

kv

这个插件在匹配日志数据中的键值对时很有用

19

metrics

它用于聚合指标,例如计算每个事件的持续时间

20

multiline

它也是常用的过滤器插件之一,它可以帮助用户将多行日志数据转换为单个事件。

21

mutate

此插件用于重命名、删除、替换和修改事件中的字段

22

range

它用于根据预期范围和范围内的字符串长度检查事件中字段的数值。

23

ruby

它用于运行任意 Ruby 代码

24

sleep

这使 Logstash 休眠指定的时间

25

split

它用于拆分事件的字段并将所有拆分值放置在该事件的克隆中

26

xml

它用于通过解析日志中存在的 XML 数据来创建事件

编解码器插件

编解码器插件可以是输入或输出插件的一部分。这些插件用于更改或格式化日志数据表示。Logstash 提供多个编解码器插件,如下所示 –

Sr.No. 插件名称和说明
1

avro

此插件将 Logstash 事件序列化为 avro 数据或将 avro 记录解码为 Logstash 事件

2

cloudfront

此插件从 AWS cloudfront 读取编码数据

3

cloudtrail

该插件用于从 AWS cloudtrail 读取数据

4

collectd

这从称为通过 UDP 收集的二进制协议读取数据

5

compress_spooler

它用于将 Logstash 中的日志事件压缩为假脱机批次

6

dots

这是通过为每个事件设置一个点到标准输出来使用性能跟踪

7

es_bulk

这用于将来自 Elasticsearch 的批量数据转换为 Logstash 事件,包括 Elasticsearch 元数据

8

graphite

此编解码器将石墨中的数据读入事件并将事件转换为石墨格式的记录

9

gzip_lines

该插件用于处理 gzip 编码的数据

10

json

这用于将 Json 数组中的单个元素转换为单个 Logstash 事件

11

json_lines

它用于处理带有换行符的 Json 数据

12

line

它插件将在单个 live 中读取和写入事件,这意味着在换行分隔符之后会有一个新事件

13

multiline

它用于将多行日志数据转换为单个事件

14

netflow

此插件用于将 nertflow v5/v9 数据转换为 logstash 事件

15

nmap

它将 nmap 结果数据解析为 XML 格式

16

plain

这读取没有分隔符的文本

17

rubydebug

这个插件将使用 Ruby Awesome 打印库编写输出 Logstash 事件

构建你自己的插件

您还可以在 Logstash 中创建自己的插件,以满足您的需求。Logstash-plugin 实用程序用于创建自定义插件。在这里,我们将创建一个过滤器插件,它将在事件中添加自定义消息。

生成基础结构

用户可以使用 logstash-plugin 实用程序的 generate 选项生成必要的文件,也可以在 GitHub 上找到它。

>logstash-plugin generate --type filter --name myfilter --path c:/tpwork/logstash/lib

在这里,type选项用于指定插件是输入、输出或过滤器。在这个例子中,我们正在创建一个名为myfilter的过滤器插件path 选项用于指定路径,您希望在其中创建插件目录。执行上述命令后,您将看到创建了一个目录结构。

开发插件

您可以在插件目录\lib\logstash\filters文件夹中找到插件的代码文件文件扩展名为.rb

在我们的例子中,代码文件位于以下路径中 –

C:\tpwork\logstash\lib\logstash-filter-myfilter\lib\logstash\filters\myfilter.rb

我们将消息更改为 − 默认⇒“嗨,您正在tutorialspoint.com 上学习”并保存文件。

安装插件

安装这个插件需要修改Logstash的Gemfile。你可以在Logstash的安装目录中找到这个文件。在我们的例子中,它将在C:\tpwork\logstash 中使用任何文本编辑器编辑此文件并在其中添加以下文本。

gem "logstash-filter-myfilter",:path => "C:/tpwork/logstash/lib/logstash-filter-myfilter"

在上面的命令中,我们指定了插件的名称以及我们可以在哪里找到它进行安装。然后,运行 Logstash-plugin 实用程序来安装此插件。

>logstash-plugin install --no-verify

测试

在这里,我们在前面的示例之一中添加了myfilter

日志文件

此 Logstash 配置文件在 grok 过滤器插件之后的过滤器部分中包含 myfilter。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input1.log"
   } 
}
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} -
            %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
   }
   myfilter{}
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output1.log"
      codec => rubydebug
   }
}

运行日志

我们可以使用以下命令运行 Logstash。

>logstash –f logsatsh.conf

输入日志

以下代码块显示了输入日志数据。

INFO - 48566 - TRANSACTION_START - start

输出日志

以下代码块显示了输出日志数据。

{
   "path" => "C:/tpwork/logstash/bin/log/input.log",
   "@timestamp" => 2017-01-07T06:25:25.484Z,
   "loglevel" => "INFO",
   "logger" => "TRANSACTION_END",
   "@version" => "1",
   "host" => "Dell-PC",
   "label" => "end",
   "message" => "Hi, You are learning this on tutorialspoint.com",
   "taskid" => "48566",
   "tags" => []
}

在 Logstash 上发布

开发人员还可以通过将其上传到 github 并遵循 Elasticsearch 公司定义的标准化步骤将他/她的自定义插件发布到 Logstash。

有关发布的更多信息,请参阅以下 URL –

https://www.elastic.co/guide/en/logstash/current/contributing-to-logstash.html

Logstash – 监控 API

Logstash 提供 API 来监控其性能。这些监控 API 提取有关 Logstash 的运行时指标。

节点信息 API

该API用于获取Logstash的节点信息。它以 JSON 格式返回 OS、Logstash 管道和 JVM 的信息。

您可以通过使用以下 URL 向 Logstash发送获取请求来提取信息

GET http://localhost:9600/_node?pretty

回复

以下是节点信息 API 的响应。

{
   "host" : "Dell-PC",
   "version" : "5.0.1",
   "http_address" : "127.0.0.1:9600",
   
   "pipeline" : {
      "workers" : 4,
      "batch_size" : 125,
      "batch_delay" : 5,
      "config_reload_automatic" : false,
      "config_reload_interval" : 3
   },
   "os" : {
      "name" : "Windows 7",
      "arch" : "x86",
      "version" : "6.1",
      "available_processors" : 4
   },
   "jvm" : {
      "pid" : 312,
      "version" : "1.8.0_111",
      "vm_name" : "Java HotSpot(TM) Client VM",
      "vm_version" : "1.8.0_111",
      "vm_vendor" : "Oracle Corporation",
      "start_time_in_millis" : 1483770315412,
      
      "mem" : {
         "heap_init_in_bytes" : 16777216,
         "heap_max_in_bytes" : 1046937600,
         "non_heap_init_in_bytes" : 163840,
         "non_heap_max_in_bytes" : 0
      },
      "gc_collectors" : [ "ParNew", "ConcurrentMarkSweep" ]
   }
}

您还可以通过在 URL 中添加它们的名称来获取 Pipeline、OS 和 JVM 的特定信息。

GET http://localhost:9600/_node/os?pretty
GET http://localhost:9600/_node/pipeline?pretty
GET http://localhost:9600/_node/jvm?pretty

插件信息 API

该 API 用于获取有关 Logstash 中已安装插件的信息。您可以通过向下面提到的 URL 发送获取请求来检索此信息 –

GET http://localhost:9600/_node/plugins?pretty

回复

以下是插件信息 API 的响应。

{
   "host" : "Dell-PC",
   "version" : "5.0.1",
   "http_address" : "127.0.0.1:9600",
   "total" : 95,
   "plugins" : [ {
      "name" : "logstash-codec-collectd",
      "version" : "3.0.2"
   },
   {
      "name" : "logstash-codec-dots",
      "version" : "3.0.2"
   },
   {
      "name" : "logstash-codec-edn",
      "version" : "3.0.2"
   },
   {
      "name" : "logstash-codec-edn_lines",
      "version" : "3.0.2"
   },
   ............
}

节点统计 API

该 API 用于提取 JSON 对象中的 Logstash(Memory、Process、JVM、Pipeline)的统计信息。您可以通过向下面提到的 URLS 发送获取请求来检索此信息 –

GET http://localhost:9600/_node/stats/?pretty
GET http://localhost:9600/_node/stats/process?pretty
GET http://localhost:9600/_node/stats/jvm?pretty
GET http://localhost:9600/_node/stats/pipeline?pretty

热线程 API

此 API 检索有关 Logstash 中的热线程的信息。热线程是 Java 线程,它具有较高的 CPU 使用率并且运行时间比正常执行时间长。您可以通过向下面提到的 URL 发送获取请求来检索此信息 –

GET http://localhost:9600/_node/hot_threads?pretty

用户可以使用以下 URL 以更具可读性的形式获取响应。

GET http://localhost:9600/_node/hot_threads?human = true

Logstash – 安全和监控

在本章中,我们将讨论 Logstash 的安全和监控方面。

监控

Logstash 是一个非常好的监控生产环境中的服务器和服务的工具。生产环境中的应用程序会产生不同类型的日志数据,如访问日志、错误日志等。Logstash 可以使用过滤器插件对错误、访问或其他事件的数量进行计数或分析。这种分析和计数可用于监控不同的服务器及其服务。

Logstash 提供了HTTP Poller 等插件来监控网站状态监控。在这里,我们正在监控一个名为mysite的网站,该网站托管在本地 Apache Tomcat 服务器上。

日志文件

在这个配置文件中,http_poller 插件用于在间隔设置中指定的时间间隔后点击插件中指定的站点。最后,它将站点的状态写入标准输出。

input {
   http_poller {
      urls => {
         site => "http://localhost:8080/mysite"
      }
      request_timeout => 20
      interval => 30
      metadata_target => "http_poller_metadata"
   }
}
output {
   if [http_poller_metadata][code] == 200 {
      stdout {
         codec => line{format => "%{http_poller_metadata[response_message]}"}
      }
   }
   if [http_poller_metadata][code] != 200 {
      stdout {
         codec => line{format => "down"}
      }
   }
}

运行日志

我们可以使用以下命令运行 Logstash。

>logstash –f logstash.conf

标准输出

如果站点已启动,则输出将是 –

Ok

如果我们使用TomcatManager App停止站点,输出将更改为 –

down

安全

Logstash 提供了大量用于与外部系统进行安全通信的功能,并支持身份验证机制。所有 Logstash 插件都支持通过 HTTP 连接进行身份验证和加密。

HTTP 协议的安全性

Logstash 提供的各种插件(如 Elasticsearch 插件)中都有用于身份验证的用户和密码等设置。

elasticsearch {
   user => <username>
   password => <password>
}

另一种身份验证是Elasticsearch 的PKI(公钥基础设施)开发者需要在 Elasticsearch 输出插件中定义两个设置来启用 PKI 身份验证。

elasticsearch {
   keystore => <string_value>
   keystore_password => <password>
}

在 HTTPS 协议中,开发者可以使用权威机构的证书进行 SSL/TLS。

elasticsearch {
   ssl => true
   cacert => <path to .pem file>
}

传输协议的安全性

要在 Elasticsearch 中使用传输协议,用户需要将协议设置设置为传输。这避免了 JSON 对象的解组并提高了效率。

基本身份验证与 Elasticsearch 输出协议中的 http 协议中执行的相同。

elasticsearch {
   protocol => “transport”
   user => <username>
   password => <password>
}

PKI 身份验证还需要 SSL 设置与 Elasticsearch 输出协议中的其他设置为真 –

elasticsearch {
   protocol => “transport”
   ssl => true
   keystore => <string_value>
   keystore_password => <password>
}

最后,SSL 安全需要比其他通信安全方法更多的设置。

elasticsearch {
   ssl => true
   ssl => true
   keystore => <string_value>
   keystore_password => <password>
   truststore => 
   truststore_password => <password>
}

Logstash 的其他安全优势

Logstash 可以帮助输入系统源以防止拒绝服务攻击等攻击。监控日志并分析这些日志中的不同事件可以帮助系统管理员检查传入连接和错误的变化。这些分析有助于查看攻击是否正在或将要发生在服务器上。

Elasticsearch 公司的其他产品,例如x-packfilebeat,提供了一些与 Logstash 安全通信的功能。

觉得文章有用?

点个广告表达一下你的爱意吧 !😁