From 51dcc1bf2757ca31e76aa38e3d8a48f367a8994d Mon Sep 17 00:00:00 2001 From: Shautvast Date: Mon, 18 Nov 2024 17:31:39 +0100 Subject: [PATCH] first commit --- .gitignore | 38 + README.md | 1 + java/dockercompose-cluster.yaml | 95 +++ java/pom.xml | 88 ++ .../java/org/github/shautvast/avro/Main.java | 44 + java/src/main/resources/avro/Person.avsc | 21 + rust/Cargo.lock | 806 ++++++++++++++++++ rust/Cargo.toml | 13 + rust/src/main.rs | 58 ++ 9 files changed, 1164 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 java/dockercompose-cluster.yaml create mode 100644 java/pom.xml create mode 100644 java/src/main/java/org/github/shautvast/avro/Main.java create mode 100644 java/src/main/resources/avro/Person.avsc create mode 100644 rust/Cargo.lock create mode 100644 rust/Cargo.toml create mode 100644 rust/src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..1172bc3 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +https://blog.top-squad.nl/kafka TBD diff --git a/java/dockercompose-cluster.yaml b/java/dockercompose-cluster.yaml new file mode 100644 index 0000000..1ad893b --- /dev/null +++ b/java/dockercompose-cluster.yaml @@ -0,0 +1,95 @@ +version: '3.5' + +services: + schema-registry: + image: confluentinc/cp-schema-registry:latest + hostname: schema-registry + depends_on: + - kafka1 + - kafka2 + - kafka3 + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081 + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092,PLAINTEXT://kafka2:19093,PLAINTEXT://kafka3:19094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT + SCHEMA_REGISTRY_DEBUG: 'true' + + kafka1: + image: confluentinc/cp-kafka:latest + hostname: kafka1 + container_name: kafka1 + ports: + - "39092:39092" + environment: + KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093 + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://localhost:39092 + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_METADATA_LOG_SEGMENT_MS: 15000 + KAFKA_METADATA_MAX_RETENTION_MS: 1200000 + KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + CLUSTER_ID: 'mycluster' + volumes: + - kafka1-data:/var/lib/kafka/data + + kafka2: + image: confluentinc/cp-kafka:latest + hostname: kafka2 + container_name: kafka2 + ports: + - "39093:39093" + environment: + KAFKA_LISTENERS: BROKER://kafka2:19093,EXTERNAL://kafka2:39093,CONTROLLER://kafka2:9093 + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka2:19093,EXTERNAL://kafka2:39093 + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_NODE_ID: 2 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_METADATA_LOG_SEGMENT_MS: 15000 + KAFKA_METADATA_MAX_RETENTION_MS: 1200000 + KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + CLUSTER_ID: 'mycluster' + volumes: + - kafka2-data:/var/lib/kafka/data + + kafka3: + image: confluentinc/cp-kafka:latest + hostname: kafka3 + container_name: kafka3 + ports: + - "39094:39094" + environment: + KAFKA_LISTENERS: BROKER://kafka3:19094,EXTERNAL://kafka3:39094,CONTROLLER://kafka3:9093 + KAFKA_ADVERTISED_LISTENERS: BROKER://kafka3:19094,EXTERNAL://kafka3:39094 + KAFKA_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_NODE_ID: 3 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093' + KAFKA_METADATA_LOG_SEGMENT_MS: 15000 + KAFKA_METADATA_MAX_RETENTION_MS: 1200000 + KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + CLUSTER_ID: 'mycluster' + volumes: + - kafka3-data:/var/lib/kafka/data + +volumes: + kafka1-data: + kafka2-data: + kafka3-data: \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml new file mode 100644 index 0000000..42ebdd4 --- /dev/null +++ b/java/pom.xml @@ -0,0 +1,88 @@ + + + 4.0.0 + + org.github.shautvast + avro-example + 1.0-SNAPSHOT + + + 22 + 22 + UTF-8 + 1.11.4 + 3.8.0 + 7.7.1 + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + org.apache.avro + avro + ${avro.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + io.confluent + kafka-avro-serializer + ${kafka.serializer.version} + + + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + String + ${project.basedir}/src/main/resources/avro/ + ${project.build.directory}/generated/java/ + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.5.0 + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated/java + + + + + + + + \ No newline at end of file diff --git a/java/src/main/java/org/github/shautvast/avro/Main.java b/java/src/main/java/org/github/shautvast/avro/Main.java new file mode 100644 index 0000000..3c810d4 --- /dev/null +++ b/java/src/main/java/org/github/shautvast/avro/Main.java @@ -0,0 +1,44 @@ +package org.github.shautvast.avro; + +import example.avro.Person; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.concurrent.Future; + +public class Main { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); + properties.put("schema.registry.url", "http://localhost:8081"); + + Person person = new Person(); + person.setName("Arthur Dent"); + person.setFavoriteNumber(42); + person.setHeight(1.90); + + // Prepare a message to send + ProducerRecord record = new ProducerRecord<>("rustonomicon", "arthur", person); + + try (KafkaProducer producer = new KafkaProducer<>(properties)) { + // Send the message; note that this is asynchronous + Future future = producer.send(record); + + // For this example, we're going to block and get the response; don't do this in production! + RecordMetadata metadata = future.get(); + + // Print the offset and partition of the message + System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset()); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/java/src/main/resources/avro/Person.avsc b/java/src/main/resources/avro/Person.avsc new file mode 100644 index 0000000..fecd678 --- /dev/null +++ b/java/src/main/resources/avro/Person.avsc @@ -0,0 +1,21 @@ +{ + "namespace": "example.avro", + "type" : "record", + "name" : "Person", + "doc" : "Ape descendent creature dwelling on planet Earth", + "aliases": ["Human"], + "fields" : [ + { + "name" : "name", + "type" : "string" + }, + { + "name" : "favorite_number", + "type" : "int" + }, + { + "name" : "height", + "type" : "double" + } + ] +} \ No newline at end of file diff --git a/rust/Cargo.lock b/rust/Cargo.lock new file mode 100644 index 0000000..d812799 --- /dev/null +++ b/rust/Cargo.lock @@ -0,0 +1,806 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "allocator-api2" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" + +[[package]] +name = "anyhow" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" + +[[package]] +name = "apache-avro" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aef82843a0ec9f8b19567445ad2421ceeb1d711514384bdd3d49fe37102ee13" +dependencies = [ + "bigdecimal", + "digest", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_bytes", + "serde_json", + "strum", + "strum_macros", + "thiserror", + "typed-builder", + "uuid", +] + +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "bigdecimal" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f850665a0385e070b64c38d2354e6c104c8479c59868d1e48a0c13ee2c7a1c1" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "cc" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aeb932158bd710538c73702db6945cb68a8fb08c519e6e12706b94263b36db8" +dependencies = [ + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "dary_heap" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] +name = "flate2" +version = "1.0.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "integer-encoding" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a" + +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "kafka" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054ba4edcb4dcda4209e138c7e88caf26d4a325b3db76fbdb6ca5eecc23e426" +dependencies = [ + "byteorder", + "crc", + "flate2", + "fnv", + "openssl", + "openssl-sys", + "ref_slice", + "snap", + "thiserror", + "tracing", + "twox-hash", +] + +[[package]] +name = "libc" +version = "0.2.162" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" + +[[package]] +name = "libflate" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +dependencies = [ + "core2", + "hashbrown", + "rle-decode-fast", +] + +[[package]] +name = "libm" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" + +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", + "serde", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" + +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" + +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quad-rand" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" + +[[package]] +name = "quote" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "ref_slice" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" + +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + +[[package]] +name = "rust" +version = "0.1.0" +dependencies = [ + "anyhow", + "apache-avro", + "kafka", + "serde", + "serde_avro_fast", + "serde_derive", +] + +[[package]] +name = "rust_decimal" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555" +dependencies = [ + "arrayvec", + "num-traits", + "serde", +] + +[[package]] +name = "rustversion" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" + +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.215" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde-transcode" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "590c0e25c2a5bb6e85bf5c1bce768ceb86b316e7a01bdf07d2cb4ec2271990e2" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_avro_fast" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed59cbaf90bc3d5f02bace18d57b46f41366c176b1be6da28fddf4d1960adc6" +dependencies = [ + "flate2", + "integer-encoding", + "num-traits", + "rand", + "rust_decimal", + "serde", + "serde-transcode", + "serde_derive", + "serde_json", + "serde_serializer_quick_unsupported", + "thiserror", +] + +[[package]] +name = "serde_bytes" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_derive" +version = "1.0.215" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.132" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "serde_serializer_quick_unsupported" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb65c5dbc9079c6913d2120b1b4112cce4e2295b7f152ca52d7f48e19ad5396" + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + +[[package]] +name = "syn" +version = "2.0.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "rand", + "static_assertions", +] + +[[package]] +name = "typed-builder" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06fbd5b8de54c5f7c91f6fe4cebb949be2125d7758e630bb58b1d831dbce600" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9534daa9fd3ed0bd911d462a37f172228077e7abf18c18a5f67199d959205f8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + +[[package]] +name = "unicode-ident" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" + +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "serde", +] + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/rust/Cargo.toml b/rust/Cargo.toml new file mode 100644 index 0000000..a3f4c97 --- /dev/null +++ b/rust/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "rust" +version = "0.1.0" +edition = "2021" + +[dependencies] +kafka = "0.10" +apache-avro = "0.17.0" +anyhow = "1.0" +# serde_avro_derive = "0.3.1" +serde_avro_fast = "2.0.0" +serde_derive = "1.0.215" +serde = "1.0.215" diff --git a/rust/src/main.rs b/rust/src/main.rs new file mode 100644 index 0000000..b562989 --- /dev/null +++ b/rust/src/main.rs @@ -0,0 +1,58 @@ +use kafka::{client::GroupOffsetStorage, consumer::Consumer}; + +fn main() -> anyhow::Result<()> { + let schema: serde_avro_fast::Schema = r#" + { + "namespace": "example.avro", + "type" : "record", + "name" : "Person", + "doc" : "Ape descendent creature dwelling on planet Earth", + "aliases": ["Human"], + "fields" : [ + { + "name" : "name", + "type" : "string" + }, + { + "name" : "favorite_number", + "type" : "int" + }, + { + "name" : "height", + "type" : "double" + } + ] + }"# + .parse() + .expect("Failed to parse schema"); + + let mut consumer = Consumer::from_hosts(vec!["localhost:19092".into()]) + .with_topic_partitions("rustonomicon".to_owned(), &[0]) + .with_group("mygroup".into()) + .with_offset_storage(Some(GroupOffsetStorage::Kafka)) + .create() + .unwrap(); + + loop { + let message = consumer.poll()?; + + for ms in message.iter() { + for m in ms.messages().iter() { + let person = serde_avro_fast::from_datum_slice::(&m.value[5..], &schema) + .expect("Failed to deserialize"); + println!("{:?}", person.name); + consumer.consume_message("rustonomicon", ms.partition(), m.offset)?; + } + consumer.commit_consumed()?; + } + } + + // Ok(()) +} + +#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)] +struct Person<'a> { + name: &'a str, + favorite_number: u32, + height: f64, +}