Pulsar
简介
flusher_pulsar
flusher
插件可以实现将采集到的数据,经过处理后,发送到Pulsar
。
版本
配置参数
参数 | 类型 | 是否必选 | 说明 |
---|---|---|---|
Type | String | 是 | 插件类型 |
Url | String | 是 | Pulsar url,多地址用逗号分隔,可以参考本文中的用例配置 |
Topic | String | 是 | Pulsar Topic,支持动态topic, 例如: test_%{contents.appname} |
Name | String | 否 | producer名称,默认ilogtail |
Convert | Struct | 否 | ilogtail数据转换协议配置 |
Convert.Protocol | String | 否 | ilogtail数据转换协议,kafka flusher 可选值:custom_single ,custom_single_flatten ,otlp_log_v1 。默认值:custom_single |
Convert.Encoding | String | 否 | ilogtail flusher数据转换编码,可选值:json 、none 、protobuf ,默认值:json |
Convert.TagFieldsRename | Map | 否 | 对日志中tags中的json字段重命名 |
Convert.ProtocolFieldsRename | Map | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:contents ,tags 和time |
EnableTLS | Boolean | 否 | 是否启用TLS安全连接,对应采用TLS和Athenz两种认证模式都需要设置为true,默认值:false |
TLSTrustCertsFilePath | String | 否 | TLS CA根证书文件路径,对应采用TLS和Athenz认证时需要指定 |
Authentication | Struct | 否 | Pulsar连接访问认证配置 |
Authentication.TLS.CertFile | String | 否 | TLS连接Pulsar 证书文件路径 |
Authentication.TLS.KeyFile | String | 否 | TLS连接Pulsar 私钥文件路径 |
Authentication.Token.Token | String | 否 | 采用JWT 认证方式的token |
Authentication.Athenz.ProviderDomain | String | 否 | Provider domain name |
Authentication.Athenz.TenantDomain | String | 否 | 租户域 |
Authentication.Athenz.TenantService | String | 否 | 租户服务 |
Authentication.Athenz.PrivateKey | String | 否 | Tenant private key path |
Authentication.Athenz.KeyID | String | 否 | Key id for the tenant private key |
Authentication.Athenz.PrincipalHeader | String | 否 | |
Authentication.Athenz.ZtsURL | String | 否 | ZTS server的地址 |
Authentication.OAuth2.Enabled | Boolean | 否 | 是否启用OAuth2认证 |
Authentication.OAuth2.IssuerURL | String | 是 | 认证提供商的URL,OAuth2.Enabled开启时必填 |
Authentication.OAuth2.PrivateKey | String | 是 | JSON 凭据文件的 URL,OAuth2.Enabled开启时必填 |
Authentication.OAuth2.Audience | String | 否 | Pulsar 集群的 OAuth 2.0 “资源服务” 的标识符 |
Authentication.OAuth2.Scope | String | 否 | 访问范围 |
CompressionType | String | 否 | 压缩算法,NONE,LZ4,ZLIB,ZSTD ,默认值NONE |
BlockIfQueueFull | Boolean | 否 | 队列满的时候是否阻塞,默认值:false |
SendTimeout | Int | 否 | 发送超时时间,默认30s |
OperationTimeout | Int | 否 | pulsar producer创建、订阅、取消订阅的超时时间,默认30s |
ConnectionTimeout | Int | 否 | tcp连接建立超时时间,默认5s |
MaxConnectionsPerBroker | Int | 否 | 单个broker连接池保持的连接数,默认1 |
MaxReconnectToBroker | Int | 否 | 重连broker的最大重试次数,默认为无限 |
HashingScheme | Int | 否 | 消息push分区的分发方式:JavaStringHash ,Murmur3_32Hash ,默认值:JavaStringHash |
BatchingMaxPublishDelay | int | 否 | 提交时延,默认值:1ms |
BatchingMaxMessages | int | 否 | 批量提交最大消息数,默认值:1000 |
MaxCacheProducers | int | 否 | 动态topic情况下最大Producer数量 ,默认最大数量:8,使用动态topic的使用可以根据自己的情况调整。 |
PartitionKeys | String数组 | 否 | 指定消息分区分发的key。 |
ClientID | String | 否 | 写入Pulsar的Client ID,默认取值:iLogtail 。 |
样例
采集/home/test-log/
路径下的所有文件名匹配*.log
规则的文件,并将采集结果发送到Pulsar。
进阶配置
以下面的一段日志为例,后来将展开介绍ilogtail pulsar flusher的一些高阶配置
以上面这行日志为例 , 我们通ilogtail
的processor_regex
插件,将上面的日志提取处理后几个关键字段:
- time
- loglevel
- appname
- thread
- class
- message
最后推送到kafka
的数据样例如下:
动态topic
针对上面写入的这种日志格式,如果想根据application
名称针对不用的应用推送到不通的topic
,
则topic
可以这样配置。
最后ilogtail
就自动将日志推送到test_springboot-docker
这个topic
中。
topic
动态表达式规则:
%{content.fieldname}
。content
代表从contents
中取指定字段值%{tag.fieldname}
,tag
表示从tags
中取指定字段值,例如:%{tag.k8s.namespace.name}
${env_name}
, 读取系统变量绑定到动态topic
上,ilogtail 1.5.0
开始支持。可以参考flusher-kafka_v2
中的使用。- 其它方式暂不支持
TagFieldsRename
例如将tags
中的host.name
重命名为hostname
,配置参考如下:
ProtocolFieldsRename
对ilogtail
协议字段重命名,在ilogtail
的数据转换协议中,
最外层三个字段contents
,tags
和time
属于协议字段。ProtocolFieldsRename
只能对
contents
,tags
和time
这个三个字段进行重命名。
例如在使用Elasticsearch
你可能想直接将time
重命名为@timestamp
,则配置参考如下:
指定分区分发
ilogtail flusher pulsar
使用的官方SDK
只支持hash
方式分区投递,通过HashingScheme
来选择不同的hash
算法。
分发是可以指定PartitionKeys
,PartitionKeys
的中配置的字段名只能是contents
中的字段属性。
配置用例:
content.application
中表示从contents
中取数据application
字段数据,如果对contents
协议字段做了重命名,
例如重名为messege
,则应该配置为messege.application
数据平铺
ilogtail 1.8.0
新增数据平铺协议custom_single_flatten
,contents
、tags
和time
三个convert
层的协议字段中数据做一级打平。
当前convert
协议在单条数据处理仅支持json
编码,因此custom_single_flatten
需要配合json
编码一起使用。
配置用例:
非平铺前写入pulsar
的消息格式
使用平铺协议后custom_single_flatten
,json
全部被一级平铺。
安全连接配置
flusher_pulsar
支持多种安全认证连接pulsar
服务端。
TLS
认证;Token
JWT Token认证;Athenz
pulsar租户域认证;OAuth2
认证;
JWT Token认证配置比较简单,参照前面的配置表配置即可,下面主要介绍下OAuth2
,TLS
和Athenz
两种认证的配置。
OAuth2认证配置参考(待验证)
下面配置仅供参考,请根据服务器实际部署情况配置
credentials_file.json配置内容样例
TLS配置参考(待验证)
下面配置仅供参考,请根据服务器实际部署情况配置
EnableTLS
如果要启用TLS
必须设置为true
。开始TLS
的情况下,URL头部为pulsar+ssl://
TLSTrustCertsFilePath
根证书需要设置。
注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。
Athenz认证配置参考(待验证)
下面配置仅供参考,请根据服务器实际部署情况配置
EnableTLS
如果要启用Athenz
认证必须设置为true
。开始TLS
的情况下,URL
头部为pulsar+ssl://
TLSTrustCertsFilePath
根证书需要设置。