Use BufStream for better I/O handling (#39)

* Use bufstream for better read/write

* Read with length 0 == EOF

* Adapt read_delay test to write one chat at a time

* Add test for eof reads

* Neater interface for MockStream
This commit is contained in:
Jon Gjengset 2017-09-27 17:38:51 -04:00 committed by Matt McCoy
parent 3e017da4af
commit 8383b47f35
5 changed files with 92 additions and 61 deletions

View file

@ -18,6 +18,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
openssl = "0.9" openssl = "0.9"
regex = "0.2" regex = "0.2"
bufstream = "0.1"
[dev-dependencies] [dev-dependencies]
base64 = "0.2" base64 = "0.2"

View file

@ -2,6 +2,7 @@ use std::net::{TcpStream, ToSocketAddrs};
use openssl::ssl::{SslConnector, SslStream}; use openssl::ssl::{SslConnector, SslStream};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::time::Duration; use std::time::Duration;
use bufstream::BufStream;
use super::mailbox::Mailbox; use super::mailbox::Mailbox;
use super::authenticator::Authenticator; use super::authenticator::Authenticator;
@ -15,8 +16,8 @@ const CR: u8 = 0x0d;
const LF: u8 = 0x0a; const LF: u8 = 0x0a;
/// Stream to interface with the IMAP server. This interface is only for the command stream. /// Stream to interface with the IMAP server. This interface is only for the command stream.
pub struct Client<T> { pub struct Client<T: Read + Write> {
stream: T, stream: BufStream<T>,
tag: u32, tag: u32,
pub debug: bool, pub debug: bool,
} }
@ -121,7 +122,7 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> {
// re-issue it at least every 29 minutes to avoid being logged off. // re-issue it at least every 29 minutes to avoid being logged off.
// This still allows a client to receive immediate mailbox updates even // This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals. // though it need only "poll" at half hour intervals.
try!(self.client.stream.set_read_timeout(Some(self.keepalive))); try!(self.client.stream.get_mut().set_read_timeout(Some(self.keepalive)));
match self.wait() { match self.wait() {
Err(Error::Io(ref e)) Err(Error::Io(ref e))
if e.kind() == io::ErrorKind::TimedOut || e.kind() == io::ErrorKind::WouldBlock => { if e.kind() == io::ErrorKind::TimedOut || e.kind() == io::ErrorKind::WouldBlock => {
@ -131,7 +132,7 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> {
self.wait_keepalive() self.wait_keepalive()
} }
r => { r => {
try!(self.client.stream.set_read_timeout(None)); try!(self.client.stream.get_mut().set_read_timeout(None));
r r
} }
} }
@ -139,9 +140,9 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> {
/// Block until the selected mailbox changes, or until the given amount of time has expired. /// Block until the selected mailbox changes, or until the given amount of time has expired.
pub fn wait_timeout(&mut self, timeout: Duration) -> Result<()> { pub fn wait_timeout(&mut self, timeout: Duration) -> Result<()> {
try!(self.client.stream.set_read_timeout(Some(timeout))); try!(self.client.stream.get_mut().set_read_timeout(Some(timeout)));
let res = self.wait(); let res = self.wait();
try!(self.client.stream.set_read_timeout(None)); try!(self.client.stream.get_mut().set_read_timeout(None));
res res
} }
} }
@ -191,7 +192,7 @@ impl Client<TcpStream> {
) -> Result<Client<SslStream<TcpStream>>> { ) -> Result<Client<SslStream<TcpStream>>> {
// TODO This needs to be tested // TODO This needs to be tested
self.run_command_and_check_ok("STARTTLS")?; self.run_command_and_check_ok("STARTTLS")?;
SslConnector::connect(&ssl_connector, domain, self.stream) SslConnector::connect(&ssl_connector, domain, try!(self.stream.into_inner()))
.map(Client::new) .map(Client::new)
.map_err(Error::Ssl) .map_err(Error::Ssl)
} }
@ -224,7 +225,7 @@ impl<T: Read + Write> Client<T> {
/// Creates a new client with the underlying stream. /// Creates a new client with the underlying stream.
pub fn new(stream: T) -> Client<T> { pub fn new(stream: T) -> Client<T> {
Client { Client {
stream: stream, stream: BufStream::new(stream),
tag: INITIAL_TAG, tag: INITIAL_TAG,
debug: false, debug: false,
} }
@ -428,6 +429,7 @@ impl<T: Read + Write> Client<T> {
} }
try!(self.stream.write_all(content)); try!(self.stream.write_all(content));
try!(self.stream.write_all(b"\r\n")); try!(self.stream.write_all(b"\r\n"));
try!(self.stream.flush());
self.read_response() self.read_response()
} }
@ -477,22 +479,17 @@ impl<T: Read + Write> Client<T> {
} }
fn readline(&mut self) -> Result<Vec<u8>> { fn readline(&mut self) -> Result<Vec<u8>> {
use std::io::BufRead;
let mut line_buffer: Vec<u8> = Vec::new(); let mut line_buffer: Vec<u8> = Vec::new();
while line_buffer.len() < 2 || if try!(self.stream.read_until(LF, &mut line_buffer)) == 0 {
(line_buffer[line_buffer.len() - 1] != LF && line_buffer[line_buffer.len() - 2] != CR) return Err(Error::ConnectionLost);
{
let byte_buffer: &mut [u8] = &mut [0];
let n = try!(self.stream.read(byte_buffer));
if n > 0 {
line_buffer.push(byte_buffer[0]);
}
} }
if self.debug { if self.debug {
let mut line = line_buffer.clone();
// Remove CRLF // Remove CRLF
line.truncate(line_buffer.len() - 2); let len = line_buffer.len();
print!("S: {}\n", String::from_utf8(line).unwrap()); let line = &line_buffer[..(len - 2)];
print!("S: {}\n", String::from_utf8_lossy(line));
} }
Ok(line_buffer) Ok(line_buffer)
@ -507,6 +504,7 @@ impl<T: Read + Write> Client<T> {
fn write_line(&mut self, buf: &[u8]) -> Result<()> { fn write_line(&mut self, buf: &[u8]) -> Result<()> {
try!(self.stream.write_all(buf)); try!(self.stream.write_all(buf));
try!(self.stream.write_all(&[CR, LF])); try!(self.stream.write_all(&[CR, LF]));
try!(self.stream.flush());
if self.debug { if self.debug {
print!("C: {}\n", String::from_utf8(buf.to_vec()).unwrap()); print!("C: {}\n", String::from_utf8(buf.to_vec()).unwrap());
} }
@ -547,17 +545,27 @@ mod tests {
fn readline_delay_read() { fn readline_delay_read() {
let greeting = "* OK Dovecot ready.\r\n"; let greeting = "* OK Dovecot ready.\r\n";
let expected_response: String = greeting.to_string(); let expected_response: String = greeting.to_string();
let mock_stream = MockStream::new_read_delay(greeting.as_bytes().to_vec()); let mock_stream = MockStream::default().with_buf(greeting.as_bytes().to_vec()).with_delay();
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
let actual_response = String::from_utf8(client.readline().unwrap()).unwrap(); let actual_response = String::from_utf8(client.readline().unwrap()).unwrap();
assert_eq!(expected_response, actual_response); assert_eq!(expected_response, actual_response);
} }
#[test]
fn readline_eof() {
let mock_stream = MockStream::default().with_eof();
let mut client = Client::new(mock_stream);
if let Err(Error::ConnectionLost) = client.readline() {
} else {
unreachable!("EOF read did not return connection lost");
}
}
#[test] #[test]
#[should_panic] #[should_panic]
fn readline_err() { fn readline_err() {
// TODO Check the error test // TODO Check the error test
let mock_stream = MockStream::new_err(); let mock_stream = MockStream::default().with_err();
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.readline().unwrap(); client.readline().unwrap();
} }
@ -565,7 +573,7 @@ mod tests {
#[test] #[test]
fn create_command() { fn create_command() {
let base_command = "CHECK"; let base_command = "CHECK";
let mock_stream = MockStream::new(Vec::new()); let mock_stream = MockStream::default();
let mut imap_stream = Client::new(mock_stream); let mut imap_stream = Client::new(mock_stream);
let expected_command = format!("a1 {}", base_command); let expected_command = format!("a1 {}", base_command);
@ -593,7 +601,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.login(username, password).unwrap(); client.login(username, password).unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid login command" "Invalid login command"
); );
} }
@ -606,7 +614,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.logout().unwrap(); client.logout().unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid logout command" "Invalid logout command"
); );
} }
@ -627,7 +635,7 @@ mod tests {
.rename(current_mailbox_name, new_mailbox_name) .rename(current_mailbox_name, new_mailbox_name)
.unwrap(); .unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid rename command" "Invalid rename command"
); );
} }
@ -641,7 +649,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.subscribe(mailbox).unwrap(); client.subscribe(mailbox).unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid subscribe command" "Invalid subscribe command"
); );
} }
@ -655,7 +663,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.unsubscribe(mailbox).unwrap(); client.unsubscribe(mailbox).unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid unsubscribe command" "Invalid unsubscribe command"
); );
} }
@ -667,7 +675,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.expunge().unwrap(); client.expunge().unwrap();
assert!( assert!(
client.stream.written_buf == b"a1 EXPUNGE\r\n".to_vec(), client.stream.get_ref().written_buf == b"a1 EXPUNGE\r\n".to_vec(),
"Invalid expunge command" "Invalid expunge command"
); );
} }
@ -679,7 +687,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.check().unwrap(); client.check().unwrap();
assert!( assert!(
client.stream.written_buf == b"a1 CHECK\r\n".to_vec(), client.stream.get_ref().written_buf == b"a1 CHECK\r\n".to_vec(),
"Invalid check command" "Invalid check command"
); );
} }
@ -710,7 +718,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
let mailbox = client.examine(mailbox_name).unwrap(); let mailbox = client.examine(mailbox_name).unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid examine command" "Invalid examine command"
); );
assert!(mailbox == expected_mailbox, "Unexpected mailbox returned"); assert!(mailbox == expected_mailbox, "Unexpected mailbox returned");
@ -744,7 +752,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
let mailbox = client.select(mailbox_name).unwrap(); let mailbox = client.select(mailbox_name).unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid select command" "Invalid select command"
); );
assert!(mailbox == expected_mailbox, "Unexpected mailbox returned"); assert!(mailbox == expected_mailbox, "Unexpected mailbox returned");
@ -760,7 +768,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
let capabilities = client.capability().unwrap(); let capabilities = client.capability().unwrap();
assert!( assert!(
client.stream.written_buf == b"a1 CAPABILITY\r\n".to_vec(), client.stream.get_ref().written_buf == b"a1 CAPABILITY\r\n".to_vec(),
"Invalid capability command" "Invalid capability command"
); );
assert!( assert!(
@ -778,7 +786,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.create(mailbox_name).unwrap(); client.create(mailbox_name).unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid create command" "Invalid create command"
); );
} }
@ -792,7 +800,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.delete(mailbox_name).unwrap(); client.delete(mailbox_name).unwrap();
assert!( assert!(
client.stream.written_buf == command.as_bytes().to_vec(), client.stream.get_ref().written_buf == command.as_bytes().to_vec(),
"Invalid delete command" "Invalid delete command"
); );
} }
@ -804,7 +812,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.noop().unwrap(); client.noop().unwrap();
assert!( assert!(
client.stream.written_buf == b"a1 NOOP\r\n".to_vec(), client.stream.get_ref().written_buf == b"a1 NOOP\r\n".to_vec(),
"Invalid noop command" "Invalid noop command"
); );
} }
@ -816,7 +824,7 @@ mod tests {
let mut client = Client::new(mock_stream); let mut client = Client::new(mock_stream);
client.close().unwrap(); client.close().unwrap();
assert!( assert!(
client.stream.written_buf == b"a1 CLOSE\r\n".to_vec(), client.stream.get_ref().written_buf == b"a1 CLOSE\r\n".to_vec(),
"Invalid close command" "Invalid close command"
); );
} }
@ -897,7 +905,7 @@ mod tests {
let mut client = Client::new(MockStream::new(resp)); let mut client = Client::new(MockStream::new(resp));
let _ = op(&mut client, seq, query); let _ = op(&mut client, seq, query);
assert!( assert!(
client.stream.written_buf == line.as_bytes().to_vec(), client.stream.get_ref().written_buf == line.as_bytes().to_vec(),
"Invalid command" "Invalid command"
); );
} }

View file

@ -5,6 +5,7 @@ use std::error::Error as StdError;
use std::net::TcpStream; use std::net::TcpStream;
use openssl::ssl::HandshakeError as SslError; use openssl::ssl::HandshakeError as SslError;
use bufstream::IntoInnerError as BufError;
pub type Result<T> = result::Result<T, Error>; pub type Result<T> = result::Result<T, Error>;
@ -19,6 +20,8 @@ pub enum Error {
BadResponse(Vec<String>), BadResponse(Vec<String>),
/// A NO response from the IMAP server. /// A NO response from the IMAP server.
NoResponse(Vec<String>), NoResponse(Vec<String>),
/// The connection was terminated unexpectedly.
ConnectionLost,
// Error parsing a server response. // Error parsing a server response.
Parse(ParseError), Parse(ParseError),
// Error appending a mail // Error appending a mail
@ -31,6 +34,12 @@ impl From<IoError> for Error {
} }
} }
impl<T> From<BufError<T>> for Error {
fn from(err: BufError<T>) -> Error {
Error::Io(err.into())
}
}
impl From<SslError<TcpStream>> for Error { impl From<SslError<TcpStream>> for Error {
fn from(err: SslError<TcpStream>) -> Error { fn from(err: SslError<TcpStream>) -> Error {
Error::Ssl(err) Error::Ssl(err)
@ -55,6 +64,7 @@ impl StdError for Error {
Error::Parse(ref e) => e.description(), Error::Parse(ref e) => e.description(),
Error::BadResponse(_) => "Bad Response", Error::BadResponse(_) => "Bad Response",
Error::NoResponse(_) => "No Response", Error::NoResponse(_) => "No Response",
Error::ConnectionLost => "Connection lost",
Error::Append => "Could not append mail to mailbox", Error::Append => "Could not append mail to mailbox",
} }
} }

View file

@ -4,6 +4,7 @@
//! imap is a IMAP client for Rust. //! imap is a IMAP client for Rust.
extern crate openssl; extern crate openssl;
extern crate bufstream;
extern crate regex; extern crate regex;
pub mod authenticator; pub mod authenticator;

View file

@ -6,45 +6,52 @@ pub struct MockStream {
read_pos: usize, read_pos: usize,
pub written_buf: Vec<u8>, pub written_buf: Vec<u8>,
err_on_read: bool, err_on_read: bool,
eof_on_read: bool,
read_delay: usize, read_delay: usize,
} }
impl MockStream { impl Default for MockStream {
pub fn new(read_buf: Vec<u8>) -> MockStream { fn default() -> Self {
MockStream {
read_buf: read_buf,
read_pos: 0,
written_buf: Vec::new(),
err_on_read: false,
read_delay: 0,
}
}
pub fn new_err() -> MockStream {
MockStream { MockStream {
read_buf: Vec::new(), read_buf: Vec::new(),
read_pos: 0, read_pos: 0,
written_buf: Vec::new(), written_buf: Vec::new(),
err_on_read: true, err_on_read: false,
eof_on_read: false,
read_delay: 0, read_delay: 0,
} }
} }
}
pub fn new_read_delay(read_buf: Vec<u8>) -> MockStream { impl MockStream {
MockStream { pub fn new(read_buf: Vec<u8>) -> MockStream {
read_buf: read_buf, MockStream::default().with_buf(read_buf)
read_pos: 0, }
written_buf: Vec::new(),
err_on_read: false, pub fn with_buf(mut self, read_buf: Vec<u8>) -> MockStream {
read_delay: 1, self.read_buf = read_buf;
} self
}
pub fn with_eof(mut self) -> MockStream {
self.eof_on_read = true;
self
}
pub fn with_err(mut self) -> MockStream {
self.err_on_read = true;
self
}
pub fn with_delay(mut self) -> MockStream {
self.read_delay = 1;
self
} }
} }
impl Read for MockStream { impl Read for MockStream {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> { fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
if self.read_delay > 0 { if self.eof_on_read {
self.read_delay -= 1;
return Ok(0); return Ok(0);
} }
if self.err_on_read { if self.err_on_read {
@ -53,7 +60,11 @@ impl Read for MockStream {
if self.read_pos >= self.read_buf.len() { if self.read_pos >= self.read_buf.len() {
return Err(Error::new(ErrorKind::UnexpectedEof, "EOF")); return Err(Error::new(ErrorKind::UnexpectedEof, "EOF"));
} }
let write_len = min(buf.len(), self.read_buf.len() - self.read_pos); let mut write_len = min(buf.len(), self.read_buf.len() - self.read_pos);
if self.read_delay > 0 {
self.read_delay -= 1;
write_len = min(write_len, 1);
}
let max_pos = self.read_pos + write_len; let max_pos = self.read_pos + write_len;
for x in self.read_pos..max_pos { for x in self.read_pos..max_pos {
buf[x - self.read_pos] = self.read_buf[x]; buf[x - self.read_pos] = self.read_buf[x];