tonic/transport/service/
discover.rs

1use super::super::service;
2use super::connection::Connection;
3use crate::transport::Endpoint;
4
5use std::{
6    hash::Hash,
7    pin::Pin,
8    task::{Context, Poll},
9};
10use tokio::sync::mpsc::Receiver;
11
12use tokio_stream::Stream;
13use tower::discover::Change;
14
15type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
16
17pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
18    changes: Receiver<Change<K, Endpoint>>,
19}
20
21impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
22    pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
23        Self { changes }
24    }
25}
26
27impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
28    type Item = DiscoverResult<K, Connection, crate::Error>;
29
30    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31        let c = &mut self.changes;
32        match Pin::new(&mut *c).poll_recv(cx) {
33            Poll::Pending | Poll::Ready(None) => Poll::Pending,
34            Poll::Ready(Some(change)) => match change {
35                Change::Insert(k, endpoint) => {
36                    let mut http = hyper::client::connect::HttpConnector::new();
37                    http.set_nodelay(endpoint.tcp_nodelay);
38                    http.set_keepalive(endpoint.tcp_keepalive);
39                    http.enforce_http(false);
40                    #[cfg(feature = "tls")]
41                    let connector = service::connector(http, endpoint.tls.clone());
42
43                    #[cfg(not(feature = "tls"))]
44                    let connector = service::connector(http);
45                    let connection = Connection::lazy(connector, endpoint);
46                    let change = Ok(Change::Insert(k, connection));
47                    Poll::Ready(Some(change))
48                }
49                Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
50            },
51        }
52    }
53}
54
55impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}