I'm using Confluent Schema Registry with Avro and I'd like to use multiple schemas in one Kafka Topic.
The default subject naming strategy TopicNameStrategy does not allow for this because it couples the schema subject name to the topic name. It is apparently possible to override this and set the subject naming strategy to RecordNameStrategy or TopicRecordNameStrategy.
Unfortunately, the documentation is not very clear on how or where the subject naming strategy can be overridden.
This document suggests that you can provide the configuration when creating or modifying a topic:
Starting with Confluent Platform 5.5.0, the naming strategy is associated with the topics. Therefore, you now have the option to configure a naming strategy to something other than the default on a per-topic basis for both the schema subject key and value with
confluent.key.subject.name.strategyandconfluent.value.subject.name.strategy.From the Confluent CLI, use the
--configoption to create or modify a topic with the specified naming strategy. For example:To create a topic that uses RecordNameStrategy for the value:
./bin/kafka-topics --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic my-other-cool-topic \ --config confluent.value.schema.validation=true --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
When I try this, the kafka-topics application returns the following error:
Error while executing topic command : Unknown topic config name: confluent.value.subject.name.strategy
[2020-10-31 10:17:00,947] ERROR org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: confluent.value.subject.name.strategy
(kafka.admin.TopicCommand$)
I've also tried configuring the subject name strategy when writing records to Kafka using kafka-avro-console-producer via property value.subject.name.strategy as this document suggests. However, the console producer ignores this property and creates a new subject for the topic using the default strategy.
kafka-avro-console-producer \
--broker-list kafka-broker-0.kafka-broker:9092 \
--property schema.registry.url='http://kafka-schema-registry:8081' \
--property value.schema='<MYSCHEMA>' \
--property value.subject.name.strategy='io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
--topic mytopic
Apparently there used to be a way to configure the strategy on the broker itself, but I can't find any documentation states how to do that either.
What is the correct way to configure the naming strategy? Where should it be configured -- on the topic, on the topic producer (kafka-avro-console-producer in this case), or somewhere else?
Additional context:
confluentinc/cp-kafka:6.0.0, confluentinc/cp-schema-registry:6.0.0, and confluentinc/cp-zookeeper:6.0.0kafka-topics and kafka-avro-console-producer versions are also set to 6.0.0You can submit the naming strategy while running the producer from the console.
kafka-avro-console-producer \
--bootstrap-server bootstrap-server-url:port \
--topic my-kafka-topic \
--property key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy \
--property value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy \
--property schema.registry.url=https://schemaregistryurl:port \
--producer.config <locationOfTheConfig> \
--property value.schema='{ \"namespace\": \"com.demo.user\", \"name\": \"UserCreatedEvent\", \"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"username\",\"type\":\"string\"},{\"name\":\"phone\",\"type\":\"long\"},{\"name\":\"age\",\"type\":\"int\"}]}' \
--property parse.key=true \
--property key.schema='{\r\n \"name\": \"UserCreatedEventKey\",\r\n \"type\": \"record\",\r\n \"doc\": \"Key for user creation event,\r\n \"namespace\": \"com.demo.user\",\r\n \"fields\": [\r\n {\r\n \"name\": \"userId\",\r\n \"type\": \"string\",\r\n \"doc\": \"Unique identifier for the user\"\r\n }\r\n ]\r\n}' \
--property key.separator=" " \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info= username:password
As you can see we are overriding the default name strategy for both Key and Value by providing
--property key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy \
--property value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy \
This will generate the schema under the subject names
my-kafka-topic-com.demo.UserCreatedEventKey for key schema and
my-kafka-topic-com.demo.UserCreatedEvent for value schema
These schemas can be seen at the https://schemaregistryurl:port/schemas
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With