1use super::super::service;
2use super::Channel;
3#[cfg(feature = "tls")]
4use super::ClientTlsConfig;
5#[cfg(feature = "tls")]
6use crate::transport::service::TlsConnector;
7use crate::transport::Error;
8use bytes::Bytes;
9use http::{
10 uri::{InvalidUri, Uri},
11 HeaderValue,
12};
13use std::{
14 convert::{TryFrom, TryInto},
15 fmt,
16 str::FromStr,
17 time::Duration,
18};
19use tower::make::MakeConnection;
20
21#[derive(Clone)]
25pub struct Endpoint {
26 pub(crate) uri: Uri,
27 pub(crate) user_agent: Option<HeaderValue>,
28 pub(crate) timeout: Option<Duration>,
29 pub(crate) concurrency_limit: Option<usize>,
30 pub(crate) rate_limit: Option<(u64, Duration)>,
31 #[cfg(feature = "tls")]
32 pub(crate) tls: Option<TlsConnector>,
33 pub(crate) buffer_size: Option<usize>,
34 pub(crate) init_stream_window_size: Option<u32>,
35 pub(crate) init_connection_window_size: Option<u32>,
36 pub(crate) tcp_keepalive: Option<Duration>,
37 pub(crate) tcp_nodelay: bool,
38 pub(crate) http2_keep_alive_interval: Option<Duration>,
39 pub(crate) http2_keep_alive_timeout: Option<Duration>,
40 pub(crate) http2_keep_alive_while_idle: Option<bool>,
41 pub(crate) connect_timeout: Option<Duration>,
42 pub(crate) http2_adaptive_window: Option<bool>,
43}
44
45impl Endpoint {
46 #[doc(hidden)]
49 pub fn new<D>(dst: D) -> Result<Self, Error>
50 where
51 D: TryInto<Self>,
52 D::Error: Into<crate::Error>,
53 {
54 let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
55 Ok(me)
56 }
57
58 pub fn from_static(s: &'static str) -> Self {
69 let uri = Uri::from_static(s);
70 Self::from(uri)
71 }
72
73 pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, InvalidUri> {
80 let uri = Uri::from_maybe_shared(s.into())?;
81 Ok(Self::from(uri))
82 }
83
84 pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
96 where
97 T: TryInto<HeaderValue>,
98 {
99 user_agent
100 .try_into()
101 .map(|ua| Endpoint {
102 user_agent: Some(ua),
103 ..self
104 })
105 .map_err(|_| Error::new_invalid_user_agent())
106 }
107
108 pub fn timeout(self, dur: Duration) -> Self {
125 Endpoint {
126 timeout: Some(dur),
127 ..self
128 }
129 }
130
131 pub fn connect_timeout(self, dur: Duration) -> Self {
142 Endpoint {
143 connect_timeout: Some(dur),
144 ..self
145 }
146 }
147
148 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
157 Endpoint {
158 tcp_keepalive,
159 ..self
160 }
161 }
162
163 pub fn concurrency_limit(self, limit: usize) -> Self {
171 Endpoint {
172 concurrency_limit: Some(limit),
173 ..self
174 }
175 }
176
177 pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
186 Endpoint {
187 rate_limit: Some((limit, duration)),
188 ..self
189 }
190 }
191
192 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
199 Endpoint {
200 init_stream_window_size: sz.into(),
201 ..self
202 }
203 }
204
205 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
209 Endpoint {
210 init_connection_window_size: sz.into(),
211 ..self
212 }
213 }
214
215 #[cfg(feature = "tls")]
217 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
218 pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
219 Ok(Endpoint {
220 tls: Some(
221 tls_config
222 .tls_connector(self.uri.clone())
223 .map_err(Error::from_source)?,
224 ),
225 ..self
226 })
227 }
228
229 pub fn tcp_nodelay(self, enabled: bool) -> Self {
231 Endpoint {
232 tcp_nodelay: enabled,
233 ..self
234 }
235 }
236
237 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
239 Endpoint {
240 http2_keep_alive_interval: Some(interval),
241 ..self
242 }
243 }
244
245 pub fn keep_alive_timeout(self, duration: Duration) -> Self {
247 Endpoint {
248 http2_keep_alive_timeout: Some(duration),
249 ..self
250 }
251 }
252
253 pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
255 Endpoint {
256 http2_keep_alive_while_idle: Some(enabled),
257 ..self
258 }
259 }
260
261 pub fn http2_adaptive_window(self, enabled: bool) -> Self {
263 Endpoint {
264 http2_adaptive_window: Some(enabled),
265 ..self
266 }
267 }
268
269 pub async fn connect(&self) -> Result<Channel, Error> {
271 let mut http = hyper::client::connect::HttpConnector::new();
272 http.enforce_http(false);
273 http.set_nodelay(self.tcp_nodelay);
274 http.set_keepalive(self.tcp_keepalive);
275
276 #[cfg(feature = "tls")]
277 let connector = service::connector(http, self.tls.clone());
278
279 #[cfg(not(feature = "tls"))]
280 let connector = service::connector(http);
281
282 if let Some(connect_timeout) = self.connect_timeout {
283 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
284 connector.set_connect_timeout(Some(connect_timeout));
285 Channel::connect(connector, self.clone()).await
286 } else {
287 Channel::connect(connector, self.clone()).await
288 }
289 }
290
291 pub fn connect_lazy(&self) -> Channel {
296 let mut http = hyper::client::connect::HttpConnector::new();
297 http.enforce_http(false);
298 http.set_nodelay(self.tcp_nodelay);
299 http.set_keepalive(self.tcp_keepalive);
300
301 #[cfg(feature = "tls")]
302 let connector = service::connector(http, self.tls.clone());
303
304 #[cfg(not(feature = "tls"))]
305 let connector = service::connector(http);
306
307 if let Some(connect_timeout) = self.connect_timeout {
308 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
309 connector.set_connect_timeout(Some(connect_timeout));
310 Channel::new(connector, self.clone())
311 } else {
312 Channel::new(connector, self.clone())
313 }
314 }
315
316 pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
324 where
325 C: MakeConnection<Uri> + Send + 'static,
326 C::Connection: Unpin + Send + 'static,
327 C::Future: Send + 'static,
328 crate::Error: From<C::Error> + Send + 'static,
329 {
330 #[cfg(feature = "tls")]
331 let connector = service::connector(connector, self.tls.clone());
332
333 #[cfg(not(feature = "tls"))]
334 let connector = service::connector(connector);
335
336 if let Some(connect_timeout) = self.connect_timeout {
337 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
338 connector.set_connect_timeout(Some(connect_timeout));
339 Channel::connect(connector, self.clone()).await
340 } else {
341 Channel::connect(connector, self.clone()).await
342 }
343 }
344
345 pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Result<Channel, Error>
353 where
354 C: MakeConnection<Uri> + Send + 'static,
355 C::Connection: Unpin + Send + 'static,
356 C::Future: Send + 'static,
357 crate::Error: From<C::Error> + Send + 'static,
358 {
359 #[cfg(feature = "tls")]
360 let connector = service::connector(connector, self.tls.clone());
361
362 #[cfg(not(feature = "tls"))]
363 let connector = service::connector(connector);
364
365 Ok(Channel::new(connector, self.clone()))
366 }
367
368 pub fn uri(&self) -> &Uri {
378 &self.uri
379 }
380}
381
382impl From<Uri> for Endpoint {
383 fn from(uri: Uri) -> Self {
384 Self {
385 uri,
386 user_agent: None,
387 concurrency_limit: None,
388 rate_limit: None,
389 timeout: None,
390 #[cfg(feature = "tls")]
391 tls: None,
392 buffer_size: None,
393 init_stream_window_size: None,
394 init_connection_window_size: None,
395 tcp_keepalive: None,
396 tcp_nodelay: true,
397 http2_keep_alive_interval: None,
398 http2_keep_alive_timeout: None,
399 http2_keep_alive_while_idle: None,
400 connect_timeout: None,
401 http2_adaptive_window: None,
402 }
403 }
404}
405
406impl TryFrom<Bytes> for Endpoint {
407 type Error = InvalidUri;
408
409 fn try_from(t: Bytes) -> Result<Self, Self::Error> {
410 Self::from_shared(t)
411 }
412}
413
414impl TryFrom<String> for Endpoint {
415 type Error = InvalidUri;
416
417 fn try_from(t: String) -> Result<Self, Self::Error> {
418 Self::from_shared(t.into_bytes())
419 }
420}
421
422impl TryFrom<&'static str> for Endpoint {
423 type Error = InvalidUri;
424
425 fn try_from(t: &'static str) -> Result<Self, Self::Error> {
426 Self::from_shared(t.as_bytes())
427 }
428}
429
430impl fmt::Debug for Endpoint {
431 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432 f.debug_struct("Endpoint").finish()
433 }
434}
435
436impl FromStr for Endpoint {
437 type Err = InvalidUri;
438
439 fn from_str(s: &str) -> Result<Self, Self::Err> {
440 Self::try_from(s.to_string())
441 }
442}