-
Notifications
You must be signed in to change notification settings - Fork 41
Fix consumer dead state bug #364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,11 +68,6 @@ def run(self): | |
| # check whether or not the feed is capable of detecting canary | ||
| # documents | ||
| if change != None: | ||
| # Record the sequence in case the changes feed needs to be | ||
| # restarted. This way the new feed can pick up right where | ||
| # the old one left off. | ||
| self.lastSequence = change['seq'] | ||
|
|
||
| if "deleted" in change and change["deleted"] == True: | ||
| logging.info('[changes] Found a delete') | ||
| consumer = self.consumers.getConsumerForTrigger(change['id']) | ||
|
|
@@ -109,7 +104,20 @@ def run(self): | |
| elif triggerIsAssignedToMe: | ||
| logging.info('[{}] Found a change to an existing trigger'.format(change["id"])) | ||
|
|
||
| if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document): | ||
| if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document): | ||
| # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state, | ||
| # so we need to forcefully delete the process before recreating it. | ||
| logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"])) | ||
|
|
||
| if existingConsumer.process.is_alive(): | ||
| logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger)) | ||
| existingConsumer.process.join(1) | ||
| else: | ||
| logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger)) | ||
|
|
||
| self.consumers.removeConsumerForTrigger(existingConsumer.trigger) | ||
| self.createAndRunConsumer(document) | ||
| elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document): | ||
| # disabled trigger has become active | ||
| logging.info('[{}] Existing disabled trigger should become active'.format(change["id"])) | ||
| self.createAndRunConsumer(document) | ||
|
|
@@ -123,6 +131,11 @@ def run(self): | |
| self.lastCanaryTime = datetime.now() | ||
| else: | ||
| logging.debug('[changes] Found a change for a non-trigger document') | ||
|
|
||
| # Record the sequence in case the changes feed needs to be | ||
| # restarted. This way the new feed can pick up right where | ||
| # the old one left off. | ||
| self.lastSequence = change['seq'] | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this in case of an unexpected exceptions that could happen, allowing us to pick up where we left off. |
||
| except Exception as e: | ||
| logging.error('[canary] Exception caught from changes feed. Restarting changes feed...') | ||
| logging.error(e) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,10 +16,9 @@ | |
| */ | ||
| package system.packages | ||
|
|
||
| import system.utils.KafkaUtils | ||
|
|
||
| import scala.concurrent.duration.DurationInt | ||
| import scala.language.postfixOps | ||
| import system.utils.KafkaUtils | ||
| import org.junit.runner.RunWith | ||
| import org.scalatest.BeforeAndAfterAll | ||
| import org.scalatest.FlatSpec | ||
|
|
@@ -40,6 +39,8 @@ import common.TestUtils.NOT_FOUND | |
| import org.apache.openwhisk.utils.retry | ||
| import org.apache.openwhisk.core.entity.Annotations | ||
| import java.util.concurrent.ExecutionException | ||
| import common.ActivationResult | ||
| import common.TestUtils.SUCCESS_EXIT | ||
|
|
||
| @RunWith(classOf[JUnitRunner]) | ||
| class MessageHubFeedTests | ||
|
|
@@ -113,6 +114,62 @@ class MessageHubFeedTests | |
| runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false) | ||
| } | ||
|
|
||
| it should "create a trigger, delete that trigger, and quickly create it again with successful trigger fires" in withAssetCleaner(wskprops) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test fails when used against provider without the included updates. |
||
| val currentTime = s"${System.currentTimeMillis}" | ||
|
|
||
| (wp, assetHelper) => | ||
| val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" | ||
| val ruleName = s"dummyMessageHub-helloKafka-$currentTime" | ||
| val parameters = Map( | ||
| "user" -> getAsJson("user"), | ||
| "password" -> getAsJson("password"), | ||
| "api_key" -> getAsJson("api_key"), | ||
| "kafka_admin_url" -> getAsJson("kafka_admin_url"), | ||
| "kafka_brokers_sasl" -> getAsJson("brokers"), | ||
| "topic" -> topic.toJson | ||
| ) | ||
|
|
||
| val key = "TheKey" | ||
| val verificationName = s"trigger-$currentTime" | ||
| val defaultAction = Some("dat/createTriggerActions.js") | ||
| val defaultActionName = s"helloKafka-$currentTime" | ||
|
|
||
| createTrigger(assetHelper, triggerName, parameters) | ||
|
|
||
| assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => | ||
| action.create(name, defaultAction, annotations = Map(Annotations.ProvideApiKeyAnnotationName -> JsBoolean(true))) | ||
| } | ||
|
|
||
| assetHelper.withCleaner(wsk.rule, ruleName) { (rule, name) => | ||
| rule.create(name, trigger = triggerName, action = defaultActionName) | ||
| } | ||
|
|
||
| assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) => | ||
| trigger.get(name, NOT_FOUND) | ||
| } | ||
|
|
||
| produceMessage(topic, key, verificationName) | ||
| retry(wsk.trigger.get(verificationName), 60, Some(1.second)) | ||
|
|
||
| wsk.trigger.delete(verificationName, expectedExitCode = SUCCESS_EXIT) | ||
| wsk.trigger.delete(triggerName, expectedExitCode = SUCCESS_EXIT) | ||
|
|
||
| val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters) | ||
| val activation = wsk.parseJsonString(feedCreationResult.stdout.substring(0, feedCreationResult.stdout.indexOf("ok: created trigger"))).convertTo[ActivationResult] | ||
| activation.response.success shouldBe true | ||
|
|
||
| wsk.rule.enable(ruleName, expectedExitCode = SUCCESS_EXIT) | ||
|
|
||
| println("Giving the consumer a moment to get ready") | ||
| Thread.sleep(KafkaUtils.consumerInitTime) | ||
|
|
||
| val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "") | ||
| consumerExists(uuid) | ||
|
|
||
| produceMessage(topic, key, verificationName) | ||
| retry(wsk.trigger.get(verificationName), 60, Some(1.second)) | ||
| } | ||
|
|
||
| it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) { | ||
| // payload size should be under the payload limit, but greater than 50% of the limit | ||
| val testPayloadSize = 600000 | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For background info, there is a nanny running periodically that deletes consumers that are in a dead state. Since the consumer is already in a dead state waiting for the nanny to close the process, a quick recreate of the consumer -- within a 2 second time period -- will be ignored. Instead of ignoring the recreate we will manually delete the consumer so that it can be recreated. Normally a graceful shutdown using the nanny is preferred, but we have no other choice here.