使用 Dropwizard Metrics 监控和测量反应性应用程序

位置:首页>文章>详情   分类: Java教程 > 编程技术   阅读(74)   2024-06-08 06:20:16

上一篇文章中,我们创建了一个简单的索引代码,该代码通过数千个并发请求对 ElasticSearch 进行攻击。监控我们系统性能的唯一方法是使用老式的日志记录语句:

.window(Duration.ofSeconds(1))
.flatMap(Flux::count)
.subscribe(winSize -> log.debug("Got {} responses in last second", winSize));

这很好,但在生产系统上,我们宁愿有一些集中监控和图表解决方案来收集各种指标。一旦您在数千个实例中拥有数百个不同的应用程序,这一点就变得尤为重要。拥有一个单一的图形仪表板,聚合所有重要信息,变得至关重要。我们需要两个组件来收集一些指标:

  • 发布指标
  • 收集并可视化它们

使用 Dropwizard Metrics 发布指标

在 Spring Boot 2 中,Dropwizard MetricsMicrometer 取代。本文采用前者,下一篇将在实践中展示后者的解决方案。为了利用 Dropwizard Metrics,我们必须将 MetricRegistry 或特定指标注入到我们的业务类中。

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
 
@Component
@RequiredArgsConstructor
class Indexer {
 
    private final PersonGenerator personGenerator;
    private final RestHighLevelClient client;
    private final Timer indexTimer;
    private final Counter indexConcurrent;
    private final Counter successes;
    private final Counter failures;
 
    public Indexer(PersonGenerator personGenerator, RestHighLevelClient client, MetricRegistry metricRegistry) {
        this.personGenerator = personGenerator;
        this.client = client;
        this.indexTimer = metricRegistry.timer(name("es", "index"));
        this.indexConcurrent = metricRegistry.counter(name("es", "concurrent"));
        this.successes = metricRegistry.counter(name("es", "successes"));
        this.failures = metricRegistry.counter(name("es", "failures"));
    }
 
    private Flux<IndexResponse> index(int count, int concurrency) {
        //....
    }
 
}

这么多的样板是为了添加一些指标!

  • indexTimer 测量索引请求的时间分布(均值、中值和各种百分位数)
  • indexConcurrent 衡量当前有多少请求待处理(已发送请求,尚未收到响应);指标随时间上下波动
  • successfailures 分别统计索引请求成功和失败的总数

我们将在一秒钟内摆脱样板文件,但首先,让我们看看它在我们的业务代码中是如何发挥作用的:

private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) {
    return indexDoc(doc)
            .doOnSuccess(response -> successes.inc())
            .doOnError(e -> log.error("Unable to index {}", doc, e))
            .doOnError(e -> failures.inc())
            .onErrorResume(e -> Mono.empty());
}

每次请求完成时,上面的这个辅助方法都会增加成功和失败的次数。此外,它会记录并吞下错误,这样单个错误或超时就不会中断整个导入过程。

private <T> Mono<T> countConcurrent(Mono<T> input) {
    return input
            .doOnSubscribe(s -> indexConcurrent.inc())
            .doOnTerminate(indexConcurrent::dec);
}

上面的另一种方法在发送新请求时递增 indexConcurrent 指标,并在结果或错误到达时递减它。该指标不断上升和下降,显示进行中请求的数量。

private <T> Mono<T> measure(Mono<T> input) {
    return Mono
            .fromCallable(indexTimer::time)
            .flatMap(time ->
                    input.doOnSuccess(x -> time.stop())
            );
}

最后一个辅助方法是最复杂的。它测量索引的总时间,即发送请求和收到响应之间的时间。事实上,它非常通用,它只是计算订阅任意 Mono<T> 和订阅完成之间的总时间。为什么看起来这么奇怪?嗯,基本的 Timer API 非常简单

indexTimer.time(() -> someSlowCode())

它只需要一个 lambda 表达式并测量调用它所花费的时间。或者你可以创建一个小的 Timer.Context 对象来记住它的创建时间。当您调用 Context.stop() 时,它会报告此测量值:

final Timer.Context time = indexTimer.time();
someSlowCode();
time.stop();

使用异步流要困难得多。任务的开始(由订阅表示)和完成通常发生在代码中不同位置的线程边界。我们可以做的是(懒惰地)创建一个新的 Context 对象(参见:fromCallable(indexTimer::time)),当包装流完成时,完成 Context(参见: input.doOnSuccess(x -> time.stop())。这就是您组合所有这些方法的方式:

personGenerator
            .infinite()
            .take(count)
            .flatMap(doc -> 
                countConcurrent(measure(indexDocSwallowErrors(doc))), concurrency);

就是这样,但是用如此多的指标收集低级细节污染业务代码似乎很奇怪。让我们用一个专门的组件包装这些指标:

@RequiredArgsConstructor
class EsMetrics {
 
    private final Timer indexTimer;
    private final Counter indexConcurrent;
    private final Counter successes;
    private final Counter failures;
 
    void success() {
        successes.inc();
    }
 
    void failure() {
        failures.inc();
    }
 
    void concurrentStart() {
        indexConcurrent.inc();
    }
 
    void concurrentStop() {
        indexConcurrent.dec();
    }
 
    Timer.Context startTimer() {
        return indexTimer.time();
    }
 
}

现在我们可以使用更高级的抽象:

class Indexer {

    private final EsMetrics esMetrics;
 
    private <T> Mono<T> countConcurrent(Mono<T> input) {
        return input
                .doOnSubscribe(s -> esMetrics.concurrentStart())
                .doOnTerminate(esMetrics::concurrentStop);
    }
 
    //...
 
    private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) {
        return indexDoc(doc)
                .doOnSuccess(response -> esMetrics.success())
                .doOnError(e -> log.error("Unable to index {}", doc, e))
                .doOnError(e -> esMetrics.failure())
                .onErrorResume(e -> Mono.empty());
    }
}

在下一篇文章中,我们将学习如何更好地组合所有这些方法。并避免一些样板。

发布和可视化指标

仅收集指标是不够的。我们必须定期发布聚合指标,以便其他系统可以使用、处理和可视化它们。 GraphiteGrafana 就是这样一种工具。但在我们深入配置它们之前,让我们先将指标发布到控制台。我发现这在对指标进行故障排除或开发期间特别有用。

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
 
@Bean
Slf4jReporter slf4jReporter(MetricRegistry metricRegistry) {
    final Slf4jReporter slf4jReporter = Slf4jReporter.forRegistry(metricRegistry.build();
    slf4jReporter.start(1, TimeUnit.SECONDS);
    return slf4jReporter;
}

这个简单的代码片段采用现有的 MetricRegistry 并注册 Slf4jReporter。每秒一次,您会看到所有指标打印到您的日志中(Logback 等):

type=COUNTER, name=es.concurrent, count=1
type=COUNTER, name=es.failures, count=0
type=COUNTER, name=es.successes, count=1653
type=TIMER, name=es.index, count=1653, min=1.104664, max=345.139385, mean=2.2166538118720576,
    stddev=11.208345077801448, median=1.455504, p75=1.660252, p95=2.7456, p98=5.625456, p99=9.69689, p999=85.062713,
    mean_rate=408.56403102372764, m1=0.0, m5=0.0, m15=0.0, rate_unit=events/second, duration_unit=milliseconds

但这只是故障排除,为了将我们的指标发布到外部 Graphite 实例,我们需要一个 GraphiteReporter

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
 
@Bean
GraphiteReporter graphiteReporter(MetricRegistry metricRegistry) {
    final Graphite graphite = new Graphite(new InetSocketAddress("localhost", 2003));
    final GraphiteReporter reporter = GraphiteReporter.forRegistry(metricRegistry)
            .prefixedWith("elastic-flux")
            .convertRatesTo(TimeUnit.SECONDS)
            .convertDurationsTo(TimeUnit.MILLISECONDS)
            .build(graphite);
    reporter.start(1, TimeUnit.SECONDS);
    return reporter;
}

在这里,我向 localhost:2003 报告我的 带有 Graphite + Grafana 的 Docker 映像 恰好所在。每秒一次,所有指标都会发送到该地址。我们稍后可以在 Grafana 上可视化所有这些指标:

上图显示索引时间分布(从第 50 个百分位到第 99.9 个百分位)。使用此图,您可以快速发现什么是典型性能 (P50) 以及(几乎)最坏情况下的性能 (P99.9)。对数刻度是不寻常的,但在这种情况下,我们可以看到低百分位数和高百分位数。下面的图更有趣。它结合了三个指标:

  • 成功索引操作的速率(每秒请求数)
  • 失败操作率(红色条,堆叠在绿色条之上)
  • 当前并发级别(右轴):进行中的请求数

此图显示了系统吞吐量 (RPS)、故障和并发性。太多的失败或异常高的并发级别(许多操作等待响应)可能是您的系统存在某些问题的迹象。 仪表板定义在 GitHub 存储库中可用。

在下一篇文章中,我们将学习如何从 Dropwizard Metrics 迁移到 Micrometer。非常愉快的一次体验!

标签2: Java教程
地址:https://www.cundage.com/article/jcg-monitoring-measuring-reactive-application-dropwizard-metrics.html

相关阅读

Java HashSet 教程展示了如何使用 Java HashSet 集合。 Java哈希集 HashSet 是一个不包含重复元素的集合。此类为基本操作(添加、删除、包含和大小)提供恒定时间性...
SpringApplicationBuilder 教程展示了如何使用 SpringApplicationBuilder 创建一个简单的 Spring Boot 应用程序。 春天 是用于创建企业应...
通道是继 buffers 之后 java.nio 的第二个主要新增内容,我们在之前的教程中已经详细了解了这一点。通道提供与 I/O 服务的直接连接。 通道是一种在字节缓冲区和通道另一端的实体(通...
课程大纲 Elasticsearch 是一个基于 Lucene 的搜索引擎。它提供了一个分布式的、支持多租户的全文搜索引擎,带有 HTTP Web 界面和无模式的 JSON 文档。 Elasti...
解析器是强大的工具,使用 ANTLR 可以编写可用于多种不同语言的各种解析器。 在这个完整的教程中,我们将: 解释基础:什么是解析器,它可以用来做什么 查看如何设置 ANTLR 以便在 Java...
Java 是用于开发各种桌面应用程序、Web 应用程序和移动应用程序的最流行的编程语言之一。以下文章将帮助您快速熟悉 Java 语言,并迈向 API 和云开发等更复杂的概念。 1. Java语言...
Java中的继承是指子类继承或获取父类的所有非私有属性和行为的能力。继承是面向对象编程的四大支柱之一,用于提高层次结构中类之间的代码可重用性。 在本教程中,我们将了解 Java 支持的继承类型,...
Java Message Service 是一种支持正式通信的 API,称为 网络上计算机之间的消息传递。 JMS 为支持 Java 程序的标准消息协议和消息服务提供了一个通用接口。 JMS 提...
之前,我介绍了spring 3 + hibernate 集成 示例和struts 2 hello world 示例。在本教程中,我将讨论在将 spring 框架与 struts 与 hibern...
Java 项目中的一项常见任务是将日期格式化或解析为字符串,反之亦然。解析日期意味着你有一个代表日期的字符串,例如“2017-08-3”,你想把它转换成一个代表 Java 中日期的对象,例如Ja...