Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ def from_stream_client(cls, stream_client, metadata=None):

@functools.cached_property
def descriptors(self):
# Go back to the BlueskyRun node and requests the documents
bs_run_node = self["data"].parent.parent # the path is: bs_run_node/streams/current_stream
# Go back to the BlueskyRun node and request the documents
# the path is: bs_run_node/streams/current_stream (old) or bs_run_node/current_stream (new)
bs_run_node = self["data"].parent
if bs_run_node.item["id"] == "streams" and ("BlueskyRun" not in {s.name for s in bs_run_node.specs}):
# The parent is the old "streams" node, go up one more level
bs_run_node = bs_run_node.parent
stream_name = self.metadata.get("stream_name") or self["data"].item["id"]
return [
doc for name, doc in bs_run_node.documents() if name == "descriptor" and doc["name"] == stream_name
Expand Down Expand Up @@ -236,10 +240,10 @@ def __repr__(self):
return node_repr(self, self._keys).replace(type(self).__name__, "DatasetClient")

def _keys_slice(self, start, stop, direction, page_size: Optional[int] = None, **kwargs):
yield from self._keys[start : stop : -1 if direction < 0 else 1] # noqa: #203
yield from self._keys[start : stop : -1 if direction < 0 else 1] # noqa: 203

def _items_slice(self, start, stop, direction, page_size: Optional[int] = None, **kwargs):
for key in self._keys[start : stop : -1 if direction < 0 else 1]: # noqa: #203
for key in self._keys[start : stop : -1 if direction < 0 else 1]: # noqa: 203
yield key, self[key]

def __iter__(self):
Expand Down Expand Up @@ -343,7 +347,11 @@ def read(self, variables=(DATAVALUES,), dim0=None):
def descriptors(self):
# Go back to the BlueskyRun node and requests the documents
stream_name = self.metadata.get("stream_name") or self.item["id"]
bs_run_node = self.parent.parent # the path is: bs_run_node/streams/current_stream
# the path is: bs_run_node/streams/current_stream (old) or bs_run_node/current_stream (new)
bs_run_node = self.parent
if bs_run_node.item["id"] == "streams" and ("BlueskyRun" not in {s.name for s in bs_run_node.specs}):
# The parent is the old "streams" node, go up one more level
bs_run_node = bs_run_node.parent
return [
doc for name, doc in bs_run_node.documents() if name == "descriptor" and doc["name"] == stream_name
]
39 changes: 30 additions & 9 deletions bluesky-tiled-plugins/bluesky_tiled_plugins/bluesky_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,22 @@ def __getitem__(self, key):

return super().__getitem__(key)

@functools.cached_property
def _has_streams_namespace(self):
return ("streams" in self) and ("BlueskyEventStream" not in {s.name for s in self["streams"].specs})

@functools.cached_property
def _streams_node(self):
# Access to the "streams" namespace (possibly a separate container)
if self._has_streams_namespace:
return self["streams"]
else:
# No intermediate "streams" node, use the top-level node
return self

@functools.cached_property
def _stream_names(self):
return sorted(self.get("streams", ()))
return sorted(k for k in self._streams_node)

def documents(self, fill=False):
with io.BytesIO() as buffer:
Expand All @@ -254,22 +267,30 @@ def documents(self, fill=False):

class BlueskyRunV2SQL(BlueskyRunV2, _BlueskyRunSQL):
def _keys_slice(self, start, stop, direction, page_size: Optional[int] = None, **kwargs):
keys = reversed(self._stream_names) if direction < 0 else self._stream_names
return (yield from keys[start:stop])
if self._has_streams_namespace:
keys = reversed(self._stream_names) if direction < 0 else self._stream_names
return (yield from keys[start:stop])
else:
return (yield from super()._keys_slice(start, stop, direction, page_size=page_size, **kwargs))

def _items_slice(self, start, stop, direction, page_size: Optional[int] = None, **kwargs):
_streams_node = super().get("streams", {})
for key in reversed(self._stream_names) if direction < 0 else self._stream_names:
yield key, _streams_node.get(key)
return
if self._has_streams_namespace:
_streams_node = super().get("streams", {})
for key in reversed(self._stream_names) if direction < 0 else self._stream_names:
yield key, _streams_node.get(key)
return
else:
return (yield from super()._items_slice(start, stop, direction, page_size=page_size, **kwargs))

def __getitem__(self, key):
# For v3, we need to handle the streams and configs keys
if key in RESERVED_V3_KEYS:
return super().__getitem__(key)

if key in self._stream_names:
stream_container = super().get("streams", {}).get(key)
stream_container = (
super().get("streams", {}).get(key) if self._has_streams_namespace else super().get(key)
)
return BlueskyEventStreamV2SQL.from_stream_client(stream_container)

return super().__getitem__(key)
Expand All @@ -293,7 +314,7 @@ def __new__(cls, context, *, item, structure_clients, **kwargs):
def __getattr__(self, key):
if key in self._stream_names:
# A shortcut to the stream data
return self["streams"][key]
return self["streams"][key] if self._has_streams_namespace else self[key]

return super().__getattr__(key)

Expand Down
11 changes: 9 additions & 2 deletions bluesky-tiled-plugins/bluesky_tiled_plugins/exporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ async def json_seq_exporter(mimetype, adapter, metadata, filter_for_access):
result = []

# Generate descriptors
stream_names = await (await adapter.lookup_adapter(["streams"])).keys_range(offset=0, limit=None)
stream_names = await adapter.keys_range(offset=0, limit=None)
if "streams" in stream_names:
# Check for backward compatibility with the old layout (with an intermediate "streams" node)
streams_adapter = await adapter.lookup_adapter(["streams"])
if "BlueskyEventStream" not in {s.name for s in streams_adapter.specs}:
adapter = streams_adapter
stream_names = await adapter.keys_range(offset=0, limit=None)

for desc_name in stream_names:
desc_node = await adapter.lookup_adapter(["streams", desc_name])
desc_node = await adapter.lookup_adapter([desc_name])
desc_meta = desc_node.metadata()
part_names = set(await desc_node.keys_range(offset=0, limit=None)) # Composite parts

Expand Down