Skip to content

Commit dcd700c

Browse files
dongjoon-hyunsenthh
authored andcommitted
[SPARK-39688][K8S] getReusablePVCs should handle accounts with no PVC permission
### What changes were proposed in this pull request? This PR aims to handle `KubernetesClientException` in `getReusablePVCs` method to handle gracefully the cases where accounts has no PVC permission including `listing`. ### Why are the changes needed? To prevent a regression in Apache Spark 3.4. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added test case. Closes apache#37095 from dongjoon-hyun/SPARK-39688. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 79f133b)
1 parent b34fee6 commit dcd700c

File tree

2 files changed

+25
-16
lines changed

2 files changed

+25
-16
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.mutable
2525
import scala.util.control.NonFatal
2626

2727
import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder}
28-
import io.fabric8.kubernetes.client.KubernetesClient
28+
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
2929

3030
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
3131
import org.apache.spark.deploy.k8s.Config._
@@ -360,20 +360,22 @@ class ExecutorPodsAllocator(
360360
private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = {
361361
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) &&
362362
driverPod.nonEmpty) {
363-
val createdPVCs = kubernetesClient
364-
.persistentVolumeClaims
365-
.withLabel("spark-app-selector", applicationId)
366-
.list()
367-
.getItems
368-
.asScala
369-
370-
val now = Instant.now().toEpochMilli
371-
val reusablePVCs = createdPVCs
372-
.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
373-
.filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
374-
> podAllocationDelay)
375-
logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs")
376-
reusablePVCs
363+
try {
364+
val createdPVCs = kubernetesClient
365+
.persistentVolumeClaims
366+
.withLabel("spark-app-selector", applicationId)
367+
.list()
368+
.getItems
369+
.asScala
370+
371+
val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
372+
logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs")
373+
reusablePVCs
374+
} catch {
375+
case _: KubernetesClientException =>
376+
logInfo("Cannot list PVC resources. Please check account permissions.")
377+
mutable.Buffer.empty[PersistentVolumeClaim]
378+
}
377379
} else {
378380
mutable.Buffer.empty[PersistentVolumeClaim]
379381
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
2424
import scala.collection.mutable
2525

2626
import io.fabric8.kubernetes.api.model._
27-
import io.fabric8.kubernetes.client.KubernetesClient
27+
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
2828
import io.fabric8.kubernetes.client.dsl.PodResource
2929
import org.mockito.{Mock, MockitoAnnotations}
3030
import org.mockito.ArgumentMatchers.{any, eq => meq}
@@ -765,6 +765,13 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
765765
" namespace default"))
766766
}
767767

768+
test("SPARK-39688: getReusablePVCs should handle accounts with no PVC permission") {
769+
val getReusablePVCs =
770+
PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs"))
771+
when(persistentVolumeClaimList.getItems).thenThrow(new KubernetesClientException("Error"))
772+
podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq.empty[String])
773+
}
774+
768775
test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in the previous batch") {
769776
val getReusablePVCs =
770777
PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs"))

0 commit comments

Comments
 (0)