# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults {% if quorum_info.using_kraft %} # The role(s) of this server. Setting this puts us in KRaft metadata quorum mode {% if node_quorum_info.has_combined_broker_and_controller_roles %} process.roles=broker,controller {% elif node_quorum_info.has_controller_role %} process.roles=controller {% else %} process.roles=broker {% endif %} # The connect string for the controller quorum controller.quorum.voters={{ controller_quorum_voters }} controller.listener.names={{ controller_listener_names }} {% endif %} listeners={{ listeners }} listener.security.protocol.map={{ listener_security_protocol_map }} {% if quorum_info.using_zk or quorum_info.has_brokers %} advertised.host.name={{ node.account.hostname }} advertised.listeners={{ advertised_listeners }} {% if node.version.supports_named_listeners() %} inter.broker.listener.name={{ interbroker_listener.name }} {% else %} security.inter.broker.protocol={{ interbroker_listener.security_protocol }} {% endif %} {% endif %} {% for k, v in listener_security_config.client_listener_overrides.items() %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }} {% endif %} {% endfor %} {% if quorum_info.using_zk or quorum_info.has_brokers %} {% if interbroker_listener.name != security_protocol %} {% for k, v in listener_security_config.interbroker_listener_overrides.items() %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }} {% endif %} {% endfor %} {% endif %} {% endif %} {% if security_config.tls_version is not none %} ssl.enabled.protocols={{ security_config.tls_version }} ssl.protocol={{ security_config.tls_version }} {% endif %} ssl.keystore.location=/mnt/security/test.keystore.jks ssl.keystore.password=test-ks-passwd ssl.key.password=test-ks-passwd ssl.keystore.type=JKS ssl.truststore.location=/mnt/security/test.truststore.jks ssl.truststore.password=test-ts-passwd ssl.truststore.type=JKS ssl.endpoint.identification.algorithm=HTTPS {% if quorum_info.using_zk %} # Zookeeper TLS settings # # Note that zookeeper.ssl.client.enable will be set to true or false elsewhere, as appropriate. # If it is false then these ZK keystore/truststore settings will have no effect. If it is true then # zookeeper.clientCnxnSocket will also be set elsewhere (to org.apache.zookeeper.ClientCnxnSocketNetty) {% if not zk.zk_tls_encrypt_only %} zookeeper.ssl.keystore.location=/mnt/security/test.keystore.jks zookeeper.ssl.keystore.password=test-ks-passwd {% endif %} zookeeper.ssl.truststore.location=/mnt/security/test.truststore.jks zookeeper.ssl.truststore.password=test-ts-passwd {% endif %} # {% if quorum_info.using_zk or quorum_info.has_brokers %} sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }} {% endif %} {% if quorum_info.using_kraft %} {% if not quorum_info.has_brokers %} sasl.mechanism.controller.protocol={{ intercontroller_sasl_mechanism }} {% else %} sasl.mechanism.controller.protocol={{ controller_quorum.controller_sasl_mechanism }} {% endif %} {% endif %} sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }} sasl.kerberos.service.name=kafka {% if authorizer_class_name is not none %} ssl.client.auth=required authorizer.class.name={{ authorizer_class_name }} {% endif %} {% if quorum_info.using_zk %} zookeeper.set.acl={{"true" if zk_set_acl else "false"}} zookeeper.connection.timeout.ms={{ zk_connect_timeout }} zookeeper.session.timeout.ms={{ zk_session_timeout }} {% endif %} {% if replica_lag is defined %} replica.lag.time.max.ms={{replica_lag}} {% endif %} {% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %} auto.create.topics.enable={{ auto_create_topics_enable }} {% endif %} offsets.topic.num.partitions={{ num_nodes }} offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }} # Set to a low, but non-zero value to exercise this path without making tests much slower group.initial.rebalance.delay.ms=100