diff --git a/provider/service.py b/provider/service.py index fa8e1090..b8a272f4 100644 --- a/provider/service.py +++ b/provider/service.py @@ -68,11 +68,6 @@ def run(self): # check whether or not the feed is capable of detecting canary # documents if change != None: - # Record the sequence in case the changes feed needs to be - # restarted. This way the new feed can pick up right where - # the old one left off. - self.lastSequence = change['seq'] - if "deleted" in change and change["deleted"] == True: logging.info('[changes] Found a delete') consumer = self.consumers.getConsumerForTrigger(change['id']) @@ -109,7 +104,20 @@ def run(self): elif triggerIsAssignedToMe: logging.info('[{}] Found a change to an existing trigger'.format(change["id"])) - if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document): + if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document): + # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state, + # so we need to forcefully delete the process before recreating it. + logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"])) + + if existingConsumer.process.is_alive(): + logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger)) + existingConsumer.process.join(1) + else: + logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger)) + + self.consumers.removeConsumerForTrigger(existingConsumer.trigger) + self.createAndRunConsumer(document) + elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document): # disabled trigger has become active logging.info('[{}] Existing disabled trigger should become active'.format(change["id"])) self.createAndRunConsumer(document) @@ -123,6 +131,11 @@ def run(self): self.lastCanaryTime = datetime.now() else: logging.debug('[changes] Found a change for a non-trigger document') + + # Record the sequence in case the changes feed needs to be + # restarted. This way the new feed can pick up right where + # the old one left off. + self.lastSequence = change['seq'] except Exception as e: logging.error('[canary] Exception caught from changes feed. Restarting changes feed...') logging.error(e) diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala index 0c05d635..9974bf5a 100644 --- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala +++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala @@ -16,10 +16,9 @@ */ package system.packages -import system.utils.KafkaUtils - import scala.concurrent.duration.DurationInt import scala.language.postfixOps +import system.utils.KafkaUtils import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpec @@ -40,6 +39,8 @@ import common.TestUtils.NOT_FOUND import org.apache.openwhisk.utils.retry import org.apache.openwhisk.core.entity.Annotations import java.util.concurrent.ExecutionException +import common.ActivationResult +import common.TestUtils.SUCCESS_EXIT @RunWith(classOf[JUnitRunner]) class MessageHubFeedTests @@ -113,6 +114,62 @@ class MessageHubFeedTests runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false) } + it should "create a trigger, delete that trigger, and quickly create it again with successful trigger fires" in withAssetCleaner(wskprops) { + val currentTime = s"${System.currentTimeMillis}" + + (wp, assetHelper) => + val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" + val ruleName = s"dummyMessageHub-helloKafka-$currentTime" + val parameters = Map( + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), + "topic" -> topic.toJson + ) + + val key = "TheKey" + val verificationName = s"trigger-$currentTime" + val defaultAction = Some("dat/createTriggerActions.js") + val defaultActionName = s"helloKafka-$currentTime" + + createTrigger(assetHelper, triggerName, parameters) + + assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => + action.create(name, defaultAction, annotations = Map(Annotations.ProvideApiKeyAnnotationName -> JsBoolean(true))) + } + + assetHelper.withCleaner(wsk.rule, ruleName) { (rule, name) => + rule.create(name, trigger = triggerName, action = defaultActionName) + } + + assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + + produceMessage(topic, key, verificationName) + retry(wsk.trigger.get(verificationName), 60, Some(1.second)) + + wsk.trigger.delete(verificationName, expectedExitCode = SUCCESS_EXIT) + wsk.trigger.delete(triggerName, expectedExitCode = SUCCESS_EXIT) + + val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters) + val activation = wsk.parseJsonString(feedCreationResult.stdout.substring(0, feedCreationResult.stdout.indexOf("ok: created trigger"))).convertTo[ActivationResult] + activation.response.success shouldBe true + + wsk.rule.enable(ruleName, expectedExitCode = SUCCESS_EXIT) + + println("Giving the consumer a moment to get ready") + Thread.sleep(KafkaUtils.consumerInitTime) + + val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "") + consumerExists(uuid) + + produceMessage(topic, key, verificationName) + retry(wsk.trigger.get(verificationName), 60, Some(1.second)) + } + it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) { // payload size should be under the payload limit, but greater than 50% of the limit val testPayloadSize = 600000