Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring-kafka application.properties configuration for JAAS/SASL not working

Use Case:
I am using Spring Boot 2.2.5.RELEASE and Kafka 2.4.1
JAAS/SASL configurations are done properly on Kafka/ZooKeeper as topics are created without issue with kafka-topics.bat

Issue:
When i start Spring Boot application, i immediately get the following errors:

kafka-server-start.bat console:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

IDE console:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected

My application.properties configuration:

spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";

kafka_server_jaas.conf:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="12345"
    user_admin="12345"
    user_spring_bO0t="i_am_a_spring_bO0t_user";
};

Am i missing something?

Thanks in advance.

like image 404
jumping_monkey Avatar asked Feb 01 '26 22:02

jumping_monkey


2 Answers

I defined the properties in the wrong place i.e in application.properties. As i have ProducerFactory & ConsumerFactory beans, those application.properties will be ignored by Spring Boot.

Configuring the same properties in the beans definitions resolved the issue, i.e move your properties from application.properties to where you define your beans.

Here's an example:

@Bean
public ProducerFactory<Object, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
            "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
    ));
        
    return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
        "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
    ));

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
    configs.put("security.protocol", "SASL_PLAINTEXT");
    configs.put("sasl.mechanism", "PLAIN");
    configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + 
                                    "username=username" + 
                                    "password=password;");
    
    return new KafkaAdmin(configs);
}
like image 154
jumping_monkey Avatar answered Feb 03 '26 11:02

jumping_monkey


The answer provided by @jumping_monkey is correct, however I didn't know where to put those configurations in ProducerFactory & ConsumerFactory beans, so I'll leave an example below for those who want to know:

-In your ProducerConfig or ConsumerConfig Beans respectively (Mine is named generalMessageProducerFactory):

@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put("sasl.mechanism", "PLAIN");
    configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='YOUR_KAFKA_CLUSTER_USERNAME'   password='YOUR_KAFKA_CLUSTER_PASSWORD';");
    configProps.put("security.protocol", "SASL_SSL");
    return new DefaultKafkaProducerFactory<>(configProps);
}

And also in your TopicConfiguration Class in kafkaAdmin method:

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configs.put("sasl.mechanism", "PLAIN");
    configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='YOUR_KAFKA_CLUSTER_USERNAME'   password='YOUR_KAFKA_CLUSTER_PASSWORD';");
    configs.put("security.protocol", "SASL_SSL");
    return new KafkaAdmin(configs);
}

Hope this was helpful guys!

like image 43
Salvador Nava Avatar answered Feb 03 '26 11:02

Salvador Nava