flink metric库的使用和自定义metric-reporter

摘要:
简要介绍Flink在内部实现了一组度量数据收集库。同时,Flink系统有一些固定的度量数据,包括系统、CPU、内存、IO的一些指标,或各种任务运行的一些指标。具体指标请参考官方文件:Flink metric。同时,我们可以使用系统的度量库在我们自己的代码中收集度量数据。
简单介绍

flink内部实现了一套metric数据收集库。 同时flink自身系统有一些固定的metric数据, 包括系统的一些指标,CPU,内存, IO 或者各个task运行的一些指标。具体包含那些指标可以查看官方文档: flink-metric
同时我们也可以利用系统的metric库在自己的代码中进行打点收集metrics数据。此外, flink提供了外部接口,可以用来导出这些metrics数据.

flink-metric库的使用

在官方的文档中有介绍, 需要继承Richfunction 才能获得对应的metric对象, 用法如下:

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}
flink-metrics导出到外部系统

在flink中, 提供了方便的metric数据导出的库,通过实现自己的reporter,可以将metrics数据导出到不同的系统.
官方提供有多种reporter库,JMX, Graphite, Slf4j... 等等. 同时,我们可以自定义实现metric库,来导入到自己的系统.

自定义reporter类

实现MetricReporter类中的open,close, notifyOfAddedMetric, notifyOfRemovedMetric方法
实现Scheduled的report方法 ,在刚方法中实现写入到其他系统的逻辑
实现CharacterFilter的 filterCharacters方法, 用于对scope进行过滤.

public class FalconReporter implements MetricReporter, CharacterFilter, Scheduled {

  private static final Logger LOG =LoggerFactory.getLogger(FalconReporter.class);

  private final Map<Gauge<?>, MetricTag> gauges = new ConcurrentHashMap<>();
  private final Map<Counter, MetricTag> counters = new ConcurrentHashMap<>();
  private final Map<Histogram, MetricTag> histograms = new ConcurrentHashMap<>();
  private final Map<Meter, MetricTag> meters = new ConcurrentHashMap<>();

  @Override
  public String filterCharacters(String s) {
    return s;
  }

  @Override
  public void open(MetricConfig metricConfig) {
  }

  @Override
  public void close() {
  }

  @Override
  public void notifyOfAddedMetric(Metric metric, String s, MetricGroup metricGroup) {

  }

  @Override
  public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
  }

  @Override
  public void report() {

  }
}

配置reporter

在flink-conf.yaml中配置即可,配置如下

metrics.reporters: slf4j, jmx
metrics.reporter.slf4j.class: org.apache.flink.metrics.falcon.FalconReporter
metrics.reporter.slf4j.interval: 60 SECONDS

metrics.reporters 用于配置类型名, 自定义即可
metrics.reporter.slf4j.class: 配置对应类型的reporter类
metrics.reporter.slf4j.interval: 60 SECONDS  消息上报的间隔
metrics.reporter.slf4j.* 可以自定义配置, 可以在open(MetricConfig metricConfig) 中的获得对应的config

免责声明:文章转载自《flink metric库的使用和自定义metric-reporter》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇SVProgressHUD自定义组合控件SettingItemView的简单实现下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Flink实战(八十三):FLINK-SQL应用场景(一)维表join(五)Flink SQL之维表join之Temporal Table Join

https://zhuanlan.zhihu.com/p/165962937?utm_source=qq 维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中(比如MySQL,HBase)...

项目实战 从 0 到 1 学习之Flink (28)FlinkSql教程(二)

从kafka到mysql 新建Java项目 最简单的方式是按照官网的方法,命令行执行curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0,不过这种方法有些包还得自行添加,大家可以复制我的pom.xml,我已经将常用的包都放进去了,并且排除了冲突的包。注意的是,本地测试的时候,...

FLINK基础(107): DS算子与窗口(18)窗口 (3) window functions(二)ProcessWindowFunction

ProcessWindowFunction   一些业务场景,我们需要收集窗口内所有的数据进行计算,例如计算窗口数据的中位数,或者计算窗口数据中出现频率最高的值。这样的需求,使用ReduceFunction和AggregateFunction就无法实现了。这个时候就需要ProcessWindowFunction了。 先来看接口定义 public abstr...

flink连接hbase方法及遇到的问题

1、继承 RichSinkFunction 类   mvn配置: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.12</artif...

对于维特比译码的理解(英文)

Performing Viterbi Decoding The Viterbi decoder itself is the primary focus of this tutorial. Perhaps the single most important concept to aid in understanding the Viterbi algorit...

时序数据库的选择?

作者:网易云链接:https://www.zhihu.com/question/50194483/answer/428449003来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 基于这个问题,推荐我厂范欣欣同学的一篇文章,这篇文章笔者将会分别针对OpenTSDB、Druid、InfluxDB以及Beringei这四个时序系统...