diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index af27d0a6f8fe9..a91383814a9a8 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -880,7 +880,11 @@ class KafkaController(val config: KafkaConfig, // between the moment this broker started and right now when it becomes controller again. loadMinIsrForTopics(controllerContext.allTopics) - rearrangePartitionReplicaAssignmentForNewTopics(controllerContext.allTopics.toSet) + // scan partitions of all topics and ensure they don't lie on partitionUnassignableBrokerIds + // the controllerContext.partitionAssignments is still not initialized yet + // thus every single partition will be checked inside rearrangePartitionReplicaAssignmentForNewPartitions + rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet) + registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach { case (topicPartition, replicaAssignment) => @@ -968,25 +972,28 @@ class KafkaController(val config: KafkaConfig, // Rearrange partition and replica assignment for new topics that get assigned to // maintenance brokers that do not take new partitions - private def rearrangePartitionReplicaAssignmentForNewTopics(topics: Set[String]) { + private def rearrangePartitionReplicaAssignmentForNewPartitions(topicsToCheck: Set[String]) { try { val noNewPartitionBrokers = partitionUnassignableBrokerIds if (noNewPartitionBrokers.nonEmpty) { - val newTopics = zkClient.getPartitionNodeNonExistsTopics(topics.toSet) - val newTopicsToBeArranged = zkClient.getPartitionAssignmentForTopics(newTopics).filter { - case (_, partitionMap) => - partitionMap.exists { + val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter { + case (topic, partitionMap) => + val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty) + val newPartitions = partitionMap.filter{case (partitionId, _) => partitionId >= existingAssignment.size} + newPartitions.exists { case (_, assignedReplicas) => assignedReplicas.replicas.intersect(noNewPartitionBrokers).nonEmpty } } - newTopicsToBeArranged.foreach { + topicsToBeRearranged.foreach { case (topic, partitionMap) => val numPartitions = partitionMap.size val numReplica = partitionMap.head._2.replicas.size val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq - val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, numPartitions, numReplica) + val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty) + val partitionsToAdd = numPartitions - existingAssignment.size + val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, partitionsToAdd, numReplica, -1, existingAssignment.size) adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment.mapValues(ReplicaAssignment(_)).toMap, true) info(s"Rearrange partition and replica assignment for topic [$topic]") } @@ -1697,7 +1704,7 @@ class KafkaController(val config: KafkaConfig, val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics - rearrangePartitionReplicaAssignmentForNewTopics(newTopics) + rearrangePartitionReplicaAssignmentForNewPartitions(newTopics) registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics) @@ -1755,6 +1762,7 @@ class KafkaController(val config: KafkaConfig, } if (!isActive) return + rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic)) val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) => controllerContext.partitionReplicaAssignment(topicPartition).isEmpty @@ -1872,9 +1880,9 @@ class KafkaController(val config: KafkaConfig, } else { val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - + val noNewPartitionBrokers = partitionUnassignableBrokerIds.toSet reassignments.foreach { case (tp, targetReplicas) => - if (replicasAreValid(tp, targetReplicas)) { + if (replicasAreValid(tp, targetReplicas, noNewPartitionBrokers)) { maybeBuildReassignment(tp, targetReplicas) match { case Some(context) => partitionsToReassign.put(tp, context) case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) @@ -1893,7 +1901,8 @@ class KafkaController(val config: KafkaConfig, } } - private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = { + private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]], + noNewPartitionBrokers: Set[Int]): Boolean = { replicasOpt match { case Some(replicas) => val replicaSet = replicas.toSet @@ -1901,7 +1910,10 @@ class KafkaController(val config: KafkaConfig, false else if (replicas.exists(_ < 0)) false - else { + else if (!replicaSet.intersect(noNewPartitionBrokers).isEmpty) { + warn(s"reject reassignment of $topicPartition to unassignable hosts $noNewPartitionBrokers") + false + } else { // Ensure that any new replicas are among the live brokers val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) val newAssignment = currentAssignment.reassignTo(replicas) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 1d338f3f5e9f0..3dbd8c11cbdcb 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -53,7 +53,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { topicConfig: Properties = new Properties, rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = { val brokerMetadatas = getBrokerMetadatas(rackAwareMode) - val noNewPartitionBrokerIds = getMaintenanceBrokerList() + val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor) createTopicWithAssignment(topic, topicConfig, replicaAssignment) } @@ -235,7 +235,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { numPartitions: Int = 1, replicaAssignment: Option[Map[Int, Seq[Int]]] = None, validateOnly: Boolean = false): Map[Int, Seq[Int]] = { - val noNewPartitionBrokerIds = getMaintenanceBrokerList() + val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList addPartitions(topic, existingAssignment, allBrokers, numPartitions, replicaAssignment, validateOnly, noNewPartitionBrokerIds.toSet) } diff --git a/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala b/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala index c97740136623b..336d053ce6443 100644 --- a/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala @@ -18,13 +18,14 @@ package kafka.server import java.util.{Optional, Properties} - import kafka.server.KafkaConfig.fromProps import kafka.utils.CoreUtils._ import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.admin._ +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.InvalidReplicaAssignmentException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol @@ -32,7 +33,8 @@ import scala.collection.JavaConverters._ import org.junit.Assert._ import org.junit.{After, Test} -import scala.collection.Map +import scala.collection.{Map, Seq} +import scala.concurrent.ExecutionException /** * This is the main test which ensure maintenance broker work correctly. @@ -172,6 +174,71 @@ class MaintenanceBrokerTest extends ZooKeeperTestHarness { client.close() } + @Test + def testAddPartitionByAdminZkClientShouldHonorMaintenanceBrokers(): Unit = { + brokers = (0 to 2).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + + TestUtils.waitUntilControllerElected(zkClient) + // setting broker 1 to not take new topic partitions + setMaintenanceBrokers(Seq(1)) + + // create topic using admin client + val topic = "topic1" + TestUtils.createTopic(zkClient, topic, 3, 2, brokers) + + assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1))) + + val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(Set(topic)).map { + case (topicPartition, assignment) => topicPartition.partition -> assignment + } + val allBrokers = adminZkClient.getBrokerMetadatas() + val newPartitionsCount = 5 + adminZkClient.addPartitions(topic, existingAssignment, allBrokers, 5) + (0 until newPartitionsCount).map { i => + TestUtils.waitUntilMetadataIsPropagated(brokers, topic, i) + i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) + } + + assertTrue("topic1 should not be in broker 1 after increasing partition count", + ensureTopicNotInBrokers("topic1", Set(1))) + } + + @Test + def testPartitionReassignmentShouldHonorMaintenanceBrokers(): Unit = { + brokers = (0 to 2).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + + TestUtils.waitUntilControllerElected(zkClient) + // setting broker 1 to not take new topic partitions + setMaintenanceBrokers(Seq(1)) + + // create topic using admin client + val topic = "topic1" + TestUtils.createTopic(zkClient, topic, 1, 2, brokers) + assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1))) + + // get the admin client + val adminClientConfig = new Properties + val brokerList = TestUtils.bootstrapServers(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val client = AdminClient.create(adminClientConfig) + + val reassignmentsResult = client.alterPartitionReassignments(Map(reassignmentEntry(new TopicPartition(topic, 0), Seq(0, 1))).asJava) + var reassignmentFailed = false + try { + reassignmentsResult.all().get() + } catch { + case e : ExecutionException => + assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + reassignmentFailed = true + } + assertTrue("the partition reassignment should have failed", reassignmentFailed) + client.close() + } + + def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment]) = + tp -> Optional.of(new NewPartitionReassignment((replicas.map(_.asInstanceOf[Integer]).asJava))) + + @Test def testTopicCreatedInZkShouldBeRearrangedForMaintenanceBrokers(): Unit = {