Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you configure the subject naming strategy for Confluent Schema Registry?

I'm using Confluent Schema Registry with Avro and I'd like to use multiple schemas in one Kafka Topic.

The default subject naming strategy TopicNameStrategy does not allow for this because it couples the schema subject name to the topic name. It is apparently possible to override this and set the subject naming strategy to RecordNameStrategy or TopicRecordNameStrategy.

Unfortunately, the documentation is not very clear on how or where the subject naming strategy can be overridden.

This document suggests that you can provide the configuration when creating or modifying a topic:

Starting with Confluent Platform 5.5.0, the naming strategy is associated with the topics. Therefore, you now have the option to configure a naming strategy to something other than the default on a per-topic basis for both the schema subject key and value with confluent.key.subject.name.strategy and confluent.value.subject.name.strategy.

From the Confluent CLI, use the --config option to create or modify a topic with the specified naming strategy. For example:

To create a topic that uses RecordNameStrategy for the value:

./bin/kafka-topics --create --bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 1 --topic my-other-cool-topic \
--config confluent.value.schema.validation=true --config confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

When I try this, the kafka-topics application returns the following error:

Error while executing topic command : Unknown topic config name: confluent.value.subject.name.strategy
[2020-10-31 10:17:00,947] ERROR org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: confluent.value.subject.name.strategy
 (kafka.admin.TopicCommand$)

I've also tried configuring the subject name strategy when writing records to Kafka using kafka-avro-console-producer via property value.subject.name.strategy as this document suggests. However, the console producer ignores this property and creates a new subject for the topic using the default strategy.

kafka-avro-console-producer \
    --broker-list kafka-broker-0.kafka-broker:9092 \
    --property schema.registry.url='http://kafka-schema-registry:8081' \
    --property value.schema='<MYSCHEMA>' \
    --property value.subject.name.strategy='io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
    --topic mytopic

Apparently there used to be a way to configure the strategy on the broker itself, but I can't find any documentation states how to do that either.

What is the correct way to configure the naming strategy? Where should it be configured -- on the topic, on the topic producer (kafka-avro-console-producer in this case), or somewhere else?

Additional context:

  • I'm running everything in Docker using images confluentinc/cp-kafka:6.0.0, confluentinc/cp-schema-registry:6.0.0, and confluentinc/cp-zookeeper:6.0.0
  • The kafka-topics and kafka-avro-console-producer versions are also set to 6.0.0
like image 207
Jared Petersen Avatar asked Jan 18 '26 15:01

Jared Petersen


1 Answers

You can submit the naming strategy while running the producer from the console.

kafka-avro-console-producer \
--bootstrap-server bootstrap-server-url:port \
--topic my-kafka-topic \
--property key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy  \
--property value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy \
--property schema.registry.url=https://schemaregistryurl:port \
--producer.config <locationOfTheConfig> \
--property value.schema='{ \"namespace\": \"com.demo.user\", \"name\": \"UserCreatedEvent\", \"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"username\",\"type\":\"string\"},{\"name\":\"phone\",\"type\":\"long\"},{\"name\":\"age\",\"type\":\"int\"}]}' \
--property parse.key=true \
--property key.schema='{\r\n  \"name\": \"UserCreatedEventKey\",\r\n  \"type\": \"record\",\r\n  \"doc\": \"Key for user creation event,\r\n  \"namespace\": \"com.demo.user\",\r\n  \"fields\": [\r\n    {\r\n      \"name\": \"userId\",\r\n      \"type\": \"string\",\r\n      \"doc\": \"Unique identifier for the user\"\r\n    }\r\n  ]\r\n}' \
--property key.separator=" " \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info= username:password

As you can see we are overriding the default name strategy for both Key and Value by providing

--property key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy  \
--property value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy \

This will generate the schema under the subject names

my-kafka-topic-com.demo.UserCreatedEventKey for key schema and
my-kafka-topic-com.demo.UserCreatedEvent for value schema

These schemas can be seen at the https://schemaregistryurl:port/schemas

like image 180
Sanjay Bharwani Avatar answered Jan 21 '26 07:01

Sanjay Bharwani