tonic/transport/service/
discover.rs1use super::super::service;
2use super::connection::Connection;
3use crate::transport::Endpoint;
4
5use std::{
6 hash::Hash,
7 pin::Pin,
8 task::{Context, Poll},
9};
10use tokio::sync::mpsc::Receiver;
11
12use tokio_stream::Stream;
13use tower::discover::Change;
14
15type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
16
17pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
18 changes: Receiver<Change<K, Endpoint>>,
19}
20
21impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
22 pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
23 Self { changes }
24 }
25}
26
27impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
28 type Item = DiscoverResult<K, Connection, crate::Error>;
29
30 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31 let c = &mut self.changes;
32 match Pin::new(&mut *c).poll_recv(cx) {
33 Poll::Pending | Poll::Ready(None) => Poll::Pending,
34 Poll::Ready(Some(change)) => match change {
35 Change::Insert(k, endpoint) => {
36 let mut http = hyper::client::connect::HttpConnector::new();
37 http.set_nodelay(endpoint.tcp_nodelay);
38 http.set_keepalive(endpoint.tcp_keepalive);
39 http.enforce_http(false);
40 #[cfg(feature = "tls")]
41 let connector = service::connector(http, endpoint.tls.clone());
42
43 #[cfg(not(feature = "tls"))]
44 let connector = service::connector(http);
45 let connection = Connection::lazy(connector, endpoint);
46 let change = Ok(Change::Insert(k, connection));
47 Poll::Ready(Some(change))
48 }
49 Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
50 },
51 }
52 }
53}
54
55impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}