diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 3519efd3fcb10..9bdc30e446646 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.util.control.NonFatal import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder} -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ @@ -360,16 +360,22 @@ class ExecutorPodsAllocator( private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = { if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) && driverPod.nonEmpty) { - val createdPVCs = kubernetesClient - .persistentVolumeClaims - .withLabel("spark-app-selector", applicationId) - .list() - .getItems - .asScala - - val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) - logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") - reusablePVCs + try { + val createdPVCs = kubernetesClient + .persistentVolumeClaims + .withLabel("spark-app-selector", applicationId) + .list() + .getItems + .asScala + + val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) + logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") + reusablePVCs + } catch { + case _: KubernetesClientException => + logInfo("Cannot list PVC resources. Please check account permissions.") + mutable.Buffer.empty[PersistentVolumeClaim] + } } else { mutable.Buffer.empty[PersistentVolumeClaim] } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 87bd8ef3d9dd0..7ce0b57d1e9f3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -20,9 +20,10 @@ import java.time.Instant import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ +import scala.collection.mutable import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException} import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.{any, eq => meq} @@ -762,6 +763,13 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { " namespace default")) } + test("SPARK-39688: getReusablePVCs should handle accounts with no PVC permission") { + val getReusablePVCs = + PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs")) + when(persistentVolumeClaimList.getItems).thenThrow(new KubernetesClientException("Error")) + podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq.empty[String]) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)