working setup

This commit is contained in:
Shautvast 2024-11-18 21:49:11 +01:00
parent 81a7a63802
commit 566814fa3e
4 changed files with 38 additions and 30 deletions

View file

@ -1,4 +1,4 @@
version: '3.5' version: "3.5"
services: services:
schema-registry: schema-registry:
@ -15,7 +15,7 @@ services:
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081 SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092,PLAINTEXT://kafka2:19093,PLAINTEXT://kafka3:19094 SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092,PLAINTEXT://kafka2:19093,PLAINTEXT://kafka3:19094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
SCHEMA_REGISTRY_DEBUG: 'true' SCHEMA_REGISTRY_DEBUG: "true"
kafka1: kafka1:
image: confluentinc/cp-kafka:latest image: confluentinc/cp-kafka:latest
@ -25,19 +25,19 @@ services:
- "39092:39092" - "39092:39092"
environment: environment:
KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093 KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://localhost:39092 KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker' KAFKA_PROCESS_ROLES: "controller,broker"
KAFKA_NODE_ID: 1 KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_METADATA_LOG_SEGMENT_MS: 15000 KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000 KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
CLUSTER_ID: 'mycluster' CLUSTER_ID: "mycluster"
volumes: volumes:
- kafka1-data:/var/lib/kafka/data - kafka1-data:/var/lib/kafka/data
@ -54,14 +54,14 @@ services:
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker' KAFKA_PROCESS_ROLES: "controller,broker"
KAFKA_NODE_ID: 2 KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_METADATA_LOG_SEGMENT_MS: 15000 KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000 KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
CLUSTER_ID: 'mycluster' CLUSTER_ID: "mycluster"
volumes: volumes:
- kafka2-data:/var/lib/kafka/data - kafka2-data:/var/lib/kafka/data
@ -78,18 +78,18 @@ services:
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker' KAFKA_PROCESS_ROLES: "controller,broker"
KAFKA_NODE_ID: 3 KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_METADATA_LOG_SEGMENT_MS: 15000 KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000 KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
CLUSTER_ID: 'mycluster' CLUSTER_ID: "mycluster"
volumes: volumes:
- kafka3-data:/var/lib/kafka/data - kafka3-data:/var/lib/kafka/data
volumes: volumes:
kafka1-data: kafka1-data:
kafka2-data: kafka2-data:
kafka3-data: kafka3-data:

View file

@ -40,6 +40,16 @@
<artifactId>kafka-avro-serializer</artifactId> <artifactId>kafka-avro-serializer</artifactId>
<version>${kafka.serializer.version}</version> <version>${kafka.serializer.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.11</version>
</dependency>
</dependencies> </dependencies>

View file

@ -7,14 +7,14 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Future; import java.util.concurrent.Future;
public class Main { public class Main {
public static void main(String[] args) { public static void main(String[] args) {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:39092,localhost:39093,localhost:39094");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081"); properties.put("schema.registry.url", "http://localhost:8081");
@ -24,17 +24,11 @@ public class Main {
person.setFavoriteNumber(42); person.setFavoriteNumber(42);
person.setHeight(1.90); person.setHeight(1.90);
// Prepare a message to send
ProducerRecord<String, Person> record = new ProducerRecord<>("rustonomicon", "arthur", person); ProducerRecord<String, Person> record = new ProducerRecord<>("rustonomicon", "arthur", person);
try (KafkaProducer<String, Person> producer = new KafkaProducer<>(properties)) { try (KafkaProducer<String, Person> producer = new KafkaProducer<>(properties)) {
// Send the message; note that this is asynchronous
Future<RecordMetadata> future = producer.send(record); Future<RecordMetadata> future = producer.send(record);
// For this example, we're going to block and get the response; don't do this in production!
RecordMetadata metadata = future.get(); RecordMetadata metadata = future.get();
// Print the offset and partition of the message
System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset()); System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
} catch (Exception e) { } catch (Exception e) {

View file

@ -26,12 +26,16 @@ fn main() -> anyhow::Result<()> {
.parse() .parse()
.expect("Failed to parse schema"); .expect("Failed to parse schema");
let mut consumer = Consumer::from_hosts(vec!["localhost:19092".into()]) let mut consumer = Consumer::from_hosts(vec![
.with_topic_partitions("rustonomicon".to_owned(), &[0]) "localhost:39092".into(),
.with_group("mygroup".into()) "localhost:39093".into(),
.with_offset_storage(Some(GroupOffsetStorage::Kafka)) "localhost:39094".into(),
.create() ])
.unwrap(); .with_topic_partitions("rustonomicon".to_owned(), &[0])
.with_group("mygroup".into())
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create()
.unwrap();
loop { loop {
let message = consumer.poll()?; let message = consumer.poll()?;