diff --git a/dockercompose-cluster.yaml b/dockercompose-cluster.yaml index 1ad893b..f1b27a2 100644 --- a/dockercompose-cluster.yaml +++ b/dockercompose-cluster.yaml @@ -1,4 +1,4 @@ -version: '3.5' +version: "3.5" services: schema-registry: @@ -15,7 +15,7 @@ services: SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081 SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092,PLAINTEXT://kafka2:19093,PLAINTEXT://kafka3:19094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT - SCHEMA_REGISTRY_DEBUG: 'true' + SCHEMA_REGISTRY_DEBUG: "true" kafka1: image: confluentinc/cp-kafka:latest @@ -25,19 +25,19 @@ services: - "39092:39092" environment: KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093 - KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://localhost:39092 + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092 KAFKA_INTER_BROKER_LISTENER_NAME: BROKER KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_PROCESS_ROLES: "controller,broker" KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093" KAFKA_METADATA_LOG_SEGMENT_MS: 15000 KAFKA_METADATA_MAX_RETENTION_MS: 1200000 KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - CLUSTER_ID: 'mycluster' + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" + CLUSTER_ID: "mycluster" volumes: - kafka1-data:/var/lib/kafka/data @@ -54,14 +54,14 @@ services: KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_PROCESS_ROLES: "controller,broker" KAFKA_NODE_ID: 2 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093" KAFKA_METADATA_LOG_SEGMENT_MS: 15000 KAFKA_METADATA_MAX_RETENTION_MS: 1200000 KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - CLUSTER_ID: 'mycluster' + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" + CLUSTER_ID: "mycluster" volumes: - kafka2-data:/var/lib/kafka/data @@ -78,18 +78,18 @@ services: KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_PROCESS_ROLES: "controller,broker" KAFKA_NODE_ID: 3 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093" KAFKA_METADATA_LOG_SEGMENT_MS: 15000 KAFKA_METADATA_MAX_RETENTION_MS: 1200000 KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - CLUSTER_ID: 'mycluster' + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" + CLUSTER_ID: "mycluster" volumes: - kafka3-data:/var/lib/kafka/data volumes: kafka1-data: kafka2-data: - kafka3-data: \ No newline at end of file + kafka3-data: diff --git a/java/pom.xml b/java/pom.xml index 42ebdd4..c35d01b 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -40,6 +40,16 @@ kafka-avro-serializer ${kafka.serializer.version} + + org.slf4j + slf4j-api + 2.0.16 + + + ch.qos.logback + logback-classic + 1.5.11 + diff --git a/java/src/main/java/org/github/shautvast/avro/Main.java b/java/src/main/java/org/github/shautvast/avro/Main.java index 3c810d4..a66316f 100644 --- a/java/src/main/java/org/github/shautvast/avro/Main.java +++ b/java/src/main/java/org/github/shautvast/avro/Main.java @@ -7,14 +7,14 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; - import java.util.Properties; import java.util.concurrent.Future; + public class Main { public static void main(String[] args) { Properties properties = new Properties(); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:39092,localhost:39093,localhost:39094"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); properties.put("schema.registry.url", "http://localhost:8081"); @@ -24,17 +24,11 @@ public class Main { person.setFavoriteNumber(42); person.setHeight(1.90); - // Prepare a message to send ProducerRecord record = new ProducerRecord<>("rustonomicon", "arthur", person); try (KafkaProducer producer = new KafkaProducer<>(properties)) { - // Send the message; note that this is asynchronous Future future = producer.send(record); - - // For this example, we're going to block and get the response; don't do this in production! RecordMetadata metadata = future.get(); - - // Print the offset and partition of the message System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset()); } catch (Exception e) { diff --git a/rust/src/main.rs b/rust/src/main.rs index b562989..5cf3a7e 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -26,12 +26,16 @@ fn main() -> anyhow::Result<()> { .parse() .expect("Failed to parse schema"); - let mut consumer = Consumer::from_hosts(vec!["localhost:19092".into()]) - .with_topic_partitions("rustonomicon".to_owned(), &[0]) - .with_group("mygroup".into()) - .with_offset_storage(Some(GroupOffsetStorage::Kafka)) - .create() - .unwrap(); + let mut consumer = Consumer::from_hosts(vec![ + "localhost:39092".into(), + "localhost:39093".into(), + "localhost:39094".into(), + ]) + .with_topic_partitions("rustonomicon".to_owned(), &[0]) + .with_group("mygroup".into()) + .with_offset_storage(Some(GroupOffsetStorage::Kafka)) + .create() + .unwrap(); loop { let message = consumer.poll()?;