scuffle_bootstrap_telemetry/
lib.rs1#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
7#![cfg_attr(feature = "docs", doc = "## Feature flags")]
8#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
9#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
105#![cfg_attr(docsrs, feature(doc_auto_cfg))]
106#![deny(missing_docs)]
107#![deny(unsafe_code)]
108#![deny(unreachable_pub)]
109#![deny(clippy::mod_module_files)]
110
111use anyhow::Context;
112use bytes::Bytes;
113#[cfg(feature = "opentelemetry-logs")]
114pub use opentelemetry_appender_tracing;
115#[cfg(feature = "opentelemetry")]
116pub use opentelemetry_sdk;
117#[cfg(feature = "prometheus")]
118pub use prometheus_client;
119use scuffle_bootstrap::global::Global;
120use scuffle_bootstrap::service::Service;
121#[cfg(feature = "opentelemetry-traces")]
122pub use tracing_opentelemetry;
123
124#[cfg(feature = "opentelemetry")]
125pub mod opentelemetry;
126
127pub struct TelemetrySvc;
175
176pub trait TelemetryConfig: Global {
178 fn enabled(&self) -> bool {
180 true
181 }
182
183 fn bind_address(&self) -> Option<std::net::SocketAddr> {
185 None
186 }
187
188 fn http_server_name(&self) -> &str {
190 "scuffle-bootstrap-telemetry"
191 }
192
193 fn health_check(&self) -> impl std::future::Future<Output = Result<(), anyhow::Error>> + Send {
197 std::future::ready(Ok(()))
198 }
199
200 #[cfg(feature = "prometheus")]
207 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
208 None
209 }
210
211 #[cfg(feature = "opentelemetry")]
218 fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
219 None
220 }
221}
222
223impl<Global: TelemetryConfig> Service<Global> for TelemetrySvc {
224 async fn enabled(&self, global: &std::sync::Arc<Global>) -> anyhow::Result<bool> {
225 Ok(global.enabled())
226 }
227
228 async fn run(self, global: std::sync::Arc<Global>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
229 if let Some(bind_addr) = global.bind_address() {
230 let global = global.clone();
231
232 let service = scuffle_http::service::fn_http_service(move |req| {
233 let global = global.clone();
234 async move {
235 match req.uri().path() {
236 "/health" => health_check(&global, req).await,
237 #[cfg(feature = "prometheus")]
238 "/metrics" => metrics(&global, req).await,
239 #[cfg(all(feature = "pprof", unix))]
240 "/pprof/cpu" => pprof(&global, req).await,
241 #[cfg(feature = "opentelemetry")]
242 "/opentelemetry/flush" => opentelemetry_flush(&global).await,
243 _ => Ok(http::Response::builder()
244 .status(http::StatusCode::NOT_FOUND)
245 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?),
246 }
247 }
248 });
249
250 scuffle_http::HttpServer::builder()
251 .bind(bind_addr)
252 .ctx(ctx)
253 .service_factory(scuffle_http::service::service_clone_factory(service))
254 .build()
255 .run()
256 .await
257 .context("server run")?;
258 } else {
259 ctx.done().await;
260 }
261
262 #[cfg(feature = "opentelemetry")]
263 if let Some(opentelemetry) = global.opentelemetry().cloned() {
264 if opentelemetry.is_enabled() {
265 tokio::task::spawn_blocking(move || opentelemetry.shutdown())
266 .await
267 .context("opentelemetry shutdown spawn")?
268 .context("opentelemetry shutdown")?;
269 }
270 }
271
272 Ok(())
273 }
274}
275
276async fn health_check<G: TelemetryConfig>(
277 global: &std::sync::Arc<G>,
278 _: http::Request<scuffle_http::body::IncomingBody>,
279) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
280 if let Err(err) = global.health_check().await {
281 tracing::error!("health check failed: {err}");
282 Ok(http::Response::builder()
283 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
284 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
285 } else {
286 Ok(http::Response::builder()
287 .status(http::StatusCode::OK)
288 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
289 }
290}
291
292#[cfg(feature = "prometheus")]
293async fn metrics<G: TelemetryConfig>(
294 global: &std::sync::Arc<G>,
295 _: http::Request<scuffle_http::body::IncomingBody>,
296) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
297 if let Some(metrics) = global.prometheus_metrics_registry() {
298 let mut buf = String::new();
299 if prometheus_client::encoding::text::encode(&mut buf, metrics).is_err() {
300 tracing::error!("metrics encode failed");
301 return http::Response::builder()
302 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
303 .body(http_body_util::Full::new("metrics encode failed".to_string().into()));
304 }
305
306 Ok(http::Response::builder()
307 .status(http::StatusCode::OK)
308 .body(http_body_util::Full::new(Bytes::from(buf)))?)
309 } else {
310 Ok(http::Response::builder()
311 .status(http::StatusCode::NOT_FOUND)
312 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
313 }
314}
315
316#[cfg(unix)]
317#[cfg(feature = "pprof")]
318async fn pprof<G: TelemetryConfig>(
319 _: &std::sync::Arc<G>,
320 req: http::Request<scuffle_http::body::IncomingBody>,
321) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
322 let query = req.uri().query();
323 let query = query.map(querystring::querify).into_iter().flatten();
324
325 let mut freq = 100;
326 let mut duration = std::time::Duration::from_secs(5);
327 let mut ignore_list = Vec::new();
328
329 for (key, value) in query {
330 if key == "freq" {
331 freq = match value.parse() {
332 Ok(v) => v,
333 Err(err) => {
334 return http::Response::builder()
335 .status(http::StatusCode::BAD_REQUEST)
336 .body(http_body_util::Full::new(format!("invalid freq: {err:#}").into()));
337 }
338 };
339 } else if key == "duration" {
340 duration = match value.parse() {
341 Ok(v) => std::time::Duration::from_secs(v),
342 Err(err) => {
343 return http::Response::builder()
344 .status(http::StatusCode::BAD_REQUEST)
345 .body(http_body_util::Full::new(format!("invalid duration: {err:#}").into()));
346 }
347 };
348 } else if key == "ignore" {
349 ignore_list.push(value);
350 }
351 }
352
353 let cpu = scuffle_pprof::Cpu::new(freq, &ignore_list);
354
355 match tokio::task::spawn_blocking(move || cpu.capture(duration)).await {
356 Ok(Ok(data)) => Ok(http::Response::builder()
357 .status(http::StatusCode::OK)
358 .body(http_body_util::Full::new(Bytes::from(data)))?),
359 Ok(Err(err)) => {
360 tracing::error!("cpu capture failed: {err:#}");
361 Ok(http::Response::builder()
362 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
363 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
364 }
365 Err(err) => {
366 tracing::error!("cpu capture failed: {err:#}");
367 Ok(http::Response::builder()
368 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
369 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
370 }
371 }
372}
373
374#[cfg(feature = "opentelemetry")]
375async fn opentelemetry_flush<G: TelemetryConfig>(
376 global: &std::sync::Arc<G>,
377) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
378 if let Some(opentelemetry) = global.opentelemetry().cloned() {
379 if opentelemetry.is_enabled() {
380 match tokio::task::spawn_blocking(move || opentelemetry.flush()).await {
381 Ok(Ok(())) => Ok(http::Response::builder()
382 .status(http::StatusCode::OK)
383 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?),
384 Ok(Err(err)) => {
385 tracing::error!("opentelemetry flush failed: {err:#}");
386 Ok(http::Response::builder()
387 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
388 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
389 }
390 Err(err) => {
391 tracing::error!("opentelemetry flush spawn failed: {err:#}");
392 Ok(http::Response::builder()
393 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
394 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
395 }
396 }
397 } else {
398 Ok(http::Response::builder()
399 .status(http::StatusCode::OK)
400 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
401 }
402 } else {
403 Ok(http::Response::builder()
404 .status(http::StatusCode::NOT_FOUND)
405 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
406 }
407}
408
409#[cfg(test)]
410#[cfg_attr(all(test, coverage_nightly), coverage(off))]
411#[cfg(all(
412 feature = "opentelemetry-metrics",
413 feature = "opentelemetry-traces",
414 feature = "opentelemetry-logs"
415))]
416mod tests {
417 use std::net::SocketAddr;
418 use std::sync::Arc;
419
420 #[cfg(unix)]
421 use bytes::Bytes;
422 #[cfg(feature = "opentelemetry-logs")]
423 use opentelemetry_sdk::logs::SdkLoggerProvider;
424 #[cfg(feature = "opentelemetry-metrics")]
425 use opentelemetry_sdk::metrics::SdkMeterProvider;
426 #[cfg(feature = "opentelemetry-traces")]
427 use opentelemetry_sdk::trace::SdkTracerProvider;
428 use scuffle_bootstrap::{GlobalWithoutConfig, Service};
429
430 use crate::{TelemetryConfig, TelemetrySvc};
431
432 fn install_provider() {
433 static ONCE: std::sync::Once = std::sync::Once::new();
434
435 ONCE.call_once(|| {
436 rustls::crypto::aws_lc_rs::default_provider()
437 .install_default()
438 .expect("failed to install aws lc provider");
439 });
440 }
441
442 async fn request_metrics(addr: SocketAddr) -> reqwest::Result<String> {
443 reqwest::get(format!("http://{addr}/metrics"))
444 .await
445 .unwrap()
446 .error_for_status()?
447 .text()
448 .await
449 }
450
451 async fn request_health(addr: SocketAddr) -> String {
452 reqwest::get(format!("http://{addr}/health"))
453 .await
454 .unwrap()
455 .error_for_status()
456 .expect("health check failed")
457 .text()
458 .await
459 .expect("health check text")
460 }
461
462 #[cfg(unix)]
463 async fn request_pprof(addr: SocketAddr, freq: &str, duration: &str) -> reqwest::Result<Bytes> {
464 reqwest::get(format!("http://{addr}/pprof/cpu?freq={freq}&duration={duration}"))
465 .await
466 .unwrap()
467 .error_for_status()?
468 .bytes()
469 .await
470 }
471
472 async fn flush_opentelemetry(addr: SocketAddr) -> reqwest::Result<reqwest::Response> {
473 reqwest::get(format!("http://{addr}/opentelemetry/flush"))
474 .await
475 .unwrap()
476 .error_for_status()
477 }
478
479 #[cfg(not(valgrind))] #[tokio::test]
481 async fn telemetry_http_server() {
482 install_provider();
483
484 struct TestGlobal {
485 bind_addr: SocketAddr,
486 #[cfg(feature = "prometheus")]
487 prometheus: prometheus_client::registry::Registry,
488 open_telemetry: crate::opentelemetry::OpenTelemetry,
489 }
490
491 impl GlobalWithoutConfig for TestGlobal {
492 async fn init() -> anyhow::Result<Arc<Self>> {
493 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
494 let bind_addr = listener.local_addr()?;
495
496 let mut prometheus = prometheus_client::registry::Registry::default();
497
498 let exporter = scuffle_metrics::prometheus::exporter().build();
499 prometheus.register_collector(exporter.collector());
500
501 let metrics = SdkMeterProvider::builder().with_reader(exporter).build();
502 opentelemetry::global::set_meter_provider(metrics.clone());
503
504 let tracer = SdkTracerProvider::default();
505 opentelemetry::global::set_tracer_provider(tracer.clone());
506
507 let logger = SdkLoggerProvider::builder().build();
508
509 let open_telemetry = crate::opentelemetry::OpenTelemetry::new()
510 .with_metrics(metrics)
511 .with_traces(tracer)
512 .with_logs(logger);
513
514 Ok(Arc::new(TestGlobal {
515 bind_addr,
516 prometheus,
517 open_telemetry,
518 }))
519 }
520 }
521
522 impl TelemetryConfig for TestGlobal {
523 fn bind_address(&self) -> Option<std::net::SocketAddr> {
524 Some(self.bind_addr)
525 }
526
527 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
528 Some(&self.prometheus)
529 }
530
531 fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
532 Some(&self.open_telemetry)
533 }
534 }
535
536 #[scuffle_metrics::metrics]
537 mod example {
538 use scuffle_metrics::{CounterU64, MetricEnum};
539
540 #[derive(MetricEnum)]
541 pub enum Kind {
542 Http,
543 Grpc,
544 }
545
546 #[metrics(unit = "requests")]
547 pub fn request(kind: Kind) -> CounterU64;
548 }
549
550 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
551
552 let bind_addr = global.bind_addr;
553
554 assert!(TelemetrySvc.enabled(&global).await.unwrap());
555
556 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
557
558 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
559
560 let health = request_health(bind_addr).await;
561 assert_eq!(health, "ok");
562
563 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
564 assert!(metrics.starts_with("# HELP target Information about the target\n"));
565 assert!(metrics.contains("# TYPE target info\n"));
566 assert!(metrics.contains("service_name=\"unknown_service\""));
567 assert!(metrics.contains("telemetry_sdk_language=\"rust\""));
568 assert!(metrics.contains("telemetry_sdk_name=\"opentelemetry\""));
569 assert!(metrics.ends_with("# EOF\n"));
570
571 example::request(example::Kind::Http).incr();
572
573 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
574
575 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
576 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
577 assert!(metrics.contains("example_request_requests_total{"));
578 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
579 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
580 assert!(metrics.contains("kind=\"Http\""));
581 assert!(metrics.contains("} 1\n"));
582 assert!(metrics.ends_with("# EOF\n"));
583
584 example::request(example::Kind::Http).incr();
585
586 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
587
588 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
589 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
590 assert!(metrics.contains("example_request_requests_total{"));
591 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
592 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
593 assert!(metrics.contains("kind=\"Http\""));
594 assert!(metrics.contains("} 2\n"));
595 assert!(metrics.ends_with("# EOF\n"));
596
597 #[cfg(unix)]
598 {
599 let timer = std::time::Instant::now();
600 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
601 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
602
603 let res = request_pprof(bind_addr, "invalid", "2").await.expect_err("error expected");
604 assert!(res.is_status());
605 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
606
607 let res = request_pprof(bind_addr, "100", "invalid").await.expect_err("error expected");
608 assert!(res.is_status());
609 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
610 }
611
612 assert!(flush_opentelemetry(bind_addr).await.is_ok());
613
614 let res = reqwest::get(format!("http://{bind_addr}/not_found")).await.unwrap();
616 assert_eq!(res.status(), reqwest::StatusCode::NOT_FOUND);
617
618 scuffle_context::Handler::global().shutdown().await;
619
620 task_handle.await.unwrap().unwrap();
621 }
622
623 #[cfg(not(valgrind))] #[tokio::test]
625 async fn empty_telemetry_http_server() {
626 install_provider();
627
628 struct TestGlobal {
629 bind_addr: SocketAddr,
630 }
631
632 impl GlobalWithoutConfig for TestGlobal {
633 async fn init() -> anyhow::Result<Arc<Self>> {
634 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
635 let bind_addr = listener.local_addr()?;
636
637 Ok(Arc::new(TestGlobal { bind_addr }))
638 }
639 }
640
641 impl TelemetryConfig for TestGlobal {
642 fn bind_address(&self) -> Option<std::net::SocketAddr> {
643 Some(self.bind_addr)
644 }
645 }
646
647 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
648
649 let bind_addr = global.bind_addr;
650
651 assert!(TelemetrySvc.enabled(&global).await.unwrap());
652
653 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
654 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
655
656 let health = request_health(bind_addr).await;
657 assert_eq!(health, "ok");
658
659 let res = request_metrics(bind_addr).await.expect_err("error expected");
660 assert!(res.is_status());
661 assert_eq!(res.status(), Some(reqwest::StatusCode::NOT_FOUND));
662
663 #[cfg(unix)]
664 {
665 let timer = std::time::Instant::now();
666 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
667 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
668 }
669
670 let err = flush_opentelemetry(bind_addr).await.expect_err("error expected");
671 assert!(err.is_status());
672 assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
673
674 scuffle_context::Handler::global().shutdown().await;
675
676 task_handle.await.unwrap().unwrap();
677 }
678}
679
680#[cfg(feature = "docs")]
682#[scuffle_changelog::changelog]
683pub mod changelog {}