Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
2 changes: 1 addition & 1 deletion azure/eventprocessorhost/abstract_checkpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def create_checkpoint_if_not_exists_async(self, partition_id):
pass

@abstractmethod
async def update_checkpoint_async(self, lease, checkpoint):
async def update_checkpoint_async(self, lease, checkpoint, event_processor_context=None):
"""
Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.

Expand Down
5 changes: 4 additions & 1 deletion azure/eventprocessorhost/abstract_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from abc import ABC, abstractmethod


class AbstractEventProcessor(ABC):
"""
Abstract that must be extended by event processor classes.
Expand All @@ -13,12 +14,14 @@ def __init__(self, params=None):
pass

@abstractmethod
async def open_async(self, context):
async def open_async(self, context, event_processor_context=None):
"""
Called by processor host to initialize the event processor.

:param context: Information about the partition
:type context: ~azure.eventprocessorhost.partition_context.PartitionContext
:param event_processor_context: State of the Event Processor.
:type event_processor_context: str
"""
pass

Expand Down
3 changes: 2 additions & 1 deletion azure/eventprocessorhost/azure_blob_lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

import json
import asyncio
import json

from azure.eventprocessorhost.lease import Lease

Expand Down Expand Up @@ -48,6 +48,7 @@ def with_blob(self, blob):
self.epoch = content["epoch"]
self.offset = content["offset"]
self.sequence_number = content["sequence_number"]
self.event_processor_context = content.get("event_processor_context")

def with_source(self, lease):
"""
Expand Down
9 changes: 5 additions & 4 deletions azure/eventprocessorhost/azure_storage_checkpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import concurrent.futures
import functools
import asyncio

import requests

from azure.storage.blob import BlockBlobService
Expand Down Expand Up @@ -145,7 +144,7 @@ async def create_checkpoint_if_not_exists_async(self, partition_id):
checkpoint = Checkpoint(partition_id)
return checkpoint

async def update_checkpoint_async(self, lease, checkpoint):
async def update_checkpoint_async(self, lease, checkpoint, event_processor_context=None):
"""
Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint
checkpoint:offset/sequeceNumber to update the store with.
Expand All @@ -159,6 +158,7 @@ async def update_checkpoint_async(self, lease, checkpoint):
new_lease.with_source(lease)
new_lease.offset = checkpoint.offset
new_lease.sequence_number = checkpoint.sequence_number
new_lease.event_processor_context = event_processor_context
return await self.update_lease_async(new_lease)

async def delete_checkpoint_async(self, partition_id):
Expand Down Expand Up @@ -269,11 +269,12 @@ async def create_lease_if_not_exists_async(self, partition_id):
try:
return_lease = AzureBlobLease()
return_lease.partition_id = partition_id
json_lease = json.dumps(return_lease.serializable())
serializable_lease = return_lease.serializable()
json_lease = json.dumps(serializable_lease)
_logger.info("Creating Lease %r %r %r",
self.lease_container_name,
partition_id,
json_lease)
json.dumps({k:v for k, v in serializable_lease.items() if k != 'event_processor_context'}))
await self.host.loop.run_in_executor(
self.executor,
functools.partial(
Expand Down
3 changes: 3 additions & 0 deletions azure/eventprocessorhost/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self):
self.owner = None
self.token = None
self.epoch = 0
self.event_processor_context = None

def with_partition_id(self, partition_id):
"""
Expand All @@ -27,6 +28,7 @@ def with_partition_id(self, partition_id):
self.owner = None
self.token = None
self.epoch = 0
self.event_processor_context = None

def with_source(self, lease):
"""
Expand All @@ -39,6 +41,7 @@ def with_source(self, lease):
self.epoch = lease.epoch
self.owner = lease.owner
self.token = lease.token
self.event_processor_context = lease.event_processor_context

async def is_expired(self):
"""
Expand Down
21 changes: 14 additions & 7 deletions azure/eventprocessorhost/partition_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,24 @@ async def get_initial_offset_async(self): # throws InterruptedException, Executi
self.host.guid, self.partition_id, self.offset, self.sequence_number)
return self.offset

async def checkpoint_async(self):
async def checkpoint_async(self, event_processor_context=None):
"""
Generates a checkpoint for the partition using the curren offset and sequenceNumber for
and persists to the checkpoint manager.
"""
captured_checkpoint = Checkpoint(self.partition_id, self.offset, self.sequence_number)
await self.persist_checkpoint_async(captured_checkpoint)
await self.persist_checkpoint_async(captured_checkpoint, event_processor_context)

async def checkpoint_async_event_data(self, event_data):
async def checkpoint_async_event_data(self, event_data, event_processor_context=None):
"""
Stores the offset and sequenceNumber from the provided received EventData instance,
then writes those values to the checkpoint store via the checkpoint manager.
Optionally stores the state of the Event Processor along the checkpoint.

:param event_data: A received EventData with valid offset and sequenceNumber.
:type event_data: ~azure.eventhub.common.EventData
:param event_processor_context State of the Event Processor.
:type event_processor_context: str
:raises: ValueError if suplied event_data is None.
:raises: ValueError if the sequenceNumber is less than the last checkpointed value.
"""
Expand All @@ -85,7 +88,8 @@ async def checkpoint_async_event_data(self, event_data):

await self.persist_checkpoint_async(Checkpoint(self.partition_id,
event_data.offset.value,
event_data.sequence_number))
event_data.sequence_number),
event_processor_context)

def to_string(self):
"""
Expand All @@ -99,12 +103,14 @@ def to_string(self):
self.partition_id,
self.sequence_number)

async def persist_checkpoint_async(self, checkpoint):
async def persist_checkpoint_async(self, checkpoint, event_processor_context=None):
"""
Persists the checkpoint.
Persists the checkpoint, and - optionally - the state of the Event Processor.

:param checkpoint: The checkpoint to persist.
:type checkpoint: ~azure.eventprocessorhost.checkpoint.Checkpoint
:param event_processor_context: The state of the Event Processor.
:type event_processor_context: str
"""
_logger.debug("PartitionPumpCheckpointStart %r %r %r %r",
self.host.guid, checkpoint.partition_id, checkpoint.offset, checkpoint.sequence_number)
Expand All @@ -115,7 +121,8 @@ async def persist_checkpoint_async(self, checkpoint):
_logger.info("persisting checkpoint %r", checkpoint.__dict__)
await self.host.storage_manager.create_checkpoint_if_not_exists_async(checkpoint.partition_id)

if not await self.host.storage_manager.update_checkpoint_async(self.lease, checkpoint):
if not await self.host.storage_manager.update_checkpoint_async(
self.lease, checkpoint, event_processor_context):
_logger.error("Failed to persist checkpoint for partition: %r", self.partition_id)
raise Exception("failed to persist checkpoint")
self.lease.offset = checkpoint.offset
Expand Down
3 changes: 2 additions & 1 deletion azure/eventprocessorhost/partition_pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ async def open_async(self):
self.partition_context.lease = self.lease
self.processor = self.host.event_processor(self.host.event_processor_params)
try:
await self.processor.open_async(self.partition_context)
event_processor_context = self.lease.event_processor_context
await self.processor.open_async(self.partition_context, event_processor_context)
except Exception as err: # pylint: disable=broad-except
# If the processor won't create or open, only thing we can do here is pass the buck.
# Null it out so we don't try to operate on it further.
Expand Down
21 changes: 19 additions & 2 deletions tests/asynctests/test_checkpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

import pytest
import asyncio
import base64
import pickle
import pytest
import time
from azure.common import AzureException

Expand Down Expand Up @@ -82,7 +84,19 @@ def test_checkpointing(eph, storage_clm):
assert local_checkpoint.offset == "-1"
lease = loop.run_until_complete(storage_clm.get_lease_async("1"))
loop.run_until_complete(storage_clm.acquire_lease_async(lease))
loop.run_until_complete(storage_clm.update_checkpoint_async(lease, local_checkpoint))
event_processor_context = {'some_string_data': 'abc', 'some_int_data': 123, 'a_list': [42]}
event_processor_context_pickled = pickle.dumps(event_processor_context)
event_processor_context_asstring = base64.b64encode(event_processor_context_pickled)
loop.run_until_complete(storage_clm.update_checkpoint_async(
lease, local_checkpoint, event_processor_context_asstring))
cloud_lease = loop.run_until_complete(storage_clm.get_lease_async("1"))
cloud_event_processor_context_asstring = pickle.loads(cloud_lease.event_processor_context)
cloud_event_processor_context_pickled = base64.b64decode(cloud_event_processor_context_asstring)
cloud_event_processor_context = pickle.loads(cloud_event_processor_context_pickled)
assert cloud_event_processor_context['some_string_data'] == 'abc'
assert cloud_event_processor_context['some_int_data'] == '123'
assert cloud_event_processor_context['a_list'] == [42]

cloud_checkpoint = loop.run_until_complete(storage_clm.get_checkpoint_async("1"))
lease.offset = cloud_checkpoint.offset
lease.sequence_number = cloud_checkpoint.sequence_number
Expand All @@ -93,6 +107,9 @@ def test_checkpointing(eph, storage_clm):
modify_checkpoint.sequence_number = "32"
time.sleep(35)
loop.run_until_complete(storage_clm.update_checkpoint_async(lease, modify_checkpoint))
cloud_lease = loop.run_until_complete(storage_clm.get_lease_async("1"))
assert cloud_lease.event_processor_context is None

cloud_checkpoint = loop.run_until_complete(storage_clm.get_checkpoint_async("1"))
assert cloud_checkpoint.partition_id == "1"
assert cloud_checkpoint.offset == "512"
Expand Down