finally works

This commit is contained in:
Shautvast 2024-11-20 15:21:45 +01:00
parent 566814fa3e
commit 83000e8b41
4 changed files with 1307 additions and 179 deletions

View file

@ -1 +1,11 @@
https://blog.top-squad.nl/kafka TBD https://blog.top-squad.nl/kafka-met-rust2
depends on /etc/hosts:
```bash
127.0.0.1 kafka1
127.0.0.1 kafka2
127.0.0.1 kafka3
```
open to suggestions to leave this out, should be possible

1380
rust/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,10 +4,12 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
kafka = "0.10"
apache-avro = "0.17.0" apache-avro = "0.17.0"
anyhow = "1.0" anyhow = "1.0"
# serde_avro_derive = "0.3.1" serde = "1.0"
serde_avro_fast = "2.0.0" serde_derive = "1.0"
serde_derive = "1.0.215" rdkafka = "0.36"
serde = "1.0.215" tokio = { version = "1", features = ["full"] }
futures = "0.3.28"
schema_registry_converter = { version = "4.2", features = ["avro"] }
tokio-macros = "2.4.0"

View file

@ -1,62 +1,34 @@
use kafka::{client::GroupOffsetStorage, consumer::Consumer}; use apache_avro::types::Value;
use rdkafka::{
consumer::{CommitMode, Consumer, StreamConsumer},
ClientConfig, Message,
};
use schema_registry_converter::async_impl::{avro::AvroDecoder, schema_registry::SrSettings};
fn main() -> anyhow::Result<()> { #[tokio::main]
let schema: serde_avro_fast::Schema = r#" async fn main() -> anyhow::Result<()> {
{ let consumer: StreamConsumer = ClientConfig::new()
"namespace": "example.avro", .set("group.id", "mygroup")
"type" : "record", .set(
"name" : "Person", "bootstrap.servers",
"doc" : "Ape descendent creature dwelling on planet Earth", "localhost:39092,localhost:39093,localhost:39094",
"aliases": ["Human"], )
"fields" : [
{
"name" : "name",
"type" : "string"
},
{
"name" : "favorite_number",
"type" : "int"
},
{
"name" : "height",
"type" : "double"
}
]
}"#
.parse()
.expect("Failed to parse schema");
let mut consumer = Consumer::from_hosts(vec![
"localhost:39092".into(),
"localhost:39093".into(),
"localhost:39094".into(),
])
.with_topic_partitions("rustonomicon".to_owned(), &[0])
.with_group("mygroup".into())
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create() .create()
.unwrap(); .expect("Consumer creation error");
let avro_decoder = AvroDecoder::new(SrSettings::new("http://localhost:8081".into()));
loop { consumer
let message = consumer.poll()?; .subscribe(&["rustonomicon"])
.expect("Can't subscribe to specific topics");
for ms in message.iter() { while let Ok(message) = consumer.recv().await {
for m in ms.messages().iter() { let value_result = avro_decoder.decode(message.payload()).await?.value;
let person = serde_avro_fast::from_datum_slice::<Person>(&m.value[5..], &schema) if let Value::Record(value_result) = value_result {
.expect("Failed to deserialize"); println!("{:?}", value_result.get(0));
println!("{:?}", person.name);
consumer.consume_message("rustonomicon", ms.partition(), m.offset)?;
}
consumer.commit_consumed()?;
}
} }
// Ok(()) consumer.commit_message(&message, CommitMode::Async)?;
} }
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] Ok(())
struct Person<'a> {
name: &'a str,
favorite_number: u32,
height: f64,
} }