Add IMAP server (step 6) — all 9 commands implemented

Plain-TCP listener on 0.0.0.0 (handles both localhost and 127.0.0.1).
LOGIN, NOOP, SELECT (reloads inbox), FETCH header+body, SEARCH, STORE,
EXPUNGE (deletes on ProtonMail), LOGOUT.

FETCH body decrypts messages on demand: brief lock for ID lookup, API call
without lock, brief lock again for crypto. RFC 3501 literal format with
exact byte counts for imap-crate compatibility.

Also: update store.expunge() to return (ids, seqs) in descending order for
correct IMAP EXPUNGE response ordering; add chrono for RFC 2822 dates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Shautvast 2026-02-22 19:13:20 +01:00
parent 69fe7994c8
commit fa1d1a6cb7
5 changed files with 426 additions and 11 deletions

1
Cargo.lock generated
View file

@ -2953,6 +2953,7 @@ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bcrypt", "bcrypt",
"cfb-mode", "cfb-mode",
"chrono",
"env_logger", "env_logger",
"num-bigint", "num-bigint",
"pgp", "pgp",

View file

@ -16,6 +16,7 @@ rand = "0.8"
pwhash = "0.3" # bcrypt with caller-supplied salt (used for SRP) pwhash = "0.3" # bcrypt with caller-supplied salt (used for SRP)
bcrypt = "0.15" # reference bcrypt impl for key passphrase derivation bcrypt = "0.15" # reference bcrypt impl for key passphrase derivation
pgp = { version = "0.14", default-features = false } # rpgp — OpenPGP decrypt pgp = { version = "0.14", default-features = false } # rpgp — OpenPGP decrypt
chrono = "0.4"
env_logger = "0.11" env_logger = "0.11"
aes = "0.8" aes = "0.8"
cfb-mode = "0.8" cfb-mode = "0.8"

View file

@ -0,0 +1,398 @@
/// Step 6: Local IMAP server.
///
/// Implements the nine commands skim uses (LOGIN, NOOP, SELECT, FETCH ×2,
/// SEARCH, STORE, EXPUNGE, LOGOUT) on a plain-TCP listener on localhost.
/// Each accepted connection is handled in its own tokio task.
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use crate::api::{ApiClient, LABEL_INBOX};
use crate::{crypto, SharedState};
// ── Public entry point ────────────────────────────────────────────────────────
pub async fn run(state: SharedState, port: u16) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?;
println!("IMAP listening on port {port}");
loop {
let (socket, addr) = listener.accept().await?;
eprintln!("IMAP: connection from {addr}");
let state = Arc::clone(&state);
tokio::spawn(async move {
if let Err(e) = handle_connection(socket, state).await {
eprintln!("IMAP connection error: {e}");
}
});
}
}
// ── Per-connection handler ────────────────────────────────────────────────────
async fn handle_connection(
socket: TcpStream,
state: SharedState,
) -> Result<(), Box<dyn std::error::Error>> {
let (reader, mut writer) = socket.into_split();
let mut lines = BufReader::new(reader).lines();
let mut authenticated = false;
writer
.write_all(b"* OK IMAP4rev1 ProtonBridge ready\r\n")
.await?;
while let Some(raw) = lines.next_line().await? {
let line = raw.trim_end();
if line.is_empty() {
continue;
}
eprintln!("IMAP < {line}");
// Every IMAP command is "TAG COMMAND [rest…]"
let mut parts = line.splitn(3, ' ');
let tag = parts.next().unwrap_or("*").to_string();
let cmd = parts.next().unwrap_or("").to_ascii_uppercase();
let rest = parts.next().unwrap_or("").trim().to_string();
let logout = cmd == "LOGOUT";
let response =
dispatch(&tag, &cmd, &rest, &state, &mut authenticated).await;
for log_line in response.lines() {
eprintln!("IMAP > {log_line}");
}
writer.write_all(response.as_bytes()).await?;
if logout {
break;
}
}
Ok(())
}
// ── Command dispatcher ────────────────────────────────────────────────────────
async fn dispatch(
tag: &str,
cmd: &str,
rest: &str,
state: &SharedState,
authenticated: &mut bool,
) -> String {
match cmd {
"CAPABILITY" => {
format!(
"* CAPABILITY IMAP4rev1 AUTH=PLAIN\r\n\
{tag} OK CAPABILITY completed\r\n"
)
}
"LOGIN" => {
let mut p = rest.splitn(2, ' ');
let _user = unquote(p.next().unwrap_or(""));
let pass = unquote(p.next().unwrap_or(""));
let local_pw = state.lock().await.config.bridge.local_password.clone();
if pass == local_pw {
*authenticated = true;
format!("{tag} OK LOGIN completed\r\n")
} else {
format!("{tag} NO [AUTHENTICATIONFAILED] Invalid credentials\r\n")
}
}
"NOOP" => format!("{tag} OK NOOP completed\r\n"),
"LOGOUT" => format!("* BYE Logging out\r\n{tag} OK LOGOUT completed\r\n"),
// All remaining commands require authentication.
_ if !*authenticated => {
format!("{tag} NO Not authenticated\r\n")
}
"SELECT" => cmd_select(tag, state).await,
"EXAMINE" => cmd_select(tag, state).await, // read-only alias
"FETCH" => cmd_fetch(tag, rest, state).await,
"SEARCH" => cmd_search(tag, rest, state).await,
"STORE" => cmd_store(tag, rest, state).await,
"EXPUNGE" => cmd_expunge(tag, state).await,
_ => format!("{tag} BAD Unknown or unimplemented command\r\n"),
}
}
// ── SELECT ────────────────────────────────────────────────────────────────────
async fn cmd_select(tag: &str, state: &SharedState) -> String {
let (http_client, session) = {
let st = state.lock().await;
(st.http_client.clone(), st.session.clone())
};
let api = ApiClient::new(&http_client, &session);
let messages = match api.list_messages(LABEL_INBOX, 0, 50).await {
Ok((msgs, _)) => msgs,
Err(e) => {
eprintln!("SELECT list_messages failed: {e}");
return format!("{tag} NO SELECT failed\r\n");
}
};
let count = {
let mut st = state.lock().await;
st.store.load_all(messages);
st.store.count()
};
format!(
"* {count} EXISTS\r\n\
* 0 RECENT\r\n\
* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\
* OK [PERMANENTFLAGS (\\Deleted \\Seen \\*)] Limited\r\n\
* OK [UIDVALIDITY 1] UIDs valid\r\n\
* OK [UIDNEXT {next}] Predicted next UID\r\n\
{tag} OK [READ-WRITE] SELECT completed\r\n",
next = count + 1,
)
}
// ── FETCH ─────────────────────────────────────────────────────────────────────
async fn cmd_fetch(tag: &str, rest: &str, state: &SharedState) -> String {
// rest = "seq_set ITEMS" or "seq_set (ITEMS)"
let mut parts = rest.splitn(2, ' ');
let seq_set = parts.next().unwrap_or("").to_string();
let items = parts
.next()
.unwrap_or("")
.trim()
.trim_matches(|c| c == '(' || c == ')')
.to_ascii_uppercase();
if items.contains("HEADER.FIELDS") {
fetch_headers(tag, &seq_set, state).await
} else {
fetch_body(tag, &seq_set, state).await
}
}
async fn fetch_headers(tag: &str, seq_set: &str, state: &SharedState) -> String {
// Collect all metadata under a single short lock — no I/O while locked.
let rows: Vec<(u32, String, String, String, i64)> = {
let st = state.lock().await;
let count = st.store.count();
parse_seq_set(seq_set, count)
.into_iter()
.filter_map(|seq| {
st.store.get_meta(seq).map(|m| {
(
seq,
m.subject.clone(),
m.sender.name.clone(),
m.sender.address.clone(),
m.time,
)
})
})
.collect()
};
let mut out = Vec::new();
for (seq, subject, name, addr, time) in &rows {
let block = format!(
"Subject: {subject}\r\nFrom: {name} <{addr}>\r\nDate: {date}\r\n\r\n",
date = rfc2822(*time),
);
let n = block.as_bytes().len();
out.extend_from_slice(
format!(
"* {seq} FETCH (BODY[HEADER.FIELDS (SUBJECT FROM DATE)] {{{n}}}\r\n"
)
.as_bytes(),
);
out.extend_from_slice(block.as_bytes());
out.extend_from_slice(b")\r\n");
}
out.extend_from_slice(format!("{tag} OK FETCH completed\r\n").as_bytes());
String::from_utf8_lossy(&out).into_owned()
}
async fn fetch_body(tag: &str, seq_set: &str, state: &SharedState) -> String {
let count = state.lock().await.store.count();
let seqs = parse_seq_set(seq_set, count);
let seq = match seqs.first() {
Some(&s) => s,
None => return format!("{tag} NO invalid sequence\r\n"),
};
// Grab proton_id + connection details under a brief lock.
let (proton_id, meta_snapshot, http_client, session) = {
let st = state.lock().await;
let m = match st.store.get_meta(seq) {
Some(m) => (
m.subject.clone(),
m.sender.name.clone(),
m.sender.address.clone(),
m.time,
),
None => return format!("{tag} NO no such message\r\n"),
};
let id = st.store.proton_id(seq).unwrap_or("").to_string();
(id, m, st.http_client.clone(), st.session.clone())
};
// API call — lock is released.
let api = ApiClient::new(&http_client, &session);
let full_msg = match api.get_message(&proton_id).await {
Ok(m) => m,
Err(e) => {
eprintln!("FETCH get_message failed: {e}");
return format!("{tag} NO fetch failed\r\n");
}
};
// Decrypt — re-acquire lock only for the synchronous crypto step.
let plaintext = {
let st = state.lock().await;
let keys: Vec<&crypto::PrivateKey> = st.key_pool.iter().collect();
match crypto::decrypt_body(&full_msg.body, &keys) {
Ok(p) => p,
Err(e) => {
eprintln!("FETCH decrypt_body failed: {e}");
return format!("{tag} NO decrypt failed\r\n");
}
}
};
let (subject, sender_name, sender_addr, time) = meta_snapshot;
let message = format!(
"From: {sender_name} <{sender_addr}>\r\n\
Subject: {subject}\r\n\
Date: {date}\r\n\
Content-Type: {mime}; charset=utf-8\r\n\
MIME-Version: 1.0\r\n\
\r\n\
{plaintext}",
date = rfc2822(time),
mime = full_msg.mime_type,
);
let n = message.as_bytes().len();
let mut out = Vec::new();
out.extend_from_slice(
format!("* {seq} FETCH (BODY[] {{{n}}}\r\n").as_bytes(),
);
out.extend_from_slice(message.as_bytes());
out.extend_from_slice(b")\r\n");
out.extend_from_slice(format!("{tag} OK FETCH completed\r\n").as_bytes());
String::from_utf8_lossy(&out).into_owned()
}
// ── SEARCH ────────────────────────────────────────────────────────────────────
async fn cmd_search(tag: &str, criteria: &str, state: &SharedState) -> String {
// skim sends: OR SUBJECT "query" FROM "query"
let query = extract_quoted(criteria).unwrap_or("");
let seqs = state
.lock()
.await
.store
.search_subject_or_from(query);
let nums: Vec<String> = seqs.iter().map(|n| n.to_string()).collect();
format!(
"* SEARCH {}\r\n{tag} OK SEARCH completed\r\n",
nums.join(" ")
)
}
// ── STORE ─────────────────────────────────────────────────────────────────────
async fn cmd_store(tag: &str, rest: &str, state: &SharedState) -> String {
// rest = "seq_set +FLAGS (\Deleted)" (or similar)
let mut parts = rest.splitn(2, ' ');
let seq_set = parts.next().unwrap_or("").to_string();
let flags = parts.next().unwrap_or("").to_ascii_uppercase();
if !flags.contains("DELETED") {
return format!("{tag} OK STORE completed\r\n");
}
let mut st = state.lock().await;
let count = st.store.count();
let seqs = parse_seq_set(&seq_set, count);
let mut out = String::new();
for seq in seqs {
st.store.mark_deleted(seq);
out.push_str(&format!("* {seq} FETCH (FLAGS (\\Deleted))\r\n"));
}
out.push_str(&format!("{tag} OK STORE completed\r\n"));
out
}
// ── EXPUNGE ───────────────────────────────────────────────────────────────────
async fn cmd_expunge(tag: &str, state: &SharedState) -> String {
let (proton_ids, removed_seqs, http_client, session) = {
let mut st = state.lock().await;
let (ids, seqs) = st.store.expunge();
(ids, seqs, st.http_client.clone(), st.session.clone())
};
if !proton_ids.is_empty() {
let api = ApiClient::new(&http_client, &session);
if let Err(e) = api.delete_messages(&proton_ids).await {
eprintln!("EXPUNGE delete_messages failed: {e}");
}
}
let mut out = String::new();
for seq in &removed_seqs {
out.push_str(&format!("* {seq} EXPUNGE\r\n"));
}
out.push_str(&format!("{tag} OK EXPUNGE completed\r\n"));
out
}
// ── Helpers ───────────────────────────────────────────────────────────────────
fn unquote(s: &str) -> &str {
s.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.unwrap_or(s)
}
/// Extract the first double-quoted string from `s`.
fn extract_quoted(s: &str) -> Option<&str> {
let start = s.find('"')? + 1;
let end = s[start..].find('"')? + start;
Some(&s[start..end])
}
/// Parse an IMAP sequence set (e.g. "1:50", "3,5,7", "42", "*") into a Vec of
/// seq numbers, clamped to [1, max].
fn parse_seq_set(set: &str, max: u32) -> Vec<u32> {
if max == 0 {
return vec![];
}
let mut result = Vec::new();
for part in set.split(',') {
let part = part.trim();
if let Some((a, b)) = part.split_once(':') {
let start: u32 = if a == "*" { max } else { a.parse().unwrap_or(1) };
let end: u32 = if b == "*" { max } else { b.parse().unwrap_or(max) };
let (lo, hi) = (start.min(end), start.max(end));
for seq in lo..=hi.min(max) {
result.push(seq);
}
} else if part == "*" {
result.push(max);
} else {
let seq: u32 = part.parse().unwrap_or(0);
if seq >= 1 && seq <= max {
result.push(seq);
}
}
}
result
}
/// Format a Unix timestamp as an RFC 2822 date string.
fn rfc2822(unix: i64) -> String {
use chrono::{DateTime, Utc};
DateTime::<Utc>::from_timestamp(unix, 0)
.unwrap_or_default()
.format("%a, %d %b %Y %H:%M:%S +0000")
.to_string()
}

View file

@ -2,6 +2,7 @@ mod api;
mod auth; mod auth;
mod config; mod config;
mod crypto; mod crypto;
mod imap_server;
mod srp; mod srp;
mod store; mod store;
@ -80,7 +81,7 @@ async fn main() {
store.load_all(messages); store.load_all(messages);
println!("Inbox: {} messages ({total} total)", store.count()); println!("Inbox: {} messages ({total} total)", store.count());
let _state: SharedState = Arc::new(Mutex::new(BridgeState { let state: SharedState = Arc::new(Mutex::new(BridgeState {
store, store,
key_pool, key_pool,
http_client: client, http_client: client,
@ -88,6 +89,15 @@ async fn main() {
config: config.clone(), config: config.clone(),
})); }));
// Spawn IMAP server.
let imap_state = Arc::clone(&state);
let imap_port = config.bridge.imap_port;
tokio::spawn(async move {
if let Err(e) = imap_server::run(imap_state, imap_port).await {
eprintln!("IMAP server error: {e}");
}
});
println!( println!(
"Bridge ready — IMAP :{}, SMTP :{} (Ctrl-C to stop)", "Bridge ready — IMAP :{}, SMTP :{} (Ctrl-C to stop)",
config.bridge.imap_port, config.bridge.smtp_port config.bridge.imap_port, config.bridge.smtp_port

View file

@ -71,19 +71,24 @@ impl MessageStore {
} }
} }
/// Remove all pending-deleted messages and return their ProtonMail IDs. /// Remove all pending-deleted messages.
/// After this call the Vec indices automatically provide new seq numbers. /// Returns `(proton_ids, seqs)` where `seqs` are the pre-expunge IMAP
pub fn expunge(&mut self) -> Vec<String> { /// sequence numbers in **descending** order (correct for IMAP EXPUNGE
let removed: Vec<String> = self /// responses — highest seq first so client renumbering stays consistent).
.messages pub fn expunge(&mut self) -> (Vec<String>, Vec<u32>) {
.iter() let mut removed_ids = Vec::new();
.filter(|m| self.deleted_pending.contains(&m.id)) let mut removed_seqs = Vec::new();
.map(|m| m.id.clone()) for (i, m) in self.messages.iter().enumerate() {
.collect(); if self.deleted_pending.contains(&m.id) {
removed_ids.push(m.id.clone());
removed_seqs.push(i as u32 + 1);
}
}
self.messages self.messages
.retain(|m| !self.deleted_pending.contains(&m.id)); .retain(|m| !self.deleted_pending.contains(&m.id));
self.deleted_pending.clear(); self.deleted_pending.clear();
removed removed_seqs.sort_unstable_by(|a, b| b.cmp(a)); // descending
(removed_ids, removed_seqs)
} }
/// Return the seq numbers of messages whose subject, sender name, or /// Return the seq numbers of messages whose subject, sender name, or