From 92ba49e539ad5a3168fdfa1fe3c38ba07000ab29 Mon Sep 17 00:00:00 2001 From: Shautvast Date: Thu, 4 Jul 2024 16:57:25 +0200 Subject: [PATCH] found out why queuing on the rust side didn't work and fixed it --- rustlib/Cargo.lock | 16 +++++++++ rustlib/Cargo.toml | 1 + rustlib/src/lib.rs | 82 +++++++++++++++++++++++++++++++++------------- 3 files changed, 76 insertions(+), 23 deletions(-) diff --git a/rustlib/Cargo.lock b/rustlib/Cargo.lock index b64fd83..799aa93 100644 --- a/rustlib/Cargo.lock +++ b/rustlib/Cargo.lock @@ -146,6 +146,21 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "encoding_rs" version = "0.8.34" @@ -832,6 +847,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "crossbeam-channel", "reqwest", "tracing", "tracing-subscriber", diff --git a/rustlib/Cargo.toml b/rustlib/Cargo.toml index 0f0dcad..d74ccb8 100644 --- a/rustlib/Cargo.toml +++ b/rustlib/Cargo.toml @@ -13,3 +13,4 @@ chrono = "0.4" reqwest = { version = "0.12", features = ["blocking"]} tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +crossbeam-channel = "0.5" \ No newline at end of file diff --git a/rustlib/src/lib.rs b/rustlib/src/lib.rs index 33ca148..c179561 100644 --- a/rustlib/src/lib.rs +++ b/rustlib/src/lib.rs @@ -1,6 +1,8 @@ -use std::cell::OnceCell; +use crossbeam_channel::{unbounded, Receiver, Sender}; use std::ffi::c_char; -use std::slice; +use std::sync::OnceLock; +use std::thread::JoinHandle; +use std::{slice, thread}; use reqwest::blocking::Client; @@ -9,12 +11,41 @@ use reqwest::blocking::Client; // why not just add it to the function const CAPACITY: isize = 32760; const READ: isize = 32760; -const CLIENT: OnceCell = OnceCell::new(); +static CHANNEL: OnceLock<(Sender, Receiver)> = OnceLock::new(); +static HANDLE: OnceLock> = OnceLock::new(); +/// Reads the data from the bytebuffer in the caller thread and sends the data to a background +/// thread that updates the datastore +/// +/// # Safety +/// +/// The function is unsafe for skipped checks on UTF-8 and string length and because it reads from a +/// mutable raw pointer. +/// Still it's guranteed to be safe because +/// 1. We make sure the part that's read is not being mutated at the same time (happens in the same thread) +/// 2. don't need to check the length since it's calculated and stored within the byte buffer +/// 3. the bytes are guaranteed to be UTF-8 #[no_mangle] -pub extern "C" fn buffer_updated(buffer: *mut c_char) { - let client = CLIENT; - let client = client.get_or_init(|| Client::new()); +pub unsafe extern "C" fn buffer_updated(buffer: *mut c_char) { + println!(""); + // using a channel for the bytes read from the buffer + // this decouples the originating from the http request + let (sender, receiver) = CHANNEL.get_or_init(unbounded); + HANDLE.get_or_init(|| { + thread::spawn(move || { + let http_client = Client::new(); + loop { + let maybe_job = receiver.recv(); + if let Ok(data) = maybe_job { + _ = http_client + .post("http://localhost:3000/api/stacktraces") + .body(data) + .send(); + } + } + }) + }); + println!("thread started"); let mut read_pos = get_u32(buffer, READ) as isize; let mut remaining = CAPACITY - read_pos; // nr of bytes to read before end of buffer @@ -38,26 +69,31 @@ pub extern "C" fn buffer_updated(buffer: *mut c_char) { l } as isize; - // copy only when needed + // must copy to maintain it safely once read from the buffer + // can safely skip checks for len and utf8 if len <= remaining { - unsafe { - let result = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.offset(read_pos).cast::(), len as usize)); - _ = client.post("http://localhost:3000/api/stacktraces") - .body(result) - .send(); - } + let result = std::str::from_utf8_unchecked(slice::from_raw_parts( + buffer.offset(read_pos).cast::(), + len as usize, + )) + .to_owned(); + println!("{}", result); + _ = sender.send(result); read_pos += len; } else { - unsafe { - let s1 = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.offset(read_pos).cast::(), remaining as usize)); - let s2 = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.cast::(), (len - remaining) as usize)); - let mut s = String::with_capacity(len as usize); - s.push_str(s1); - s.push_str(s2); - _ = client.post("http://localhost:3000/api/stacktraces") - .body(s) - .send(); - } + let s1 = std::str::from_utf8_unchecked(slice::from_raw_parts( + buffer.offset(read_pos).cast::(), + remaining as usize, + )); + let s2 = std::str::from_utf8_unchecked(slice::from_raw_parts( + buffer.cast::(), + (len - remaining) as usize, + )); + let mut s = String::with_capacity(len as usize); + s.push_str(s1); + s.push_str(s2); + _ = sender.send(s); + read_pos = len - remaining; } put_u32(buffer, READ, read_pos as u32);