Skip to content

Commit decb161

Browse files
Feature/395 prevent self deactivation (#397)
* Use the same heartbeat for updateHeartbeat and deactivation * Never deactivate own instance * Additional log output
1 parent 17d0b59 commit decb161

File tree

5 files changed

+29
-22
lines changed

5 files changed

+29
-22
lines changed

src/main/scala/za/co/absa/hyperdrive/trigger/persistance/SchedulerInstanceRepository.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ import scala.util.{Failure, Success}
2828
trait SchedulerInstanceRepository extends Repository {
2929
def insertInstance()(implicit ec: ExecutionContext): Future[Long]
3030

31-
def updateHeartbeat(id: Long)(implicit ec: ExecutionContext): Future[Int]
31+
def updateHeartbeat(id: Long, newHeartbeat: LocalDateTime)(implicit ec: ExecutionContext): Future[Int]
3232

33-
def deactivateLaggingInstances(currentHeartbeat: LocalDateTime, lagTolerance: Duration)(implicit ec: ExecutionContext): Future[Int]
33+
def deactivateLaggingInstances(instanceId: Long, currentHeartbeat: LocalDateTime, lagTolerance: Duration)(implicit ec: ExecutionContext): Future[Int]
3434

3535
def getAllInstances()(implicit ec: ExecutionContext): Future[Seq[SchedulerInstance]]
3636
}
@@ -55,16 +55,17 @@ class SchedulerInstanceRepositoryImpl extends SchedulerInstanceRepository {
5555
}
5656
}
5757

58-
override def updateHeartbeat(id: Long)(implicit ec: ExecutionContext): Future[Int] = db.run {
58+
override def updateHeartbeat(id: Long, newHeartbeat: LocalDateTime)(implicit ec: ExecutionContext): Future[Int] = db.run {
5959
schedulerInstanceTable.filter(_.id === id)
6060
.filter(_.status === LiteralColumn[SchedulerInstanceStatus](SchedulerInstanceStatuses.Active))
6161
.map(_.lastHeartbeat)
62-
.update(LocalDateTime.now())
62+
.update(newHeartbeat)
6363
}
6464

65-
override def deactivateLaggingInstances(currentHeartbeat: LocalDateTime, lagTolerance: Duration)(implicit ec: ExecutionContext): Future[Int] = db.run {
65+
override def deactivateLaggingInstances(instanceId: Long, currentHeartbeat: LocalDateTime, lagTolerance: Duration)(implicit ec: ExecutionContext): Future[Int] = db.run {
6666
schedulerInstanceTable.filter(i => i.lastHeartbeat < currentHeartbeat.minusSeconds(lagTolerance.getSeconds))
6767
.filter(_.status === LiteralColumn[SchedulerInstanceStatus](SchedulerInstanceStatuses.Active))
68+
.filter(_.id =!= instanceId)
6869
.map(_.status)
6970
.update(SchedulerInstanceStatuses.Deactivated)
7071
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class JobScheduler @Inject()(sensors: Sensors, executors: Executors, dagInstance
9191
runningAssignWorkflows = workflowBalancer.getAssignedWorkflows(runningDags.keys.map(_.workflowId).toSeq)
9292
.recover {
9393
case e: SchedulerInstanceAlreadyDeactivatedException =>
94+
logger.error("Stopping scheduler because the instance has already been deactivated", e)
9495
stopManager()
9596
throw e
9697
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/SchedulerInstanceService.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515

1616
package za.co.absa.hyperdrive.trigger.scheduler.cluster
1717

18-
import java.time.{Duration, LocalDateTime}
18+
import org.slf4j.LoggerFactory
1919

20+
import java.time.{Duration, LocalDateTime}
2021
import javax.inject.Inject
2122
import org.springframework.stereotype.Service
2223
import za.co.absa.hyperdrive.trigger.models.SchedulerInstance
@@ -33,18 +34,21 @@ trait SchedulerInstanceService {
3334

3435
@Service
3536
class SchedulerInstanceServiceImpl @Inject()(schedulerInstanceRepository: SchedulerInstanceRepository) extends SchedulerInstanceService {
37+
private val logger = LoggerFactory.getLogger(this.getClass)
3638

3739
override def registerNewInstance()(implicit ec: ExecutionContext): Future[Long] = schedulerInstanceRepository.insertInstance()
3840

3941
override def updateSchedulerStatus(instanceId: Long, lagThreshold: Duration)(implicit ec: ExecutionContext): Future[Seq[SchedulerInstance]] = {
42+
val currentHeartbeat = LocalDateTime.now()
4043
for {
41-
updatedCount <- schedulerInstanceRepository.updateHeartbeat(instanceId)
44+
updatedCount <- schedulerInstanceRepository.updateHeartbeat(instanceId, currentHeartbeat)
4245
_ <- if (updatedCount == 0) {
4346
Future.failed(new SchedulerInstanceAlreadyDeactivatedException)
4447
} else {
4548
Future{}
4649
}
47-
_ <- schedulerInstanceRepository.deactivateLaggingInstances(LocalDateTime.now(), lagThreshold)
50+
deactivatedCount <- schedulerInstanceRepository.deactivateLaggingInstances(instanceId, currentHeartbeat, lagThreshold)
51+
_ = if (deactivatedCount != 0) logger.debug(s"Deactivated $deactivatedCount instances at current heartbeat $currentHeartbeat")
4852
allInstances <- schedulerInstanceRepository.getAllInstances()
4953
} yield allInstances
5054
}

src/test/scala/za/co/absa/hyperdrive/trigger/persistance/SchedulerInstanceRepositoryTest.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,30 +61,31 @@ class SchedulerInstanceRepositoryTest extends FlatSpec with Matchers with Before
6161
}
6262

6363
"updateHeartbeat" should "update the last heartbeat of an active instance" in {
64-
val nowMinusOne = LocalDateTime.now().minusNanos(1L)
65-
val result = await(schedulerInstanceRepository.updateHeartbeat(11))
64+
val newHeartbeat = LocalDateTime.now()
65+
val result = await(schedulerInstanceRepository.updateHeartbeat(11, newHeartbeat))
6666
val updatedInstance = await(db.run(schedulerInstanceTable.filter(_.id === 11L).result.head))
6767

6868
result shouldBe 1
69-
updatedInstance.lastHeartbeat.isAfter(nowMinusOne) shouldBe true
69+
updatedInstance.lastHeartbeat shouldBe newHeartbeat
7070
}
7171

7272
it should "not update a deactivated instance" in {
73-
val result = await(schedulerInstanceRepository.updateHeartbeat(31L))
73+
val newHeartbeat = LocalDateTime.now()
74+
val result = await(schedulerInstanceRepository.updateHeartbeat(31L, newHeartbeat))
7475
result shouldBe 0
7576
}
7677

77-
"deactivateLaggingInstances" should "deactivate lagging instances" in {
78+
"deactivateLaggingInstances" should "deactivate lagging instances, except own instance" in {
7879
val localTime = LocalDateTime.of(2020, 1, 1, 2, 30, 28)
7980
val lagTolerance = Duration.ofSeconds(20L)
8081

81-
val result = await(schedulerInstanceRepository.deactivateLaggingInstances(localTime, lagTolerance))
82+
val result = await(schedulerInstanceRepository.deactivateLaggingInstances(21L, localTime, lagTolerance))
8283
val allInstances = await(db.run(schedulerInstanceTable.result))
8384

84-
result shouldBe 2
85+
result shouldBe 1
8586
allInstances
8687
.filter(_.status == SchedulerInstanceStatuses.Deactivated)
87-
.map(_.id) should contain theSameElementsAs Seq(21L, 22L, 31L)
88+
.map(_.id) should contain theSameElementsAs Seq(22L, 31L)
8889
}
8990

9091
"getAllInstances" should "return all instances" in {

src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/SchedulerInstanceServiceTest.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,30 +61,30 @@ class SchedulerInstanceServiceTest extends AsyncFlatSpec with MockitoSugar with
6161
SchedulerInstance(23, SchedulerInstanceStatuses.Active, LocalDateTime.now()),
6262
SchedulerInstance(24, SchedulerInstanceStatuses.Active, LocalDateTime.now())
6363
)
64-
when(schedulerInstanceRepository.updateHeartbeat(any())(any[ExecutionContext])).thenReturn(Future{1})
64+
when(schedulerInstanceRepository.updateHeartbeat(any(), any())(any[ExecutionContext])).thenReturn(Future{1})
6565
when(schedulerInstanceRepository.getAllInstances()(any[ExecutionContext])).thenReturn(Future{instances})
66-
when(schedulerInstanceRepository.deactivateLaggingInstances(any(), any())(any[ExecutionContext])).thenReturn(Future{0})
66+
when(schedulerInstanceRepository.deactivateLaggingInstances(any(), any(), any())(any[ExecutionContext])).thenReturn(Future{0})
6767

6868
// when
6969
val result = await(underTest.updateSchedulerStatus(23L, lagThreshold))
7070

7171
// then
7272
result shouldBe instances
73-
verify(schedulerInstanceRepository, times(1)).updateHeartbeat(eqTo(23L))(any())
74-
verify(schedulerInstanceRepository, times(1)).deactivateLaggingInstances(any(), eqTo(lagThreshold))(any())
73+
verify(schedulerInstanceRepository, times(1)).updateHeartbeat(eqTo(23L), any())(any())
74+
verify(schedulerInstanceRepository, times(1)).deactivateLaggingInstances(eqTo(23L), any(), eqTo(lagThreshold))(any())
7575
succeed
7676
}
7777

7878
it should "throw an exception if the heartbeat could not be updated" in {
7979
// given
8080
val lagThreshold = Duration.ofSeconds(5L)
81-
when(schedulerInstanceRepository.updateHeartbeat(any())(any[ExecutionContext])).thenReturn(Future{0})
81+
when(schedulerInstanceRepository.updateHeartbeat(any(), any())(any[ExecutionContext])).thenReturn(Future{0})
8282

8383
// when
8484
the [SchedulerInstanceAlreadyDeactivatedException] thrownBy await(underTest.updateSchedulerStatus(23L, lagThreshold))
8585

8686
// then
87-
verify(schedulerInstanceRepository, never).deactivateLaggingInstances(any(), any())(any())
87+
verify(schedulerInstanceRepository, never).deactivateLaggingInstances(any(), any(), any())(any())
8888
verify(schedulerInstanceRepository, never).getAllInstances()(any())
8989
succeed
9090
}

0 commit comments

Comments
 (0)