import numpy as np import cv2 import time from typing import List, Iterable from video_loader import VideoLoader import numpy from data.library import ArkiteData from cv2util import to_8_bit_image from string import digits as DIGITS class BackgroundHeatmap: def __init__(self, first_frame, append_blur=True, learning_rate=0.0003): self.append_blur = append_blur self.learning_rate = learning_rate self.heatmap = np.array([]) self.backsub = cv2.createBackgroundSubtractorMOG2() # self.backsub = cv2.createBackgroundSubtractorKNN() # self.backsub = cv2.bgsegm_BackgroundSubtractorGSOC() # self.backsub = cv2.back() # # Teach background filter for a few iterations # for i in range(bg_teach_iters): # ret, frame = self.cap.read() # # self.backsub.apply(cv2.blur(frame,(5,5))) # self.backsub.apply(frame) if self.append_blur: frame = cv2.blur(first_frame,(5,5)) self.lastframe = self.backsub.apply(frame, self.learning_rate) self.lastsum = self.to_floats(self.lastframe) @staticmethod def to_grayscale(frame): return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) @staticmethod def to_floats(frame): return cv2.normalize(frame, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) @staticmethod def float_to_gray(frame): return cv2.normalize(frame, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8UC1) @staticmethod def gray_to_heat(frame): return cv2.applyColorMap(frame, cv2.COLORMAP_JET) def update(self, frame): if self.append_blur: frame = cv2.blur(frame,(5,5)) self.backsub.apply(frame, self.lastframe, self.learning_rate) self.lastsum += self.to_floats(self.lastframe) self.heatmap = self.gray_to_heat( self.float_to_gray( self.to_floats(self.lastsum) ) ) class RecordingGrouper: def __init__(self, data: ArkiteData): self.data = data def getRecordings(self): for detection in data.detections(self.data.use_cases()): for recording in detection.recordings(): yield recording def getRecordingGroups(self): groups = [] for recording in self.getRecordings(): # Check if last char is digit name = recording.name if name[-1] in DIGITS: # Remove last two chars: splitname = name[:-2] group = next((x for x in groups if x[0] == splitname), None) if group is not None: group[1].append(recording) else: groups.append((splitname, [recording])) return groups if __name__ == '__main__': projects_path = "C:\\UntrackedGit\\opencv_test\\him_projects" data = ArkiteData(projects_path, 1) grouper = RecordingGrouper(data) groups = grouper.getRecordingGroups() out = cv2.VideoWriter('outpy.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 30, (512, 424), isColor=False) for group in groups: heatmaps = [] for recording in group[1]: print("New recording: " + str(recording.name)) with recording.ir() as ir: first = True for frame in ir.frame_sequence(): if first: heatmap = BackgroundHeatmap(frame) first = False converted = to_8_bit_image(frame, display_min=100, display_max=8000) heatmap.update(converted) cv2.imshow("IR", converted) # out.write(converted) cv2.imshow("Heatmap", heatmap.heatmap) cv2.moveWindow("Heatmap", 600, 100) if cv2.waitKey(1) & 0xFF == ord('q'): break # print("Showing frame...") # time.sleep(0.1) heatmaps.append(heatmap.heatmap) cv2.destroyAllWindows() for i, heatmap in enumerate(heatmaps): imname = "Heatmap " + str(i) cv2.imshow(imname, heatmap) cv2.moveWindow(imname, 500 * i, 0) cv2.waitKey() cv2.destroyAllWindows() # out.release() # cv2.destroyAllWindows()