如何开发原生Flusher插件
接口定义
对于使用Http协议发送数据的Flusher,进一步定义了HttpFlusher,接口如下:
Flusher级组件
聚合(必选)
-
作用:将多个小的PipelineEventGroup根据tag异同合并成一个大的group,提升发送效率
-
参数:
可以在flusher的参数中配置Batch字段,该字段的类型为map,其中允许包含的字段如下:
名称 | 类型 | 默认值 | 说明 |
---|---|---|---|
MinCnt | uint | 每个Flusher自定义 | 每个聚合队列最少包含的event数量 |
MinSizeBytes | uint | 每个Flusher自定义 | 每个聚合队列最小的尺寸 |
TimeoutSecs | uint | 每个Flusher自定义 | 每个聚合队列在第一个event加入后,在被输出前最多等待的时间 |
- 类接口:
序列化(必选)
-
作用:对聚合模块的输出进行序列化,分为2个层级:
-
event级:对每一个event单独进行序列化
-
event group级:对多个event进行批量的序列化
-
-
类接口:
压缩(可选)
-
作用:对序列化后的结果进行压缩
-
类接口:
开发步骤
下面以开发一个HttpFlusher为例,说明整个开发步骤:
-
在plugin/flusher目录下新建一个Flusherxxx.h和Flusherxxx.cpp文件,用于派生HttpFlusher接口生成具体的插件类;
-
在Flusherxxx.h文件中定义新的输出插件类Flusherxxx,满足以下要求:
a. 所有的可配置参数的权限为public,其余参数的权限均为private
b. 新增一个聚合组件:
Batcher<> mBatcher;
c. 新增一个序列化组件:
std::unique_ptr<T> mSerializer;
,其中T为EventSerializer
和EventGroupSerializer
中的一种d. 如果需要压缩,则新增一个压缩组件:
std::unique_ptr<Compressor> mCompressor;
-
在pipeline/serializer目录下新建一个xxxSerializer.h和xxxSerializer.cpp文件,用于派生
Serializer
接口生成具体类; -
(可选)如果需要压缩组件,且现有压缩组件库中没有所需算法,则新增一个压缩组件:
a. 在common/compression/CompressType.h文件中,扩展CompressType类用以标识新的压缩算法;
b. 在common/compression目录下新建一个xxxCompressor.h和xxxCompressor.cpp文件,用于派生
Compressor
接口生成具体类;c. 在common/compression/CompressorFactory.cpp文件的各个函数中注册该压缩组件;
-
在Flusherxxx.cpp文件中实现插件类
a.
Init
函数:b.
SerializeAndPush(BatchedEventsList&&)
函数:c.
SerializeAndPush(vector<BatchedEventsList>&&)
函数:d.
Send
函数:e.
Flush
函数:f.
FlushAll
函数:g.
BuildRequest
函数:将待发送数据包装成一个Http请求,如果请求构建失败,使用keepItem
参数记录是否要保留数据供以后重试。h.
OnSendDone
函数:根据返回的http response进行相应的记录和操作。 -
在
PluginRegistry
类中注册该插件: -
在pipeline/plugin/PluginRegistry.cpp文件的头文件包含区新增如下行:
-
在
PluginRegistry
类的LoadStaticPlugins()
函数中新增如下行: