Skip to content

Commit

Permalink
Tonic Interceptor (#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
blogle committed Mar 19, 2023
1 parent 57147b1 commit 675de34
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 26 deletions.
1 change: 1 addition & 0 deletions opentelemetry-otlp/src/exporter/grpcio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::default_headers;

/// Configuration of grpcio
#[derive(Debug)]
#[non_exhaustive]
pub struct GrpcioConfig {
/// The credentials to use when communicating with the collector.
pub credentials: Option<Credentials>,
Expand Down
28 changes: 28 additions & 0 deletions opentelemetry-otlp/src/exporter/tonic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::ExportConfig;
use std::fmt::{Debug, Formatter};
use tonic::metadata::MetadataMap;
#[cfg(feature = "tls")]
use tonic::transport::ClientTlsConfig;
Expand All @@ -9,6 +10,7 @@ use super::default_headers;
///
/// [tonic]: https://github.com/hyperium/tonic
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct TonicConfig {
/// Custom metadata entries to send to the collector.
pub metadata: Option<MetadataMap>,
Expand All @@ -32,6 +34,20 @@ pub struct TonicExporterBuilder {
pub(crate) exporter_config: ExportConfig,
pub(crate) tonic_config: TonicConfig,
pub(crate) channel: Option<tonic::transport::Channel>,
pub(crate) interceptor: Option<BoxInterceptor>,
}

pub(crate) struct BoxInterceptor(Box<dyn tonic::service::Interceptor + Send>);
impl tonic::service::Interceptor for BoxInterceptor {
fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
self.0.call(request)
}
}

impl Debug for BoxInterceptor {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "BoxInterceptor(..)")
}
}

impl Default for TonicExporterBuilder {
Expand All @@ -50,6 +66,7 @@ impl Default for TonicExporterBuilder {
exporter_config: ExportConfig::default(),
tonic_config,
channel: Option::default(),
interceptor: Option::default(),
}
}
}
Expand Down Expand Up @@ -87,6 +104,17 @@ impl TonicExporterBuilder {
self.channel = Some(channel);
self
}

/// Use a custom `interceptor` to modify each outbound request.
/// this can be used to modify the grpc metadata, for example
/// to inject auth tokens.
pub fn with_interceptor<I>(mut self, interceptor: I) -> Self
where
I: tonic::service::Interceptor + Send + 'static,
{
self.interceptor = Some(BoxInterceptor(Box::new(interceptor)));
self
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ use opentelemetry::sdk::export::ExportError;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[cfg(feature = "grpc-sys")]
pub use crate::exporter::grpcio::{Compression, Credentials, GrpcioExporterBuilder};
pub use crate::exporter::grpcio::{Compression, Credentials, GrpcioConfig, GrpcioExporterBuilder};
#[cfg(feature = "http-proto")]
pub use crate::exporter::http::HttpExporterBuilder;
#[cfg(feature = "grpc-tonic")]
pub use crate::exporter::tonic::TonicExporterBuilder;
pub use crate::exporter::tonic::{TonicConfig, TonicExporterBuilder};

#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
Expand Down
65 changes: 41 additions & 24 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
//!
//! Currently, OTEL metrics exporter only support GRPC connection via tonic on tokio runtime.

use crate::exporter::{
tonic::{TonicConfig, TonicExporterBuilder},
ExportConfig,
};
use crate::exporter::tonic::TonicExporterBuilder;
use crate::transform::{record_to_metric, sink, CheckpointedMetrics};
use crate::{Error, OtlpPipeline};
use core::fmt;
Expand Down Expand Up @@ -93,11 +90,9 @@ impl MetricsExporterBuilder {
) -> Result<MetricsExporter> {
match self {
#[cfg(feature = "grpc-tonic")]
MetricsExporterBuilder::Tonic(builder) => Ok(MetricsExporter::new(
builder.exporter_config,
builder.tonic_config,
temporality_selector,
)?),
MetricsExporterBuilder::Tonic(builder) => {
Ok(MetricsExporter::new(builder, temporality_selector)?)
}
}
}
}
Expand Down Expand Up @@ -235,10 +230,12 @@ impl MetricsExporter {
/// Create a new OTLP metrics exporter.
#[cfg(feature = "grpc-tonic")]
pub fn new(
config: ExportConfig,
mut tonic_config: TonicConfig,
export_builder: TonicExporterBuilder,
temporality_selector: Box<dyn TemporalitySelector + Send + Sync>,
) -> Result<MetricsExporter> {
let config = export_builder.exporter_config;
let mut tonic_config = export_builder.tonic_config;

let endpoint = match std::env::var(OTEL_EXPORTER_OTLP_METRICS_ENDPOINT) {
Ok(val) => val,
Err(_) => format!("{}{}", config.endpoint, "/v1/metrics"),
Expand Down Expand Up @@ -267,21 +264,19 @@ impl MetricsExporter {
#[cfg(not(feature = "tls"))]
let channel = endpoint.timeout(config.timeout).connect_lazy();

let client = MetricsServiceClient::new(channel);

let (sender, mut receiver) = tokio::sync::mpsc::channel::<ExportMsg>(2);
tokio::spawn(Box::pin(async move {
while let Some(msg) = receiver.recv().await {
match msg {
ExportMsg::Shutdown => {
break;
}
ExportMsg::Export(req) => {
let _ = client.to_owned().export(req).await;
}
let (sender, receiver) = tokio::sync::mpsc::channel::<ExportMsg>(2);
tokio::spawn(async move {
match export_builder.interceptor {
Some(interceptor) => {
let client = MetricsServiceClient::with_interceptor(channel, interceptor);
export_sink(client, receiver).await
}
None => {
let client = MetricsServiceClient::new(channel);
export_sink(client, receiver).await
}
}
}));
});

Ok(MetricsExporter {
sender: Mutex::new(sender),
Expand All @@ -291,6 +286,28 @@ impl MetricsExporter {
}
}

use tonic::codegen::{Body, Bytes, StdError};
async fn export_sink<T>(
mut client: MetricsServiceClient<T>,
mut receiver: tokio::sync::mpsc::Receiver<ExportMsg>,
) where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
while let Some(msg) = receiver.recv().await {
match msg {
ExportMsg::Shutdown => {
break;
}
ExportMsg::Export(req) => {
let _r = client.export(req).await;
}
}
}
}

impl metrics::MetricsExporter for MetricsExporter {
fn export(
&self,
Expand Down

0 comments on commit 675de34

Please sign in to comment.