Files
opencv_python_tests/data/library.py
2019-11-08 17:14:27 +01:00

215 lines
7.2 KiB
Python

import json
import os.path
import zipfile
from abc import ABC, abstractmethod
from contextlib import contextmanager
from glob import iglob
from os import listdir
from typing import Iterable, Optional, Union
from data.xhdfs import XHDFS
HEF_RECORDING_LIST_FILE = 'data/Model.Entities.Resource.json'
TYPES = ['Activity', 'Button', 'Container', 'Process', 'Tool']
TYPE_SHORTHANDS = {t[0]: t for t in TYPES}
def is_old_format_detection(path: str) -> bool:
"""Returns `True` if `path` points to a directory that contains a `.him` file and a recordings directory."""
if not os.path.isdir(path):
return False
return os.path.isdir(os.path.join(path, 'resources', 'recordings')) \
and any(f.endswith('.him') for f in listdir(path))
def is_new_format_detection(path: str) -> bool:
"""Returns `True` if `path` points to a directory that contains a `.hef` file."""
if not os.path.isdir(path):
return False
return any(f.endswith('.hef') for f in listdir(path))
class ArkiteRecording(ABC):
def __init__(self, full_name: str):
self._full_name = full_name
self._name = os.path.split(full_name)[-1]
self._detection_type = TYPE_SHORTHANDS.get(self._name[0], None)
@property
def name(self) -> str:
return self._name
@property
def full_name(self) -> str:
return self._full_name
@property
def detection_type(self) -> str:
return self._detection_type
@abstractmethod
def depth(self) -> XHDFS:
raise NotImplementedError
@abstractmethod
def ir(self) -> XHDFS:
raise NotImplementedError
class ArkiteRecordingDirectory(ArkiteRecording):
def __init__(self, full_name: str):
super().__init__(full_name)
def _data_file_path(self, data_type: str) -> str:
*location, name = os.path.split(self._full_name)
return os.path.join(*location, 'resources', 'recordings', name, data_type + os.path.extsep + 'xhdfs')
@contextmanager
def depth(self):
with XHDFS(open(self._data_file_path('Depth'), mode='rb')) as depth_xhdfs:
yield depth_xhdfs
@contextmanager
def ir(self):
with XHDFS(open(self._data_file_path('IR'), mode='rb')) as ir_xhdfs:
yield ir_xhdfs
class ArkiteRecordingHEF(ArkiteRecording):
def __init__(self, full_name: str, hef_file_name: str, depth_file_name: str, ir_file_name: str):
super().__init__(full_name)
self._hef = hef_file_name
self._depth = depth_file_name
self._ir = ir_file_name
@staticmethod
def _data_file_path(file_name: str) -> str:
return "resources/" + file_name
@contextmanager
def depth(self) -> XHDFS:
with zipfile.ZipFile(self._hef) as hef, \
XHDFS(hef.open(self._data_file_path(self._depth))) as depth_xhdfs:
yield depth_xhdfs
@contextmanager
def ir(self) -> XHDFS:
with zipfile.ZipFile(self._hef) as hef, \
XHDFS(hef.open(self._data_file_path(self._ir))) as ir_xhdfs:
yield ir_xhdfs
class ArkiteDetection(ABC):
def __init__(self, relative_path: str):
self._path = relative_path
self._full_name = relative_path
self._name = os.path.split(relative_path)[-1]
@property
def full_name(self) -> str:
return self._full_name
@property
def detection_name(self) -> str:
return self._name
@abstractmethod
def recordings(self) -> Iterable[ArkiteRecording]:
raise NotImplementedError
class ArkiteDetectionDirectory(ArkiteDetection):
def __init__(self, relative_path: str):
super().__init__(relative_path)
def recordings(self) -> Iterable[ArkiteRecordingDirectory]:
recordings_dir = os.path.join(self._path, 'resources', 'recordings')
for rec_dir_name in listdir(recordings_dir):
name = rec_dir_name
full_name = os.path.join(self.full_name, name)
rec = ArkiteRecordingDirectory(full_name)
yield rec
class ArkiteDetectionHEF(ArkiteDetection):
def __init__(self, relative_path: str):
super().__init__(relative_path)
hef_file_name = next(f for f in listdir(self._path) if f.endswith('.hef'))
self._hef = os.path.join(self._path, hef_file_name)
def recordings(self) -> Iterable[ArkiteRecordingHEF]:
with zipfile.ZipFile(self._hef, mode='r') as hef:
with hef.open(HEF_RECORDING_LIST_FILE) as recording_list:
file_list = json.loads(recording_list.read().decode('utf-8'))
previous_metadata = None
for file_metadata in file_list:
if file_metadata['Extension'] != '.xhdfs' or file_metadata['RemovedId'] != 0:
continue
if previous_metadata is None:
previous_metadata = file_metadata
continue
name, data_type = file_metadata['Name'].rsplit(' ', 1)
prev_name, prev_type = previous_metadata['Name'].rsplit(' ', 1)
assert name == prev_name and data_type != prev_type
full_name = os.path.join(self._full_name, name)
depth_name = (file_metadata if data_type == 'Depth' else previous_metadata)['FileName']
ir_name = (file_metadata if data_type == 'IR' else previous_metadata)['FileName']
previous_metadata = None
rec = ArkiteRecordingHEF(full_name, self._hef, depth_file_name=depth_name, ir_file_name=ir_name)
yield rec
class ArkiteData:
def __init__(self, root_dir: str, use_case_depth: int):
self._root = os.path.abspath(root_dir)
self._use_case_depth = use_case_depth
def use_cases(self) -> Iterable[str]:
use_case_glob = os.path.sep.join('*' * self._use_case_depth)
full_glob = os.path.join(self._root, use_case_glob)
for d in iglob(full_glob):
if os.path.isdir(d):
yield d.replace(self._root, '')[1:]
def detections(self, use_cases: Optional[Union[Iterable[str], str]] = None) -> Iterable[ArkiteDetection]:
if isinstance(use_cases, str):
use_cases = (use_cases,)
if isinstance(use_cases, Iterable):
use_cases = set(use_cases)
for uc in self.use_cases():
if use_cases is not None and uc not in use_cases:
continue
use_case_dir = os.path.join(self._root, uc)
# Check if the directory itself is a detection
detection = self._dir_to_detection(use_case_dir)
if detection:
yield detection
# Check if subdirectories are detections themselves
for dd in listdir(use_case_dir):
detection_dir = os.path.join(use_case_dir, dd)
detection = self._dir_to_detection(detection_dir)
if detection:
yield detection
@staticmethod
def _dir_to_detection(directory: str) -> Optional[ArkiteDetection]:
if is_old_format_detection(directory):
return ArkiteDetectionDirectory(directory)
elif is_new_format_detection(directory):
return ArkiteDetectionHEF(directory)
else:
return None