Read videos from HIM project files
This commit is contained in:
0
__init__.py
Normal file
0
__init__.py
Normal file
@@ -1,17 +1,25 @@
|
||||
import numpy as np
|
||||
from typing import List
|
||||
import cv2
|
||||
from typing import List
|
||||
from data.library import ArkiteData, ArkiteDetectionHEF
|
||||
from cv2util import to_8_bit_image
|
||||
import time
|
||||
|
||||
class BackgroundHeatmap:
|
||||
def __init__(self, capture):
|
||||
def __init__(self, capture, bg_teach_iters=50):
|
||||
self.heatmap = np.array([])
|
||||
self.cap = capture
|
||||
self.backsub = cv2.createBackgroundSubtractorMOG2()
|
||||
# self.backsub = cv2.createBackgroundSubtractorMOG2()
|
||||
self.backsub = cv2.createBackgroundSubtractorKNN()
|
||||
# self.backsub = cv2.bgsegm_BackgroundSubtractorGSOC()
|
||||
# self.backsub = cv2.back()
|
||||
|
||||
# Fill up with first frame
|
||||
ret, frame = self.cap.read()
|
||||
# Suppose the first frame is background, teach it for a few iterations
|
||||
for i in range(bg_teach_iters):
|
||||
ret, frame = self.cap.read()
|
||||
self.backsub.apply(cv2.blur(frame,(5,5)))
|
||||
self.lastframe = self.backsub.apply(frame)
|
||||
self.lastsum = self.to_floats(self.lastframe)
|
||||
|
||||
@@ -33,7 +41,7 @@ class BackgroundHeatmap:
|
||||
|
||||
def update(self):
|
||||
ret, frame = self.cap.read()
|
||||
self.lastframe = self.backsub.apply(frame)
|
||||
self.lastframe = self.backsub.apply(cv2.blur(frame,(5,5)))
|
||||
self.lastsum += self.to_floats(self.lastframe)
|
||||
self.heatmap = self.gray_to_heat(
|
||||
self.float_to_gray(
|
||||
@@ -41,26 +49,43 @@ class BackgroundHeatmap:
|
||||
)
|
||||
)
|
||||
|
||||
cap = cv2.VideoCapture(0)
|
||||
diffsum = BackgroundHeatmap(cap)
|
||||
if __name__ == '__main__':
|
||||
# cap = cv2.VideoCapture(0)
|
||||
projects_path = "C:\\UntrackedGit\\opencv_test\\him_projects"
|
||||
data = ArkiteData(projects_path, 1)
|
||||
for uc in data.use_cases():
|
||||
for detection in data.detections(uc):
|
||||
for recording in detection.recordings():
|
||||
print("New recording: " + str(recording.name))
|
||||
with recording.ir() as ir:
|
||||
for frame in ir.frame_sequence():
|
||||
converted = to_8_bit_image(frame, display_min=100, display_max=8000)
|
||||
# converted = cv2.applyColorMap(frame, colormap=cv2.COLORMAP_JET)
|
||||
cv2.imshow("IR", converted)
|
||||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||||
break
|
||||
# print("Showing frame...")
|
||||
# time.sleep(0.1)
|
||||
|
||||
# Load diffsum up with first
|
||||
first = True
|
||||
while(True):
|
||||
# Update heatmap
|
||||
diffsum.update()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
# Display the resulting frame
|
||||
cv2.imshow('Heatmap', diffsum.heatmap)
|
||||
cv2.imshow('Backsub', diffsum.lastframe)
|
||||
# # Load diffsum up with first
|
||||
# first = True
|
||||
# while(True):
|
||||
# # Update heatmap
|
||||
# diffsum.update()
|
||||
|
||||
if first:
|
||||
cv2.moveWindow("Backsub", 1000, 100)
|
||||
first = False
|
||||
# # Display the resulting frame
|
||||
# cv2.imshow('Heatmap', diffsum.heatmap)
|
||||
# cv2.imshow('Backsub', diffsum.lastframe)
|
||||
|
||||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||||
break
|
||||
# if first:
|
||||
# cv2.moveWindow("Backsub", 1000, 100)
|
||||
# first = False
|
||||
|
||||
# When everything done, release the capture
|
||||
diffsum.cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
# if cv2.waitKey(1) & 0xFF == ord('q'):
|
||||
# break
|
||||
|
||||
# # When everything done, release the capture
|
||||
# diffsum.cap.release()
|
||||
# cv2.destroyAllWindows()
|
||||
67
cv2util.py
Normal file
67
cv2util.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from typing import Optional, Iterable
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
import util as abu
|
||||
from data.xhdfs import MIN_PIXEL_VALUE, MAX_PIXEL_VALUE
|
||||
|
||||
_WHITE = 255
|
||||
|
||||
|
||||
def to_8_bit_image(src_image: np.ndarray,
|
||||
display_min: Optional[int] = None,
|
||||
display_max: Optional[int] = None) -> np.ndarray:
|
||||
"""Copies the image and normalizes pixel values to [0; 255].
|
||||
|
||||
See https://stackoverflow.com/questions/14464449/using-numpy-to-efficiently-convert-16-bit-image-data-to-8-bit-for-display-with"""
|
||||
|
||||
if src_image.dtype == np.uint8: # Do nothing with images that are already 8-bit
|
||||
return src_image
|
||||
|
||||
copy = np.array(src_image, copy=True)
|
||||
|
||||
# Clip pixel values if necessary
|
||||
need_to_clip = display_min is not None or display_max is not None
|
||||
|
||||
if display_min is None:
|
||||
display_min = MIN_PIXEL_VALUE
|
||||
if display_max is None:
|
||||
display_max = MAX_PIXEL_VALUE
|
||||
|
||||
if need_to_clip:
|
||||
copy.clip(display_min, display_max, out=copy)
|
||||
|
||||
copy -= display_min
|
||||
np.floor_divide(copy, (display_max - display_min + 1) / 256, out=copy, casting='unsafe')
|
||||
|
||||
return copy.astype(np.uint8)
|
||||
|
||||
|
||||
def imshow_xhdfs_frame(frame: np.ndarray,
|
||||
window_name: Optional[str] = None,
|
||||
color_map: Optional[int] = None,
|
||||
display_min: Optional[int] = None,
|
||||
display_max: Optional[int] = None) -> None:
|
||||
frame_as_8_bit = to_8_bit_image(frame, display_min=display_min, display_max=display_max)
|
||||
if color_map:
|
||||
frame_as_8_bit = cv2.applyColorMap(frame_as_8_bit, color_map)
|
||||
|
||||
if not window_name:
|
||||
window_name = __name__
|
||||
|
||||
cv2.imshow(window_name, frame_as_8_bit)
|
||||
del frame_as_8_bit
|
||||
|
||||
|
||||
def box_mask(box_points: Iterable[abu.FloatPoint],
|
||||
image_width: Optional[int] = None,
|
||||
image_height: Optional[int] = None) -> np.ndarray:
|
||||
image_width = image_width or abu.FRAME_WIDTH # FIXME: hard-coded dimensions
|
||||
image_height = image_height or abu.FRAME_HEIGHT
|
||||
|
||||
mask = np.zeros(shape=(image_height, image_width), dtype=np.uint8)
|
||||
box_corners = np.asarray(box_points, dtype=np.int32)
|
||||
cv2.fillConvexPoly(mask, box_corners, _WHITE)
|
||||
|
||||
return mask
|
||||
0
data/__init__.py
Normal file
0
data/__init__.py
Normal file
93
data/box.py
Normal file
93
data/box.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import json
|
||||
import os.path
|
||||
import re
|
||||
import zipfile
|
||||
from typing import NamedTuple, Optional, Iterable, List
|
||||
|
||||
from numpy import ndarray
|
||||
|
||||
from arkite_booster.cv2util import box_mask
|
||||
from arkite_booster.util import FloatPoint
|
||||
from arkite_booster.util import bounding_box_centre
|
||||
|
||||
ArkiteBox = NamedTuple(typename='ArkiteBox',
|
||||
fields=[('name', str),
|
||||
('points', List[FloatPoint]),
|
||||
('depth', int),
|
||||
('distance', int),
|
||||
('centre', FloatPoint),
|
||||
('mask', ndarray)])
|
||||
|
||||
HEF_BOX_LIST_FILE = 'data/Model.Entities.Box.json'
|
||||
|
||||
CALIBRATION_BOX_REGEX = re.compile('(Depth|IR) Cal ([0-9]+|- Center).*')
|
||||
GRAB_BOX_PREFIX = 'grab_'
|
||||
|
||||
|
||||
def load_boxes(model_entities_box_path: str,
|
||||
project_ids: Optional[Iterable[int]] = None,
|
||||
box_ids: Optional[Iterable[int]] = None,
|
||||
keep_grab_boxes: bool = True,
|
||||
keep_container_boxes: bool = True,
|
||||
skip_calibration_boxes: bool = True) -> Iterable[ArkiteBox]:
|
||||
model_entities_box_path = os.path.abspath(model_entities_box_path)
|
||||
|
||||
if model_entities_box_path.endswith('.json'):
|
||||
with open(model_entities_box_path, 'r') as mebf:
|
||||
all_boxes = json.load(mebf)
|
||||
elif model_entities_box_path.endswith('.hef'):
|
||||
with zipfile.ZipFile(model_entities_box_path, mode='r') as hef:
|
||||
with hef.open(HEF_BOX_LIST_FILE) as mebf:
|
||||
all_boxes = json.loads(mebf.read().decode('utf-8'))
|
||||
else:
|
||||
raise ValueError('Unknown file format: ' + model_entities_box_path)
|
||||
|
||||
boxes = []
|
||||
for box_json in all_boxes:
|
||||
if project_ids is not None and box_json['ProjectId'] not in project_ids:
|
||||
continue
|
||||
|
||||
if box_ids is not None and box_json['Id'] not in box_ids:
|
||||
continue
|
||||
|
||||
box_name = box_json['Name']
|
||||
if skip_calibration_boxes and CALIBRATION_BOX_REGEX.match(box_name) is not None:
|
||||
continue
|
||||
|
||||
if not keep_grab_boxes and box_name.startswith(GRAB_BOX_PREFIX):
|
||||
continue
|
||||
|
||||
if not keep_container_boxes and not box_name.startswith(GRAB_BOX_PREFIX):
|
||||
continue
|
||||
|
||||
points = [(coordinates["X"], coordinates["Y"]) for coordinates in box_json["DataPoints"]]
|
||||
mask = box_mask(points)
|
||||
centre = bounding_box_centre(points)
|
||||
|
||||
box = ArkiteBox(name=box_name,
|
||||
depth=box_json['Depth'],
|
||||
distance=box_json['Distance'],
|
||||
points=points,
|
||||
mask=mask,
|
||||
centre=centre)
|
||||
boxes.append(box)
|
||||
|
||||
return boxes
|
||||
|
||||
|
||||
def boxes_for_use_case(box_metadata_root_dir: str,
|
||||
use_case: str,
|
||||
keep_grab_boxes: bool = True,
|
||||
keep_container_boxes: bool = True,
|
||||
skip_calibration_boxes: bool = True) -> Iterable[ArkiteBox]:
|
||||
"""Ad-hoc method to match the detection libraty with additional `.hef` files provided by Stijn."""
|
||||
box_metadata_root_dir = os.path.abspath(box_metadata_root_dir)
|
||||
|
||||
use_case_name = use_case.split('/')[-1]
|
||||
use_case_hef = os.path.join(box_metadata_root_dir, use_case_name + '.hef')
|
||||
use_case_boxes = load_boxes(use_case_hef,
|
||||
keep_grab_boxes=keep_grab_boxes,
|
||||
keep_container_boxes=keep_container_boxes,
|
||||
skip_calibration_boxes=skip_calibration_boxes)
|
||||
|
||||
return use_case_boxes
|
||||
71
data/hands.py
Normal file
71
data/hands.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import os.path
|
||||
from collections import OrderedDict
|
||||
from glob import glob
|
||||
from typing import Generator, Union, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
HandMask = np.ndarray
|
||||
HandMaskMaybeWithFileName = Union[HandMask, Tuple[HandMask, str]]
|
||||
|
||||
_CSV_FILE_NAME = 'data.csv'
|
||||
_CSV_COLUMNS = OrderedDict([('frame_number', int),
|
||||
('hand_id', int),
|
||||
('x', float),
|
||||
('y', float),
|
||||
('direction_x', float),
|
||||
('direction_y', float),
|
||||
('distance', float),
|
||||
('detected', bool),
|
||||
('frames_without_detection', int)])
|
||||
|
||||
_HAND_MASK_FILE_GLOB = 'hand_labels_*.png'
|
||||
|
||||
|
||||
def load_coordinates(data_csv_path: str) -> pd.DataFrame:
|
||||
data_csv_path = os.path.abspath(data_csv_path)
|
||||
hand_data = pd.read_csv(data_csv_path,
|
||||
sep=',',
|
||||
header=None,
|
||||
names=_CSV_COLUMNS.keys(),
|
||||
dtype=_CSV_COLUMNS)
|
||||
|
||||
return hand_data
|
||||
|
||||
|
||||
def load_hand_masks(data_dir_path: str,
|
||||
return_file_names: bool = False) -> Generator[HandMaskMaybeWithFileName, None, None]:
|
||||
data_dir_path = os.path.abspath(data_dir_path)
|
||||
|
||||
mask_files = glob(os.path.join(data_dir_path, _HAND_MASK_FILE_GLOB))
|
||||
mask_files = sorted(mask_files)
|
||||
|
||||
for mf in mask_files:
|
||||
hand_mask = cv2.imread(mf, cv2.IMREAD_UNCHANGED)
|
||||
|
||||
if return_file_names:
|
||||
yield hand_mask, mf
|
||||
else:
|
||||
yield hand_mask
|
||||
|
||||
|
||||
class RecordingHandTracking:
|
||||
def __init__(self, path: str):
|
||||
self._path = os.path.abspath(path)
|
||||
|
||||
def hand_coordinates(self) -> pd.DataFrame:
|
||||
return load_coordinates(os.path.join(self._path, _CSV_FILE_NAME))
|
||||
|
||||
def hand_mask_sequence(self, return_file_names: bool = False) -> Generator[HandMaskMaybeWithFileName, None, None]:
|
||||
return load_hand_masks(self._path, return_file_names)
|
||||
|
||||
|
||||
for mf in mask_files:
|
||||
hand_mask = cv2.imread(mf, cv2.IMREAD_UNCHANGED)
|
||||
|
||||
if return_file_names:
|
||||
yield hand_mask, mf
|
||||
else:
|
||||
yield hand_mask
|
||||
214
data/library.py
Normal file
214
data/library.py
Normal file
@@ -0,0 +1,214 @@
|
||||
import json
|
||||
import os.path
|
||||
import zipfile
|
||||
from abc import ABC, abstractmethod
|
||||
from contextlib import contextmanager
|
||||
from glob import iglob
|
||||
from os import listdir
|
||||
from typing import Iterable, Optional, Union
|
||||
|
||||
from data.xhdfs import XHDFS
|
||||
|
||||
HEF_RECORDING_LIST_FILE = 'data/Model.Entities.Resource.json'
|
||||
|
||||
TYPES = ['Activity', 'Button', 'Container', 'Process', 'Tool']
|
||||
TYPE_SHORTHANDS = {t[0]: t for t in TYPES}
|
||||
|
||||
|
||||
def is_old_format_detection(path: str) -> bool:
|
||||
"""Returns `True` if `path` points to a directory that contains a `.him` file and a recordings directory."""
|
||||
if not os.path.isdir(path):
|
||||
return False
|
||||
return os.path.isdir(os.path.join(path, 'resources', 'recordings')) \
|
||||
and any(f.endswith('.him') for f in listdir(path))
|
||||
|
||||
|
||||
def is_new_format_detection(path: str) -> bool:
|
||||
"""Returns `True` if `path` points to a directory that contains a `.hef` file."""
|
||||
if not os.path.isdir(path):
|
||||
return False
|
||||
return any(f.endswith('.hef') for f in listdir(path))
|
||||
|
||||
|
||||
class ArkiteRecording(ABC):
|
||||
def __init__(self, full_name: str):
|
||||
self._full_name = full_name
|
||||
self._name = os.path.split(full_name)[-1]
|
||||
self._detection_type = TYPE_SHORTHANDS.get(self._name[0], None)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def full_name(self) -> str:
|
||||
return self._full_name
|
||||
|
||||
@property
|
||||
def detection_type(self) -> str:
|
||||
return self._detection_type
|
||||
|
||||
@abstractmethod
|
||||
def depth(self) -> XHDFS:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def ir(self) -> XHDFS:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ArkiteRecordingDirectory(ArkiteRecording):
|
||||
def __init__(self, full_name: str):
|
||||
super().__init__(full_name)
|
||||
|
||||
def _data_file_path(self, data_type: str) -> str:
|
||||
*location, name = os.path.split(self._full_name)
|
||||
return os.path.join(*location, 'resources', 'recordings', name, data_type + os.path.extsep + 'xhdfs')
|
||||
|
||||
@contextmanager
|
||||
def depth(self):
|
||||
with XHDFS(open(self._data_file_path('Depth'), mode='rb')) as depth_xhdfs:
|
||||
yield depth_xhdfs
|
||||
|
||||
@contextmanager
|
||||
def ir(self):
|
||||
with XHDFS(open(self._data_file_path('IR'), mode='rb')) as ir_xhdfs:
|
||||
yield ir_xhdfs
|
||||
|
||||
|
||||
class ArkiteRecordingHEF(ArkiteRecording):
|
||||
def __init__(self, full_name: str, hef_file_name: str, depth_file_name: str, ir_file_name: str):
|
||||
super().__init__(full_name)
|
||||
self._hef = hef_file_name
|
||||
self._depth = depth_file_name
|
||||
self._ir = ir_file_name
|
||||
|
||||
@staticmethod
|
||||
def _data_file_path(file_name: str) -> str:
|
||||
return "resources/" + file_name
|
||||
|
||||
@contextmanager
|
||||
def depth(self) -> XHDFS:
|
||||
with zipfile.ZipFile(self._hef) as hef, \
|
||||
XHDFS(hef.open(self._data_file_path(self._depth))) as depth_xhdfs:
|
||||
yield depth_xhdfs
|
||||
|
||||
@contextmanager
|
||||
def ir(self) -> XHDFS:
|
||||
with zipfile.ZipFile(self._hef) as hef, \
|
||||
XHDFS(hef.open(self._data_file_path(self._ir))) as ir_xhdfs:
|
||||
yield ir_xhdfs
|
||||
|
||||
|
||||
class ArkiteDetection(ABC):
|
||||
def __init__(self, relative_path: str):
|
||||
self._path = relative_path
|
||||
self._full_name = relative_path
|
||||
self._name = os.path.split(relative_path)[-1]
|
||||
|
||||
@property
|
||||
def full_name(self) -> str:
|
||||
return self._full_name
|
||||
|
||||
@property
|
||||
def detection_name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@abstractmethod
|
||||
def recordings(self) -> Iterable[ArkiteRecording]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ArkiteDetectionDirectory(ArkiteDetection):
|
||||
def __init__(self, relative_path: str):
|
||||
super().__init__(relative_path)
|
||||
|
||||
def recordings(self) -> Iterable[ArkiteRecordingDirectory]:
|
||||
recordings_dir = os.path.join(self._path, 'resources', 'recordings')
|
||||
|
||||
for rec_dir_name in listdir(recordings_dir):
|
||||
name = rec_dir_name
|
||||
full_name = os.path.join(self.full_name, name)
|
||||
rec = ArkiteRecordingDirectory(full_name)
|
||||
yield rec
|
||||
|
||||
|
||||
class ArkiteDetectionHEF(ArkiteDetection):
|
||||
def __init__(self, relative_path: str):
|
||||
super().__init__(relative_path)
|
||||
|
||||
hef_file_name = next(f for f in listdir(self._path) if f.endswith('.hef'))
|
||||
self._hef = os.path.join(self._path, hef_file_name)
|
||||
|
||||
def recordings(self) -> Iterable[ArkiteRecordingHEF]:
|
||||
with zipfile.ZipFile(self._hef, mode='r') as hef:
|
||||
with hef.open(HEF_RECORDING_LIST_FILE) as recording_list:
|
||||
file_list = json.loads(recording_list.read().decode('utf-8'))
|
||||
|
||||
previous_metadata = None
|
||||
for file_metadata in file_list:
|
||||
if file_metadata['Extension'] != '.xhdfs' or file_metadata['RemovedId'] != 0:
|
||||
continue
|
||||
|
||||
if previous_metadata is None:
|
||||
previous_metadata = file_metadata
|
||||
continue
|
||||
|
||||
name, data_type = file_metadata['Name'].rsplit(' ', 1)
|
||||
prev_name, prev_type = previous_metadata['Name'].rsplit(' ', 1)
|
||||
|
||||
assert name == prev_name and data_type != prev_type
|
||||
full_name = os.path.join(self._full_name, name)
|
||||
depth_name = (file_metadata if data_type == 'Depth' else previous_metadata)['FileName']
|
||||
ir_name = (file_metadata if data_type == 'IR' else previous_metadata)['FileName']
|
||||
|
||||
previous_metadata = None
|
||||
|
||||
rec = ArkiteRecordingHEF(full_name, self._hef, depth_file_name=depth_name, ir_file_name=ir_name)
|
||||
yield rec
|
||||
|
||||
|
||||
class ArkiteData:
|
||||
def __init__(self, root_dir: str, use_case_depth: int):
|
||||
self._root = os.path.abspath(root_dir)
|
||||
self._use_case_depth = use_case_depth
|
||||
|
||||
def use_cases(self) -> Iterable[str]:
|
||||
use_case_glob = os.path.sep.join('*' * self._use_case_depth)
|
||||
full_glob = os.path.join(self._root, use_case_glob)
|
||||
for d in iglob(full_glob):
|
||||
if os.path.isdir(d):
|
||||
yield d.replace(self._root, '')[1:]
|
||||
|
||||
def detections(self, use_cases: Optional[Union[Iterable[str], str]] = None) -> Iterable[ArkiteDetection]:
|
||||
if isinstance(use_cases, str):
|
||||
use_cases = (use_cases,)
|
||||
if isinstance(use_cases, Iterable):
|
||||
use_cases = set(use_cases)
|
||||
|
||||
for uc in self.use_cases():
|
||||
if use_cases is not None and uc not in use_cases:
|
||||
continue
|
||||
|
||||
use_case_dir = os.path.join(self._root, uc)
|
||||
|
||||
# Check if the directory itself is a detection
|
||||
detection = self._dir_to_detection(use_case_dir)
|
||||
if detection:
|
||||
yield detection
|
||||
|
||||
# Check if subdirectories are detections themselves
|
||||
for dd in listdir(use_case_dir):
|
||||
detection_dir = os.path.join(use_case_dir, dd)
|
||||
detection = self._dir_to_detection(detection_dir)
|
||||
if detection:
|
||||
yield detection
|
||||
|
||||
@staticmethod
|
||||
def _dir_to_detection(directory: str) -> Optional[ArkiteDetection]:
|
||||
if is_old_format_detection(directory):
|
||||
return ArkiteDetectionDirectory(directory)
|
||||
elif is_new_format_detection(directory):
|
||||
return ArkiteDetectionHEF(directory)
|
||||
else:
|
||||
return None
|
||||
92
data/xhdfs.py
Normal file
92
data/xhdfs.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from typing import NamedTuple, Generator, Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
CAMERA_NODE_TYPES = ('unknown', 'depth', 'ir', 'rgb', 'rgb_luminance', 'rgb_mapped')
|
||||
|
||||
HEADER_BYTES = 32
|
||||
BYTES_PER_SHORT = 2
|
||||
TIMESTAMP_BYTES = 8
|
||||
RAW_UNRELIABLE_PIXEL_VALUE = -11
|
||||
|
||||
ENDIANNESS = 'little'
|
||||
|
||||
# Pixels are stored as signed 16-bit integers with little endianness.
|
||||
# However, the only valid negative value is `-11` (unreliable pixel), which should just be converted into the
|
||||
# corresponding unsigned value, without any loss of information
|
||||
SOURCE_NUMPY_DATATYPE = np.dtype('<i2')
|
||||
TARGET_NUMPY_DATATYPE = np.dtype('=u2') # TODO: is this cross-platform?
|
||||
MIN_PIXEL_VALUE = np.iinfo(TARGET_NUMPY_DATATYPE).min
|
||||
MAX_PIXEL_VALUE = np.iinfo(TARGET_NUMPY_DATATYPE).max
|
||||
UNRELIABLE_PIXEL_VALUE = np.cast[TARGET_NUMPY_DATATYPE](RAW_UNRELIABLE_PIXEL_VALUE).min()
|
||||
|
||||
XHDFSHeader = NamedTuple(typename='XHDFSHeader',
|
||||
fields=[('n_frames', int),
|
||||
('width', int),
|
||||
('height', int),
|
||||
('version_number', int),
|
||||
('camera_node_type', str)])
|
||||
|
||||
|
||||
def read_unsigned_integer(fileobj, n_bytes: int = 4, signed=False) -> int:
|
||||
return int.from_bytes(fileobj.read(n_bytes), ENDIANNESS, signed=signed)
|
||||
|
||||
|
||||
def bytes_to_image(frame_bytes: bytes, width: int, height: int) -> np.ndarray:
|
||||
frame_bytes = frame_bytes[8:] # Cut off the timestamp
|
||||
|
||||
# Height & width in that order match what OpenCV expects
|
||||
frame_image = np.frombuffer(frame_bytes, SOURCE_NUMPY_DATATYPE).reshape(height, width)
|
||||
frame_image = frame_image.astype(TARGET_NUMPY_DATATYPE, casting='unsafe', copy=False)
|
||||
frame_image[frame_image == UNRELIABLE_PIXEL_VALUE] = 0
|
||||
|
||||
return frame_image
|
||||
|
||||
|
||||
class XHDFS:
|
||||
def __init__(self, fileobj):
|
||||
self._fileobj = fileobj
|
||||
|
||||
self._header = None
|
||||
self._pixels_per_frame = -1
|
||||
|
||||
def __enter__(self):
|
||||
header = self._read_header()
|
||||
|
||||
self._header = header
|
||||
self._pixels_per_frame = header.width * header.height * BYTES_PER_SHORT + TIMESTAMP_BYTES
|
||||
|
||||
return self
|
||||
|
||||
def _read_header(self) -> XHDFSHeader:
|
||||
n_frames = read_unsigned_integer(self._fileobj)
|
||||
width = read_unsigned_integer(self._fileobj)
|
||||
height = read_unsigned_integer(self._fileobj)
|
||||
|
||||
# Skip `CameraNodeId` & `WithBodies`
|
||||
_ = self._fileobj.read(4 + 1)
|
||||
|
||||
version = read_unsigned_integer(self._fileobj)
|
||||
node_type = read_unsigned_integer(self._fileobj, n_bytes=2)
|
||||
node_type = CAMERA_NODE_TYPES[node_type]
|
||||
|
||||
# Skip `CameraNodeSerial` & 3 "spare" bytes
|
||||
_ = self._fileobj.read(6 + 3)
|
||||
|
||||
header = XHDFSHeader(n_frames, width, height, version, node_type)
|
||||
return header
|
||||
|
||||
def _read_frame_bytes(self) -> bytes:
|
||||
return self._fileobj.read(self._pixels_per_frame)
|
||||
|
||||
def frame_sequence(self) -> Generator[np.ndarray, None, None]:
|
||||
w, h = self._header.width, self._header.height
|
||||
for _ in range(self._header.n_frames):
|
||||
frame_bytes = self._read_frame_bytes()
|
||||
frame = bytes_to_image(frame_bytes, w, h)
|
||||
del frame_bytes
|
||||
yield frame
|
||||
pass
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self._fileobj.close()
|
||||
26
util.py
Normal file
26
util.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from typing import Dict, Any, Tuple, Union, Iterable
|
||||
|
||||
IntPoint = Tuple[int, int]
|
||||
FloatPoint = Tuple[float, float]
|
||||
Point = Union[IntPoint, FloatPoint]
|
||||
|
||||
AlgorithmParams = Dict[str, Any]
|
||||
|
||||
FRAME_SHAPE = (512, 424) # TODO: hardcoded
|
||||
FRAME_WIDTH, FRAME_HEIGHT = FRAME_SHAPE
|
||||
|
||||
|
||||
def bounding_box_centre(points: Iterable[FloatPoint]) -> FloatPoint:
|
||||
n = 0
|
||||
x_sum = 0.0
|
||||
y_sum = 0.0
|
||||
|
||||
for x, y in points:
|
||||
n += 1
|
||||
x_sum += x
|
||||
y_sum += y
|
||||
|
||||
x_centre = x_sum / n
|
||||
y_centre = y_sum / n
|
||||
|
||||
return x_centre, y_centre
|
||||
Reference in New Issue
Block a user