2022-03-15 16:38:17 +08:00

145 lines
6.8 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the types of metadata quorums we support
zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s)
colocated_kraft = 'COLOCATED_KRAFT' # co-located Controllers in KRaft mode, used during/after the KIP-500 bridge release(s)
remote_kraft = 'REMOTE_KRAFT' # separate Controllers in KRaft mode, used during/after the KIP-500 bridge release(s)
# How we will parameterize tests that exercise all quorum styles
# [“ZK”, “REMOTE_KRAFT”, "COLOCATED_KRAFT"] during the KIP-500 bridge release(s)
# [“REMOTE_KRAFT”, "COLOCATED_KRAFT”] after the KIP-500 bridge release(s)
all = [zk, remote_kraft, colocated_kraft]
# How we will parameterize tests that exercise all KRaft quorum styles
all_kraft = [remote_kraft, colocated_kraft]
# How we will parameterize tests that are unrelated to upgrades:
# [“ZK”] before the KIP-500 bridge release(s)
# [“ZK”, “REMOTE_KRAFT”] during the KIP-500 bridge release(s) and in preview releases
# [“REMOTE_KRAFT”] after the KIP-500 bridge release(s)
all_non_upgrade = [zk, remote_kraft]
def for_test(test_context):
# A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper
default_quorum_type = zk
arg_name = 'metadata_quorum'
retval = default_quorum_type if not test_context.injected_args else test_context.injected_args.get(arg_name, default_quorum_type)
if retval not in all:
raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval))
return retval
class ServiceQuorumInfo:
"""
Exposes quorum-related information for a KafkaService
Kafka can use either ZooKeeper or a KRaft (Kafka Raft) Controller quorum for
its metadata. KRaft Controllers can either be co-located with Kafka in
the same JVM or remote in separate JVMs. The choice is made via
the 'metadata_quorum' parameter defined for the system test: if it
is not explicitly defined, or if it is set to 'ZK', then ZooKeeper
is used. If it is explicitly set to 'COLOCATED_KRAFT' then KRaft
controllers will be co-located with the brokers; the value
`REMOTE_KRAFT` indicates remote controllers.
Attributes
----------
kafka : KafkaService
The service for which this instance exposes quorum-related
information
quorum_type : str
COLOCATED_KRAFT, REMOTE_KRAFT, or ZK
using_zk : bool
True iff quorum_type==ZK
using_kraft : bool
False iff quorum_type==ZK
has_brokers : bool
Whether there is at least one node with process.roles
containing 'broker'. True iff using_kraft and the Kafka
service doesn't itself have a remote Kafka service (meaning
it is not a remote controller quorum).
has_controllers : bool
Whether there is at least one node with process.roles
containing 'controller'. True iff quorum_type ==
COLOCATED_KRAFT or the Kafka service itself has a remote Kafka
service (meaning it is a remote controller quorum).
has_brokers_and_controllers :
True iff quorum_type==COLOCATED_KRAFT
"""
def __init__(self, kafka, context):
"""
:param kafka : KafkaService
The service for which this instance exposes quorum-related
information
:param context : TestContext
The test context within which the this instance and the
given Kafka service is being instantiated
"""
quorum_type = for_test(context)
if quorum_type != zk and kafka.zk and not kafka.allow_zk_with_kraft:
raise Exception("Cannot use ZooKeeper while specifying a KRaft metadata quorum unless explicitly allowing it")
if kafka.remote_kafka and quorum_type != remote_kraft:
raise Exception("Cannot specify a remote Kafka service unless using a remote KRaft metadata quorum (should not happen)")
self.kafka = kafka
self.quorum_type = quorum_type
self.using_zk = quorum_type == zk
self.using_kraft = not self.using_zk
self.has_brokers = self.using_kraft and not kafka.remote_kafka
self.has_controllers = quorum_type == colocated_kraft or kafka.remote_kafka
self.has_brokers_and_controllers = quorum_type == colocated_kraft
class NodeQuorumInfo:
"""
Exposes quorum-related information for a node in a KafkaService
Attributes
----------
service_quorum_info : ServiceQuorumInfo
The quorum information about the service to which the node
belongs
has_broker_role : bool
True iff using_kraft and the Kafka service doesn't itself have
a remote Kafka service (meaning it is not a remote controller)
has_controller_role : bool
True iff quorum_type==COLOCATED_KRAFT and the node is one of
the first N in the cluster where N is the number of nodes
that have a controller role; or the Kafka service itself has a
remote Kafka service (meaning it is a remote controller
quorum).
has_combined_broker_and_controller_roles :
True iff has_broker_role==True and has_controller_role==true
"""
def __init__(self, service_quorum_info, node):
"""
:param service_quorum_info : ServiceQuorumInfo
The quorum information about the service to which the node
belongs
:param node : Node
The particular node for which this information applies.
In the co-located case, whether or not a node's broker's
process.roles contains 'controller' may vary based on the
particular node if the number of controller nodes is less
than the number of nodes in the service.
"""
self.service_quorum_info = service_quorum_info
self.has_broker_role = self.service_quorum_info.has_brokers
idx = self.service_quorum_info.kafka.nodes.index(node)
self.has_controller_role = self.service_quorum_info.kafka.num_nodes_controller_role > idx
self.has_combined_broker_and_controller_roles = self.has_broker_role and self.has_controller_role