@@ -5,6 +5,9 @@ use std::{
55 time:: { Duration , Instant } ,
66} ;
77
8+ #[ cfg( debug_assertions) ]
9+ use std:: sync:: Arc ;
10+
811use futures:: { Stream , StreamExt } ;
912use metrics:: Gauge ;
1013use pin_project:: pin_project;
@@ -17,6 +20,8 @@ use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal};
1720
1821use crate :: stats;
1922
23+ const UTILIZATION_EMITTER_DURATION : Duration = Duration :: from_secs ( 5 ) ;
24+
2025#[ pin_project]
2126pub ( crate ) struct Utilization < S > {
2227 intervals : IntervalStream ,
@@ -66,6 +71,10 @@ pub(crate) struct Timer {
6671 total_wait : Duration ,
6772 ewma : stats:: Ewma ,
6873 gauge : Gauge ,
74+ #[ cfg( debug_assertions) ]
75+ report_count : u32 ,
76+ #[ cfg( debug_assertions) ]
77+ component_id : Arc < str > ,
6978}
7079
7180/// A simple, specialized timer for tracking spans of waiting vs not-waiting
@@ -77,14 +86,18 @@ pub(crate) struct Timer {
7786/// to be of uniform length and used to aggregate span data into time-weighted
7887/// averages.
7988impl Timer {
80- pub ( crate ) fn new ( gauge : Gauge ) -> Self {
89+ pub ( crate ) fn new ( gauge : Gauge , # [ cfg ( debug_assertions ) ] component_id : Arc < str > ) -> Self {
8190 Self {
8291 overall_start : Instant :: now ( ) ,
8392 span_start : Instant :: now ( ) ,
8493 waiting : false ,
8594 total_wait : Duration :: new ( 0 , 0 ) ,
8695 ewma : stats:: Ewma :: new ( 0.9 ) ,
8796 gauge,
97+ #[ cfg( debug_assertions) ]
98+ report_count : 0 ,
99+ #[ cfg( debug_assertions) ]
100+ component_id,
88101 }
89102 }
90103
@@ -122,8 +135,19 @@ impl Timer {
122135
123136 self . ewma . update ( utilization) ;
124137 let avg = self . ewma . average ( ) . unwrap_or ( f64:: NAN ) ;
125- debug ! ( utilization = %avg) ;
126- self . gauge . set ( avg) ;
138+ let avg_rounded = ( avg * 10000.0 ) . round ( ) / 10000.0 ; // 4 digit precision
139+
140+ #[ cfg( debug_assertions) ]
141+ {
142+ // Note that changing the reporting interval would also affect the actual metric reporting frequency.
143+ // This check reduces debug log spamming.
144+ if self . report_count . is_multiple_of ( 5 ) {
145+ debug ! ( component_id = %self . component_id, utilization = %avg_rounded) ;
146+ }
147+ self . report_count = self . report_count . wrapping_add ( 1 ) ;
148+ }
149+
150+ self . gauge . set ( avg_rounded) ;
127151
128152 // Reset overall statistics for the next reporting period.
129153 self . overall_start = self . span_start ;
@@ -182,7 +206,7 @@ impl UtilizationEmitter {
182206 let ( timer_tx, timer_rx) = channel ( 4096 ) ;
183207 Self {
184208 timers : HashMap :: default ( ) ,
185- intervals : IntervalStream :: new ( interval ( Duration :: from_secs ( 5 ) ) ) ,
209+ intervals : IntervalStream :: new ( interval ( UTILIZATION_EMITTER_DURATION ) ) ,
186210 timer_tx,
187211 timer_rx,
188212 }
@@ -196,7 +220,14 @@ impl UtilizationEmitter {
196220 key : ComponentKey ,
197221 gauge : Gauge ,
198222 ) -> UtilizationComponentSender {
199- self . timers . insert ( key. clone ( ) , Timer :: new ( gauge) ) ;
223+ self . timers . insert (
224+ key. clone ( ) ,
225+ Timer :: new (
226+ gauge,
227+ #[ cfg( debug_assertions) ]
228+ key. id ( ) . into ( ) ,
229+ ) ,
230+ ) ;
200231 UtilizationComponentSender {
201232 timer_tx : self . timer_tx . clone ( ) ,
202233 component_key : key,
0 commit comments