found out why queuing on the rust side didn't work and fixed it

This commit is contained in:
Shautvast 2024-07-04 16:57:25 +02:00
parent beb078d4de
commit 92ba49e539
3 changed files with 76 additions and 23 deletions

16
rustlib/Cargo.lock generated
View file

@ -146,6 +146,21 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]] [[package]]
name = "encoding_rs" name = "encoding_rs"
version = "0.8.34" version = "0.8.34"
@ -832,6 +847,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
"crossbeam-channel",
"reqwest", "reqwest",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View file

@ -13,3 +13,4 @@ chrono = "0.4"
reqwest = { version = "0.12", features = ["blocking"]} reqwest = { version = "0.12", features = ["blocking"]}
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
crossbeam-channel = "0.5"

View file

@ -1,6 +1,8 @@
use std::cell::OnceCell; use crossbeam_channel::{unbounded, Receiver, Sender};
use std::ffi::c_char; use std::ffi::c_char;
use std::slice; use std::sync::OnceLock;
use std::thread::JoinHandle;
use std::{slice, thread};
use reqwest::blocking::Client; use reqwest::blocking::Client;
@ -9,12 +11,41 @@ use reqwest::blocking::Client;
// why not just add it to the function // why not just add it to the function
const CAPACITY: isize = 32760; const CAPACITY: isize = 32760;
const READ: isize = 32760; const READ: isize = 32760;
const CLIENT: OnceCell<Client> = OnceCell::new(); static CHANNEL: OnceLock<(Sender<String>, Receiver<String>)> = OnceLock::new();
static HANDLE: OnceLock<JoinHandle<()>> = OnceLock::new();
/// Reads the data from the bytebuffer in the caller thread and sends the data to a background
/// thread that updates the datastore
///
/// # Safety
///
/// The function is unsafe for skipped checks on UTF-8 and string length and because it reads from a
/// mutable raw pointer.
/// Still it's guranteed to be safe because
/// 1. We make sure the part that's read is not being mutated at the same time (happens in the same thread)
/// 2. don't need to check the length since it's calculated and stored within the byte buffer
/// 3. the bytes are guaranteed to be UTF-8
#[no_mangle] #[no_mangle]
pub extern "C" fn buffer_updated(buffer: *mut c_char) { pub unsafe extern "C" fn buffer_updated(buffer: *mut c_char) {
let client = CLIENT; println!("");
let client = client.get_or_init(|| Client::new()); // using a channel for the bytes read from the buffer
// this decouples the originating from the http request
let (sender, receiver) = CHANNEL.get_or_init(unbounded);
HANDLE.get_or_init(|| {
thread::spawn(move || {
let http_client = Client::new();
loop {
let maybe_job = receiver.recv();
if let Ok(data) = maybe_job {
_ = http_client
.post("http://localhost:3000/api/stacktraces")
.body(data)
.send();
}
}
})
});
println!("thread started");
let mut read_pos = get_u32(buffer, READ) as isize; let mut read_pos = get_u32(buffer, READ) as isize;
let mut remaining = CAPACITY - read_pos; // nr of bytes to read before end of buffer let mut remaining = CAPACITY - read_pos; // nr of bytes to read before end of buffer
@ -38,26 +69,31 @@ pub extern "C" fn buffer_updated(buffer: *mut c_char) {
l l
} as isize; } as isize;
// copy only when needed // must copy to maintain it safely once read from the buffer
// can safely skip checks for len and utf8
if len <= remaining { if len <= remaining {
unsafe { let result = std::str::from_utf8_unchecked(slice::from_raw_parts(
let result = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.offset(read_pos).cast::<u8>(), len as usize)); buffer.offset(read_pos).cast::<u8>(),
_ = client.post("http://localhost:3000/api/stacktraces") len as usize,
.body(result) ))
.send(); .to_owned();
} println!("{}", result);
_ = sender.send(result);
read_pos += len; read_pos += len;
} else { } else {
unsafe { let s1 = std::str::from_utf8_unchecked(slice::from_raw_parts(
let s1 = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.offset(read_pos).cast::<u8>(), remaining as usize)); buffer.offset(read_pos).cast::<u8>(),
let s2 = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.cast::<u8>(), (len - remaining) as usize)); remaining as usize,
));
let s2 = std::str::from_utf8_unchecked(slice::from_raw_parts(
buffer.cast::<u8>(),
(len - remaining) as usize,
));
let mut s = String::with_capacity(len as usize); let mut s = String::with_capacity(len as usize);
s.push_str(s1); s.push_str(s1);
s.push_str(s2); s.push_str(s2);
_ = client.post("http://localhost:3000/api/stacktraces") _ = sender.send(s);
.body(s)
.send();
}
read_pos = len - remaining; read_pos = len - remaining;
} }
put_u32(buffer, READ, read_pos as u32); put_u32(buffer, READ, read_pos as u32);