diff --git a/scripts/Comparison/Person_trc.py b/scripts/Comparison/Person_trc.py new file mode 100644 index 0000000000000000000000000000000000000000..628da5bc3b0a0be4cc43a2c9bd6bef99f803b012 --- /dev/null +++ b/scripts/Comparison/Person_trc.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass +from typing import List + + +@dataclass +class Point(): + x: float + y: float + stereo_x: float + stereo_y: float + stereo_z: float + qual: int + col_x: float + col_y: float + color: str + markerID: int + + def __hash__(self) -> int: + return hash((self.x,self.y,self.markerID,self.stereo_x,self.stereo_y,self.color,self.qual)) + +# TODO Hash Funktion vereinfachen; performance +class Person: + def __init__(self, person_str: str, points_str: str, id: int, trc_version: int): + self.id = int(id) + self.points: List[Point] = [] + + # Version 4, sonst nichts unterstützt + person_values = person_str.split(' ') + self.nr = int(person_values[0]) + self.height = float(person_values[1]) + self.first_frame = int(person_values[2]) + self.last_frame = int(person_values[3]) + self.colCount = int(person_values[4]) + self.color = person_values[5] + " " + person_values[6] + " " + person_values[7] + if trc_version >= 4: + self.markerID = int(person_values[8]) + self.numTrackedPoints = int(person_values[9]) + else: + self.numTrackedPoints = int(person_values[8]) + + points_str = points_str.split("\n") + for line in points_str: + point_values = line.split(' ') + x = float(point_values[0]) + y = float(point_values[1]) + stereo_x = float(point_values[2]) + stereo_y = float(point_values[3]) + stereo_z = float(point_values[4]) + qual = int(point_values[5]) + col_x = float(point_values[6]) + col_y = float(point_values[7]) + color = point_values[8] + " " + point_values[9] + " " + point_values[10] + markerID = int(point_values[11]) + self.points.append(Point(x,y,stereo_x,stereo_y,stereo_z,qual,col_x,col_y,color,markerID)) + + def __hash__(self) -> int: + return hash((self.id, self.nr, self.height, self.first_frame, self.last_frame, self.colCount, self.color, self.numTrackedPoints)) + + def __str__(self) -> str: + return "ID: " + str(self.id) + + def __repr__(self) -> str: + return self.__str__() \ No newline at end of file diff --git a/scripts/Comparison/handmade_test.py b/scripts/Comparison/handmade_test.py new file mode 100644 index 0000000000000000000000000000000000000000..e24582dc2e4c0e166ae04deff1a8fa2ef56dbbe8 --- /dev/null +++ b/scripts/Comparison/handmade_test.py @@ -0,0 +1,332 @@ +import argparse +import queue +from copy import deepcopy +from typing import List, Tuple +from statistics import mean, median +from math import sqrt +from Person_trc import Person, Point + +args = None +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Compare two trc files and generate an error report." + ) + parser.add_argument( + "truth_filename", type=str, action="store", help="The path to the truth trc" + ) + parser.add_argument( + "test_filename", type=str, action="store", help="The path to the test trc" + ) + parser.add_argument( + "--epsilon", + type=float, + dest="eps", + action="store", + default=1, + help="The epsilon (in pixels) of the tolerated difference in pixel values (default=1)", + ) + parser.add_argument( + "--point_diff", + dest="warn_pointwise", + action="store_true", + default=False, + help="Print warning for every point difference bigger than epsilon", + ) + parser.add_argument( + "--frames", + dest="check_frames", + action="store_true", + default=False, + help="Print warning for every trajectory pair having different starting and/or ending frames", + ) + parser.add_argument( + "--counterpart", + "-c", + dest="check_counterpart", + action="store_true", + default=False, + help="Check if every trajectory has a counterpart in the other file", + ) + args = parser.parse_args() + +EPSILON = ( + 1 if args == None else args.eps +) # in pixels # default value 1, can be changed in CLI +error_message = [] +MAX_DIFF = 20 # in pixels +MIN_DIFF_START = 0.01 # Should this be a CLI parameter as well? +DIFF_STEP = 0.01 # Should this be a CLI parameter as well? + + +def parse_trc(filename: str) -> List[Person]: + # NOTE assumes Version 4 of trc; Should be fractored into a factory outside of constructor + content: List[Person] = [] + id = 0 + + with open(filename) as file: + version = int(file.readline()[-2]) # Read header + file.readline() # Read header + file = file.read() + file = file.split("\n\n") + file = file[:-1] # empty line at the end of file + + it = iter(file) + for person_str in it: + points_str = next(it) + person = Person(person_str, points_str, id, version) + id += 1 + if len(person.points) != person.numTrackedPoints: + print(f"Error when parsing trc-file {filename}.") + exit(1) + content.append(person) + + return content + + +def calc_diff(test: List[Point], truth: List[Point]) -> float: + """Calculates Median Euclidean Distance + + This function assumes the persons are already cropped to the same starting + frame!!! + + :param test: List of points from person from test trc + :type test: List[Point] + :param truth: List of Points from person from truth trc + :type truth: List[Point] + :return: sum of point distances in common frames + :rtype: float + """ + # Assumes the persons already are cropped to have the same starting frame + # Zip then crops to the same ending frame + diff = 0 + num_common_points = len(test) if len(test) < len(truth) else len(truth) + for test_point, truth_point in zip(test, truth): + diff += sqrt( + (test_point.x - truth_point.x) ** 2 + (test_point.y - truth_point.y) ** 2 + ) + return diff / num_common_points + + +class HandmadeComparison: + def __init__( + self, + check_frames: bool, + warn_pointwise: bool, + check_counterpart: bool, + truth_filename: str, + test_filename: str, + ): + self.point_diffs: List[float] = [] + self.check_frames = check_frames + self.warn_pointwise = warn_pointwise + self.check_counterpart = check_counterpart + self.truth_filename = truth_filename + self.test_filename = test_filename + + self.output: List[str] = [] + + def compare_person(self, test: Person, truth: Person): + """Crops persons to common frames and calcs the diff + + This function crops the persons to the common frames. Then it adds + the absolute differences and optionally outputs those with a diff. + over a (user-defined) epsilon. It also optionally outputs different + starting and ending frames. + + :param test: Person from test trc + :type test: Person + :param truth: Person from truth trc + :type truth: Person + """ + first_frame: int + if test.first_frame > truth.first_frame: + first_frame = test.first_frame + frame_diff = test.first_frame - truth.first_frame + test_points = test.points + truth_points = truth.points[frame_diff:] + elif test.first_frame < truth.first_frame: + first_frame = truth.first_frame + frame_diff = truth.first_frame - test.first_frame + test_points = test.points[frame_diff:] + truth_points = truth.points + else: + first_frame = test.first_frame + test_points = test.points + truth_points = truth.points + + if self.check_frames and test.first_frame != truth.first_frame: + self.output.append( + f"Different first frame: truth({truth.id+1}) {truth.first_frame} and test({test.id+1}) {test.first_frame}" + ) + if self.check_frames and test.last_frame != truth.last_frame: + self.output.append( + f"Different last frame: truth({truth.id+1}) {truth.last_frame} and test({test.id+1}) {test.last_frame}" + ) + + for i, (test_point, truth_point) in enumerate(zip(test_points, truth_points)): + diff = sqrt( + (test_point.x - truth_point.x) ** 2 + + (test_point.y - truth_point.y) ** 2 + ) + self.point_diffs.append(diff) # save for overall statistics + if self.warn_pointwise and diff > EPSILON: + self.output.append( + f"Diff of {diff} between test({test.id+1}) and truth({truth.id+1}) in frame {first_frame+i}" + ) + + def compare_files( + self, test: List[Person], truth: List[Person], indices: List[Tuple[int, int]] + ): + for truth_id, test_id in indices: + self.compare_person(test[test_id], truth[truth_id]) + + # NOTE: Maybe use an established algorithm for bipartite graph matching instead + def associate_trajectories( + self, test: List[Person], truth: List[Person] + ) -> List[Tuple[int, int]]: + """Finds pairs of the 'same' trajectories in both files + + This function iterated over all trajectories in truth as long as there are trajectories left + in either truth or test and and tries to find the associated trajectory. For this it compares + all each truth_trajectory with all left test trajectories, when trimmed to a common start and end + frame. If the best match has an error lower than the current threshold, the two trajectories + are a match and the trajectories are not evaluated anymore. If the best match has still a + difference higher than the current theshold, it is cached. + + Each time every queued truth_trajectory has been tested, the threshold for the maximum allowed + difference is increased. This ensures that a test trajectory is matches with the best fitting + truth trajectory, not with the first fitting one. + + :param test: List of Persons in the test trc (is changed; give a deep copy) + :type test: List[Person] + :param truth: List of Persons in the truth trc + :type truth: List[Person] + :return: List of tuples with the indices of the pairs in their respective list (truth_idx, test_idx) + :rtype: List[Tuple[int,int]] + """ + result = [] + truth_queue: queue.SimpleQueue[Person] = queue.SimpleQueue() + for person in truth: + truth_queue.put(person) + + min_diff = MIN_DIFF_START + cache = {} + queue_size = truth_queue.qsize() + i = 0 + while (not truth_queue.empty()) and (not (len(test) == 0)): + # One iteration through the queue/truth candidates? + i += 1 + if i > queue_size: + queue_size = truth_queue.qsize() + min_diff += DIFF_STEP + i = 1 + + truth_person = truth_queue.get() + best_fit_diff = MAX_DIFF + done = False + + # First test cached match; most of the time this already is the real one + old_match = cache.get(truth_person) + if old_match != None: + if old_match[1] < min_diff: + if old_match[0] in test: + result.append((truth_person.id, old_match[0].id)) + test.remove(old_match[0]) + done = True + continue + elif ( + old_match[1] == -1 + ): # Was this trajctory already closest to a deleted/already matched person last time? + continue + else: + cache.update({truth_person: (None, -1)}) + else: + truth_queue.put(truth_person) + continue + + for test_person in test: + # Calculate difference with common start frame + # Zip in compare_persons ends at common end frame + if ( + test_person.last_frame < truth_person.first_frame + or test_person.first_frame > truth_person.last_frame + ): + continue + if test_person.first_frame > truth_person.first_frame: + frame_diff = test_person.first_frame - truth_person.first_frame + diff = calc_diff( + test_person.points, truth_person.points[frame_diff:] + ) + elif test_person.first_frame < truth_person.first_frame: + frame_diff = truth_person.first_frame - test_person.first_frame + diff = calc_diff( + test_person.points[frame_diff:], truth_person.points + ) + else: + diff = calc_diff(test_person.points, truth_person.points) + + if diff < min_diff: + result.append((truth_person.id, test_person.id)) + test.remove(test_person) + done = True + break + elif diff < best_fit_diff: + best_fit_diff = diff + cache.update({truth_person: (test_person, diff)}) + if not done: + if best_fit_diff >= MAX_DIFF: + self.output.append( + f"The person {truth_person.id + 1} in the truth file has no counterpart in test!" + ) + continue + truth_queue.put(truth_person) + + # TODO mitschreiben fuer die grobe Statistik + if self.check_counterpart: + for person in test: + self.output.append( + f"The person {person.id +1} in the test file has no counterpart in truth!" + ) + while not truth_queue.empty(): + self.output.append( + f"The person {truth_queue.get().id +1} in the truth file has no counterpart in test!" + ) + return result + + def run(self): + truth = parse_trc(self.truth_filename) + test = parse_trc(self.test_filename) + indices = self.associate_trajectories(deepcopy(test), truth) + indices = sorted(indices, key=lambda x: x[0]) + self.compare_files(test, truth, indices) + + self.output.append( + "\nThe mean difference in point coordinates is: " + + str(mean(self.point_diffs)) + ) + self.output.append( + "The median difference in point coordinates is: " + + str(median(self.point_diffs)) + ) + point_diffs = list(filter(lambda x: x != 0, self.point_diffs)) + if len(point_diffs) > 0: + self.output.append( + "The mean difference with 0s filtered out is: " + str(mean(point_diffs)) + ) + self.output.append( + "The median with 0s filtered out is: " + str(median(point_diffs)) + ) + + return self.output + + +if __name__ == "__main__": + comp = HandmadeComparison( + args.check_frames, + args.warn_pointwise, + args.check_counterpart, + args.truth_filename, + args.test_filename, + ) + out = comp.run() + print("\n".join(out)) diff --git a/scripts/Comparison/video_comparison.py b/scripts/Comparison/video_comparison.py new file mode 100644 index 0000000000000000000000000000000000000000..e68e6cf2c24021c8aaa4f18f0bde6d60ec418419 --- /dev/null +++ b/scripts/Comparison/video_comparison.py @@ -0,0 +1,371 @@ +import argparse +from typing import Callable, List, Set, Tuple +try: + import cv2 +except ImportError: + print("You need to install OpenCV for Python (e.g. pip install opencv-python)") + exit(1) +from Person_trc import Person +import xml.etree.ElementTree as ET +import numpy as np +import re +from pathlib import Path +from handmade_test import MAX_DIFF, calc_diff, parse_trc, HandmadeComparison + + +class ComparisonVideoPlayer: + # TODO: Longterm; Make thread, so we can add UI-Elements via tkinter + # These could be e.g. looking at a number of user-chosen trajectories + # Which could be helpful for the 'no counterpart' case + def __init__(self, pet: str) -> None: + # super().__init__() + cv2.namedWindow("Comparison", cv2.WINDOW_NORMAL) + PET = Path(pet) + + # NOTE: will only work for tests this way. (Need this naming scheme) + self.truth_filename = PET.parent / (str(PET.stem) + "_truth.trc") + self.test_filename = PET.parent / (str(PET.stem) + "_test.trc") + + tree = ET.parse(PET) + root = tree.getroot() + + vid_path = PET.with_name(root.find("MAIN").attrib["SRC"].split(";")[-1]) + intrinsics = ( + root.find("CONTROL").find("CALIBRATION").find("INTRINSIC_PARAMETERS") + ) + self.border = int( + float( + root.find("CONTROL").find("CALIBRATION").find("BORDER").attrib["VALUE"] + ) + * 2 + ) + calib = intrinsics.attrib + + camera = np.array( + [ + [float(calib["FX"]), 0, float(calib["CX"])], + [0, float(calib["FY"]), float(calib["CY"])], + [0, 0, 1], + ] + ) + distortion = np.array( + [ + calib["R2"], + calib["R4"], + calib["TX"], + calib["TY"], + calib["R6"], + calib["K4"], + calib["K5"], + calib["K6"], + ], + np.float32, + ) + + self.stream: cv2.VideoCapture = cv2.VideoCapture(str(vid_path)) + _, frame = self.stream.read() + frame = cv2.copyMakeBorder( + frame, + self.border, + self.border, + self.border, + self.border, + cv2.BORDER_CONSTANT, + ) + + self.map1, self.map2 = cv2.initUndistortRectifyMap( + camera, distortion, np.eye(3), camera, frame.shape[:-1][::-1], cv2.CV_16SC2 + ) + + self.test_trajectories = parse_trc(str(self.test_filename)) + self.truth_trajectories = parse_trc(str(self.truth_filename)) + + def get_proximal_people(self, seed_person: Person): + """Gets the indices of the people which could have been a match, + according to the maximum error. + + This function finds people who could have been the match. It is used + for people who do not have a match. If you see one truth and one test + trajectory, it probably was just matched to the other trajectory and you + have a double tracked person. + + :param seed_person: Person to match to + :type seed_person: Person + :return: (truth, test) People who match to seed_person, split into test and truth + :rtype: Tuple[List[int], List[int]] + """ + nearby_people_test: List[int] = [] + nearby_people_truth: List[int] = [] + for person in self.test_trajectories: + if ( + seed_person.last_frame < person.first_frame + or seed_person.first_frame > person.last_frame + ): + continue + if seed_person.first_frame > person.first_frame: + frame_diff = seed_person.first_frame - person.first_frame + diff = calc_diff(seed_person.points, person.points[frame_diff:]) + elif seed_person.first_frame < person.first_frame: + frame_diff = person.first_frame - seed_person.first_frame + diff = calc_diff(seed_person.points[frame_diff:], person.points) + else: + diff = calc_diff(seed_person.points, person.points) + + if diff >= MAX_DIFF: + continue + nearby_people_test.append(person.id + 1) + for person in self.truth_trajectories: + if ( + seed_person.last_frame < person.first_frame + or seed_person.first_frame > person.last_frame + ): + continue + if seed_person.first_frame > person.first_frame: + frame_diff = seed_person.first_frame - person.first_frame + diff = calc_diff(seed_person.points, person.points[frame_diff:]) + elif seed_person.first_frame < person.first_frame: + frame_diff = person.first_frame - seed_person.first_frame + diff = calc_diff(seed_person.points[frame_diff:], person.points) + else: + diff = calc_diff(seed_person.points, person.points) + + if diff >= MAX_DIFF: + continue + nearby_people_truth.append(person.id + 1) + return nearby_people_truth, nearby_people_test + + def play_video( + self, start: int, end: int, draw_callback: Callable[[np.ndarray, int], None] + ): + """Plays the video from frame start to frame end, calling the draw_callback each frame + + This function plays the video. It uses distortion and border like PeTrack would. + To draw on the video, a callback is provided, which draws onto the frame it's given. + + :param stream: Videostream + :type stream: cv2.VideoCapture + :param start: Starting frame + :type start: int + :param end: Ending frame + :type end: int + :param draw_callback: Function to call for drawing on the frame + :type draw_callback: Callable[[np.ndarray, int], None] + """ + currFrame = start + self.stream.set(cv2.CAP_PROP_POS_FRAMES, start) + + while (k := cv2.waitKey(20)) != 110: + grabbed, frame = self.stream.read() + if not grabbed or currFrame > end: + self.stream.set(cv2.CAP_PROP_POS_FRAMES, start) + currFrame = start + _, frame = self.stream.read() + + frame = cv2.copyMakeBorder( + frame, + self.border, + self.border, + self.border, + self.border, + cv2.BORDER_CONSTANT, + ) + frame = cv2.remap( + frame, + self.map1, + self.map2, + cv2.INTER_LINEAR, + borderMode=cv2.BORDER_CONSTANT, + ) + + draw_callback(frame, currFrame) + + cv2.imshow("Comparison", frame) + + currFrame += 1 + + if k == ord('p'): + while cv2.waitKey(-1) != ord('p'): + pass + + def drawPoints( + self, + person: Person, + currFrame: int, + frame: np.ndarray, + color: Tuple[int], + thickness: int, + ): + if person.first_frame <= currFrame <= person.last_frame: + points = person.points + for i in range(min(10, currFrame - person.first_frame)): + frame = cv2.circle( + frame, + ( + int(points[currFrame - person.first_frame - i].x + self.border), + int(points[currFrame - person.first_frame - i].y + self.border), + ), + thickness, + color, + -1, + ) + for i in range(min(10, person.last_frame - currFrame)): + frame = cv2.circle( + frame, + ( + int(points[currFrame - person.first_frame + i].x + self.border), + int(points[currFrame - person.first_frame + i].y + self.border), + ), + thickness, + color, + -1, + ) + + def visualize_people(self, idx_truth: int, idx_test: int): + tr = self.truth_trajectories[idx_truth - 1] + te = self.test_trajectories[idx_test - 1] + start = min(tr.first_frame, te.first_frame) + end = max(tr.last_frame, te.last_frame) + + def draw(frame: np.ndarray, currFrame: int) -> None: + self.drawPoints( + self.test_trajectories[idx_test - 1], currFrame, frame, (255, 0, 0), 7 + ) + self.drawPoints( + self.truth_trajectories[idx_truth - 1], currFrame, frame, (0, 255, 0), 5 + ) + + self.play_video(start, end, draw) + + def visualize_many( + self, + idxs_truth: List[int], + idxs_test: List[int], + idx_seed: int, + is_seed_test: bool, + ): + # Idea: Try out taking the first first and last last frame, as in two people? + if is_seed_test: + start = self.test_trajectories[idx_seed - 1].first_frame + end = self.test_trajectories[idx_seed - 1].last_frame + else: + start = self.truth_trajectories[idx_seed - 1].first_frame + end = self.truth_trajectories[idx_seed - 1].last_frame + + def draw(frame: np.ndarray, currFrame: int): + for id in idxs_test: + self.drawPoints( + self.test_trajectories[id - 1], currFrame, frame, (255, 0, 0), 7 + ) + for id in idxs_truth: + self.drawPoints( + self.truth_trajectories[id - 1], currFrame, frame, (0, 255, 0), 5 + ) + if idx_seed != None: + if is_seed_test: + self.drawPoints( + self.test_trajectories[idx_seed - 1], + currFrame, + frame, + (255, 0, 255), + 3, + ) + else: + self.drawPoints( + self.truth_trajectories[idx_seed - 1], + currFrame, + frame, + (0, 255, 255), + 3, + ) + + self.play_video(start, end, draw) + + def visualize_difference(self): + """Takes output of handmade_test and starts a video + comparison for each of the listed differences + + :param comp_output: output from handmade_test + :type comp_output: str + """ + different_first_frame = re.compile( + r"Different first frame: truth\((\d+)\) \d+ and test\((\d+)\) \d+" + ) + different_last_frame = re.compile( + r"Different last frame: truth\((\d+)\) \d+ and test\((\d+)\) \d+" + ) + no_counterpart_given = re.compile( + r"The person (\d+) in the ([^\s]+) file has no counterpart in \w+!" + ) + point_difference = re.compile( + r"Diff of \d*\.\d* between test\((\d+)\) and truth\((\d+)\) in frame \d+" + ) + # Pairing is characterized by truth-countperpart's index + displayed_pairing: Set[int] = set() + comp = HandmadeComparison( + True, True, True, str(self.truth_filename), str(self.test_filename) + ) + comp_output = comp.run() + print("To go to the next comparison, press 'n'") + print("To pause/play press 'p'") + for line in comp_output: + print("\r" + " " * 80, end="", flush=True) + print("\r", end="", flush=True) + print(line, end="", flush=True) + match = different_first_frame.match(line) + if match: + idx_truth = int(match.group(1)) + idx_test = int(match.group(2)) + if idx_truth in displayed_pairing: + continue + displayed_pairing.add(idx_truth) + + self.visualize_people(idx_truth, idx_test) + continue + + match = different_last_frame.match(line) + if match: + idx_truth = int(match.group(1)) + idx_test = int(match.group(2)) + if idx_truth in displayed_pairing: + continue + displayed_pairing.add(idx_truth) + + self.visualize_people(idx_truth, idx_test) + continue + + # NOTE: Implementation satisfying? + match = no_counterpart_given.match(line) + if match: + if match.group(2) == "test": + idx_seed = int(match.group(1)) + near_truth, near_test = self.get_proximal_people( + self.test_trajectories[idx_seed - 1] + ) + is_seed_test = True + else: + idx_seed = int(match.group(1)) + near_truth, near_test = self.get_proximal_people( + self.truth_trajectories[idx_seed - 1] + ) + is_seed_test = False + self.visualize_many(near_truth, near_test, idx_seed, is_seed_test) + continue + + match = point_difference.match(line) + if match: + idx_truth = int(match.group(2)) + idx_test = int(match.group(1)) + if idx_truth in displayed_pairing: + continue + displayed_pairing.add(idx_truth) + + self.visualize_people(idx_truth, idx_test) + continue + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("PetPath", help="Path to the .pet-file of the test case") + args = parser.parse_args() + viz = ComparisonVideoPlayer(args.PetPath) + viz.visualize_difference()