Skip to content

"RuntimeError: is bound to a different event loop" when using prefect client from sync flow in kubernetes #13181

@carlosjourdan

Description

@carlosjourdan

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When tying to run prefect client async methods from a sync context on kubernetes, I get a "RuntimeError: is bound to a different event loop" error. I've tried using prefect.utilities.asyncutils.run_sync, asyncio.run and asyncio.get_event_loop().run_util_complete.

The code works fine locally, and calling a simple async on the k8s deployment also works. I'm only able to reproduce the error in the k8s deployment with the prefect.client methods.

Reproduction

from prefect.utilities.asyncutils import run_sync
from prefect.context import get_run_context
from prefect import flow
from prefect.deployments import Deployment
import asyncio

@flow()
def flow2():
    #This method works fine locally, but crashes on k8s
    context = get_run_context()
    prefect_client = context.client
    flow_run_id = context.flow_run.id
    
    flow_run_state = run_sync(prefect_client.read_flow_run(flow_run_id))
    print(flow_run_state)


@flow()
def flow1():
    #This works fine locally and on k8s
    run_sync(asyncio.sleep(10))

if __name__ == "__main__":
    Deployment.build_from_flow(flow=flow1, name="flow1", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()
    Deployment.build_from_flow(flow=flow2, name="flow2", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()

The image basically builds from python:3.11-buster, installs some packages and copies the source files to the /source dir.

This is the result of a pip freeze:

aiosqlite==0.20.0
alembic==1.13.1
altair==4.2.2
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asgi-lifespan==2.1.0
asn1crypto==1.5.1
asttokens==2.4.1
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
azure-common==1.1.28
azure-core==1.30.1
azure-identity==1.14.1
azure-keyvault-secrets==4.7.0
beautifulsoup4==4.12.3
bleach==6.1.0
boto3==1.34.95
botocore==1.34.95
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
comm==0.2.2
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
debugpy==1.8.1
decorator==5.1.1
defusedxml==0.7.1
dependency-injector==4.41.0
dnspython==2.6.1
docker==6.1.3
elastic-transport==8.1.2
elasticsearch==8.2.3
email-validator==2.1.1
entrypoints==0.4
executing==2.0.1
fastjsonschema==2.19.1
filelock==3.14.0
fqdn==1.5.1
fsspec==2024.3.1
fuzzywuzzy==0.18.0
geographiclib==2.0
geopy==2.4.1
google-auth==2.29.0
graphviz==0.20.3
great-expectations==0.15.32
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
ijson==3.2.3
importlib-metadata==7.1.0
importlib-resources==6.1.3
ipykernel==6.29.4
ipython==8.24.0
ipython-genutils==0.2.0
ipywidgets==8.1.2
isodate==0.6.1
isoduration==20.11.0
itsdangerous==2.2.0
jedi==0.19.1
jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jmespath==1.0.1
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.7.2
jupyter-client==7.4.9
jupyter-core==5.7.2
jupyter-events==0.6.3
jupyter-server==2.10.0
jupyter-server-terminals==0.5.3
jupyterlab-pygments==0.3.0
jupyterlab-widgets==3.0.10
kubernetes==29.0.0
lazy-import==0.2.2
mailbits==0.2.1
makefun==1.15.2
mako==1.3.3
markdown==3.6
markdown-it-py==3.0.0
markupsafe==2.1.5
marshmallow==3.21.1
matplotlib-inline==0.1.7
mdurl==0.1.2
mistune==3.0.2
msal==1.23.0
msal-extensions==1.1.0
nbclassic==1.0.0
nbclient==0.10.0
nbconvert==7.16.4
nbformat==5.10.4
nest-asyncio==1.6.0
notebook==6.5.6
notebook-shim==0.2.4
nslookup==1.7.0
numpy==1.24.4
oauthlib==3.2.2
orjson==3.10.1
overrides==7.7.0
packaging==24.0
pandas==2.1.4
pandocfilters==1.5.1
parso==0.8.4
pathspec==0.12.1
pendulum==2.1.2
platformdirs==4.2.1
portalocker==2.8.2
prefect==2.18.1
prometheus-client==0.20.0
prompt-toolkit==3.0.43
psutil==5.9.8
pure-eval==0.2.2
pyasn1==0.6.0
pyasn1-modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic-core==2.18.2
pygments==2.17.2
pyjwt==2.8.0
pyopenssl==20.0.1
pyparsing==3.1.2
pypi-simple==1.5.0
pypyodbc==1.3.6
pyrsistent==0.20.0
pysmbclient==0.1.5
pyspnego==0.10.2
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-json-logger==2.0.7
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
pywinpty==2.0.13
pyyaml==6.0.1
pyzmq==24.0.1
readchar==4.0.6
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rich==13.7.1
rsa==4.9
ruamel-yaml==0.17.17
s3transfer==0.10.1
scipy==1.13.0
selenium==3.141.0
send2trash==1.8.3
setuptools==69.5.1
shellingham==1.5.4
six==1.14.0
smbprotocol==1.6.2
sniffio==1.3.1
snowflake-connector-python==3.10.0
snowflake-sqlalchemy==1.5.3
sortedcontainers==2.4.0
soupsieve==2.5
sqlalchemy==1.4.52
sspilib==0.1.0
stack-data==0.6.3
termcolor==2.0.1
terminado==0.18.1
text-unidecode==1.3
tinycss2==1.3.0
toml==0.10.2
tomlkit==0.12.4
toolz==0.12.1
tornado==6.4
tqdm==4.66.2
traitlets==5.14.3
truststore==0.7.0
typer==0.12.3
types-python-dateutil==2.9.0.20240316
typing-extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
unidecode==1.3.8
uri-template==1.3.0
urllib3==1.26.18
uvicorn==0.28.1
vertica-python==1.0.5
wcwidth==0.2.13
webcolors==1.13
webdriver-manager==4.0.1
webencodings==0.5.1
websocket-client==1.8.0
websockets==12.0
widgetsnbextension==4.0.10
zipp==3.18.1


### Error

```python3
Encountered exception during execution:
Traceback (most recent call last):
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 875, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/flows/my_flows.py", line 16, in flow2
    flow_run_state = run_sync(prefect_client.read_flow_run(flow_run_id))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 115, in run_sync
    return asyncio.run(coroutine)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2007, in read_flow_run
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1801, in get
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 325, in send
    response = await self._send_with_retry(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 249, in _send_with_retry
    response = await send(request, *send_args, **send_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
    response = await connection.handle_async_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection.py", line 101, in handle_async_request
    return await self._connection.handle_async_request(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 185, in handle_async_request
    raise exc
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 148, in handle_async_request
    status, headers = await self._receive_response(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 292, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 333, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 361, in _receive_events
    events = await self._read_incoming_data(request)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 452, in _read_incoming_data
    raise exc
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 438, in _read_incoming_data
    data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 35, in read
    return await self._stream.receive(max_bytes=max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 196, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 138, in _call_sslobject_method
    data = await self.transport_stream.receive()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1203, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.11/asyncio/locks.py", line 210, in wait
    fut = self._get_loop().create_future()
          ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/mixins.py", line 20, in _get_loop
    raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError:  is bound to a different event loop

Versions

# Locally

Version:             2.18.1
API version:         0.8.4
Python version:      3.11.8
Git commit:          8cff545a
Built:               Thu, Apr 25, 2024 3:40 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         cloud

# On k8s

Version:             2.18.1
API version:         0.8.4
Python version:      3.11.4
Git commit:          8cff545a
Built:               Thu, Apr 25, 2024 3:40 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions