11 Commits

Author SHA1 Message Date
Bart Moyaers
ef4d419b20 fix unexisting import 2019-12-06 11:54:32 +01:00
Bart Moyaers
e2c7974f8a test blob detection 2019-12-06 11:48:04 +01:00
Bart Moyaers
6fcdc0f71c separate script from class 2019-11-29 11:27:11 +01:00
Bart Moyaers
c4170aa3b5 add separate video loader class 2019-11-29 10:32:37 +01:00
Bart Moyaers
8ac9d97398 ignore video files 2019-11-29 09:36:17 +01:00
Bart Moyaers
d2098810df correct blur and learning rate 2019-11-29 09:35:33 +01:00
Bart Moyaers
7b42ffef8a correct getrecordings method 2019-11-27 10:01:35 +01:00
Bart Moyaers
99ae058d01 Record combined video with all actions 2019-11-15 11:56:57 +01:00
Bart Moyaers
b8255e285d group recordings by name 2019-11-15 10:59:02 +01:00
Bart Moyaers
e4ba433a56 Read videos from HIM project files 2019-11-08 17:14:27 +01:00
Bart Moyaers
17360b2fd5 update gitignore 2019-11-08 17:13:58 +01:00
15 changed files with 776 additions and 66 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
*.pyc
__pycache__
him_projects
*.hef
*.avi

0
__init__.py Normal file
View File

52
background_heatmap.py Normal file
View File

@@ -0,0 +1,52 @@
import numpy as np
import cv2
import time
from typing import List, Iterable
from video_loader import VideoLoader
import numpy
from data.library import ArkiteData
from cv2util import to_8_bit_image
class BackgroundHeatmap:
def __init__(self, frames, append_blur=True, learning_rate=0.0003):
self.append_blur = append_blur
self.frames = frames
self.lastframe = None
self.learning_rate = learning_rate
self.heatmap = np.array([])
self.backsub = cv2.createBackgroundSubtractorMOG2()
self.teach_background()
self.lastsum = self.to_floats(self.lastframe)
@staticmethod
def to_grayscale(frame):
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
@staticmethod
def to_floats(frame):
return cv2.normalize(frame, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
@staticmethod
def float_to_gray(frame):
return cv2.normalize(frame, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8UC1)
@staticmethod
def gray_to_heat(frame):
return cv2.applyColorMap(frame, cv2.COLORMAP_JET)
def update(self, frame):
if self.append_blur:
frame = cv2.blur(frame,(5,5))
self.backsub.apply(frame, self.lastframe, self.learning_rate)
self.lastsum += self.to_floats(self.lastframe)
self.heatmap = self.gray_to_heat(
self.float_to_gray(
self.to_floats(self.lastsum)
)
)
def teach_background(self):
for frame in self.frames:
if self.append_blur:
frame = cv2.blur(frame,(5,5))
self.lastframe = self.backsub.apply(frame, None, self.learning_rate)

View File

@@ -1,66 +0,0 @@
import numpy as np
from typing import List
import cv2
class BackgroundHeatmap:
def __init__(self, capture):
self.heatmap = np.array([])
self.cap = capture
self.backsub = cv2.createBackgroundSubtractorMOG2()
# self.backsub = cv2.bgsegm_BackgroundSubtractorGSOC()
# self.backsub = cv2.back()
# Fill up with first frame
ret, frame = self.cap.read()
self.lastframe = self.backsub.apply(frame)
self.lastsum = self.to_floats(self.lastframe)
@staticmethod
def to_grayscale(frame):
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
@staticmethod
def to_floats(frame):
return cv2.normalize(frame, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
@staticmethod
def float_to_gray(frame):
return cv2.normalize(frame, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8UC1)
@staticmethod
def gray_to_heat(frame):
return cv2.applyColorMap(frame, cv2.COLORMAP_JET)
def update(self):
ret, frame = self.cap.read()
self.lastframe = self.backsub.apply(frame)
self.lastsum += self.to_floats(self.lastframe)
self.heatmap = self.gray_to_heat(
self.float_to_gray(
self.to_floats(self.lastsum)
)
)
cap = cv2.VideoCapture(0)
diffsum = BackgroundHeatmap(cap)
# Load diffsum up with first
first = True
while(True):
# Update heatmap
diffsum.update()
# Display the resulting frame
cv2.imshow('Heatmap', diffsum.heatmap)
cv2.imshow('Backsub', diffsum.lastframe)
if first:
cv2.moveWindow("Backsub", 1000, 100)
first = False
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# When everything done, release the capture
diffsum.cap.release()
cv2.destroyAllWindows()

67
cv2util.py Normal file
View File

@@ -0,0 +1,67 @@
from typing import Optional, Iterable
import cv2
import numpy as np
import util as abu
from data.xhdfs import MIN_PIXEL_VALUE, MAX_PIXEL_VALUE
_WHITE = 255
def to_8_bit_image(src_image: np.ndarray,
display_min: Optional[int] = None,
display_max: Optional[int] = None) -> np.ndarray:
"""Copies the image and normalizes pixel values to [0; 255].
See https://stackoverflow.com/questions/14464449/using-numpy-to-efficiently-convert-16-bit-image-data-to-8-bit-for-display-with"""
if src_image.dtype == np.uint8: # Do nothing with images that are already 8-bit
return src_image
copy = np.array(src_image, copy=True)
# Clip pixel values if necessary
need_to_clip = display_min is not None or display_max is not None
if display_min is None:
display_min = MIN_PIXEL_VALUE
if display_max is None:
display_max = MAX_PIXEL_VALUE
if need_to_clip:
copy.clip(display_min, display_max, out=copy)
copy -= display_min
np.floor_divide(copy, (display_max - display_min + 1) / 256, out=copy, casting='unsafe')
return copy.astype(np.uint8)
def imshow_xhdfs_frame(frame: np.ndarray,
window_name: Optional[str] = None,
color_map: Optional[int] = None,
display_min: Optional[int] = None,
display_max: Optional[int] = None) -> None:
frame_as_8_bit = to_8_bit_image(frame, display_min=display_min, display_max=display_max)
if color_map:
frame_as_8_bit = cv2.applyColorMap(frame_as_8_bit, color_map)
if not window_name:
window_name = __name__
cv2.imshow(window_name, frame_as_8_bit)
del frame_as_8_bit
def box_mask(box_points: Iterable[abu.FloatPoint],
image_width: Optional[int] = None,
image_height: Optional[int] = None) -> np.ndarray:
image_width = image_width or abu.FRAME_WIDTH # FIXME: hard-coded dimensions
image_height = image_height or abu.FRAME_HEIGHT
mask = np.zeros(shape=(image_height, image_width), dtype=np.uint8)
box_corners = np.asarray(box_points, dtype=np.int32)
cv2.fillConvexPoly(mask, box_corners, _WHITE)
return mask

0
data/__init__.py Normal file
View File

93
data/box.py Normal file
View File

@@ -0,0 +1,93 @@
import json
import os.path
import re
import zipfile
from typing import NamedTuple, Optional, Iterable, List
from numpy import ndarray
from arkite_booster.cv2util import box_mask
from arkite_booster.util import FloatPoint
from arkite_booster.util import bounding_box_centre
ArkiteBox = NamedTuple(typename='ArkiteBox',
fields=[('name', str),
('points', List[FloatPoint]),
('depth', int),
('distance', int),
('centre', FloatPoint),
('mask', ndarray)])
HEF_BOX_LIST_FILE = 'data/Model.Entities.Box.json'
CALIBRATION_BOX_REGEX = re.compile('(Depth|IR) Cal ([0-9]+|- Center).*')
GRAB_BOX_PREFIX = 'grab_'
def load_boxes(model_entities_box_path: str,
project_ids: Optional[Iterable[int]] = None,
box_ids: Optional[Iterable[int]] = None,
keep_grab_boxes: bool = True,
keep_container_boxes: bool = True,
skip_calibration_boxes: bool = True) -> Iterable[ArkiteBox]:
model_entities_box_path = os.path.abspath(model_entities_box_path)
if model_entities_box_path.endswith('.json'):
with open(model_entities_box_path, 'r') as mebf:
all_boxes = json.load(mebf)
elif model_entities_box_path.endswith('.hef'):
with zipfile.ZipFile(model_entities_box_path, mode='r') as hef:
with hef.open(HEF_BOX_LIST_FILE) as mebf:
all_boxes = json.loads(mebf.read().decode('utf-8'))
else:
raise ValueError('Unknown file format: ' + model_entities_box_path)
boxes = []
for box_json in all_boxes:
if project_ids is not None and box_json['ProjectId'] not in project_ids:
continue
if box_ids is not None and box_json['Id'] not in box_ids:
continue
box_name = box_json['Name']
if skip_calibration_boxes and CALIBRATION_BOX_REGEX.match(box_name) is not None:
continue
if not keep_grab_boxes and box_name.startswith(GRAB_BOX_PREFIX):
continue
if not keep_container_boxes and not box_name.startswith(GRAB_BOX_PREFIX):
continue
points = [(coordinates["X"], coordinates["Y"]) for coordinates in box_json["DataPoints"]]
mask = box_mask(points)
centre = bounding_box_centre(points)
box = ArkiteBox(name=box_name,
depth=box_json['Depth'],
distance=box_json['Distance'],
points=points,
mask=mask,
centre=centre)
boxes.append(box)
return boxes
def boxes_for_use_case(box_metadata_root_dir: str,
use_case: str,
keep_grab_boxes: bool = True,
keep_container_boxes: bool = True,
skip_calibration_boxes: bool = True) -> Iterable[ArkiteBox]:
"""Ad-hoc method to match the detection libraty with additional `.hef` files provided by Stijn."""
box_metadata_root_dir = os.path.abspath(box_metadata_root_dir)
use_case_name = use_case.split('/')[-1]
use_case_hef = os.path.join(box_metadata_root_dir, use_case_name + '.hef')
use_case_boxes = load_boxes(use_case_hef,
keep_grab_boxes=keep_grab_boxes,
keep_container_boxes=keep_container_boxes,
skip_calibration_boxes=skip_calibration_boxes)
return use_case_boxes

71
data/hands.py Normal file
View File

@@ -0,0 +1,71 @@
import os.path
from collections import OrderedDict
from glob import glob
from typing import Generator, Union, Tuple
import cv2
import numpy as np
import pandas as pd
HandMask = np.ndarray
HandMaskMaybeWithFileName = Union[HandMask, Tuple[HandMask, str]]
_CSV_FILE_NAME = 'data.csv'
_CSV_COLUMNS = OrderedDict([('frame_number', int),
('hand_id', int),
('x', float),
('y', float),
('direction_x', float),
('direction_y', float),
('distance', float),
('detected', bool),
('frames_without_detection', int)])
_HAND_MASK_FILE_GLOB = 'hand_labels_*.png'
def load_coordinates(data_csv_path: str) -> pd.DataFrame:
data_csv_path = os.path.abspath(data_csv_path)
hand_data = pd.read_csv(data_csv_path,
sep=',',
header=None,
names=_CSV_COLUMNS.keys(),
dtype=_CSV_COLUMNS)
return hand_data
def load_hand_masks(data_dir_path: str,
return_file_names: bool = False) -> Generator[HandMaskMaybeWithFileName, None, None]:
data_dir_path = os.path.abspath(data_dir_path)
mask_files = glob(os.path.join(data_dir_path, _HAND_MASK_FILE_GLOB))
mask_files = sorted(mask_files)
for mf in mask_files:
hand_mask = cv2.imread(mf, cv2.IMREAD_UNCHANGED)
if return_file_names:
yield hand_mask, mf
else:
yield hand_mask
class RecordingHandTracking:
def __init__(self, path: str):
self._path = os.path.abspath(path)
def hand_coordinates(self) -> pd.DataFrame:
return load_coordinates(os.path.join(self._path, _CSV_FILE_NAME))
def hand_mask_sequence(self, return_file_names: bool = False) -> Generator[HandMaskMaybeWithFileName, None, None]:
return load_hand_masks(self._path, return_file_names)
for mf in mask_files:
hand_mask = cv2.imread(mf, cv2.IMREAD_UNCHANGED)
if return_file_names:
yield hand_mask, mf
else:
yield hand_mask

214
data/library.py Normal file
View File

@@ -0,0 +1,214 @@
import json
import os.path
import zipfile
from abc import ABC, abstractmethod
from contextlib import contextmanager
from glob import iglob
from os import listdir
from typing import Iterable, Optional, Union
from data.xhdfs import XHDFS
HEF_RECORDING_LIST_FILE = 'data/Model.Entities.Resource.json'
TYPES = ['Activity', 'Button', 'Container', 'Process', 'Tool']
TYPE_SHORTHANDS = {t[0]: t for t in TYPES}
def is_old_format_detection(path: str) -> bool:
"""Returns `True` if `path` points to a directory that contains a `.him` file and a recordings directory."""
if not os.path.isdir(path):
return False
return os.path.isdir(os.path.join(path, 'resources', 'recordings')) \
and any(f.endswith('.him') for f in listdir(path))
def is_new_format_detection(path: str) -> bool:
"""Returns `True` if `path` points to a directory that contains a `.hef` file."""
if not os.path.isdir(path):
return False
return any(f.endswith('.hef') for f in listdir(path))
class ArkiteRecording(ABC):
def __init__(self, full_name: str):
self._full_name = full_name
self._name = os.path.split(full_name)[-1]
self._detection_type = TYPE_SHORTHANDS.get(self._name[0], None)
@property
def name(self) -> str:
return self._name
@property
def full_name(self) -> str:
return self._full_name
@property
def detection_type(self) -> str:
return self._detection_type
@abstractmethod
def depth(self) -> XHDFS:
raise NotImplementedError
@abstractmethod
def ir(self) -> XHDFS:
raise NotImplementedError
class ArkiteRecordingDirectory(ArkiteRecording):
def __init__(self, full_name: str):
super().__init__(full_name)
def _data_file_path(self, data_type: str) -> str:
*location, name = os.path.split(self._full_name)
return os.path.join(*location, 'resources', 'recordings', name, data_type + os.path.extsep + 'xhdfs')
@contextmanager
def depth(self):
with XHDFS(open(self._data_file_path('Depth'), mode='rb')) as depth_xhdfs:
yield depth_xhdfs
@contextmanager
def ir(self):
with XHDFS(open(self._data_file_path('IR'), mode='rb')) as ir_xhdfs:
yield ir_xhdfs
class ArkiteRecordingHEF(ArkiteRecording):
def __init__(self, full_name: str, hef_file_name: str, depth_file_name: str, ir_file_name: str):
super().__init__(full_name)
self._hef = hef_file_name
self._depth = depth_file_name
self._ir = ir_file_name
@staticmethod
def _data_file_path(file_name: str) -> str:
return "resources/" + file_name
@contextmanager
def depth(self) -> XHDFS:
with zipfile.ZipFile(self._hef) as hef, \
XHDFS(hef.open(self._data_file_path(self._depth))) as depth_xhdfs:
yield depth_xhdfs
@contextmanager
def ir(self) -> XHDFS:
with zipfile.ZipFile(self._hef) as hef, \
XHDFS(hef.open(self._data_file_path(self._ir))) as ir_xhdfs:
yield ir_xhdfs
class ArkiteDetection(ABC):
def __init__(self, relative_path: str):
self._path = relative_path
self._full_name = relative_path
self._name = os.path.split(relative_path)[-1]
@property
def full_name(self) -> str:
return self._full_name
@property
def detection_name(self) -> str:
return self._name
@abstractmethod
def recordings(self) -> Iterable[ArkiteRecording]:
raise NotImplementedError
class ArkiteDetectionDirectory(ArkiteDetection):
def __init__(self, relative_path: str):
super().__init__(relative_path)
def recordings(self) -> Iterable[ArkiteRecordingDirectory]:
recordings_dir = os.path.join(self._path, 'resources', 'recordings')
for rec_dir_name in listdir(recordings_dir):
name = rec_dir_name
full_name = os.path.join(self.full_name, name)
rec = ArkiteRecordingDirectory(full_name)
yield rec
class ArkiteDetectionHEF(ArkiteDetection):
def __init__(self, relative_path: str):
super().__init__(relative_path)
hef_file_name = next(f for f in listdir(self._path) if f.endswith('.hef'))
self._hef = os.path.join(self._path, hef_file_name)
def recordings(self) -> Iterable[ArkiteRecordingHEF]:
with zipfile.ZipFile(self._hef, mode='r') as hef:
with hef.open(HEF_RECORDING_LIST_FILE) as recording_list:
file_list = json.loads(recording_list.read().decode('utf-8'))
previous_metadata = None
for file_metadata in file_list:
if file_metadata['Extension'] != '.xhdfs' or file_metadata['RemovedId'] != 0:
continue
if previous_metadata is None:
previous_metadata = file_metadata
continue
name, data_type = file_metadata['Name'].rsplit(' ', 1)
prev_name, prev_type = previous_metadata['Name'].rsplit(' ', 1)
assert name == prev_name and data_type != prev_type
full_name = os.path.join(self._full_name, name)
depth_name = (file_metadata if data_type == 'Depth' else previous_metadata)['FileName']
ir_name = (file_metadata if data_type == 'IR' else previous_metadata)['FileName']
previous_metadata = None
rec = ArkiteRecordingHEF(full_name, self._hef, depth_file_name=depth_name, ir_file_name=ir_name)
yield rec
class ArkiteData:
def __init__(self, root_dir: str, use_case_depth: int):
self._root = os.path.abspath(root_dir)
self._use_case_depth = use_case_depth
def use_cases(self) -> Iterable[str]:
use_case_glob = os.path.sep.join('*' * self._use_case_depth)
full_glob = os.path.join(self._root, use_case_glob)
for d in iglob(full_glob):
if os.path.isdir(d):
yield d.replace(self._root, '')[1:]
def detections(self, use_cases: Optional[Union[Iterable[str], str]] = None) -> Iterable[ArkiteDetection]:
if isinstance(use_cases, str):
use_cases = (use_cases,)
if isinstance(use_cases, Iterable):
use_cases = set(use_cases)
for uc in self.use_cases():
if use_cases is not None and uc not in use_cases:
continue
use_case_dir = os.path.join(self._root, uc)
# Check if the directory itself is a detection
detection = self._dir_to_detection(use_case_dir)
if detection:
yield detection
# Check if subdirectories are detections themselves
for dd in listdir(use_case_dir):
detection_dir = os.path.join(use_case_dir, dd)
detection = self._dir_to_detection(detection_dir)
if detection:
yield detection
@staticmethod
def _dir_to_detection(directory: str) -> Optional[ArkiteDetection]:
if is_old_format_detection(directory):
return ArkiteDetectionDirectory(directory)
elif is_new_format_detection(directory):
return ArkiteDetectionHEF(directory)
else:
return None

92
data/xhdfs.py Normal file
View File

@@ -0,0 +1,92 @@
from typing import NamedTuple, Generator, Any
import numpy as np
CAMERA_NODE_TYPES = ('unknown', 'depth', 'ir', 'rgb', 'rgb_luminance', 'rgb_mapped')
HEADER_BYTES = 32
BYTES_PER_SHORT = 2
TIMESTAMP_BYTES = 8
RAW_UNRELIABLE_PIXEL_VALUE = -11
ENDIANNESS = 'little'
# Pixels are stored as signed 16-bit integers with little endianness.
# However, the only valid negative value is `-11` (unreliable pixel), which should just be converted into the
# corresponding unsigned value, without any loss of information
SOURCE_NUMPY_DATATYPE = np.dtype('<i2')
TARGET_NUMPY_DATATYPE = np.dtype('=u2') # TODO: is this cross-platform?
MIN_PIXEL_VALUE = np.iinfo(TARGET_NUMPY_DATATYPE).min
MAX_PIXEL_VALUE = np.iinfo(TARGET_NUMPY_DATATYPE).max
UNRELIABLE_PIXEL_VALUE = np.cast[TARGET_NUMPY_DATATYPE](RAW_UNRELIABLE_PIXEL_VALUE).min()
XHDFSHeader = NamedTuple(typename='XHDFSHeader',
fields=[('n_frames', int),
('width', int),
('height', int),
('version_number', int),
('camera_node_type', str)])
def read_unsigned_integer(fileobj, n_bytes: int = 4, signed=False) -> int:
return int.from_bytes(fileobj.read(n_bytes), ENDIANNESS, signed=signed)
def bytes_to_image(frame_bytes: bytes, width: int, height: int) -> np.ndarray:
frame_bytes = frame_bytes[8:] # Cut off the timestamp
# Height & width in that order match what OpenCV expects
frame_image = np.frombuffer(frame_bytes, SOURCE_NUMPY_DATATYPE).reshape(height, width)
frame_image = frame_image.astype(TARGET_NUMPY_DATATYPE, casting='unsafe', copy=False)
frame_image[frame_image == UNRELIABLE_PIXEL_VALUE] = 0
return frame_image
class XHDFS:
def __init__(self, fileobj):
self._fileobj = fileobj
self._header = None
self._pixels_per_frame = -1
def __enter__(self):
header = self._read_header()
self._header = header
self._pixels_per_frame = header.width * header.height * BYTES_PER_SHORT + TIMESTAMP_BYTES
return self
def _read_header(self) -> XHDFSHeader:
n_frames = read_unsigned_integer(self._fileobj)
width = read_unsigned_integer(self._fileobj)
height = read_unsigned_integer(self._fileobj)
# Skip `CameraNodeId` & `WithBodies`
_ = self._fileobj.read(4 + 1)
version = read_unsigned_integer(self._fileobj)
node_type = read_unsigned_integer(self._fileobj, n_bytes=2)
node_type = CAMERA_NODE_TYPES[node_type]
# Skip `CameraNodeSerial` & 3 "spare" bytes
_ = self._fileobj.read(6 + 3)
header = XHDFSHeader(n_frames, width, height, version, node_type)
return header
def _read_frame_bytes(self) -> bytes:
return self._fileobj.read(self._pixels_per_frame)
def frame_sequence(self) -> Generator[np.ndarray, None, None]:
w, h = self._header.width, self._header.height
for _ in range(self._header.n_frames):
frame_bytes = self._read_frame_bytes()
frame = bytes_to_image(frame_bytes, w, h)
del frame_bytes
yield frame
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self._fileobj.close()

65
heatmap_test.py Normal file
View File

@@ -0,0 +1,65 @@
import cv2
from video_loader import VideoLoader
from background_heatmap import BackgroundHeatmap
import numpy as np
if __name__ == '__main__':
params = cv2.SimpleBlobDetector_Params()
params.minThreshold = 60
params.maxThreshold = 255
# Filter by Area.
params.filterByArea = True
params.minArea = 20
params.maxArea = 25000
params.filterByCircularity = False
params.filterByColor = False
params.filterByConvexity = False
params.filterByInertia = False
params.blobColor = 255
detector = cv2.SimpleBlobDetector_create(params)
projects_path = "C:\\UntrackedGit\\opencv_test\\him_projects"
loader = VideoLoader(projects_path)
groups = loader.get_recordings_grouped()
# out = cv2.VideoWriter('outpy.avi',cv2.VideoWriter_fourc\c('M','J','P','G'), 30, (512, 424), isColor=False)
for group in groups:
heatmaps = []
for recording in group[1]:
first = True
frames = VideoLoader.extract_frames(recording)
bgh = BackgroundHeatmap(VideoLoader.extract_frames(recording))
for frame in frames:
bgh.update(frame)
cv2.imshow("IR", frame)
# out.write(converted)
# cv2.imshow("Heatmap", heatmap.heatmap)
cv2.imshow("Background filter", bgh.lastframe)
diff = bgh.bgf_diff
# Erode dilate
kernel = np.ones((5,5),np.uint8)
diff = cv2.morphologyEx(diff, cv2.MORPH_OPEN, kernel)
keypoints = detector.detect(diff)
frame_with_keypoints = cv2.drawKeypoints(diff, keypoints, np.array([]), (0,0,255), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
cv2.imshow("BGF diff", frame_with_keypoints)
cv2.moveWindow("Background filter", 600, 100)
cv2.moveWindow("BGF diff", 1200, 100)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
heatmaps.append(bgh.heatmap)
cv2.destroyAllWindows()
for i, bgh in enumerate(heatmaps):
imname = "Heatmap " + str(i)
cv2.imshow(imname, bgh)
cv2.moveWindow(imname, 500 * i, 0)
cv2.waitKey(2000)
cv2.destroyAllWindows()
# out.release()
cv2.destroyAllWindows()

27
recording_grouper.py Normal file
View File

@@ -0,0 +1,27 @@
from data.library import ArkiteData
from string import digits as DIGITS
class RecordingGrouper:
def __init__(self, data: ArkiteData):
self.data = data
def getRecordings(self):
for detection in self.data.detections(self.data.use_cases()):
for recording in detection.recordings():
yield recording
def getRecordingGroups(self):
groups = []
for recording in self.getRecordings():
# Check if last char is digit
name = recording.name
if name[-1] in DIGITS:
# Remove last two chars:
splitname = name[:-2]
group = next((x for x in groups if x[0] == splitname), None)
if group is not None:
group[1].append(recording)
else:
groups.append((splitname, [recording]))
return groups

26
util.py Normal file
View File

@@ -0,0 +1,26 @@
from typing import Dict, Any, Tuple, Union, Iterable
IntPoint = Tuple[int, int]
FloatPoint = Tuple[float, float]
Point = Union[IntPoint, FloatPoint]
AlgorithmParams = Dict[str, Any]
FRAME_SHAPE = (512, 424) # TODO: hardcoded
FRAME_WIDTH, FRAME_HEIGHT = FRAME_SHAPE
def bounding_box_centre(points: Iterable[FloatPoint]) -> FloatPoint:
n = 0
x_sum = 0.0
y_sum = 0.0
for x, y in points:
n += 1
x_sum += x
y_sum += y
x_centre = x_sum / n
y_centre = y_sum / n
return x_centre, y_centre

52
video_loader.py Normal file
View File

@@ -0,0 +1,52 @@
from enum import Enum
from data.library import ArkiteData, ArkiteDetection, ArkiteRecording
from cv2util import to_8_bit_image
from typing import Iterable
import numpy
from recording_grouper import RecordingGrouper
class VideoType(Enum):
IR = 0
DEPTH = 1
class VideoLoader:
def __init__(self, projects_path: str):
self.projects_path = projects_path
# Search folders with depth of 1:
self.data = ArkiteData(self.projects_path, 1)
def get_all_recordings(self) -> Iterable[ArkiteRecording]:
# Loop over projects
for detection in self.data.detections(self.data.use_cases()):
for recording in detection.recordings():
yield recording
def get_recordings_grouped(self):
grouper = RecordingGrouper(self.data)
return grouper.getRecordingGroups()
@staticmethod
def extract_frames(recording: ArkiteRecording, video_type: VideoType = None) -> Iterable[numpy.ndarray]:
if video_type == VideoType.IR or video_type == None:
disp_min = 100
disp_max = 8000
with recording.ir() as ir:
frames = ir.frame_sequence()
for frame in frames:
yield to_8_bit_image(frame, display_min=disp_min, display_max=disp_max)
elif video_type == VideoType.DEPTH:
# TODO: check if these values make sense for depth images.
disp_min = 100
disp_max = 8000
with recording.depth() as depth:
frames = depth.frame_sequence()
for frame in frames:
yield to_8_bit_image(frame, display_min=disp_min, display_max=disp_max)
else:
raise NotImplementedError("VideoType \'{}\' not implemented".format(video_type))
# Yielding in sub method after "with" statement in top method does not work!
# @staticmethod
# def frames_to_8_bit(frame_sequence, disp_min=None, disp_max=None):
# for frame in frame_sequence:
# yield to_8_bit_image(frame, display_min=disp_min, display_max=disp_max)

12
videoloader_test.py Normal file
View File

@@ -0,0 +1,12 @@
from video_loader import VideoLoader
import cv2
from cv2util import to_8_bit_image
projects_path = "C:\\UntrackedGit\\opencv_test\\him_projects"
loader = VideoLoader(projects_path)
for recording in loader.get_all_recordings():
frames = loader.extract_frames(recording)
for frame in frames:
cv2.imshow("test", frame)
cv2.waitKey(1)