LoongCollector

1758
下载
LoongCollector 源自阿里云可观测性团队所开源的 iLogtail 项目,在继承了 iLogtail 强大的日志采集与处理能力的基础上,进行了全面的功能升级与扩展。从原来单一日志场景,逐步扩展为可观测数据采集、本地计算、服务发现的统一体。 LoongCollector 是一款集卓越性能、超强稳定性和灵活可编程性于一身的数据采集器,专为构建下一代可观测 Pipeline 设计。愿景是:打造业界领先的“统一可观测 Agent(Unified Observability Agent)”与“端到端可观

如何开发扩展Aggregator插件


Aggregator 插件对输入数据进行打包,以下将从接口定义与案例2方面指导如何开发 Aggregator 插件

Aggregator 接口定义

Aggregator 插件的作用就是将一条条独立的 Log 根据一定规则聚合成 LogGroup,进而提交给下一级的 flusher 插件处理。

  • Add 接口供外部输入 Log
  • Flush 接口供外部获取聚合得到的 LogGroug
  • Reset 接口目前仅在内部使用,可以忽略
  • Init 接口,类似于 input 插件的 Init 接口,该接口返回值的第一个参数表示插件系统调用 Flush 接口的周期值,该值为 0 时使用全局参数,第二个参数表示一个初始化错误。 但不同于 input 插件,aggregator 插件的 Init 接口新增了一个 LogGroupQueue 类型的参数,该类型定义于 loggroup_queue.go 文件中,如下:
type LogGroupQueue interface {
// Returns errAggAdd immediately if queue is full.
Add(loggroup *LogGroup) error
// Wait at most @duration if queue is full and returns errAggAdd if timeout.
// Do not use this method if you are unsure.
AddWithWait(loggroup *LogGroup, duration time.Duration) error
}

该接口对应的实例实际上充当了一个队列,aggregator 插件实例可以将聚合得到的 LogGroup 对象立即通过 AddXXX 接口插入队列。 这是一个可选项,从之前的描述可以看到,Add->Flush 是一个周期性调用的数据链路,而 LogGroupQueue 可以提供一个实时性更高的链路,在 aggregator 一得到新聚合的 LogGroup 后就直接提交,不必等到 Flush 被调用。 但需要注意的是,在队列满的时候,Add 会返回错误,这一般意味着 flusher 发生了阻塞,比如网络异常。

// Aggregator is an interface for implementing an Aggregator plugin.
// the RunningAggregator wraps this interface and guarantees that
// Add, Push, and Reset can not be called concurrently, so locking is not
// required when implementing an Aggregator plugin.
type Aggregator interface {
// Init called for init some system resources, like socket, mutex...
// return flush interval(ms) and error flag, if interval is 0, use default interval
Init(Context, LogGroupQueue) (int, error)
// Description returns a one-sentence description on the Input.
Description() string
// Add the metric to the aggregator.
Add(log *protocol.Log) error
// Flush pushes the current aggregates to the accumulator.
Flush() []*protocol.LogGroup
// Reset resets the aggregators caches and aggregates.
Reset()
}

Aggregator 开发

Aggregator 的开发分为以下步骤:

  1. 创建Issue,描述开发插件功能,会有社区同学参与讨论插件开发的可行性,如果社区review 通过,请参考步骤2继续进行。
  2. 实现 Aggregator 接口,这里我们使用样例模式进行介绍,详细样例请查看aggregator/defaultone
  3. 通过init将插件注册到Aggregators,Aggregator插件的注册名(即json配置中的plugin_type)必须以”aggregator_“开头,详细样例请查看aggregator/defaultone
  4. 将插件加入插件引用配置文件common配置节, 如果仅运行于指定系统,请添加到linuxwindows配置节.
  5. 进行单测或者E2E测试,请参考如何使用单测如何使用E2E测试.
  6. 使用 make lint 检查代码规范。
  7. 提交Pull Request。

observability.cn Authors 2024 | Documentation Distributed under CC-BY-4.0
Copyright © 2017-2024, Alibaba. All rights reserved. Alibaba has registered trademarks and uses trademarks.
浙ICP备2021005855号-32