Support timing out wait on IDLE
This commit is contained in:
parent
22f9ba0ddf
commit
80a0aae69d
1 changed files with 34 additions and 12 deletions
|
|
@ -31,6 +31,15 @@ pub struct Handle<'a, T: Read + Write> {
|
||||||
done: bool,
|
done: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The result of a wait on a [`Handle`]
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
pub enum WaitOutcome {
|
||||||
|
/// The wait timed out
|
||||||
|
TimedOut,
|
||||||
|
/// The mailbox was modified
|
||||||
|
MailboxChanged,
|
||||||
|
}
|
||||||
|
|
||||||
/// Must be implemented for a transport in order for a `Session` using that transport to support
|
/// Must be implemented for a transport in order for a `Session` using that transport to support
|
||||||
/// operations with timeouts.
|
/// operations with timeouts.
|
||||||
///
|
///
|
||||||
|
|
@ -91,34 +100,37 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
|
||||||
/// Internal helper that doesn't consume self.
|
/// Internal helper that doesn't consume self.
|
||||||
///
|
///
|
||||||
/// This is necessary so that we can keep using the inner `Session` in `wait_keepalive`.
|
/// This is necessary so that we can keep using the inner `Session` in `wait_keepalive`.
|
||||||
fn wait_inner(&mut self) -> Result<()> {
|
fn wait_inner(&mut self, reconnect: bool) -> Result<WaitOutcome> {
|
||||||
let mut v = Vec::new();
|
let mut v = Vec::new();
|
||||||
loop {
|
loop {
|
||||||
match self.session.readline(&mut v).map(|_| ()) {
|
let result = match self.session.readline(&mut v).map(|_| ()) {
|
||||||
Err(Error::Io(ref e))
|
Err(Error::Io(ref e))
|
||||||
if e.kind() == io::ErrorKind::TimedOut
|
if e.kind() == io::ErrorKind::TimedOut
|
||||||
|| e.kind() == io::ErrorKind::WouldBlock =>
|
|| e.kind() == io::ErrorKind::WouldBlock =>
|
||||||
{
|
{
|
||||||
// we need to refresh the IDLE connection
|
if reconnect {
|
||||||
self.terminate()?;
|
self.terminate()?;
|
||||||
self.init()?;
|
self.init()?;
|
||||||
return self.wait_inner();
|
return self.wait_inner(reconnect);
|
||||||
|
}
|
||||||
|
Ok(WaitOutcome::TimedOut)
|
||||||
}
|
}
|
||||||
r => r,
|
Ok(()) => Ok(WaitOutcome::MailboxChanged),
|
||||||
|
Err(r) => Err(r),
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
// Handle Dovecot's imap_idle_notify_interval message
|
// Handle Dovecot's imap_idle_notify_interval message
|
||||||
if v.eq_ignore_ascii_case(b"* OK Still here\r\n") {
|
if v.eq_ignore_ascii_case(b"* OK Still here\r\n") {
|
||||||
v.clear();
|
v.clear();
|
||||||
} else {
|
} else {
|
||||||
break Ok(());
|
break Ok(result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block until the selected mailbox changes.
|
/// Block until the selected mailbox changes.
|
||||||
pub fn wait(mut self) -> Result<()> {
|
pub fn wait(mut self) -> Result<()> {
|
||||||
self.wait_inner()
|
self.wait_inner(true).map(|_| ())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -147,16 +159,26 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
|
||||||
// This still allows a client to receive immediate mailbox updates even
|
// This still allows a client to receive immediate mailbox updates even
|
||||||
// though it need only "poll" at half hour intervals.
|
// though it need only "poll" at half hour intervals.
|
||||||
let keepalive = self.keepalive;
|
let keepalive = self.keepalive;
|
||||||
self.wait_timeout(keepalive)
|
self.timed_wait(keepalive, true).map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block until the selected mailbox changes, or until the given amount of time has expired.
|
/// Block until the selected mailbox changes, or until the given amount of time has expired.
|
||||||
pub fn wait_timeout(mut self, timeout: Duration) -> Result<()> {
|
#[deprecated(note = "use wait_with_timeout instead")]
|
||||||
|
pub fn wait_timeout(self, timeout: Duration) -> Result<()> {
|
||||||
|
self.wait_with_timeout(timeout).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Block until the selected mailbox changes, or until the given amount of time has expired.
|
||||||
|
pub fn wait_with_timeout(self, timeout: Duration) -> Result<WaitOutcome> {
|
||||||
|
self.timed_wait(timeout, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn timed_wait(mut self, timeout: Duration, reconnect: bool) -> Result<WaitOutcome> {
|
||||||
self.session
|
self.session
|
||||||
.stream
|
.stream
|
||||||
.get_mut()
|
.get_mut()
|
||||||
.set_read_timeout(Some(timeout))?;
|
.set_read_timeout(Some(timeout))?;
|
||||||
let res = self.wait_inner();
|
let res = self.wait_inner(reconnect);
|
||||||
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
|
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue