tonic/request.rs
1use crate::metadata::{MetadataMap, MetadataValue};
2#[cfg(all(feature = "transport", feature = "tls"))]
3use crate::transport::server::TlsConnectInfo;
4#[cfg(feature = "transport")]
5use crate::transport::{server::TcpConnectInfo, Certificate};
6use crate::Extensions;
7use futures_core::Stream;
8#[cfg(feature = "transport")]
9use std::sync::Arc;
10use std::{net::SocketAddr, time::Duration};
11
12/// A gRPC request and metadata from an RPC call.
13#[derive(Debug)]
14pub struct Request<T> {
15 metadata: MetadataMap,
16 message: T,
17 extensions: Extensions,
18}
19
20/// Trait implemented by RPC request types.
21///
22/// Types implementing this trait can be used as arguments to client RPC
23/// methods without explicitly wrapping them into `tonic::Request`s. The purpose
24/// is to make client calls slightly more convenient to write.
25///
26/// Tonic's code generation and blanket implementations handle this for you,
27/// so it is not necessary to implement this trait directly.
28///
29/// # Example
30///
31/// Given the following gRPC method definition:
32/// ```proto
33/// rpc GetFeature(Point) returns (Feature) {}
34/// ```
35///
36/// we can call `get_feature` in two equivalent ways:
37/// ```rust
38/// # pub struct Point {}
39/// # pub struct Client {}
40/// # impl Client {
41/// # fn get_feature(&self, r: impl tonic::IntoRequest<Point>) {}
42/// # }
43/// # let client = Client {};
44/// use tonic::Request;
45///
46/// client.get_feature(Point {});
47/// client.get_feature(Request::new(Point {}));
48/// ```
49pub trait IntoRequest<T>: sealed::Sealed {
50 /// Wrap the input message `T` in a `tonic::Request`
51 fn into_request(self) -> Request<T>;
52}
53
54/// Trait implemented by RPC streaming request types.
55///
56/// Types implementing this trait can be used as arguments to client streaming
57/// RPC methods without explicitly wrapping them into `tonic::Request`s. The
58/// purpose is to make client calls slightly more convenient to write.
59///
60/// Tonic's code generation and blanket implementations handle this for you,
61/// so it is not necessary to implement this trait directly.
62///
63/// # Example
64///
65/// Given the following gRPC service method definition:
66/// ```proto
67/// rpc RecordRoute(stream Point) returns (RouteSummary) {}
68/// ```
69/// we can call `record_route` in two equivalent ways:
70///
71/// ```rust
72/// # #[derive(Clone)]
73/// # pub struct Point {};
74/// # pub struct Client {};
75/// # impl Client {
76/// # fn record_route(&self, r: impl tonic::IntoStreamingRequest<Message = Point>) {}
77/// # }
78/// # let client = Client {};
79/// use tonic::Request;
80/// use futures_util::stream;
81///
82/// let messages = vec![Point {}, Point {}];
83///
84/// client.record_route(Request::new(stream::iter(messages.clone())));
85/// client.record_route(stream::iter(messages));
86/// ```
87pub trait IntoStreamingRequest: sealed::Sealed {
88 /// The RPC request stream type
89 type Stream: Stream<Item = Self::Message> + Send + 'static;
90
91 /// The RPC request type
92 type Message;
93
94 /// Wrap the stream of messages in a `tonic::Request`
95 fn into_streaming_request(self) -> Request<Self::Stream>;
96}
97
98impl<T> Request<T> {
99 /// Create a new gRPC request.
100 ///
101 /// ```rust
102 /// # use tonic::Request;
103 /// # pub struct HelloRequest {
104 /// # pub name: String,
105 /// # }
106 /// Request::new(HelloRequest {
107 /// name: "Bob".into(),
108 /// });
109 /// ```
110 pub fn new(message: T) -> Self {
111 Request {
112 metadata: MetadataMap::new(),
113 message,
114 extensions: Extensions::new(),
115 }
116 }
117
118 /// Get a reference to the message
119 pub fn get_ref(&self) -> &T {
120 &self.message
121 }
122
123 /// Get a mutable reference to the message
124 pub fn get_mut(&mut self) -> &mut T {
125 &mut self.message
126 }
127
128 /// Get a reference to the custom request metadata.
129 pub fn metadata(&self) -> &MetadataMap {
130 &self.metadata
131 }
132
133 /// Get a mutable reference to the request metadata.
134 pub fn metadata_mut(&mut self) -> &mut MetadataMap {
135 &mut self.metadata
136 }
137
138 /// Consumes `self`, returning the message
139 pub fn into_inner(self) -> T {
140 self.message
141 }
142
143 pub(crate) fn into_parts(self) -> (MetadataMap, Extensions, T) {
144 (self.metadata, self.extensions, self.message)
145 }
146
147 pub(crate) fn from_parts(metadata: MetadataMap, extensions: Extensions, message: T) -> Self {
148 Self {
149 metadata,
150 extensions,
151 message,
152 }
153 }
154
155 pub(crate) fn from_http_parts(parts: http::request::Parts, message: T) -> Self {
156 Request {
157 metadata: MetadataMap::from_headers(parts.headers),
158 message,
159 extensions: Extensions::from_http(parts.extensions),
160 }
161 }
162
163 /// Convert an HTTP request to a gRPC request
164 pub fn from_http(http: http::Request<T>) -> Self {
165 let (parts, message) = http.into_parts();
166 Request::from_http_parts(parts, message)
167 }
168
169 pub(crate) fn into_http(
170 self,
171 uri: http::Uri,
172 sanitize_headers: SanitizeHeaders,
173 ) -> http::Request<T> {
174 let mut request = http::Request::new(self.message);
175
176 *request.version_mut() = http::Version::HTTP_2;
177 *request.method_mut() = http::Method::POST;
178 *request.uri_mut() = uri;
179 *request.headers_mut() = match sanitize_headers {
180 SanitizeHeaders::Yes => self.metadata.into_sanitized_headers(),
181 SanitizeHeaders::No => self.metadata.into_headers(),
182 };
183 *request.extensions_mut() = self.extensions.into_http();
184
185 request
186 }
187
188 #[doc(hidden)]
189 pub fn map<F, U>(self, f: F) -> Request<U>
190 where
191 F: FnOnce(T) -> U,
192 {
193 let message = f(self.message);
194
195 Request {
196 metadata: self.metadata,
197 message,
198 extensions: self.extensions,
199 }
200 }
201
202 /// Get the remote address of this connection.
203 ///
204 /// This will return `None` if the `IO` type used
205 /// does not implement `Connected`. This currently,
206 /// only works on the server side.
207 pub fn remote_addr(&self) -> Option<SocketAddr> {
208 #[cfg(feature = "transport")]
209 {
210 #[cfg(feature = "tls")]
211 {
212 self.extensions()
213 .get::<TcpConnectInfo>()
214 .and_then(|i| i.remote_addr())
215 .or_else(|| {
216 self.extensions()
217 .get::<TlsConnectInfo<TcpConnectInfo>>()
218 .and_then(|i| i.get_ref().remote_addr())
219 })
220 }
221
222 #[cfg(not(feature = "tls"))]
223 {
224 self.extensions()
225 .get::<TcpConnectInfo>()
226 .and_then(|i| i.remote_addr())
227 }
228 }
229
230 #[cfg(not(feature = "transport"))]
231 {
232 None
233 }
234 }
235
236 /// Get the peer certificates of the connected client.
237 ///
238 /// This is used to fetch the certificates from the TLS session
239 /// and is mostly used for mTLS. This currently only returns
240 /// `Some` on the server side of the `transport` server with
241 /// TLS enabled connections.
242 #[cfg(feature = "transport")]
243 #[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
244 pub fn peer_certs(&self) -> Option<Arc<Vec<Certificate>>> {
245 #[cfg(feature = "tls")]
246 {
247 self.extensions()
248 .get::<TlsConnectInfo<TcpConnectInfo>>()
249 .and_then(|i| i.peer_certs())
250 }
251
252 #[cfg(not(feature = "tls"))]
253 {
254 None
255 }
256 }
257
258 /// Set the max duration the request is allowed to take.
259 ///
260 /// Requires the server to support the `grpc-timeout` metadata, which Tonic does.
261 ///
262 /// The duration will be formatted according to [the spec] and use the most precise unit
263 /// possible.
264 ///
265 /// Example:
266 ///
267 /// ```rust
268 /// use std::time::Duration;
269 /// use tonic::Request;
270 ///
271 /// let mut request = Request::new(());
272 ///
273 /// request.set_timeout(Duration::from_secs(30));
274 ///
275 /// let value = request.metadata().get("grpc-timeout").unwrap();
276 ///
277 /// assert_eq!(
278 /// value,
279 /// // equivalent to 30 seconds
280 /// "30000000u"
281 /// );
282 /// ```
283 ///
284 /// [the spec]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
285 pub fn set_timeout(&mut self, deadline: Duration) {
286 let value = MetadataValue::from_str(&duration_to_grpc_timeout(deadline)).unwrap();
287 self.metadata_mut()
288 .insert(crate::metadata::GRPC_TIMEOUT_HEADER, value);
289 }
290
291 /// Returns a reference to the associated extensions.
292 pub fn extensions(&self) -> &Extensions {
293 &self.extensions
294 }
295
296 /// Returns a mutable reference to the associated extensions.
297 ///
298 /// # Example
299 ///
300 /// Extensions can be set in interceptors:
301 ///
302 /// ```no_run
303 /// use tonic::{Request, service::interceptor};
304 ///
305 /// struct MyExtension {
306 /// some_piece_of_data: String,
307 /// }
308 ///
309 /// interceptor(|mut request: Request<()>| {
310 /// request.extensions_mut().insert(MyExtension {
311 /// some_piece_of_data: "foo".to_string(),
312 /// });
313 ///
314 /// Ok(request)
315 /// });
316 /// ```
317 ///
318 /// And picked up by RPCs:
319 ///
320 /// ```no_run
321 /// use tonic::{async_trait, Status, Request, Response};
322 /// #
323 /// # struct Output {}
324 /// # struct Input;
325 /// # struct MyService;
326 /// # struct MyExtension;
327 /// # #[async_trait]
328 /// # trait TestService {
329 /// # async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status>;
330 /// # }
331 ///
332 /// #[async_trait]
333 /// impl TestService for MyService {
334 /// async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status> {
335 /// let value: &MyExtension = req.extensions().get::<MyExtension>().unwrap();
336 ///
337 /// Ok(Response::new(Output {}))
338 /// }
339 /// }
340 /// ```
341 pub fn extensions_mut(&mut self) -> &mut Extensions {
342 &mut self.extensions
343 }
344}
345
346impl<T> IntoRequest<T> for T {
347 fn into_request(self) -> Request<Self> {
348 Request::new(self)
349 }
350}
351
352impl<T> IntoRequest<T> for Request<T> {
353 fn into_request(self) -> Request<T> {
354 self
355 }
356}
357
358impl<T> IntoStreamingRequest for T
359where
360 T: Stream + Send + 'static,
361{
362 type Stream = T;
363 type Message = T::Item;
364
365 fn into_streaming_request(self) -> Request<Self> {
366 Request::new(self)
367 }
368}
369
370impl<T> IntoStreamingRequest for Request<T>
371where
372 T: Stream + Send + 'static,
373{
374 type Stream = T;
375 type Message = T::Item;
376
377 fn into_streaming_request(self) -> Self {
378 self
379 }
380}
381
382impl<T> sealed::Sealed for T {}
383
384mod sealed {
385 pub trait Sealed {}
386}
387
388fn duration_to_grpc_timeout(duration: Duration) -> String {
389 fn try_format<T: Into<u128>>(
390 duration: Duration,
391 unit: char,
392 convert: impl FnOnce(Duration) -> T,
393 ) -> Option<String> {
394 // The gRPC spec specifies that the timeout most be at most 8 digits. So this is the largest a
395 // value can be before we need to use a bigger unit.
396 let max_size: u128 = 99_999_999; // exactly 8 digits
397
398 let value = convert(duration).into();
399 if value > max_size {
400 None
401 } else {
402 Some(format!("{}{}", value, unit))
403 }
404 }
405
406 // pick the most precise unit that is less than or equal to 8 digits as per the gRPC spec
407 try_format(duration, 'n', |d| d.as_nanos())
408 .or_else(|| try_format(duration, 'u', |d| d.as_micros()))
409 .or_else(|| try_format(duration, 'm', |d| d.as_millis()))
410 .or_else(|| try_format(duration, 'S', |d| d.as_secs()))
411 .or_else(|| try_format(duration, 'M', |d| d.as_secs() / 60))
412 .or_else(|| {
413 try_format(duration, 'H', |d| {
414 let minutes = d.as_secs() / 60;
415 minutes / 60
416 })
417 })
418 // duration has to be more than 11_415 years for this to happen
419 .expect("duration is unrealistically large")
420}
421
422/// When converting a `tonic::Request` into a `http::Request` should reserved
423/// headers be removed?
424pub(crate) enum SanitizeHeaders {
425 Yes,
426 No,
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use crate::metadata::MetadataValue;
433 use http::Uri;
434
435 #[test]
436 fn reserved_headers_are_excluded() {
437 let mut r = Request::new(1);
438
439 for header in &MetadataMap::GRPC_RESERVED_HEADERS {
440 r.metadata_mut()
441 .insert(*header, MetadataValue::from_static("invalid"));
442 }
443
444 let http_request = r.into_http(Uri::default(), SanitizeHeaders::Yes);
445 assert!(http_request.headers().is_empty());
446 }
447
448 #[test]
449 fn duration_to_grpc_timeout_less_than_second() {
450 let timeout = Duration::from_millis(500);
451 let value = duration_to_grpc_timeout(timeout);
452 assert_eq!(value, format!("{}u", timeout.as_micros()));
453 }
454
455 #[test]
456 fn duration_to_grpc_timeout_more_than_second() {
457 let timeout = Duration::from_secs(30);
458 let value = duration_to_grpc_timeout(timeout);
459 assert_eq!(value, format!("{}u", timeout.as_micros()));
460 }
461
462 #[test]
463 fn duration_to_grpc_timeout_a_very_long_time() {
464 let one_hour = Duration::from_secs(60 * 60);
465 let value = duration_to_grpc_timeout(one_hour);
466 assert_eq!(value, format!("{}m", one_hour.as_millis()));
467 }
468}