added rest api

This commit is contained in:
Shautvast 2024-07-03 13:23:57 +02:00
parent df98ef761d
commit 8af6881d3e
45 changed files with 3567 additions and 245 deletions

View file

@ -73,7 +73,7 @@ public class Agent {
ClassDesc.of(EXCEPTIONLOGGER), "log",
MethodTypeDesc.ofDescriptor("(Ljava/lang/Throwable;)V"));
}
builder.with(element); // leave any other element in place
builder.with(element); // leave every element in place
});
}));
}

1
api/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
.env

1434
api/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

34
api/Cargo.toml Normal file
View file

@ -0,0 +1,34 @@
[package]
name = "api"
version = "0.1.0"
edition = "2021"
[dependencies]
#serde_json = { version = "1.0", features = [] }
#serde = { version = "1.0", features = ["derive"] }
#anyhow = "1.0"
#postgres = "0.19"
#sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "any", "postgres", "json", "time"] }
#dotenvy = "0.15.0"
#chrono = "0.4"
#axum = "0.7"
#tokio = { version = "1", features = ["full"] }
#tokio-sqlite = "0.1"
#rusqlite = { version = "0.31", features = ["bundled"] }
#tracing-subscriber = { version = "0.3", features = ["env-filter"] }
#tracing = "0.1"
axum = { version = "0.6", features = ["macros"] }
axum-macros = "0.3"
chrono = { version = "0.4.26", features = ["serde"] }
diesel = { version = "2.1", features = ["postgres", "uuid", "serde_json"] }
diesel_migrations = "2"
deadpool-diesel = { version = "0.4", features = ["postgres"] }
dotenvy = "0.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["sync", "macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] }

0
api/migrations/.keep Normal file
View file

View file

@ -0,0 +1,6 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();

View file

@ -0,0 +1,36 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS
$$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS
$$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View file

@ -0,0 +1 @@
DROP TABLE stacktraces

View file

@ -0,0 +1,6 @@
create table stacktraces
(
id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
timestamp timestamp,
stacktrace varchar
)

41
api/pom.xml Normal file
View file

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.github.shautvast.exceptional</groupId>
<artifactId>exceptional-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>exceptional-api</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>Custom Command at Compile Phase</id>
<phase>compile</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>cargo</executable>
<workingDirectory>${project.basedir}/api</workingDirectory>
<arguments>
<argument>build</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

61
api/src/config.rs Normal file
View file

@ -0,0 +1,61 @@
use std::env;
use dotenvy::dotenv;
use tokio::sync::OnceCell;
#[derive(Debug)]
struct ServerConfig {
host: String,
port: u16,
}
#[derive(Debug)]
struct DatabaseConfig {
url: String,
}
#[derive(Debug)]
pub struct Config {
server: ServerConfig,
db: DatabaseConfig,
}
impl Config {
pub fn db_url(&self) -> &str {
&self.db.url
}
pub fn server_host(&self) -> &str {
&self.server.host
}
pub fn server_port(&self) -> u16 {
self.server.port
}
}
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
async fn init_config() -> Config {
dotenv().ok();
let server_config = ServerConfig {
host: env::var("HOST").unwrap_or_else(|_| String::from("127.0.0.1")),
port: env::var("PORT")
.unwrap_or_else(|_| String::from("3000"))
.parse::<u16>()
.unwrap(),
};
let database_config = DatabaseConfig {
url: env::var("DATABASE_URL").expect("DATABASE_URL must be set"),
};
Config {
server: server_config,
db: database_config,
}
}
pub async fn config() -> &'static Config {
CONFIG.get_or_init(init_config).await
}

1
api/src/domain/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod models;

View file

@ -0,0 +1 @@
pub mod post;

View file

@ -0,0 +1,54 @@
use axum::http::StatusCode;
use axum::Json;
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::infra::errors::InfraError;
// #[derive(Clone, Debug, PartialEq)]
// pub struct PostModel {
// pub id: Uuid,
// pub title: String,
// pub body: String,
// pub published: bool,
// }
#[derive(Clone, Debug, PartialEq)]
pub struct SimpleStacktraceModel {
pub id: i32,
pub stacktrace: String,
}
#[derive(Debug)]
pub enum StacktraceError {
InternalServerError,
NotFound(i32),
InfraError(InfraError),
}
impl IntoResponse for StacktraceError {
fn into_response(self) -> axum::response::Response {
let (status, err_msg) = match self {
Self::NotFound(id) => (
StatusCode::NOT_FOUND,
format!("StacktraceModel with id {} has not been found", id),
),
Self::InfraError(db_error) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Internal server error: {}", db_error),
),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
String::from("Internal server error"),
),
};
(
status,
Json(
json!({"resource":"StacktraceModel", "message": err_msg, "happened_at" : chrono::Utc::now() }),
),
)
.into_response()
}
}

30
api/src/errors.rs Normal file
View file

@ -0,0 +1,30 @@
use axum::http::StatusCode;
use axum::Json;
use axum::response::IntoResponse;
use serde_json::json;
#[derive(Debug)]
pub enum AppError {
InternalServerError,
BodyParsingError(String),
}
pub fn internal_error<E>(_err: E) -> AppError {
AppError::InternalServerError
}
impl IntoResponse for AppError {
fn into_response(self) -> axum::response::Response {
let (status, err_msg) = match self {
Self::InternalServerError => (
StatusCode::INTERNAL_SERVER_ERROR,
String::from("Internal Server Error"),
),
Self::BodyParsingError(message) => (
StatusCode::BAD_REQUEST,
format!("Bad request error: {}", message),
),
};
(status, Json(json!({ "message": err_msg }))).into_response()
}
}

1
api/src/handlers/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod stacktraces;

View file

@ -0,0 +1,33 @@
use axum::extract::State;
use axum::Json;
use tracing::info;
use crate::AppState;
use crate::domain::models::post::StacktraceError;
use crate::handlers::stacktraces::StacktraceResponse;
use crate::infra::repositories::stacktrace_repository;
static mut counter: usize = 0;
pub async fn create_stacktrace(
State(state): State<AppState>,
data: String,
) -> Result<Json<StacktraceResponse>, StacktraceError> {
unsafe {
counter += 1;
}
let new_post_db = stacktrace_repository::NewPostDb {
stacktrace: data,
};
let created_stacktrace = stacktrace_repository::insert(&state.pool, new_post_db)
.await
.map_err(StacktraceError::InfraError)?;
let post_response = StacktraceResponse {
id: created_stacktrace.id,
stacktrace: created_stacktrace.stacktrace,
};
Ok(Json(post_response))
}

View file

@ -0,0 +1,31 @@
use axum::extract::State;
use axum::Json;
use crate::domain::models::post::{StacktraceError, SimpleStacktraceModel};
use crate::handlers::stacktraces::StacktraceResponse;
use crate::infra::errors::InfraError;
use crate::infra::repositories::stacktrace_repository;
use crate::utils::PathExtractor;
use crate::AppState;
pub async fn get_stacktrace(
State(state): State<AppState>,
PathExtractor(post_id): PathExtractor<i32>,
) -> Result<Json<StacktraceResponse>, StacktraceError> {
let post =
stacktrace_repository::get(&state.pool, post_id)
.await
.map_err(|db_error| match db_error {
InfraError::InternalServerError => StacktraceError::InternalServerError,
InfraError::NotFound => StacktraceError::NotFound(post_id),
})?;
Ok(Json(adapt_post_to_post_response(post)))
}
fn adapt_post_to_post_response(post: SimpleStacktraceModel) -> StacktraceResponse {
StacktraceResponse {
id: post.id,
stacktrace: post.stacktrace,
}
}

View file

@ -0,0 +1,33 @@
use axum::extract::{Query, State};
use axum::Json;
use crate::domain::models::post::{StacktraceError, SimpleStacktraceModel};
use crate::handlers::stacktraces::{ListStacktracesResponse, StacktraceResponse};
use crate::infra::repositories::stacktrace_repository::{get_all};
use crate::AppState;
pub async fn list_stacktraces(
State(state): State<AppState>,
) -> Result<Json<ListStacktracesResponse>, StacktraceError> {
let stacktraces = get_all(&state.pool)
.await
.map_err(|_| StacktraceError::InternalServerError)?;
Ok(Json(adapt_posts_to_list_posts_response(stacktraces)))
}
fn adapt_post_to_post_response(post: SimpleStacktraceModel) -> StacktraceResponse {
StacktraceResponse {
id: post.id,
stacktrace: post.stacktrace,
}
}
fn adapt_posts_to_list_posts_response(posts: Vec<SimpleStacktraceModel>) -> ListStacktracesResponse {
let posts_response: Vec<StacktraceResponse> =
posts.into_iter().map(adapt_post_to_post_response).collect();
ListStacktracesResponse {
stacktraces: posts_response,
}
}

View file

@ -0,0 +1,25 @@
use serde::{Deserialize, Serialize};
pub use create_stacktrace::create_stacktrace;
pub use get_stacktrace::get_stacktrace;
pub use list_stacktraces::list_stacktraces;
mod create_stacktrace;
mod get_stacktrace;
mod list_stacktraces;
#[derive(Debug, Deserialize)]
pub struct CreatePostRequest {
stacktrace: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StacktraceResponse {
id: i32,
stacktrace: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ListStacktracesResponse {
stacktraces: Vec<StacktraceResponse>,
}

1
api/src/infra/db/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod schema;

View file

@ -0,0 +1,6 @@
diesel::table! {
stacktraces (id) {
id -> Integer,
stacktrace -> Varchar,
}
}

47
api/src/infra/errors.rs Normal file
View file

@ -0,0 +1,47 @@
use std::fmt;
use deadpool_diesel::InteractError;
#[derive(Debug)]
pub enum InfraError {
InternalServerError,
NotFound,
}
pub fn adapt_infra_error<T: Error>(error: T) -> InfraError {
error.as_infra_error()
}
impl fmt::Display for InfraError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
InfraError::NotFound => write!(f, "Not found"),
InfraError::InternalServerError => write!(f, "Internal server error"),
}
}
}
pub trait Error {
fn as_infra_error(&self) -> InfraError;
}
impl Error for diesel::result::Error {
fn as_infra_error(&self) -> InfraError {
match self {
diesel::result::Error::NotFound => InfraError::NotFound,
_ => InfraError::InternalServerError,
}
}
}
impl Error for deadpool_diesel::PoolError {
fn as_infra_error(&self) -> InfraError {
InfraError::InternalServerError
}
}
impl Error for InteractError {
fn as_infra_error(&self) -> InfraError {
InfraError::InternalServerError
}
}

3
api/src/infra/mod.rs Normal file
View file

@ -0,0 +1,3 @@
pub mod db;
pub mod errors;
pub mod repositories;

View file

@ -0,0 +1 @@
pub mod stacktrace_repository;

View file

@ -0,0 +1,90 @@
use diesel::{
ExpressionMethods, Insertable, QueryDsl, Queryable, RunQueryDsl,
Selectable, SelectableHelper,
};
use serde::{Deserialize, Serialize};
use crate::domain::models::post::{SimpleStacktraceModel};
use crate::infra::db::schema::stacktraces;
use crate::infra::errors::{adapt_infra_error, InfraError};
#[derive(Serialize, Queryable, Selectable)]
#[diesel(table_name = stacktraces)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct StacktraceDb {
pub id: i32,
pub stacktrace: String,
}
#[derive(Insertable)]
#[diesel(table_name = stacktraces)]
pub struct NewPostDb {
pub stacktrace: String,
}
pub async fn insert(
pool: &deadpool_diesel::postgres::Pool,
new_post: NewPostDb,
) -> Result<SimpleStacktraceModel, InfraError> {
let conn = pool.get().await.map_err(adapt_infra_error)?;
let res = conn
.interact(|conn| {
diesel::insert_into(stacktraces::table)
.values(new_post)
.returning(StacktraceDb::as_returning())
.get_result(conn)
})
.await
.map_err(adapt_infra_error)?
.map_err(adapt_infra_error)?;
Ok(adapt_stacktracedb_to_stacktracemodel(res))
}
pub async fn get(
pool: &deadpool_diesel::postgres::Pool,
id: i32,
) -> Result<SimpleStacktraceModel, InfraError> {
let conn = pool.get().await.map_err(adapt_infra_error)?;
let res = conn
.interact(move |conn| {
stacktraces::table
.filter(stacktraces::id.eq(id))
.select(StacktraceDb::as_select())
.get_result(conn)
})
.await
.map_err(adapt_infra_error)?
.map_err(adapt_infra_error)?;
Ok(adapt_stacktracedb_to_stacktracemodel(res))
}
pub async fn get_all(
pool: &deadpool_diesel::postgres::Pool,
) -> Result<Vec<SimpleStacktraceModel>, InfraError> {
let conn = pool.get().await.map_err(adapt_infra_error)?;
let res = conn
.interact(move |conn| {
let query = stacktraces::table.into_boxed::<diesel::pg::Pg>();
query.select(StacktraceDb::as_select()).load::<StacktraceDb>(conn)
})
.await
.map_err(adapt_infra_error)?
.map_err(adapt_infra_error)?;
let posts: Vec<SimpleStacktraceModel> = res
.into_iter()
.map(|post_db| adapt_stacktracedb_to_stacktracemodel(post_db))
.collect();
Ok(posts)
}
fn adapt_stacktracedb_to_stacktracemodel(post_db: StacktraceDb) -> SimpleStacktraceModel {
SimpleStacktraceModel {
id: post_db.id,
stacktrace: post_db.stacktrace,
}
}

78
api/src/main.rs Normal file
View file

@ -0,0 +1,78 @@
use std::net::SocketAddr;
use deadpool_diesel::postgres::{Manager, Pool};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::config::config;
use crate::errors::internal_error;
use crate::routes::app_router;
mod config;
mod domain;
mod errors;
mod handlers;
mod infra;
mod routes;
mod utils;
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/");
#[derive(Clone)]
pub struct AppState {
pool: Pool,
}
#[tokio::main]
async fn main() {
init_tracing();
let config = config().await;
let manager = Manager::new(
config.db_url().to_string(),
deadpool_diesel::Runtime::Tokio1,
);
let pool = Pool::builder(manager).build().unwrap();
{
run_migrations(&pool).await;
}
let state = AppState { pool };
let app = app_router(state.clone()).with_state(state);
let host = config.server_host();
let port = config.server_port();
let address = format!("{}:{}", host, port);
let socket_addr: SocketAddr = address.parse().unwrap();
tracing::info!("listening on http://{}", socket_addr);
axum::Server::bind(&socket_addr)
.serve(app.into_make_service())
.await
.map_err(internal_error)
.unwrap()
}
fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "example_tokio_postgres=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
}
async fn run_migrations(pool: &Pool) {
let conn = pool.get().await.unwrap();
conn.interact(|conn| conn.run_pending_migrations(MIGRATIONS).map(|_| ()))
.await
.unwrap()
.unwrap();
}

37
api/src/models.rs.z Normal file
View file

@ -0,0 +1,37 @@
use serde::Deserialize;
#[derive(Deserialize, Debug)]
pub struct SimpleStacktrace{
stacktrace: String
}
#[derive(Deserialize, Debug)]
pub struct Throwable {
cause: Option<Box<Throwable>>,
#[serde(rename(deserialize = "stackTrace"))]
stack_trace: Vec<Stacktrace>,
message: Option<String>,
suppressed: Vec<String>,
#[serde(rename(deserialize = "localizedMessage"))]
localized_message: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct Stacktrace {
#[serde(rename(deserialize = "classLoaderName"))]
class_loader_name: Option<String>,
#[serde(rename(deserialize = "moduleName"))]
module_name: Option<String>,
#[serde(rename(deserialize = "moduleVersion"))]
module_version: Option<String>,
#[serde(rename(deserialize = "methodName"))]
method_name: Option<String>,
#[serde(rename(deserialize = "fileName"))]
file_name: Option<String>,
#[serde(rename(deserialize = "lineNumber"))]
line_number: Option<u32>,
#[serde(rename(deserialize = "className"))]
class_name: Option<String>,
#[serde(rename(deserialize = "nativeMethod"))]
native_method: Option<bool>,
}

33
api/src/routes.rs Normal file
View file

@ -0,0 +1,33 @@
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::Router;
use crate::handlers::stacktraces::{create_stacktrace, get_stacktrace, list_stacktraces};
use crate::AppState;
pub fn app_router(state: AppState) -> Router<AppState> {
Router::new()
.route("/", get(root))
.nest("/api/stacktraces", stacktraces_routes(state.clone()))
.fallback(handler_404)
}
async fn root() -> &'static str {
"Server is running!"
}
async fn handler_404() -> impl IntoResponse {
(
StatusCode::NOT_FOUND,
"The requested resource was not found",
)
}
fn stacktraces_routes(state: AppState) -> Router<AppState> {
Router::new()
.route("/", post(create_stacktrace))
.route("/", get(list_stacktraces))
.route("/:id", get(get_stacktrace))
.with_state(state)
}

View file

@ -0,0 +1,15 @@
use axum::extract::rejection::JsonRejection;
use axum_macros::FromRequest;
use crate::errors::AppError;
#[derive(FromRequest)]
#[from_request(via(axum::Json), rejection(AppError))]
pub struct JsonExtractor<T>(pub T);
impl From<JsonRejection> for AppError {
fn from(rejection: JsonRejection) -> Self {
AppError::BodyParsingError(rejection.to_string())
}
}

View file

@ -0,0 +1,2 @@
pub mod json_extractor;
pub mod path_extractor;

View file

@ -0,0 +1,14 @@
use axum::extract::rejection::PathRejection;
use axum_macros::FromRequestParts;
use crate::errors::AppError;
#[derive(FromRequestParts, Debug)]
#[from_request(via(axum::extract::Path), rejection(AppError))]
pub struct PathExtractor<T>(pub T);
impl From<PathRejection> for AppError {
fn from(rejection: PathRejection) -> Self {
AppError::BodyParsingError(rejection.to_string())
}
}

5
api/src/utils/mod.rs Normal file
View file

@ -0,0 +1,5 @@
pub use custom_extractors::json_extractor::JsonExtractor;
pub use custom_extractors::path_extractor::PathExtractor;
mod custom_extractors;

View file

@ -30,6 +30,11 @@
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.2</version>
</dependency>
</dependencies>
<build>

View file

@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
@SuppressWarnings("unused") // this code is called from the instrumented code
public class ExceptionLogger {
private final static ObjectMapper objectMapper = new ObjectMapper();;
private final static MPSCBufferWriter bufferWriter=new MPSCBufferWriter();
private final static ObjectMapper objectMapper = new ObjectMapper();
private final static MPSCBufferWriter bufferWriter = new MPSCBufferWriter();
public static void log(Throwable throwable) {
try {

View file

@ -1,7 +1,6 @@
package com.github.shautvast.exceptional;
import java.lang.foreign.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@ -14,10 +13,15 @@ public class MPSCBufferWriter implements AutoCloseable {
private static Linker linker;
private static SymbolLookup rustlib;
private static final LinkedBlockingDeque<byte[]> writeQueue = new LinkedBlockingDeque<>(); // unbounded
private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
private static final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
});
private final AtomicBoolean active = new AtomicBoolean(false);
public MPSCBufferWriter() {
// DatabaseInit.initializeDatabase();
startWriteQueueListener();
}
@ -58,8 +62,8 @@ public class MPSCBufferWriter implements AutoCloseable {
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
System.out.println("Shutting down");
});
}

View file

@ -1,179 +0,0 @@
package com.github.shautvast.exceptional;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
//circular MPSC buffer
//TODO REMOVE
public class RingBuffer implements AutoCloseable {
private final ByteBuffer memory;
private final AtomicInteger readPointer;
private final AtomicInteger writePointer;
private final AtomicBoolean writerThreadRunning = new AtomicBoolean(true);
private final AtomicBoolean readerThreadRunning = new AtomicBoolean(false);
private final ReentrantLock lock = new ReentrantLock();
private static final LinkedBlockingDeque<byte[]> writeQueue = new LinkedBlockingDeque<>();
private final ExecutorService writerThread;
private ExecutorService readerThread;
void setReadPointer(int readPointer) {
this.readPointer.set(readPointer);
}
public RingBuffer(MemorySegment memory) {
if (memory.byteSize() > 0xfffd) {
throw new IllegalArgumentException("Max memory size is 65533"); //TODO probably ffff
}
this.memory = memory.asByteBuffer();
memory.asByteBuffer();
readPointer = new AtomicInteger(0);
writePointer = new AtomicInteger(0);
writerThread = Executors.newSingleThreadExecutor();
writerThread.submit(() -> {
while (writerThreadRunning.get()) {
try {
byte[] data = writeQueue.poll(5, TimeUnit.SECONDS);
if (data != null) {
while (!writeBytes(data)) {
Thread.sleep(5000);
}
;
}
} catch (InterruptedException _) {
// honor the interrupt
writerThreadRunning.set(false);
}
}
});
}
/**
* Writes a byte array to the ring buffer.
* <p>
* If there is enough space in the buffer, the method writes the byte array to the buffer
* and returns true. If there is not enough space, the method does not write the byte array
* and returns false.
*
* @param data the byte array to write to the buffer
* @return true if the byte array was written successfully, false otherwise
*/
private boolean writeBytes(byte[] data) {
if (writePointer.get() > memory.capacity()) {
System.out.println("blocked");
return false;//signal retry
}
System.out.println("write " + new String(data));
int allocSize = data.length + 2;
int pos = writePointer.getAndAdd(allocSize);
if (writePointer.get() > (memory.capacity() - 2)) {
int max = memory.capacity() - (pos + 4);
if (data.length - max < readPointer.get()) {
System.out.println("wrap");
memory.putShort(pos, (short) data.length);
memory.position(pos + 2);
memory.put(data, 0, max);
memory.position(0);
memory.put(data, max, data.length - max);
writePointer.set(data.length - max);
memory.putShort((short) 0);
return true;
} else {
return false;
}
} else {
memory.putShort(pos, (short) data.length);
memory.position(pos + 2);
memory.put(data);
memory.putShort((short) 0);
return true;
}
}
/**
* Reads a byte array from the ring buffer with a specified timeout.
* <p>
* Blocks until there is data available to read or the timeout is reached.
* If the timeout is reached and there is still no data, the resul is empty.
*
* @param timeout the maximum time to wait for data to be available in the buffer
* @return the byte array read from the buffer
* @throws InterruptedException if the thread is interrupted while waiting for data
*/
private Optional<byte[]> read(Duration timeout) throws InterruptedException {
if (memory.getShort(readPointer.get()) == 0 || readPointer.get() >= memory.capacity()) {
return Optional.empty();
}
return Optional.ofNullable(getBytes());
}
private byte[] getBytes() {
int currentReadPointerValue = readPointer.get();
int lenToread = memory.getShort(currentReadPointerValue);
System.out.println(lenToread + " bytes");
if (lenToread <= 0) {
return null;
}
currentReadPointerValue = readPointer.addAndGet(2);
byte[] data = new byte[lenToread];
int bytesTilEnd = memory.capacity() - currentReadPointerValue;
if (lenToread > bytesTilEnd) {
memory.get(currentReadPointerValue, data, 0, bytesTilEnd);
memory.get(0, data, bytesTilEnd, lenToread - bytesTilEnd);
readPointer.set(lenToread - bytesTilEnd);
} else {
memory.get(currentReadPointerValue, data);
System.out.println("set "+readPointer.addAndGet(lenToread));
}
return data;
}
public void write(byte[] bytes) {
while (!writeQueue.offer(bytes)) ;
}
public void startReader(Consumer<byte[]> consumer) {
readerThreadRunning.set(true);
readerThread = Executors.newSingleThreadExecutor();
readerThread.submit(() -> {
while (readerThreadRunning.get()) {
try {
System.out.println("read");
read(Duration.ofSeconds(5)).ifPresent(consumer);
Thread.sleep(5000);
} catch (InterruptedException _) {
readerThreadRunning.set(false);
}
}
});
}
public void close() {
System.out.println("close");
writerThreadRunning.set(false);
readerThreadRunning.set(false);
writerThread.close();
readerThread.close();
}
public void drain() {
while (!writeQueue.isEmpty()) ;
close();
}
}

View file

@ -8,8 +8,13 @@ class ExceptionLoggerTest {
@Test
void test() throws InterruptedException {
long t0 = System.currentTimeMillis();
for (int i = 0; i < 1_000; i++) {
ExceptionLogger.log(new Throwable());
TimeUnit.SECONDS.sleep(1);
TimeUnit.MILLISECONDS.sleep(1);
}
System.out.println(System.currentTimeMillis() - t0);
Thread.sleep(10000);
}
}

View file

@ -11,6 +11,7 @@
<modules>
<module>agent</module>
<module>rustlib</module>
<module>lib</module>
</modules>

1309
rustlib/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -8,6 +8,7 @@ crate-type = ["cdylib"]
bench = false
[dependencies]
serde_json = { version = "1.0", features = [] }
serde = { version = "1.0", features = ["derive"] }
anyhow = "1.0"
chrono = "0.4"
reqwest = { version = "0.12", features = ["blocking"]}
crossbeam-deque = "0.8"

5
rustlib/bodies Normal file
View file

@ -0,0 +1,5 @@
HTTP/1.1 200 OK
content-type: text/plain; charset=utf-8
content-length: 19
date: Tue, 02 Jul 2024 06:20:13 GMT

41
rustlib/pom.xml Normal file
View file

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.github.shautvast.exceptional</groupId>
<artifactId>exceptional-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>exceptional-rustlib</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>Custom Command at Compile Phase</id>
<phase>compile</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>cargo</executable>
<workingDirectory>${project.basedir}/rustlib</workingDirectory>
<arguments>
<argument>build</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -1,32 +0,0 @@
use serde::Deserialize;
#[derive(Deserialize, Debug)]
pub struct Throwable {
cause: Option<Box<Throwable>>,
#[serde(rename (deserialize = "stackTrace"))]
stack_trace: Vec<Stacktrace>,
message: Option<String>,
suppressed: Vec<String>,
#[serde(rename (deserialize = "localizedMessage"))]
localized_message: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct Stacktrace{
#[serde(rename (deserialize = "classLoaderName"))]
class_loader_name: Option<String>,
#[serde(rename (deserialize = "moduleName"))]
module_name: Option<String>,
#[serde(rename (deserialize = "moduleVersion"))]
module_version: Option<String>,
#[serde(rename (deserialize = "methodName"))]
method_name: Option<String>,
#[serde(rename (deserialize = "fileName"))]
file_name: Option<String>,
#[serde(rename (deserialize = "lineNumber"))]
line_number: Option<u32>,
#[serde(rename (deserialize = "className"))]
class_name: Option<String>,
#[serde(rename (deserialize = "nativeMethod"))]
native_method: Option<bool>,
}

View file

@ -1,18 +1,23 @@
use std::cell::OnceCell;
use std::ffi::c_char;
use std::slice;
mod throwable;
use reqwest::blocking::Client;
// same value, but different meanings
// TODO find a way to set the buffer size from java.
// why not just add it to the function
const CAPACITY: isize = 32760;
const READ: isize = 32760;
const CLIENT: OnceCell<Client> = OnceCell::new();
#[no_mangle]
pub extern "C" fn buffer_updated(buffer: *mut c_char) {
let client = CLIENT;
let client = client.get_or_init(|| Client::new());
let mut read_pos = get_u32(buffer, READ) as isize;
let mut remaining = CAPACITY - read_pos;
let mut remaining = CAPACITY - read_pos; // nr of bytes to read before end of buffer
let len = if remaining == 1 {
let byte_high = get_u8(buffer, read_pos);
read_pos = 0;
@ -33,37 +38,37 @@ pub extern "C" fn buffer_updated(buffer: *mut c_char) {
l
} as isize;
let mut result = Vec::with_capacity(len as usize);
// copy only when needed
if len <= remaining {
for i in 0..len {
unsafe { result.push(*buffer.offset(read_pos + i) as u8); }
unsafe {
let result = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.offset(read_pos).cast::<u8>(), len as usize));
client.post("http://localhost:3000/api/stacktraces")
.body(result)
.send()
.unwrap();
}
read_pos += len;
} else {
for i in 0..remaining {
unsafe { result.push(*buffer.offset(read_pos + i) as u8); }
unsafe {
let s1 = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.offset(read_pos).cast::<u8>(), remaining as usize));
let s2 = std::str::from_utf8_unchecked(slice::from_raw_parts(buffer.cast::<u8>(), (len - remaining) as usize));
let mut s = String::with_capacity(len as usize);
s.push_str(s1);
s.push_str(s2);
client.post("http://localhost:3000/api/stacktraces")
.body(s)
.send()
.unwrap();
}
read_pos = 0;
for i in 0..len - remaining {
unsafe { result.push(*buffer.offset(i) as u8); }
}
read_pos += len - remaining;
read_pos = len - remaining;
}
put_u32(buffer, READ, read_pos as u32);
let string = String::from_utf8(result);
if let Ok(json) = string {
println!("receiving {}", json);
} else {
println!("not ok");
}
}
fn get_u8(s: *const c_char, pos: isize) -> u8 {
unsafe { *s.offset(pos) as u8 }
}
fn get_u16(s: *const c_char, pos: isize) -> u16 {
let mut b: [u8; 2] = [0; 2];
unsafe {