LoongCollector

1758
下载
LoongCollector 源自阿里云可观测性团队所开源的 iLogtail 项目,在继承了 iLogtail 强大的日志采集与处理能力的基础上,进行了全面的功能升级与扩展。从原来单一日志场景,逐步扩展为可观测数据采集、本地计算、服务发现的统一体。 LoongCollector 是一款集卓越性能、超强稳定性和灵活可编程性于一身的数据采集器,专为构建下一代可观测 Pipeline 设计。愿景是:打造业界领先的“统一可观测 Agent(Unified Observability Agent)”与“端到端可观

Pulsar


简介

flusher_pulsar flusher插件可以实现将采集到的数据,经过处理后,发送到Pulsar

版本

Alpha

配置参数

参数类型是否必选说明
TypeString插件类型
UrlStringPulsar url,多地址用逗号分隔,可以参考本文中的用例配置
TopicStringPulsar Topic,支持动态topic, 例如: test_%{contents.appname}
NameStringproducer名称,默认ilogtail
ConvertStructilogtail数据转换协议配置
Convert.ProtocolStringilogtail数据转换协议,kafka flusher 可选值:custom_single,custom_single_flatten,otlp_log_v1。默认值:custom_single
Convert.EncodingStringilogtail flusher数据转换编码,可选值:jsonnoneprotobuf,默认值:json
Convert.TagFieldsRenameMap对日志中tags中的json字段重命名
Convert.ProtocolFieldsRenameMapilogtail日志协议字段重命名,可当前可重命名的字段:contents,tagstime
EnableTLSBoolean是否启用TLS安全连接,对应采用TLS和Athenz两种认证模式都需要设置为true,默认值:false
TLSTrustCertsFilePathStringTLS CA根证书文件路径,对应采用TLS和Athenz认证时需要指定
AuthenticationStructPulsar连接访问认证配置
Authentication.TLS.CertFileStringTLS连接Pulsar证书文件路径
Authentication.TLS.KeyFileStringTLS连接Pulsar私钥文件路径
Authentication.Token.TokenString采用JWT 认证方式的token
Authentication.Athenz.ProviderDomainStringProvider domain name
Authentication.Athenz.TenantDomainString租户域
Authentication.Athenz.TenantServiceString租户服务
Authentication.Athenz.PrivateKeyStringTenant private key path
Authentication.Athenz.KeyIDStringKey id for the tenant private key
Authentication.Athenz.PrincipalHeaderString
Authentication.Athenz.ZtsURLStringZTS server的地址
Authentication.OAuth2.EnabledBoolean是否启用OAuth2认证
Authentication.OAuth2.IssuerURLString认证提供商的URL,OAuth2.Enabled开启时必填
Authentication.OAuth2.PrivateKeyStringJSON 凭据文件的 URL,OAuth2.Enabled开启时必填
Authentication.OAuth2.AudienceStringPulsar 集群的 OAuth 2.0 “资源服务” 的标识符
Authentication.OAuth2.ScopeString访问范围
CompressionTypeString压缩算法,NONE,LZ4,ZLIB,ZSTD,默认值NONE
BlockIfQueueFullBoolean队列满的时候是否阻塞,默认值:false
SendTimeoutInt发送超时时间,默认30s
OperationTimeoutIntpulsar producer创建、订阅、取消订阅的超时时间,默认30s
ConnectionTimeoutInttcp连接建立超时时间,默认5s
MaxConnectionsPerBrokerInt单个broker连接池保持的连接数,默认1
MaxReconnectToBrokerInt重连broker的最大重试次数,默认为无限
HashingSchemeInt消息push分区的分发方式:JavaStringHash,Murmur3_32Hash,默认值:JavaStringHash
BatchingMaxPublishDelayint提交时延,默认值:1ms
BatchingMaxMessagesint批量提交最大消息数,默认值:1000
MaxCacheProducersint动态topic情况下最大Producer数量 ,默认最大数量:8,使用动态topic的使用可以根据自己的情况调整。
PartitionKeysString数组指定消息分区分发的key。
ClientIDString写入Pulsar的Client ID,默认取值:iLogtail

样例

采集/home/test-log/路径下的所有文件名匹配*.log规则的文件,并将采集结果发送到Pulsar。

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
Topic: PulsarTestTopic

进阶配置

以下面的一段日志为例,后来将展开介绍ilogtail pulsar flusher的一些高阶配置

2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log

以上面这行日志为例 , 我们通ilogtailprocessor_regex插件,将上面的日志提取处理后几个关键字段:

  • time
  • loglevel
  • appname
  • thread
  • class
  • message

最后推送到kafka的数据样例如下:

{
"contents": {
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name":"java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
},
"time": 1664435098
}

动态topic

针对上面写入的这种日志格式,如果想根据application名称针对不用的应用推送到不通的topic
topic可以这样配置。

Topic: test_%{content.application}

最后ilogtail就自动将日志推送到test_springboot-docker这个topic中。

topic动态表达式规则:

  • %{content.fieldname}content代表从contents中取指定字段值
  • %{tag.fieldname},tag表示从tags中取指定字段值,例如:%{tag.k8s.namespace.name}
  • ${env_name}, 读取系统变量绑定到动态topic上,ilogtail 1.5.0开始支持。可以参考flusher-kafka_v2中的使用。
  • 其它方式暂不支持

TagFieldsRename

例如将tags中的host.name重命名为hostname,配置参考如下:

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
Convert:
TagFieldsRename:
host.name: hostname
Topic: PulsarTestTopic

ProtocolFieldsRename

ilogtail协议字段重命名,在ilogtail的数据转换协议中,
最外层三个字段contents,tagstime属于协议字段。ProtocolFieldsRename只能对
contents,tagstime这个三个字段进行重命名。
例如在使用Elasticsearch你可能想直接将time重命名为@timestamp,则配置参考如下:

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
Convert:
TagFieldsRename:
host.name: hostname
ProtocolFieldsRename:
time: '@timestamp'
Topic: PulsarTestTopic

指定分区分发

ilogtail flusher pulsar使用的官方SDK只支持hash方式分区投递,通过HashingScheme来选择不同的hash算法。
分发是可以指定PartitionKeysPartitionKeys的中配置的字段名只能是contents中的字段属性。

配置用例:

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
PartitionKeys:
- content.application
URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
Topic: PulsarTestTopic
  • content.application中表示从contents中取数据application字段数据,如果对contents协议字段做了重命名,
    例如重名为messege,则应该配置为messege.application

数据平铺

ilogtail 1.8.0新增数据平铺协议custom_single_flattencontentstagstime三个convert层的协议字段中数据做一级打平。
当前convert协议在单条数据处理仅支持json编码,因此custom_single_flatten需要配合json编码一起使用。

配置用例:

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
Convert:
Protocol: custom_single_flatten
Encoding: json
URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
Topic: PulsarTestTopic

非平铺前写入pulsar的消息格式

{
"contents": {
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name":"java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
},
"time": 1664435098
}

使用平铺协议后custom_single_flattenjson全部被一级平铺。

{
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415",
"k8s.namespace.name":"java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log",
"time": 1664435098
}

安全连接配置

flusher_pulsar支持多种安全认证连接pulsar服务端。

  • TLS认证;
  • TokenJWT Token认证;
  • Athenz pulsar租户域认证;
  • OAuth2认证;

JWT Token认证配置比较简单,参照前面的配置表配置即可,下面主要介绍下OAuth2,TLSAthenz两种认证的配置。

OAuth2认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
Authentication:
OAuth2:
Enabled: true
IssuerURL: https://accounts.google.com
PrivateKey: file:/path/to/file/credentials_file.json
Audience: https://broker.example.com
Scope: api://pulsar-cluster-1/.default
Topic: PulsarTestTopic

credentials_file.json配置内容样例

{
"type": "client_credentials",
"client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
"client_secret": "on1uJ...k6F6R",
"client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
"issuer_url": "https://accounts.google.com"
}

TLS配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
EnableTLS: true
TLSTrustCertsFilePath: /data/cert/ca.crt
Authentication:
TLS:
CertFile: /data/cert/client.crt
KeyFile: /data/cert/client.key
Topic: PulsarTestTopic
  • EnableTLS 如果要启用TLS必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://
  • TLSTrustCertsFilePath根证书需要设置。
    注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。

Athenz认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_pulsar
URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
EnableTLS: true
TLSTrustCertsFilePath: /data/cert/ca.crt
Authentication:
Athenz:
ProviderDomain: pulsar
TenantDomain: shopping
TenantService: some_app
PrivateKey: file:///path/to/client-key.pem
KeyID: v1
Topic: PulsarTestTopic
  • EnableTLS 如果要启用Athenz认证必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://
  • TLSTrustCertsFilePath根证书需要设置。

observability.cn Authors 2024 | Documentation Distributed under CC-BY-4.0
Copyright © 2017-2024, Alibaba. All rights reserved. Alibaba has registered trademarks and uses trademarks.
浙ICP备2021005855号-32