Skip to content

Commit 94df974

Browse files
zmalikZain Malik
authored andcommitted
Add mesos primitives, Unavailability support
1 parent f47aa98 commit 94df974

File tree

3 files changed

+77
-3
lines changed

3 files changed

+77
-3
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.apache.mesos.chronos.scheduler.mesos
2+
3+
import java.util.logging.Logger
4+
5+
import org.apache.mesos.Protos
6+
7+
/**
8+
* Helper for checking availability using mesos primitives
9+
*/
10+
object AvailabilityChecker {
11+
12+
private[this] val log = Logger.getLogger(getClass.getName)
13+
14+
def checkAvailability(offer: Protos.Offer): Boolean = {
15+
val now = System.nanoTime()
16+
if (offer.hasUnavailability && offer.getUnavailability.hasStart) {
17+
val start = offer.getUnavailability.getStart.getNanoseconds
18+
if (now.>=(start)) {
19+
if (offer.getUnavailability.hasDuration) {
20+
return start.+(offer.getUnavailability.getDuration.getNanoseconds).<(now)
21+
} else {
22+
return false;
23+
}
24+
25+
}
26+
}
27+
return true
28+
}
29+
30+
}

src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFramework.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class MesosJobFramework @Inject()(
131131
case None =>
132132
val neededResources = new Resources(job)
133133
offerResources.toIterator.find { ors =>
134-
ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints)
134+
ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints) && AvailabilityChecker.checkAvailability(ors._1)
135135
} match {
136136
case Some((offer, resources)) =>
137137
// Subtract this job's resource requirements from the remaining available resources in this offer.

src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosJobFrameworkSpec.scala

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package org.apache.mesos.chronos.scheduler.mesos
22

3+
import java.util.concurrent.TimeUnit
4+
35
import mesosphere.mesos.protos._
46
import mesosphere.mesos.util.FrameworkIdUtil
5-
import org.apache.mesos.Protos.Offer
7+
import org.apache.mesos.Protos.{DurationInfo, Offer, TimeInfo, Unavailability}
68
import org.apache.mesos.chronos.ChronosTestHelper._
79
import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, JobScheduler, MockJobUtils, TaskManager}
810
import org.apache.mesos.{Protos, SchedulerDriver}
@@ -76,6 +78,33 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
7678
there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance)
7779
}
7880

81+
"Reject unavailable offer" in {
82+
import mesosphere.mesos.protos.Implicits._
83+
84+
import scala.collection.JavaConverters._
85+
86+
val mockDriverFactory = MockJobUtils.mockDriverFactory
87+
val mockSchedulerDriver = mockDriverFactory.get
88+
89+
val mesosJobFramework = spy(
90+
new MesosJobFramework(
91+
mockDriverFactory,
92+
mock[JobScheduler],
93+
mock[TaskManager],
94+
makeConfig(),
95+
mock[FrameworkIdUtil],
96+
mock[MesosTaskBuilder],
97+
mock[MesosOfferReviver]))
98+
99+
val tasks = mutable.Buffer[(String, BaseJob, Offer)]()
100+
doReturn(tasks).when(mesosJobFramework).generateLaunchableTasks(any)
101+
102+
val offer: Offer = makeUnavailableOffer
103+
mesosJobFramework.resourceOffers(mockSchedulerDriver, Seq[Protos.Offer](offer).asJava)
104+
105+
there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance)
106+
}
107+
79108
"Reject unused offers with default RefuseSeconds if --decline_offer_duration is not set" in {
80109
import mesosphere.mesos.protos.Implicits._
81110

@@ -176,6 +205,22 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
176205
}
177206

178207
private[this] def makeBasicOffer: Offer = {
208+
209+
makeBasicOfferBuilder
210+
.build()
211+
}
212+
213+
private[this] def makeUnavailableOffer: Offer = {
214+
215+
makeBasicOfferBuilder.setUnavailability(
216+
Unavailability.newBuilder()
217+
.setStart(TimeInfo.newBuilder().setNanoseconds(System.nanoTime()))
218+
.setDuration(DurationInfo.newBuilder().setNanoseconds(TimeUnit.DAYS.toNanos(1)))
219+
.build())
220+
.build()
221+
}
222+
223+
private[this] def makeBasicOfferBuilder: Offer.Builder = {
179224
import mesosphere.mesos.protos.Implicits._
180225

181226
Protos.Offer.newBuilder()
@@ -186,7 +231,6 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
186231
.addResources(ScalarResource(Resource.CPUS, 1, "*"))
187232
.addResources(ScalarResource(Resource.MEM, 100, "*"))
188233
.addResources(ScalarResource(Resource.DISK, 100, "*"))
189-
.build()
190234
}
191235

192236

0 commit comments

Comments
 (0)