caseNewReplica => val assignment = controllerContext.partitionFullReplicaAssignment(partition) if (!assignment.replicas.contains(replicaId)) { error(s"Adding replica ($replicaId) that is not part of the assignment $assignment") val newAssignment = assignment.copy(replicas = assignment.replicas :+ replicaId) controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment) }
caseNewReplica => validReplicas.foreach { replica => val partition = replica.topicPartition val currentState = controllerContext.replicaState(replica)
controllerContext.partitionLeadershipInfo.get(partition) match { caseSome(leaderIsrAndControllerEpoch) => if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) { val exception = newStateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader") logFailedStateChange(replica, currentState, OfflineReplica, exception) } else { controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId), replica.topicPartition, leaderIsrAndControllerEpoch, controllerContext.partitionFullReplicaAssignment(replica.topicPartition), isNew = true) logSuccessfulTransition(replicaId, partition, currentState, NewReplica) controllerContext.putReplicaState(replica, NewReplica) } caseNone => logSuccessfulTransition(replicaId, partition, currentState, NewReplica) controllerContext.putReplicaState(replica, NewReplica) } }
/** * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in * zookeeper */ privatedefinitializePartitionState(): Unit = { for (topicPartition <- controllerContext.allPartitions) { // check if leader and isr path exists for partition. If not, then it is in NEW state //检查leader和isr路径是否存在 controllerContext.partitionLeadershipInfo.get(topicPartition) match { caseSome(currentLeaderIsrAndEpoch) => if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition)) // leader is alive controllerContext.putPartitionState(topicPartition, OnlinePartition) else controllerContext.putPartitionState(topicPartition, OfflinePartition) caseNone => controllerContext.putPartitionState(topicPartition, NewPartition) } } }
deftriggerOnlinePartitionStateChange(): Unit = { val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition)) triggerOnlineStateChangeForPartitions(partitions) }
privatedeftriggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = { // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions // that belong to topics to be deleted val partitionsToTrigger = partitions.filter { partition => !controllerContext.isTopicQueuedUpForDeletion(partition.topic) }.toSeq
handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false))) // TODO: If handleStateChanges catches an exception, it is not enough to bail out and log an error. // It is important to trigger leader election for those partitions. }