Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 220 additions & 1 deletion src/ctapipe_io_nectarcam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
"""

import glob
import os
import re
import struct
from collections.abc import Iterable
from enum import IntFlag, auto

import numpy as np
Expand Down Expand Up @@ -38,6 +40,7 @@
)
from ctapipe.io import DataLevel, EventSource
from pkg_resources import resource_filename
from protozfits import File
from traitlets.config import Config

from .anyarray_dtypes import CDTS_AFTER_37201_DTYPE, CDTS_BEFORE_37201_DTYPE, TIB_DTYPE
Expand All @@ -51,7 +54,12 @@
)
from .version import __version__

__all__ = ["LightNectarCAMEventSource", "NectarCAMEventSource", "__version__"]
__all__ = [
"LightNectarCAMEventSource",
"NectarCAMEventSource",
"BlockNectarCAMEventSource",
"__version__",
]

S_TO_NS = np.uint64(1e9)

Expand Down Expand Up @@ -1287,6 +1295,217 @@
return False


class BlockNectarCAMEventSource:
"""
EventSource for long NectarCAMObservations or read specific part of the run.
The grouping is only done if the number of files is a multiple of the block_size.
It is also possible to analyse only certain blocks via the allowed_blocks argument.

The grouping has the advantage of not opening all files at the same time.

At the moment, it's a standalone class to have better control on what is done.
Could be made the default behavior of NectarCAMEventSource but need some rewriting.

Input:
block_size: The number of file per group.
default: 4

allowed_blocks : id or list of id of block to analyse
default: None (all analysed)

"""

def __init__(self, block_size=4, allowed_blocks=None, **kwargs):
self._arguments = kwargs # blocks
self._file_names = None
self._block_file_names = list()
self._current_source = None
self._current_block = None
self._current_generator = None
self._total_entries = 0
self._current_counts = 0
self.block_size = block_size
self.allowed_blocks = None
self.max_events = None
self.empty_entries = 0
self.show_empty_stats = False

if isinstance(allowed_blocks, int):
self.allowed_blocks = [
allowed_blocks,
]
elif isinstance(allowed_blocks, Iterable):
self.allowed_blocks = list(set([int(e) for e in allowed_blocks]))

Check warning on line 1338 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1338

Added line #L1338 was not covered by tests
else:
self.allowed_blocks = None

if "input_url" in self._arguments.keys():

Check warning on line 1342 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1342

Added line #L1342 was not covered by tests
# give list to NectarCAMEventSource so remove it from arguments
self._file_names = glob.glob(str(kwargs["input_url"]))
self._file_names.sort()
del self._arguments["input_url"]
elif "input_filelist" in self._arguments.keys():
# give list to NectarCAMEventSource so remove it from arguments
self._file_names = kwargs["input_filelist"]
self._file_names.sort()
del self._arguments["input_filelist"]

Check warning on line 1351 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1351

Added line #L1351 was not covered by tests
else:
raise ValueError("No input_irl or input_filelist given !")

if "max_events" in self._arguments.keys():

Check warning on line 1355 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1353-L1355

Added lines #L1353 - L1355 were not covered by tests
# treating option here, don't forward it to NectarCAMEventSource
self.max_events = int(kwargs["max_events"])

Check warning on line 1357 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1357

Added line #L1357 was not covered by tests
del self._arguments["max_events"]

if "show_empty_stats" in self._arguments.keys():
# treating option here, don't forward it to NectarCAMEventSource
self.show_empty_stats = bool(kwargs["show_empty_stats"])
del self._arguments["show_empty_stats"]

self._create_blocks()
self._switch_block()

Check warning on line 1367 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1366-L1367

Added lines #L1366 - L1367 were not covered by tests
@staticmethod
def is_compatible(file_path):
"""
This version should only be called directly, so return False
such that it is not used when using EventSource.
Nevertheless, in principle it should work as NectarCAMEventSource by default.
"""
return False

def __getattr__(self, attr):
# Forward unknown methods to the current NectarCAMEventSource, if it exist
# More careful checks are needed to know if this truly works...

Check warning on line 1379 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1379

Added line #L1379 was not covered by tests
if hasattr(self._current_source, attr):
attr_val = getattr(self._current_source, attr)
if callable(attr_val):

def call_wrapper(*args, **kwargs):
return getattr(self._current_source, attr)(*args, **kwargs)

return call_wrapper
else:
return attr_val

Check warning on line 1389 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1388-L1389

Added lines #L1388 - L1389 were not covered by tests

def _rewind(self):

Check warning on line 1391 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1391

Added line #L1391 was not covered by tests
self._current_block = None
self._switch_block()

def get_entries(self):
if self._total_entries == 0:
for filename in self._file_names:
self._total_entries += len(File(str(filename)).Events)
return (
self._total_entries
if self.max_events is None
else min(self._total_entries, self.max_events)
)

def _switch_block(self):
if self._current_block is None:
self._current_block = 0
else:
self._current_block += 1

valid = False
if self._current_block < len(self._block_file_names):
self._current_source = NectarCAMEventSource(

Check warning on line 1413 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1413

Added line #L1413 was not covered by tests
input_filelist=self._block_file_names[self._current_block],
**self._arguments,
)
self._current_generator = self._current_source._generator()
valid = True
return valid

def __len__(self):
return self.get_entries()

def _create_blocks(self):
if len(self._file_names) % self.block_size != 0 or not self.consecutive_files(
self._file_names
):
print("Not possible to block --> Read everything")
block_list = list()
block_list.append(list(self._file_names))
else:
block_list = list()
nBlocks = len(self._file_names) // self.block_size
for i in range(nBlocks):
imin = i * self.block_size
imax = (i + 1) * self.block_size
block_list.append(self._file_names[imin:imax])
if self.allowed_blocks is not None:
# going to only take the selected blocks
filtered_blocks = list()
for block in self.allowed_blocks:
if block < len(block_list):

Check warning on line 1442 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1436-L1442

Added lines #L1436 - L1442 were not covered by tests
filtered_blocks.append(block_list[block])
# Sanity check --> Remove duplicated entries
filtered_blocks = [
x
for n, x in enumerate(filtered_blocks)

Check warning on line 1447 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1444-L1447

Added lines #L1444 - L1447 were not covered by tests
if x not in filtered_blocks[:n]
]

Check warning on line 1449 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1449

Added line #L1449 was not covered by tests
filtered_blocks.sort() # just in case
block_list = filtered_blocks
# Erase the input list to keep only the selected files
self._file_names = [file for block in filtered_blocks for file in block]

self._block_file_names = block_list

Check warning on line 1455 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1454-L1455

Added lines #L1454 - L1455 were not covered by tests

def consecutive_files(self, file_list=None):

Check warning on line 1457 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1457

Added line #L1457 was not covered by tests
if file_list is None:
file_list = self._file_names
# assume files are of type: 'NectarCAM.Run5665.0246.fits.fz'
consecutive = False
try:
numbers = np.array(

Check warning on line 1463 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1462-L1463

Added lines #L1462 - L1463 were not covered by tests
[
int(os.path.basename(f).split(".fits.fz")[0].split(".")[-1])
for f in file_list
]

Check warning on line 1467 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1465-L1467

Added lines #L1465 - L1467 were not covered by tests
)
delta_numbers = numbers[1:] - numbers[:-1]
consecutive = np.all(delta_numbers == 1) and numbers[0] == 0
except ValueError:
consecutive = False
return consecutive

def __iter__(self):
self._rewind()
return self

Check warning on line 1477 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1473-L1477

Added lines #L1473 - L1477 were not covered by tests

def __next__(self):
if self.max_events is not None and self._current_counts >= self.max_events:
raise StopIteration
try:
next_entry = next(self._current_generator)
except StopIteration:
# End of current block, try if there is a next one
self.empty_entries += self._current_source.get_empty_entries()
if self._switch_block():
next_entry = next(self._current_generator)

Check warning on line 1488 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1488

Added line #L1488 was not covered by tests
else:
if self.show_empty_stats:
self.print_empty_stats()
raise StopIteration

Check warning on line 1492 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1490-L1492

Added lines #L1490 - L1492 were not covered by tests
self._current_counts += 1
return next_entry

def get_empty_entries(self):

Check warning on line 1496 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1494-L1496

Added lines #L1494 - L1496 were not covered by tests
return self.empty_entries

def print_empty_stats(self):
if self.empty_entries > 0:
print(

Check warning on line 1501 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1501

Added line #L1501 was not covered by tests
f"WARNING> Empty events :"
f" {self.empty_entries}/{self.get_entries()}"
f" --> "
f"{100.*self.empty_entries/self.get_entries():.2f} %"

Check warning on line 1505 in src/ctapipe_io_nectarcam/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/ctapipe_io_nectarcam/__init__.py#L1504-L1505

Added lines #L1504 - L1505 were not covered by tests
)


class MultiFiles:
"""
This class open all the files in file_list and read the events following
Expand Down
Loading
Loading