Skip to content

Commit 5cea56b

Browse files
vijayvammiclaude
andauthored
feat: Remove global context (#244)
* test: add context isolation tests (failing) * fix: add missing set_run_context call in async test The async test was creating a PipelineContext but never calling set_run_context(context), making it inconsistent with the sync test logic. This fix adds the missing call to properly set up the context before assertions. Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com> * feat: implement contextvars for run_context isolation * fix: remove broken backward compatibility property for run_context * feat: update tasks.py to use contextvars run_context 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * feat: update tasks.py to use contextvars run_context * feat: update executor.py to use contextvars run_context Updated the _context property in BaseExecutor to use get_run_context() with proper error handling. Updated tests to mock the new API. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * feat: update datastore.py to use contextvars run_context Updated all references from context.run_context to context.get_run_context() with proper error handling. Also updated corresponding tests to mock the new get_run_context() function. Changes: - ObjectParameter.description: Added get_run_context() with None check - ObjectParameter.file_name: Added get_run_context() with RuntimeError - ObjectParameter.get_value(): Added get_run_context() with RuntimeError - ObjectParameter.put_object(): Added get_run_context() with RuntimeError - RunLog.get_summary(): Added get_run_context() with RuntimeError - BaseRunLogStore._context: Added get_run_context() with RuntimeError - Updated test mocks to patch get_run_context() instead of run_context 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * feat: update remaining files to use contextvars run_context Updated all remaining production and test files to use the new contextvars-based API: Production files: - runnable/catalog.py: Updated _context property to use get_run_context() - runnable/nodes.py: Updated _context property with type checking - runnable/sdk.py: Changed context.run_context assignment to set_run_context() and updated import - runnable/utils.py: Updated get_git_code_identity to use get_run_context() - extensions/pipeline_executor/__init__.py: Updated _context property with type checking - extensions/job_executor/__init__.py: Updated _context property with type checking Test files: - tests/assertions.py: Updated load_run_log to use get_run_context() - tests/test_job_examples.py: Updated cleanup to use set_run_context(None) - tests/test_pipeline_examples.py: Updated cleanup to use set_run_context(None) - tests/test_retries.py: Updated cleanup and load_run_log to use new API - tests/runnable/test_catalog.py: Updated mock to patch get_run_context - tests/runnable/test_utils.py: Updated mocks to patch get_run_context - tests/extensions/catalog/test_any_path.py: Updated mock to patch get_run_context - tests/extensions/catalog/test_file_system.py: Updated mock to patch get_run_context - tests/extensions/job_executor/test_k8s_scheduling.py: Updated all mocks to patch get_run_context All context.run_context direct access replaced with proper API calls. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test: fix async test to use asyncio.run() * docs: add context isolation architecture documentation * fix: update test to use get_run_context mock * feat: Ading agentic capabilities --------- Co-authored-by: Claude Sonnet 4 <noreply@anthropic.com>
1 parent 756d75c commit 5cea56b

29 files changed

Lines changed: 1282 additions & 202 deletions

argo-pipeline.yaml

Lines changed: 216 additions & 97 deletions
Large diffs are not rendered by default.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Context Isolation in Runnable
2+
3+
## Problem Solved
4+
5+
Previously, `run_context` was a global variable that caused issues when multiple pipelines ran concurrently (e.g., in FastAPI endpoints). All pipelines would share the same context, leading to:
6+
7+
- Data leakage between pipelines
8+
- Incorrect run IDs in logs
9+
- Configuration mix-ups
10+
- Resource conflicts
11+
12+
## Solution
13+
14+
Replaced the global variable with Python's `contextvars` module, providing:
15+
16+
- **Request isolation**: Each execution context maintains its own run context
17+
- **Async safety**: Contexts automatically propagate through async/await chains
18+
- **Thread safety**: Works correctly with thread pools and concurrent execution
19+
- **Explicit error handling**: Clear errors when no context is available
20+
21+
## Usage
22+
23+
```python
24+
from runnable.context import get_run_context, set_run_context
25+
26+
# Get current context (returns None if no context)
27+
current_context = get_run_context()
28+
29+
# Set context (automatically isolated per request/task)
30+
set_run_context(my_context)
31+
32+
# Context automatically propagates through async chains
33+
async def my_async_function():
34+
context = get_run_context() # Same context as caller
35+
await some_other_async_function()
36+
```
37+
38+
## Migration Notes
39+
40+
- The global `context.run_context` variable has been removed
41+
- New code must use `get_run_context()` to access the context
42+
- Error handling is now explicit - functions raise `RuntimeError` if no context available
43+
- No changes needed for FastAPI or async usage - isolation happens automatically
44+
45+
## Implementation Details
46+
47+
The implementation uses Python's `contextvars.ContextVar`:
48+
49+
```python
50+
_run_context_var: contextvars.ContextVar[Optional[RunnableContextType]] = contextvars.ContextVar(
51+
'run_context',
52+
default=None
53+
)
54+
55+
def get_run_context() -> Optional[RunnableContextType]:
56+
"""Get the current run context for this execution context."""
57+
return _run_context_var.get()
58+
59+
def set_run_context(context: RunnableContextType) -> None:
60+
"""Set the run context for this execution context."""
61+
_run_context_var.set(context)
62+
```
63+
64+
This ensures each async task, thread, or request maintains its own isolated context.

0 commit comments

Comments
 (0)