bugfix when read_pos==capacity
This commit is contained in:
parent
f2ae6972b8
commit
f9afdf3f21
4 changed files with 36 additions and 19 deletions
|
|
@ -56,7 +56,6 @@ public class CircularByteBuffer {
|
||||||
|
|
||||||
this.data.putInt(readStartPos, 0);
|
this.data.putInt(readStartPos, 0);
|
||||||
this.data.putInt(writeStartPos, 0);
|
this.data.putInt(writeStartPos, 0);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean put(byte[] bytes) {
|
public boolean put(byte[] bytes) {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package com.github.shautvast.exceptional;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
class ExceptionLoggerTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test() throws InterruptedException {
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
ExceptionLogger.log(new Throwable());
|
||||||
|
}
|
||||||
|
Thread.sleep(Duration.ofSeconds(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,21 +1,16 @@
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum::Json;
|
use axum::Json;
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
use crate::AppState;
|
use crate::AppState;
|
||||||
use crate::domain::models::post::StacktraceError;
|
use crate::domain::models::post::StacktraceError;
|
||||||
use crate::handlers::stacktraces::StacktraceResponse;
|
use crate::handlers::stacktraces::StacktraceResponse;
|
||||||
use crate::infra::repositories::stacktrace_repository;
|
use crate::infra::repositories::stacktrace_repository;
|
||||||
|
|
||||||
static mut counter: usize = 0;
|
|
||||||
|
|
||||||
pub async fn create_stacktrace(
|
pub async fn create_stacktrace(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
data: String,
|
data: String,
|
||||||
) -> Result<Json<StacktraceResponse>, StacktraceError> {
|
) -> Result<Json<StacktraceResponse>, StacktraceError> {
|
||||||
unsafe {
|
|
||||||
counter += 1;
|
|
||||||
}
|
|
||||||
let new_post_db = stacktrace_repository::NewPostDb {
|
let new_post_db = stacktrace_repository::NewPostDb {
|
||||||
stacktrace: data,
|
stacktrace: data,
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,11 @@
|
||||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
use crossbeam_channel::{Receiver, Sender, bounded};
|
||||||
use std::ffi::c_char;
|
use std::ffi::c_char;
|
||||||
use std::sync::OnceLock;
|
use std::sync::OnceLock;
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
use std::{slice, thread};
|
use std::{slice, thread};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use reqwest::blocking::Client;
|
use reqwest::blocking::Client; // can I use non-blocking here?
|
||||||
|
|
||||||
// same value, but different meanings
|
// same value, but different meanings
|
||||||
// TODO find a way to set the buffer size from java.
|
// TODO find a way to set the buffer size from java.
|
||||||
|
|
@ -21,16 +22,15 @@ static HANDLE: OnceLock<JoinHandle<()>> = OnceLock::new();
|
||||||
///
|
///
|
||||||
/// The function is unsafe for skipped checks on UTF-8 and string length and because it reads from a
|
/// The function is unsafe for skipped checks on UTF-8 and string length and because it reads from a
|
||||||
/// mutable raw pointer.
|
/// mutable raw pointer.
|
||||||
/// Still it's guranteed to be safe because
|
/// Still it's guaranteed to be safe because
|
||||||
/// 1. We make sure the part that's read is not being mutated at the same time (happens in the same thread)
|
/// 1. We make sure the part that's read is not being mutated at the same time (happens in the same thread)
|
||||||
/// 2. don't need to check the length since it's calculated and stored within the byte buffer
|
/// 2. don't need to check the length since it's calculated and stored within the byte buffer
|
||||||
/// 3. the bytes are guaranteed to be UTF-8
|
/// 3. the bytes are guaranteed to be UTF-8
|
||||||
#[no_mangle]
|
#[no_mangle]
|
||||||
pub unsafe extern "C" fn buffer_updated(buffer: *mut c_char) {
|
pub unsafe extern "C" fn buffer_updated(buffer: *mut c_char) {
|
||||||
println!("");
|
|
||||||
// using a channel for the bytes read from the buffer
|
// using a channel for the bytes read from the buffer
|
||||||
// this decouples the originating from the http request
|
// this decouples the originating from the http request
|
||||||
let (sender, receiver) = CHANNEL.get_or_init(unbounded);
|
let (sender, receiver) = CHANNEL.get_or_init(|| bounded(1000));
|
||||||
HANDLE.get_or_init(|| {
|
HANDLE.get_or_init(|| {
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let http_client = Client::new();
|
let http_client = Client::new();
|
||||||
|
|
@ -45,8 +45,10 @@ pub unsafe extern "C" fn buffer_updated(buffer: *mut c_char) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
println!("thread started");
|
|
||||||
let mut read_pos = get_u32(buffer, READ) as isize;
|
let mut read_pos = get_u32(buffer, READ) as isize;
|
||||||
|
if read_pos == CAPACITY {
|
||||||
|
read_pos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
let mut remaining = CAPACITY - read_pos; // nr of bytes to read before end of buffer
|
let mut remaining = CAPACITY - read_pos; // nr of bytes to read before end of buffer
|
||||||
let len = if remaining == 1 {
|
let len = if remaining == 1 {
|
||||||
|
|
@ -72,13 +74,14 @@ pub unsafe extern "C" fn buffer_updated(buffer: *mut c_char) {
|
||||||
// must copy to maintain it safely once read from the buffer
|
// must copy to maintain it safely once read from the buffer
|
||||||
// can safely skip checks for len and utf8
|
// can safely skip checks for len and utf8
|
||||||
if len <= remaining {
|
if len <= remaining {
|
||||||
let result = std::str::from_utf8_unchecked(slice::from_raw_parts(
|
let s = std::str::from_utf8_unchecked(slice::from_raw_parts(
|
||||||
buffer.offset(read_pos).cast::<u8>(),
|
buffer.offset(read_pos).cast::<u8>(),
|
||||||
len as usize,
|
len as usize,
|
||||||
))
|
)).to_owned();
|
||||||
.to_owned();
|
let send_result = sender.send_timeout(s, Duration::from_secs(10));
|
||||||
println!("{}", result);
|
if send_result.is_err() {
|
||||||
_ = sender.send(result);
|
println!("overflow detected, discarding");
|
||||||
|
}
|
||||||
read_pos += len;
|
read_pos += len;
|
||||||
} else {
|
} else {
|
||||||
let s1 = std::str::from_utf8_unchecked(slice::from_raw_parts(
|
let s1 = std::str::from_utf8_unchecked(slice::from_raw_parts(
|
||||||
|
|
@ -92,7 +95,10 @@ pub unsafe extern "C" fn buffer_updated(buffer: *mut c_char) {
|
||||||
let mut s = String::with_capacity(len as usize);
|
let mut s = String::with_capacity(len as usize);
|
||||||
s.push_str(s1);
|
s.push_str(s1);
|
||||||
s.push_str(s2);
|
s.push_str(s2);
|
||||||
_ = sender.send(s);
|
let send_result = sender.send_timeout(s, Duration::from_secs(10));
|
||||||
|
if send_result.is_err() {
|
||||||
|
println!("overflow detected, discarding");
|
||||||
|
}
|
||||||
|
|
||||||
read_pos = len - remaining;
|
read_pos = len - remaining;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue