Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 656afa0

Browse files
konstantinmillerannatisch
authored andcommitted
Added support for storing the state of the Event Processor along the Checkpoint (#84)
* Updates for release 1.2.0 (#81) * Made setup 2.7 compatible * Separated async tests * Support 2.7 types * Bumped version * Added non-ascii tests * Fix CI * Fix Py27 pylint * Added iot sample * Updated sender/receiver client opening * bumped version * Updated tests * Fixed test name * Fixed test env settings * Skip eph test * Added support for storing the state of the Event Processor along the Checkpoint. Both Checkpoint and the EP state are stored as pickled objects. * Fixing pylint complaints. * Switched from pickle back to JSON for lease persistence. * Fixes bug when accessing leases that don't contain EP context. Also, minor renaming.
1 parent 7403e9f commit 656afa0

File tree

8 files changed

+50
-17
lines changed

8 files changed

+50
-17
lines changed

azure/eventprocessorhost/abstract_checkpoint_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async def create_checkpoint_if_not_exists_async(self, partition_id):
5454
pass
5555

5656
@abstractmethod
57-
async def update_checkpoint_async(self, lease, checkpoint):
57+
async def update_checkpoint_async(self, lease, checkpoint, event_processor_context=None):
5858
"""
5959
Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
6060

azure/eventprocessorhost/abstract_event_processor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from abc import ABC, abstractmethod
77

8+
89
class AbstractEventProcessor(ABC):
910
"""
1011
Abstract that must be extended by event processor classes.
@@ -13,12 +14,14 @@ def __init__(self, params=None):
1314
pass
1415

1516
@abstractmethod
16-
async def open_async(self, context):
17+
async def open_async(self, context, event_processor_context=None):
1718
"""
1819
Called by processor host to initialize the event processor.
1920
2021
:param context: Information about the partition
2122
:type context: ~azure.eventprocessorhost.partition_context.PartitionContext
23+
:param event_processor_context: State of the Event Processor.
24+
:type event_processor_context: str
2225
"""
2326
pass
2427

azure/eventprocessorhost/azure_blob_lease.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# -----------------------------------------------------------------------------------
55

6-
import json
76
import asyncio
7+
import json
88

99
from azure.eventprocessorhost.lease import Lease
1010

@@ -48,6 +48,7 @@ def with_blob(self, blob):
4848
self.epoch = content["epoch"]
4949
self.offset = content["offset"]
5050
self.sequence_number = content["sequence_number"]
51+
self.event_processor_context = content.get("event_processor_context")
5152

5253
def with_source(self, lease):
5354
"""

azure/eventprocessorhost/azure_storage_checkpoint_manager.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import concurrent.futures
1111
import functools
1212
import asyncio
13-
1413
import requests
1514

1615
from azure.storage.blob import BlockBlobService
@@ -145,7 +144,7 @@ async def create_checkpoint_if_not_exists_async(self, partition_id):
145144
checkpoint = Checkpoint(partition_id)
146145
return checkpoint
147146

148-
async def update_checkpoint_async(self, lease, checkpoint):
147+
async def update_checkpoint_async(self, lease, checkpoint, event_processor_context=None):
149148
"""
150149
Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint
151150
checkpoint:offset/sequeceNumber to update the store with.
@@ -159,6 +158,7 @@ async def update_checkpoint_async(self, lease, checkpoint):
159158
new_lease.with_source(lease)
160159
new_lease.offset = checkpoint.offset
161160
new_lease.sequence_number = checkpoint.sequence_number
161+
new_lease.event_processor_context = event_processor_context
162162
return await self.update_lease_async(new_lease)
163163

164164
async def delete_checkpoint_async(self, partition_id):
@@ -269,11 +269,12 @@ async def create_lease_if_not_exists_async(self, partition_id):
269269
try:
270270
return_lease = AzureBlobLease()
271271
return_lease.partition_id = partition_id
272-
json_lease = json.dumps(return_lease.serializable())
272+
serializable_lease = return_lease.serializable()
273+
json_lease = json.dumps(serializable_lease)
273274
_logger.info("Creating Lease %r %r %r",
274275
self.lease_container_name,
275276
partition_id,
276-
json_lease)
277+
json.dumps({k:v for k, v in serializable_lease.items() if k != 'event_processor_context'}))
277278
await self.host.loop.run_in_executor(
278279
self.executor,
279280
functools.partial(

azure/eventprocessorhost/lease.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def __init__(self):
1515
self.owner = None
1616
self.token = None
1717
self.epoch = 0
18+
self.event_processor_context = None
1819

1920
def with_partition_id(self, partition_id):
2021
"""
@@ -27,6 +28,7 @@ def with_partition_id(self, partition_id):
2728
self.owner = None
2829
self.token = None
2930
self.epoch = 0
31+
self.event_processor_context = None
3032

3133
def with_source(self, lease):
3234
"""
@@ -39,6 +41,7 @@ def with_source(self, lease):
3941
self.epoch = lease.epoch
4042
self.owner = lease.owner
4143
self.token = lease.token
44+
self.event_processor_context = lease.event_processor_context
4245

4346
async def is_expired(self):
4447
"""

azure/eventprocessorhost/partition_context.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,24 @@ async def get_initial_offset_async(self): # throws InterruptedException, Executi
5959
self.host.guid, self.partition_id, self.offset, self.sequence_number)
6060
return self.offset
6161

62-
async def checkpoint_async(self):
62+
async def checkpoint_async(self, event_processor_context=None):
6363
"""
6464
Generates a checkpoint for the partition using the curren offset and sequenceNumber for
6565
and persists to the checkpoint manager.
6666
"""
6767
captured_checkpoint = Checkpoint(self.partition_id, self.offset, self.sequence_number)
68-
await self.persist_checkpoint_async(captured_checkpoint)
68+
await self.persist_checkpoint_async(captured_checkpoint, event_processor_context)
6969

70-
async def checkpoint_async_event_data(self, event_data):
70+
async def checkpoint_async_event_data(self, event_data, event_processor_context=None):
7171
"""
7272
Stores the offset and sequenceNumber from the provided received EventData instance,
7373
then writes those values to the checkpoint store via the checkpoint manager.
74+
Optionally stores the state of the Event Processor along the checkpoint.
7475
7576
:param event_data: A received EventData with valid offset and sequenceNumber.
7677
:type event_data: ~azure.eventhub.common.EventData
78+
:param event_processor_context State of the Event Processor.
79+
:type event_processor_context: str
7780
:raises: ValueError if suplied event_data is None.
7881
:raises: ValueError if the sequenceNumber is less than the last checkpointed value.
7982
"""
@@ -85,7 +88,8 @@ async def checkpoint_async_event_data(self, event_data):
8588

8689
await self.persist_checkpoint_async(Checkpoint(self.partition_id,
8790
event_data.offset.value,
88-
event_data.sequence_number))
91+
event_data.sequence_number),
92+
event_processor_context)
8993

9094
def to_string(self):
9195
"""
@@ -99,12 +103,14 @@ def to_string(self):
99103
self.partition_id,
100104
self.sequence_number)
101105

102-
async def persist_checkpoint_async(self, checkpoint):
106+
async def persist_checkpoint_async(self, checkpoint, event_processor_context=None):
103107
"""
104-
Persists the checkpoint.
108+
Persists the checkpoint, and - optionally - the state of the Event Processor.
105109
106110
:param checkpoint: The checkpoint to persist.
107111
:type checkpoint: ~azure.eventprocessorhost.checkpoint.Checkpoint
112+
:param event_processor_context: The state of the Event Processor.
113+
:type event_processor_context: str
108114
"""
109115
_logger.debug("PartitionPumpCheckpointStart %r %r %r %r",
110116
self.host.guid, checkpoint.partition_id, checkpoint.offset, checkpoint.sequence_number)
@@ -115,7 +121,8 @@ async def persist_checkpoint_async(self, checkpoint):
115121
_logger.info("persisting checkpoint %r", checkpoint.__dict__)
116122
await self.host.storage_manager.create_checkpoint_if_not_exists_async(checkpoint.partition_id)
117123

118-
if not await self.host.storage_manager.update_checkpoint_async(self.lease, checkpoint):
124+
if not await self.host.storage_manager.update_checkpoint_async(
125+
self.lease, checkpoint, event_processor_context):
119126
_logger.error("Failed to persist checkpoint for partition: %r", self.partition_id)
120127
raise Exception("failed to persist checkpoint")
121128
self.lease.offset = checkpoint.offset

azure/eventprocessorhost/partition_pump.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ async def open_async(self):
6161
self.partition_context.lease = self.lease
6262
self.processor = self.host.event_processor(self.host.event_processor_params)
6363
try:
64-
await self.processor.open_async(self.partition_context)
64+
event_processor_context = self.lease.event_processor_context
65+
await self.processor.open_async(self.partition_context, event_processor_context)
6566
except Exception as err: # pylint: disable=broad-except
6667
# If the processor won't create or open, only thing we can do here is pass the buck.
6768
# Null it out so we don't try to operate on it further.

tests/asynctests/test_checkpoint_manager.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# -----------------------------------------------------------------------------------
55

6-
import pytest
76
import asyncio
7+
import base64
8+
import pickle
9+
import pytest
810
import time
911
from azure.common import AzureException
1012

@@ -82,7 +84,19 @@ def test_checkpointing(eph, storage_clm):
8284
assert local_checkpoint.offset == "-1"
8385
lease = loop.run_until_complete(storage_clm.get_lease_async("1"))
8486
loop.run_until_complete(storage_clm.acquire_lease_async(lease))
85-
loop.run_until_complete(storage_clm.update_checkpoint_async(lease, local_checkpoint))
87+
event_processor_context = {'some_string_data': 'abc', 'some_int_data': 123, 'a_list': [42]}
88+
event_processor_context_pickled = pickle.dumps(event_processor_context)
89+
event_processor_context_asstring = base64.b64encode(event_processor_context_pickled)
90+
loop.run_until_complete(storage_clm.update_checkpoint_async(
91+
lease, local_checkpoint, event_processor_context_asstring))
92+
cloud_lease = loop.run_until_complete(storage_clm.get_lease_async("1"))
93+
cloud_event_processor_context_asstring = pickle.loads(cloud_lease.event_processor_context)
94+
cloud_event_processor_context_pickled = base64.b64decode(cloud_event_processor_context_asstring)
95+
cloud_event_processor_context = pickle.loads(cloud_event_processor_context_pickled)
96+
assert cloud_event_processor_context['some_string_data'] == 'abc'
97+
assert cloud_event_processor_context['some_int_data'] == '123'
98+
assert cloud_event_processor_context['a_list'] == [42]
99+
86100
cloud_checkpoint = loop.run_until_complete(storage_clm.get_checkpoint_async("1"))
87101
lease.offset = cloud_checkpoint.offset
88102
lease.sequence_number = cloud_checkpoint.sequence_number
@@ -93,6 +107,9 @@ def test_checkpointing(eph, storage_clm):
93107
modify_checkpoint.sequence_number = "32"
94108
time.sleep(35)
95109
loop.run_until_complete(storage_clm.update_checkpoint_async(lease, modify_checkpoint))
110+
cloud_lease = loop.run_until_complete(storage_clm.get_lease_async("1"))
111+
assert cloud_lease.event_processor_context is None
112+
96113
cloud_checkpoint = loop.run_until_complete(storage_clm.get_checkpoint_async("1"))
97114
assert cloud_checkpoint.partition_id == "1"
98115
assert cloud_checkpoint.offset == "512"

0 commit comments

Comments
 (0)