import json import os.path import zipfile from abc import ABC, abstractmethod from contextlib import contextmanager from glob import iglob from os import listdir from typing import Iterable, Optional, Union from data.xhdfs import XHDFS HEF_RECORDING_LIST_FILE = 'data/Model.Entities.Resource.json' TYPES = ['Activity', 'Button', 'Container', 'Process', 'Tool'] TYPE_SHORTHANDS = {t[0]: t for t in TYPES} def is_old_format_detection(path: str) -> bool: """Returns `True` if `path` points to a directory that contains a `.him` file and a recordings directory.""" if not os.path.isdir(path): return False return os.path.isdir(os.path.join(path, 'resources', 'recordings')) \ and any(f.endswith('.him') for f in listdir(path)) def is_new_format_detection(path: str) -> bool: """Returns `True` if `path` points to a directory that contains a `.hef` file.""" if not os.path.isdir(path): return False return any(f.endswith('.hef') for f in listdir(path)) class ArkiteRecording(ABC): def __init__(self, full_name: str): self._full_name = full_name self._name = os.path.split(full_name)[-1] self._detection_type = TYPE_SHORTHANDS.get(self._name[0], None) @property def name(self) -> str: return self._name @property def full_name(self) -> str: return self._full_name @property def detection_type(self) -> str: return self._detection_type @abstractmethod def depth(self) -> XHDFS: raise NotImplementedError @abstractmethod def ir(self) -> XHDFS: raise NotImplementedError class ArkiteRecordingDirectory(ArkiteRecording): def __init__(self, full_name: str): super().__init__(full_name) def _data_file_path(self, data_type: str) -> str: *location, name = os.path.split(self._full_name) return os.path.join(*location, 'resources', 'recordings', name, data_type + os.path.extsep + 'xhdfs') @contextmanager def depth(self): with XHDFS(open(self._data_file_path('Depth'), mode='rb')) as depth_xhdfs: yield depth_xhdfs @contextmanager def ir(self): with XHDFS(open(self._data_file_path('IR'), mode='rb')) as ir_xhdfs: yield ir_xhdfs class ArkiteRecordingHEF(ArkiteRecording): def __init__(self, full_name: str, hef_file_name: str, depth_file_name: str, ir_file_name: str): super().__init__(full_name) self._hef = hef_file_name self._depth = depth_file_name self._ir = ir_file_name @staticmethod def _data_file_path(file_name: str) -> str: return "resources/" + file_name @contextmanager def depth(self) -> XHDFS: with zipfile.ZipFile(self._hef) as hef, \ XHDFS(hef.open(self._data_file_path(self._depth))) as depth_xhdfs: yield depth_xhdfs @contextmanager def ir(self) -> XHDFS: with zipfile.ZipFile(self._hef) as hef, \ XHDFS(hef.open(self._data_file_path(self._ir))) as ir_xhdfs: yield ir_xhdfs class ArkiteDetection(ABC): def __init__(self, relative_path: str): self._path = relative_path self._full_name = relative_path self._name = os.path.split(relative_path)[-1] @property def full_name(self) -> str: return self._full_name @property def detection_name(self) -> str: return self._name @abstractmethod def recordings(self) -> Iterable[ArkiteRecording]: raise NotImplementedError class ArkiteDetectionDirectory(ArkiteDetection): def __init__(self, relative_path: str): super().__init__(relative_path) def recordings(self) -> Iterable[ArkiteRecordingDirectory]: recordings_dir = os.path.join(self._path, 'resources', 'recordings') for rec_dir_name in listdir(recordings_dir): name = rec_dir_name full_name = os.path.join(self.full_name, name) rec = ArkiteRecordingDirectory(full_name) yield rec class ArkiteDetectionHEF(ArkiteDetection): def __init__(self, relative_path: str): super().__init__(relative_path) hef_file_name = next(f for f in listdir(self._path) if f.endswith('.hef')) self._hef = os.path.join(self._path, hef_file_name) def recordings(self) -> Iterable[ArkiteRecordingHEF]: with zipfile.ZipFile(self._hef, mode='r') as hef: with hef.open(HEF_RECORDING_LIST_FILE) as recording_list: file_list = json.loads(recording_list.read().decode('utf-8')) previous_metadata = None for file_metadata in file_list: if file_metadata['Extension'] != '.xhdfs' or file_metadata['RemovedId'] != 0: continue if previous_metadata is None: previous_metadata = file_metadata continue name, data_type = file_metadata['Name'].rsplit(' ', 1) prev_name, prev_type = previous_metadata['Name'].rsplit(' ', 1) assert name == prev_name and data_type != prev_type full_name = os.path.join(self._full_name, name) depth_name = (file_metadata if data_type == 'Depth' else previous_metadata)['FileName'] ir_name = (file_metadata if data_type == 'IR' else previous_metadata)['FileName'] previous_metadata = None rec = ArkiteRecordingHEF(full_name, self._hef, depth_file_name=depth_name, ir_file_name=ir_name) yield rec class ArkiteData: def __init__(self, root_dir: str, use_case_depth: int): self._root = os.path.abspath(root_dir) self._use_case_depth = use_case_depth def use_cases(self) -> Iterable[str]: use_case_glob = os.path.sep.join('*' * self._use_case_depth) full_glob = os.path.join(self._root, use_case_glob) for d in iglob(full_glob): if os.path.isdir(d): yield d.replace(self._root, '')[1:] def detections(self, use_cases: Optional[Union[Iterable[str], str]] = None) -> Iterable[ArkiteDetection]: if isinstance(use_cases, str): use_cases = (use_cases,) if isinstance(use_cases, Iterable): use_cases = set(use_cases) for uc in self.use_cases(): if use_cases is not None and uc not in use_cases: continue use_case_dir = os.path.join(self._root, uc) # Check if the directory itself is a detection detection = self._dir_to_detection(use_case_dir) if detection: yield detection # Check if subdirectories are detections themselves for dd in listdir(use_case_dir): detection_dir = os.path.join(use_case_dir, dd) detection = self._dir_to_detection(detection_dir) if detection: yield detection @staticmethod def _dir_to_detection(directory: str) -> Optional[ArkiteDetection]: if is_old_format_detection(directory): return ArkiteDetectionDirectory(directory) elif is_new_format_detection(directory): return ArkiteDetectionHEF(directory) else: return None