指标
概述
1. 指标访问说明
2. 指标系统设计
Dubbo 的指标系统总共涉及三个部分,指标收集、本地聚合和指标推送 指标收集:将 Dubbo 内部需要监控的指标推送到统一的 Collector 进行存储 本地聚合:所有收集到的指标都是基础指标,一些分位数指标需要通过本地聚合计算 指标推送:收集和聚合的指标以某种方式推送到第三方服务器,目前只涉及 Prometheus
3. 结构设计
- 移除原 Metrics 相关的类
- 新建模块 dubbo-metrics/dubbo-metrics-api,dubbo-metrics/dubbo-metrics-prometheus,MetricsConfig 作为模块的配置类
- 使用 micrometer,在 Collector 中使用基本类型表示指标,例如 Long,Double 等,并在 dubbo-metrics-api 中引入 micrometer,使用 micrometer 将内部指标进行转换
4. 数据流
5. 目标
指标接口将提供一个 MetricsService,它不仅提供灵活服务的接口级数据,还提供所有指标的查询方法。查询方法级指标的接口可以声明如下
public interface MetricsService {
/**
* Default {@link MetricsService} extension name.
*/
String DEFAULT_EXTENSION_NAME = "default";
/**
* The contract version of {@link MetricsService}, the future update must make sure compatible.
*/
String VERSION = "1.0.0";
/**
* Get metrics by prefixes
*
* @param categories categories
* @return metrics - key=MetricCategory value=MetricsEntityList
*/
Map<MetricsCategory, List<MetricsEntity>> getMetricsByCategories(List<MetricsCategory> categories);
/**
* Get metrics by interface and prefixes
*
* @param serviceUniqueName serviceUniqueName (eg.group/interfaceName:version)
* @param categories categories
* @return metrics - key=MetricCategory value=MetricsEntityList
*/
Map<MetricsCategory, List<MetricsEntity>> getMetricsByCategories(String serviceUniqueName, List<MetricsCategory> categories);
/**
* Get metrics by interface、method and prefixes
*
* @param serviceUniqueName serviceUniqueName (eg.group/interfaceName:version)
* @param methodName methodName
* @param parameterTypes method parameter types
* @param categories categories
* @return metrics - key=MetricCategory value=MetricsEntityList
*/
Map<MetricsCategory, List<MetricsEntity>> getMetricsByCategories(String serviceUniqueName, String methodName, Class<?>[] parameterTypes, List<MetricsCategory> categories);
}
其中,MetricsCategory 设计如下
public enum MetricsCategory {
RT,
QPS,
REQUESTS,
}
MetricsEntity 设计如下
public class MetricsEntity {
private String name;
private Map<String, String> tags;
private MetricsCategory category;
private Object value;
}
指标收集
1. 嵌入位置
Dubbo 架构图如下
在 provider 中添加一层 MetricsFilter,重写 invoke 方法,将调用链嵌入到收集指标中,并使用 try-catch-finally 进行处理。核心代码如下
@Activate(group = PROVIDER, order = -1)
public class MetricsFilter implements Filter, ScopeModelAware {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
collector.increaseTotalRequests(interfaceName, methodName, group, version);
collector.increaseProcessingRequests(interfaceName, methodName, group, version);
Long startTime = System.currentTimeMillis();
try {
Result invoke = invoker.invoke(invocation);
collector.increaseSucceedRequests(interfaceName, methodName, group, version);
return invoke;
} catch (RpcException e) {
collector.increaseFailedRequests(interfaceName, methodName, group, version);
throw e;
} finally {
Long endTime = System.currentTimeMillis();
Long rt = endTime - startTime;
collector.addRT(interfaceName, methodName, group, version, rt);
collector.decreaseProcessingRequests(interfaceName, methodName, group, version);
}
}
}
2. 指标标识
使用以下五个属性作为隔离级别来区分和识别不同的方法,这也是每个 ConcurrentHashMap 的键
public class MethodMetric {
private String applicationName;
private String interfaceName;
private String methodName;
private String group;
private String version;
}
3. 基础指标
Metrics 通过 MetricsCollector 在 common 模块下存储所有指标数据
public class DefaultMetricsCollector implements MetricsCollector {
private Boolean collectEnabled = false;
private final List<MetricsListener> listeners = new ArrayList<>();
private final ApplicationModel applicationModel;
private final String applicationName;
private final Map<MethodMetric, AtomicLong> totalRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, AtomicLong> succeedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, AtomicLong> failedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, AtomicLong> processingRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, AtomicLong> lastRT = new ConcurrentHashMap<>();
private final Map<MethodMetric, LongAccumulator> minRT = new ConcurrentHashMap<>();
private final Map<MethodMetric, LongAccumulator> maxRT = new ConcurrentHashMap<>();
private final Map<MethodMetric, AtomicLong> avgRT = new ConcurrentHashMap<>();
private final Map<MethodMetric, AtomicLong> totalRT = new ConcurrentHashMap<>();
private final Map<MethodMetric, AtomicLong> rtCount = new ConcurrentHashMap<>();
}
本地聚合
本地聚合是指通过计算一些简单指标来获得分位数指标的过程
1. 参数设计
收集指标时,默认只收集基础指标,一些独立的聚合指标需要启用服务灵活度或启动新线程计算后进行本地聚合。如果这里启用了服务灵活度,则默认启用本地聚合
1.1 如何启用本地聚合
<dubbo:metrics>
<dubbo:aggregation enable="true" />
</dubbo:metrics>
1.2 指标聚合参数
<dubbo:metrics>
<dubbo:aggregation enable="true" bucket-num="5" time-window-seconds="10"/>
</dubbo:metrics>
2. 具体指标
监控的四个关键指标。他们称之为“四个黄金信号”:延迟、流量、错误和饱和度。Dubbo 主要包括以下监控指标
基础设施 | 业务监控 | |
---|---|---|
延迟 | IO 等待;RPC 延迟; | 接口,平均服务时间,TP90,TP99,TP999 等。 |
流量 | 网络和磁盘 IO; | 服务级别的 QPS |
错误 | 停机时间;磁盘(坏磁盘或文件系统错误);进程或端口挂起;网络数据包丢失; | 错误日志;业务状态码,错误码趋势; |
饱和度 | 系统资源利用率:CPU、内存、磁盘、网络等;饱和度:等待线程数,队列积压长度; | 这主要包括 JVM、线程池等。 |
- qps:基于滑动窗口获取动态 qps
- rt:基于滑动窗口获取动态 rt
- 失败请求数:基于滑动窗口获取最新时间内的失败请求数
- 成功请求数:基于滑动窗口获取最新时间内的成功请求数
- 处理请求数:在 Filter 前后添加简单统计
- 具体指标依赖滑动窗口,另外使用 AggregateMetricsCollector 进行收集
输出到 Prometheus 的相关指标可以参考如下
# HELP jvm_gc_live_data_size_bytes Size of long-lived heap memory pool after reclamation
# TYPE jvm_gc_live_data_size_bytes gauge
jvm_gc_live_data_size_bytes 1.6086528E7
# HELP requests_succeed_aggregate Aggregated Succeed Requests
# TYPE requests_succeed_aggregate gauge
requests_succeed_aggregate{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 39.0
# HELP jvm_buffer_memory_used_bytes An estimate of the memory that the Java virtual machine is using for this buffer pool
# TYPE jvm_buffer_memory_used_bytes gauge
jvm_buffer_memory_used_bytes{id="direct",} 1.679975E7
jvm_buffer_memory_used_bytes{id="mapped",} 0.0
# HELP jvm_gc_memory_allocated_bytes_total Incremented for an increase in the size of the (young) heap memory pool after one GC to before the next
# TYPE jvm_gc_memory_allocated_bytes_total counter
jvm_gc_memory_allocated_bytes_total 2.9884416E9
# HELP requests_total_aggregate Aggregated Total Requests
# TYPE requests_total_aggregate gauge
requests_total_aggregate{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 39.0
# HELP system_load_average_1m The sum of the number of runnable entities queued to available processors and the number of runnable entities running on the available processors averaged over a period of time
# TYPE system_load_average_1m gauge
system_load_average_1m 0.0
# HELP system_cpu_usage The "recent cpu usage" for the whole system
# TYPE system_cpu_usage gauge
system_cpu_usage 0.015802269043760128
# HELP jvm_threads_peak_threads The peak live thread count since the Java virtual machine started or peak was reset
# TYPE jvm_threads_peak_threads gauge
jvm_threads_peak_threads 40.0
# HELP requests_processing Processing Requests
# TYPE requests_processing gauge
requests_processing{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 0.0
# HELP jvm_memory_max_bytes The maximum amount of memory in bytes that can be used for memory management
# TYPE jvm_memory_max_bytes gauge
jvm_memory_max_bytes{area="nonheap",id="CodeHeap 'profiled nmethods'",} 1.22912768E8
jvm_memory_max_bytes{area="heap",id="G1 Survivor Space",} -1.0
jvm_memory_max_bytes{area="heap",id="G1 Old Gen",} 9.52107008E8
jvm_memory_max_bytes{area="nonheap",id="Metaspace",} -1.0
jvm_memory_max_bytes{area="heap",id="G1 Eden Space",} -1.0
jvm_memory_max_bytes{area="nonheap",id="CodeHeap 'non-nmethods'",} 5828608.0
jvm_memory_max_bytes{area="nonheap",id="Compressed Class Space",} 1.073741824E9
jvm_memory_max_bytes{area="nonheap",id="CodeHeap 'non-profiled nmethods'",} 1.22916864E8
# HELP jvm_threads_states_threads The current number of threads having BLOCKED state
# TYPE jvm_threads_states_threads gauge
jvm_threads_states_threads{state="blocked",} 0.0
jvm_threads_states_threads{state="runnable",} 10.0
jvm_threads_states_threads{state="waiting",} 16.0
jvm_threads_states_threads{state="timed-waiting",} 13.0
jvm_threads_states_threads{state="new",} 0.0
jvm_threads_states_threads{state="terminated",} 0.0
# HELP jvm_buffer_total_capacity_bytes An estimate of the total capacity of the buffers in this pool
# TYPE jvm_buffer_total_capacity_bytes gauge
jvm_buffer_total_capacity_bytes{id="direct",} 1.6799749E7
jvm_buffer_total_capacity_bytes{id="mapped",} 0.0
# HELP rt_p99 Response Time P99
# TYPE rt_p99 gauge
rt_p99{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 1.0
# HELP jvm_memory_used_bytes The amount of used memory
# TYPE jvm_memory_used_bytes gauge
jvm_memory_used_bytes{area="heap",id="G1 Survivor Space",} 1048576.0
jvm_memory_used_bytes{area="nonheap",id="CodeHeap 'profiled nmethods'",} 1.462464E7
jvm_memory_used_bytes{area="heap",id="G1 Old Gen",} 1.6098728E7
jvm_memory_used_bytes{area="nonheap",id="Metaspace",} 4.0126952E7
jvm_memory_used_bytes{area="heap",id="G1 Eden Space",} 8.2837504E7
jvm_memory_used_bytes{area="nonheap",id="CodeHeap 'non-nmethods'",} 1372032.0
jvm_memory_used_bytes{area="nonheap",id="Compressed Class Space",} 4519248.0
jvm_memory_used_bytes{area="nonheap",id="CodeHeap 'non-profiled nmethods'",} 5697408.0
# HELP qps Query Per Seconds
# TYPE qps gauge
qps{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 0.3333333333333333
# HELP rt_min Min Response Time
# TYPE rt_min gauge
rt_min{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 0.0
# HELP jvm_buffer_count_buffers An estimate of the number of buffers in the pool
# TYPE jvm_buffer_count_buffers gauge
jvm_buffer_count_buffers{id="mapped",} 0.0
jvm_buffer_count_buffers{id="direct",} 10.0
# HELP system_cpu_count The number of processors available to the Java virtual machine
# TYPE system_cpu_count gauge
system_cpu_count 2.0
# HELP jvm_classes_loaded_classes The number of classes that are currently loaded in the Java virtual machine
# TYPE jvm_classes_loaded_classes gauge
jvm_classes_loaded_classes 7325.0
# HELP rt_total Total Response Time
# TYPE rt_total gauge
rt_total{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 2783.0
# HELP rt_last Last Response Time
# TYPE rt_last gauge
rt_last{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 0.0
# HELP jvm_gc_memory_promoted_bytes_total Count of positive increases in the size of the old generation memory pool before GC to after GC
# TYPE jvm_gc_memory_promoted_bytes_total counter
jvm_gc_memory_promoted_bytes_total 1.4450952E7
# HELP jvm_gc_pause_seconds Time spent in GC pause
# TYPE jvm_gc_pause_seconds summary
jvm_gc_pause_seconds_count{action="end of minor GC",cause="Metadata GC Threshold",} 2.0
jvm_gc_pause_seconds_sum{action="end of minor GC",cause="Metadata GC Threshold",} 0.026
jvm_gc_pause_seconds_count{action="end of minor GC",cause="G1 Evacuation Pause",} 37.0
jvm_gc_pause_seconds_sum{action="end of minor GC",cause="G1 Evacuation Pause",} 0.156
# HELP jvm_gc_pause_seconds_max Time spent in GC pause
# TYPE jvm_gc_pause_seconds_max gauge
jvm_gc_pause_seconds_max{action="end of minor GC",cause="Metadata GC Threshold",} 0.0
jvm_gc_pause_seconds_max{action="end of minor GC",cause="G1 Evacuation Pause",} 0.0
# HELP rt_p95 Response Time P95
# TYPE rt_p95 gauge
rt_p95{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 0.0
# HELP requests_total Total Requests
# TYPE requests_total gauge
requests_total{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 27738.0
# HELP process_cpu_usage The "recent cpu usage" for the Java Virtual Machine process
# TYPE process_cpu_usage gauge
process_cpu_usage 8.103727714748784E-4
# HELP rt_max Max Response Time
# TYPE rt_max gauge
rt_max{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 4.0
# HELP jvm_gc_max_data_size_bytes Max size of long-lived heap memory pool
# TYPE jvm_gc_max_data_size_bytes gauge
jvm_gc_max_data_size_bytes 9.52107008E8
# HELP jvm_threads_live_threads The current number of live threads including both daemon and non-daemon threads
# TYPE jvm_threads_live_threads gauge
jvm_threads_live_threads 39.0
# HELP jvm_threads_daemon_threads The current number of live daemon threads
# TYPE jvm_threads_daemon_threads gauge
jvm_threads_daemon_threads 36.0
# HELP jvm_classes_unloaded_classes_total The total number of classes unloaded since the Java virtual machine has started execution
# TYPE jvm_classes_unloaded_classes_total counter
jvm_classes_unloaded_classes_total 0.0
# HELP jvm_memory_committed_bytes The amount of memory in bytes that is committed for the Java virtual machine to use
# TYPE jvm_memory_committed_bytes gauge
jvm_memory_committed_bytes{area="nonheap",id="CodeHeap 'profiled nmethods'",} 1.4680064E7
jvm_memory_committed_bytes{area="heap",id="G1 Survivor Space",} 1048576.0
jvm_memory_committed_bytes{area="heap",id="G1 Old Gen",} 5.24288E7
jvm_memory_committed_bytes{area="nonheap",id="Metaspace",} 4.1623552E7
jvm_memory_committed_bytes{area="heap",id="G1 Eden Space",} 9.0177536E7
jvm_memory_committed_bytes{area="nonheap",id="CodeHeap 'non-nmethods'",} 2555904.0
jvm_memory_committed_bytes{area="nonheap",id="Compressed Class Space",} 5111808.0
jvm_memory_committed_bytes{area="nonheap",id="CodeHeap 'non-profiled nmethods'",} 5701632.0
# HELP requests_succeed Succeed Requests
# TYPE requests_succeed gauge
requests_succeed{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 27738.0
# HELP rt_avg Average Response Time
# TYPE rt_avg gauge
rt_avg{application_name="metrics-provider",group="",hostname="iZ8lgm9icspkthZ",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="172.28.236.104",method="sayHello",version="",} 0.0
聚合收集器
public class AggregateMetricsCollector implements MetricsCollector, MetricsListener {
private int bucketNum;
private int timeWindowSeconds;
private final Map<MethodMetric, TimeWindowCounter> totalRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> succeedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> failedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> qps = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowQuantile> rt = new ConcurrentHashMap<>();
private final ApplicationModel applicationModel;
private static final Integer DEFAULT_COMPRESSION = 100;
private static final Integer DEFAULT_BUCKET_NUM = 10;
private static final Integer DEFAULT_TIME_WINDOW_SECONDS = 120;
//在构造函数中解析配置信息
public AggregateMetricsCollector(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
ConfigManager configManager = applicationModel.getApplicationConfigManager();
MetricsConfig config = configManager.getMetrics().orElse(null);
if (config != null && config.getAggregation() != null && Boolean.TRUE.equals(config.getAggregation().getEnabled())) {
// only registered when aggregation is enabled.
registerListener();
AggregationConfig aggregation = config.getAggregation();
this.bucketNum = aggregation.getBucketNum() == null ? DEFAULT_BUCKET_NUM : aggregation.getBucketNum();
this.timeWindowSeconds = aggregation.getTimeWindowSeconds() == null ? DEFAULT_TIME_WINDOW_SECONDS : aggregation.getTimeWindowSeconds();
}
}
}
如果启用了本地聚合,则通过 spring 的 BeanFactory 添加监听器,并将 AggregateMetricsCollector 绑定到 DefaultMetricsCollector 以实现生产者-消费者模型。DefaultMetricsCollector 使用监听器列表,便于扩展
private void registerListener() {
applicationModel.getBeanFactory().getBean(DefaultMetricsCollector.class).addListener(this);
}
3. 指标聚合
滑动窗口 假设我们最初有 6 个桶,每个窗口时间设置为 2 分钟 每次写入指标数据时,数据都会分别写入 6 个桶,每两分钟移动一个桶,并将原始桶中的数据清除 读取指标时,读取当前 current 指向的桶,以实现滑动窗口的效果 具体来说,如下图所示,当前桶存储了配置中设置的桶生命周期内的所有数据,即最近的数据
在每个桶中,使用 **TDigest 算法** 计算分位数指标
**TDigest 算法**(极端分位数精度高,例如 p1 p99,中间分位数精度低,例如 p50),相关信息如下>
- https://op8867555.github.io/posts/2018-04-09-tdigest.html
- https://blog.csdn.net/csdnnews/article/details/116246540
- Open Impl:https://github.com/tdunning/t-digest
代码实现如下,除了使用 TimeWindowQuantile 计算分位数指标外,还提供了 TimeWindowCounter 用于收集时间间隔内的指标数量。
public class TimeWindowQuantile {
private final double compression;
private final TDigest[] ringBuffer;
private int currentBucket;
private long lastRotateTimestampMillis;
private final long durationBetweenRotatesMillis;
public TimeWindowQuantile(double compression, int bucketNum, int timeWindowSeconds) {
this.compression = compression;
this.ringBuffer = new TDigest[bucketNum];
for (int i = 0; i < bucketNum; i++) {
this.ringBuffer[i] = TDigest.createDigest(compression);
}
this.currentBucket = 0;
this.lastRotateTimestampMillis = System.currentTimeMillis();
this.durationBetweenRotatesMillis = TimeUnit.SECONDS.toMillis(timeWindowSeconds) / bucketNum;
}
public synchronized double quantile(double q) {
TDigest currentBucket = rotate();
return currentBucket.quantile(q);
}
public synchronized void add(double value) {
rotate();
for (TDigest bucket : ringBuffer) {
bucket.add(value);
}
}
private TDigest rotate() {
long timeSinceLastRotateMillis = System.currentTimeMillis() - lastRotateTimestampMillis;
while (timeSinceLastRotateMillis > durationBetweenRotatesMillis) {
ringBuffer[currentBucket] = TDigest.createDigest(compression);
if (++currentBucket >= ringBuffer.length) {
currentBucket = 0;
}
timeSinceLastRotateMillis -= durationBetweenRotatesMillis;
lastRotateTimestampMillis += durationBetweenRotatesMillis;
}
return ringBuffer[currentBucket];
}
}
指标推送
只有在用户设置了 <dubbo:metrics /> 配置并配置了协议参数后,才能启用指标推送。如果只启用了指标聚合,则默认不会推送指标。
1. Prometheus 拉取服务发现
使用 dubbo-admin 或其他类似中间层,并在启动时根据配置将本地 IP、端口和 MetricsURL 地址信息推送到 dubbo-admin(或任何中间层),并为 prometheus 暴露 HTTP 服务发现以供读取。配置方法如下:dubbo:metrics protocol=“prometheus” mode=“pull” address="${dubbo-admin.address}" port=“20888” url="/metrics"/>,其中 address 是拉取模式下的可选参数,如果未填写,用户需要在 Prometheus 配置文件中手动配置地址。
private void exportHttpServer() {
boolean exporterEnabled = url.getParameter(PROMETHEUS_EXPORTER_ENABLED_KEY, false);
if (exporterEnabled) {
int port = url.getParameter(PROMETHEUS_EXPORTER_METRICS_PORT_KEY, PROMETHEUS_DEFAULT_METRICS_PORT);
String path = url.getParameter(PROMETHEUS_EXPORTER_METRICS_PATH_KEY, PROMETHEUS_DEFAULT_METRICS_PATH);
if (!path.startsWith("/")) {
path = "/" + path;
}
try {
prometheusExporterHttpServer = HttpServer.create(new InetSocketAddress(port), 0);
prometheusExporterHttpServer.createContext(path, httpExchange -> {
String response = prometheusRegistry.scrape();
httpExchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = httpExchange.getResponseBody()) {
os.write(response.getBytes());
}
});
httpServerThread = new Thread(prometheusExporterHttpServer::start);
httpServerThread.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
2. Prometheus 推送 Pushgateway
用户可以在 Dubbo 配置文件中直接配置 Prometheus Pushgateway 的地址,例如 <dubbo:metrics protocol=“prometheus” mode=“push” address="${prometheus.pushgateway-url}" interval=“5” />,其中 interval 表示推送间隔。
private void schedulePushJob() {
boolean pushEnabled = url.getParameter(PROMETHEUS_PUSHGATEWAY_ENABLED_KEY, false);
if (pushEnabled) {
String baseUrl = url.getParameter(PROMETHEUS_PUSHGATEWAY_BASE_URL_KEY);
String job = url.getParameter(PROMETHEUS_PUSHGATEWAY_JOB_KEY, PROMETHEUS_DEFAULT_JOB_NAME);
int pushInterval = url.getParameter(PROMETHEUS_PUSHGATEWAY_PUSH_INTERVAL_KEY, PROMETHEUS_DEFAULT_PUSH_INTERVAL);
String username = url.getParameter(PROMETHEUS_PUSHGATEWAY_USERNAME_KEY);
String password = url.getParameter(PROMETHEUS_PUSHGATEWAY_PASSWORD_KEY);
NamedThreadFactory threadFactory = new NamedThreadFactory("prometheus-push-job", true);
pushJobExecutor = Executors.newScheduledThreadPool(1, threadFactory);
PushGateway pushGateway = new PushGateway(baseUrl);
if (!StringUtils.isBlank(username)) {
pushGateway.setConnectionFactory(new BasicAuthHttpConnectionFactory(username, password));
}
pushJobExecutor.scheduleWithFixedDelay(() -> push(pushGateway, job), pushInterval, pushInterval, TimeUnit.SECONDS);
}
}
protected void push(PushGateway pushGateway, String job) {
try {
pushGateway.pushAdd(prometheusRegistry.getPrometheusRegistry(), job);
} catch (IOException e) {
logger.error("Error occurred when pushing metrics to prometheus: ", e);
}
}