From df1008add44bfbb6b4ee9e7bfab9f57b3163a86b Mon Sep 17 00:00:00 2001 From: rkhafizov Date: Sat, 15 Jul 2023 13:07:09 +0200 Subject: [PATCH] simple ransac added --- .gitignore | 1 + fundamental.py | 172 +++++++++++++++++++++++++++++++++++++++++++++++-- ransac.py | 45 ++++++++++++- run_phg.py | 81 ++++++++++++++++------- 4 files changed, 268 insertions(+), 31 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7e99e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc \ No newline at end of file diff --git a/fundamental.py b/fundamental.py index 123248c..aa1e179 100644 --- a/fundamental.py +++ b/fundamental.py @@ -4,7 +4,7 @@ from scipy.optimize import least_squares # calculates error = x1^T*F*x2 -def calc_err(kpts1, kpts2, F): +def calc_err_total(kpts1, kpts2, F): total_err = 0.0 for i in range(kpts1.shape[0]): kp1 = np.array([kpts1[i,0], kpts1[i,1], 1.0]) @@ -15,6 +15,167 @@ def calc_err(kpts1, kpts2, F): return total_err +class Fundamental: + def __init__(self, min_samples): + self.min_samples_ = min_samples + + def calc_err_total(self, kpts1, kpts2, F): + total_err = 0.0 + for i in range(kpts1.shape[0]): + kp1 = np.array([kpts1[i,0], kpts1[i,1], 1.0]) + kp2 = np.array([kpts2[i,0], kpts2[i,1], 1.0]) + err = np.linalg.multi_dot([kp1.T,F,kp2]) + total_err += err + + return total_err + + def calc_err(self, kp1, kp2, F): + kp1h = np.array([kp1[0], kp1[0], 1.0]) + kp2h = np.array([kp2[0], kp2[0], 1.0]) + + return np.linalg.multi_dot([kp1h.T, F, kp2h]) + + + # returns inlier/n_total_samples ratio + def calc_inlier_ratio(self, kpts1, kpts2, F, threshold): + n_inliers = 0 + + err_mean = 0 + for i in range(kpts1.shape[0]): + err = self.calc_err(kpts1[i,:], kpts2[i,:], F) + err_mean += err + if(abs(err) < threshold): + n_inliers+=1 + + return n_inliers/kpts1.shape[0] + +class EightPoint(Fundamental): + def __init__(self): + super().__init__(8) + + def estimate(self, kpts1, kpts2): + assert(kpts1.shape == kpts2.shape ) + assert(kpts1.shape[0] == self.min_samples_) + + A = np.zeros((kpts1.shape[0],9)) + + for i in range(A.shape[0]): + A[i][0] = kpts1[i][0] * kpts2[i][0] + A[i][1] = kpts1[i][1] * kpts2[i][0] + A[i][2] = kpts2[i][0] + A[i][3] = kpts1[i][0] * kpts2[i][1] + A[i][4] = kpts1[i][1] * kpts2[i][1] + A[i][5] = kpts2[i][1] + A[i][6] = kpts1[i][0] + A[i][7] = kpts1[i][1] + A[i][8] = 1.0 + + # find nullspace to get solution for Af = 0 + # u - [row_space, left null_space] + # v - [column_space, null_space] + u,s,vt = np.linalg.svd(A, full_matrices=True) + + # only 1 vector in nullspace + null_vec = vt.T[:,8] + null_mat = null_vec.reshape((3,3)) + + # enforcing constraint on third singular value being equal to 0 + u2,s2,vt2 = np.linalg.svd(null_mat, full_matrices=True) + s2[2] = 0.0 + + s2 = np.diag(s2) + fundamental = np.linalg.multi_dot([u2,s2,vt2]) + # up-to-scale, dividing by last element to make it equal to 1 + fundamental /= fundamental[2,2] + + return fundamental + +class SevenPoint(Fundamental): + def __init__(self): + super().__init__(7) + + def estimate(self, kpts1, kpts2): + A = np.zeros((7,9)) + + for i in range(A.shape[0]): + A[i][0] = kpts1[i][0] * kpts2[i][0] + A[i][1] = kpts1[i][1] * kpts2[i][0] + A[i][2] = kpts2[i][0] + A[i][3] = kpts1[i][0] * kpts2[i][1] + A[i][4] = kpts1[i][1] * kpts2[i][1] + A[i][5] = kpts2[i][1] + A[i][6] = kpts1[i][0] + A[i][7] = kpts1[i][1] + A[i][8] = 1.0 + + u,s,vt = np.linalg.svd(A, full_matrices=True) + + # 2 vecs in nullspace + null_vec1 = vt.T[:,8] + null_vec2 = vt.T[:,7] + + null_mat1 = null_vec1.reshape((3,3)) + null_mat2 = null_vec2.reshape((3,3)) + + # creating variables, to make code more readable + a = null_mat1[0,0] + b = null_mat1[0,1] + c = null_mat1[0,2] + d = null_mat1[1,0] + e = null_mat1[1,1] + f = null_mat1[1,2] + g = null_mat1[2,0] + h = null_mat1[2,1] + i = null_mat1[2,2] + + j = null_mat2[0,0] + k = null_mat2[0,1] + l = null_mat2[0,2] + m = null_mat2[1,0] + n = null_mat2[1,1] + o = null_mat2[1,2] + p = null_mat2[2,0] + q = null_mat2[2,1] + r = null_mat2[2,2] + + # here write the result of constrant = det(f_1 + \lambda*f_2)=0 -> as a result there is a cubic polynomial + coeffs =[ + a*e*i + b*f*g + c*d*h - a*f*h - b*d*i - c*e*g, + a*e*r + a*i*n + b*f*p + b*g*o + c*d*q + c*h*m + d*h*l + e*i*j + f*g*k - + a*f*q - a*h*o - b*d*r - b*i*m - c*e*p - c*g*n - d*i*k - e*g*l - f*h*j, + a*n*r + b*o*p + c*m*q + d*l*q + e*j*r + f*k*p + g*k*o + h*l*m + i*j*n - + a*o*q - b*m*r - c*n*p - d*k*r - e*l*p - f*j*q - g*l*n - h*j*o - i*k*m, + j*n*r + k*o*p + l*m*q - j*o*q - k*m*r - l*n*p + ] + + # finding roots of polynomial A*x^3 + B*x^2 + C*x + D + # cubic polynomial -> 1 or 3 solutions + roots = np.roots([coeffs[3],coeffs[2],coeffs[1],coeffs[0]]) + + # in order to choose best root we calc error and choose solution which minimizes this error + fundamental_best = None + err_best = None + for root in roots: + fundamental_i = null_mat1 + root * null_mat2 + err_i = self.calc_err_total(kpts1,kpts2,fundamental_i) + + if(err_best is None or err_i < err_best): + err_best = err_i + fundamental_best = fundamental_i + + return fundamental_best + + +class LevMarq(Fundamental): + def __status__(self): + print("status") + + def __del__(self): + print("destructor") + + + + # estimating fundamental matrix with 2 ways: # 7 point + solve quadratic polynom # 8 points @@ -122,7 +283,7 @@ def seven_point(kpts1, kpts2): err_best = None for root in roots: fundamental_i = null_mat1 + root * null_mat2 - err_i = calc_err(kpts1,kpts2,fundamental_i) + err_i = calc_err_total(kpts1,kpts2,fundamental_i) if(err_best == None or err_i < err_best): err_best = err_i @@ -131,7 +292,7 @@ def seven_point(kpts1, kpts2): return fundamental_best -def levmarq(kpts1, kpts2): +def levmarq(kpts1, kpts2, F0 = None): assert(kpts1.shape[0] == kpts2.shape[0] ) assert(kpts1.shape[0]>=7) # here should be cost function of type: @@ -156,8 +317,9 @@ def cost(F): return total_loss # get initial estimate(without initial estimate lev marquardt may fail) - F = seven_point(kpts1[:7], kpts2[:7]) - F0 = np.ravel(F[:3,:3]) + if(F0 is None): + F0 = seven_point(kpts1[:7], kpts2[:7]) + F0 = np.ravel(F0[:3,:3]) res = least_squares(lambda x: cost(x), F0) diff --git a/ransac.py b/ransac.py index d8c1590..475e82e 100644 --- a/ransac.py +++ b/ransac.py @@ -1,10 +1,53 @@ from fundamental import * +import random +""" +RANSAC overview +ransac has cycle, where: +1. sample N points +2. kernel.fit() - estimates model given sampled points +3. scorer.score() - compute number of inliers +4. if number of inliers > current -> rewrite current +""" + + + +""" +Ransac class which does following: +1. samples N points from data +2. calculates some given parameter(for example fundamental or essential matrix) +3. checks proportion of inliers +4. if enough iterations done or enough inliers obtained -> return best results +""" class Ransac: def __init__(self, algo) -> None: # algorithm to run self.estimator = algo self.n_iterations = 1024 + self.min_samples = algo.min_samples_ + + # error = x'^T * F * x + self.threshold = 1 def run(self, kpts1, kpts2): - return self.estimator(kpts1, kpts2) \ No newline at end of file + assert(kpts1.shape[0] >= self.min_samples) + assert(kpts2.shape[0] >= self.min_samples) + assert(kpts1.shape[0] == kpts2.shape[0]) + + F_best = None + ratio_best = 0.0 + for i in range(self.n_iterations): + # sample kpts + curr_idxs = random.sample(range(0, kpts1.shape[0]), self.min_samples) + + # estimate given parameter + F = self.estimator.estimate(kpts1[curr_idxs], kpts2[curr_idxs]) + # calculate number of inliers + inlier_ratio = self.estimator.calc_inlier_ratio(kpts1, kpts2, F, 1.0) + if(inlier_ratio > ratio_best): + ratio_best = inlier_ratio + F_best = F + # print(f"inlier_ratio: {inlier_ratio}") + # err = self.estimator.calc_err(kpts1, kpts2, F) + + return ratio_best, F_best \ No newline at end of file diff --git a/run_phg.py b/run_phg.py index c4aff68..0881e8c 100644 --- a/run_phg.py +++ b/run_phg.py @@ -6,13 +6,15 @@ from feature_extractor import FeatureExtractor from feature_matcher import FeatureMatcher -from fundamental import eight_point -from fundamental import seven_point -from fundamental import levmarq -from fundamental import calc_err +# from fundamental import eight_point +# from fundamental import seven_point +# from fundamental import levmarq +# from fundamental import calc_err_total from ransac import Ransac +from fundamental import * + def read_img(path, grayscale = True): return cv2.cvtColor(cv2.imread(path),cv2.COLOR_BGR2GRAY) @@ -52,32 +54,61 @@ def draw_kpts(im, kpts): kpts1_np.append(kpts1[matches[i].queryIdx].pt) kpts2_np.append(kpts2[matches[i].trainIdx].pt) - kpts1_np = np.array(kpts1_np) kpts2_np = np.array(kpts2_np) - f8 = eight_point(kpts1_np[:8,:], kpts2_np[:8,:]) - f7 = seven_point(kpts1_np[:7,:], kpts2_np[:7,:]) - f_levmarq = levmarq(kpts1_np[:20,:], kpts2_np[:20,:]) - + # f8 = eight_point(kpts1_np[:8,:], kpts2_np[:8,:]) + # err = calc_err_total(kpts1_np,kpts2_np, f8) + # err_mean = err / kpts1_np.shape[0] + # print(f"8 | err: {err}, err_mean: {err_mean}") + # f7 = seven_point(kpts1_np[:7,:], kpts2_np[:7,:]) + # err = calc_err_total(kpts1_np,kpts2_np, f7) + # err_mean = err / kpts1_np.shape[0] + # print(f"7 | err: {err}, err_mean: {err_mean}") + # # f_levmarq = levmarq(kpts1_np[:20,:], kpts2_np[:20,:]) + + # fCV, mask = cv2.findFundamentalMat(kpts1_np, kpts2_np,cv2.LMEDS) + # err = calc_err_total(kpts1_np,kpts2_np, fCV) + # err_mean = err / kpts1_np.shape[0] + # print(f"CV | err: {err}, err_mean: {err_mean}") # calc error on all matched keypoints - err8 = calc_err(kpts1_np, kpts2_np, f8) - err7 = calc_err(kpts1_np, kpts2_np, f7) - err_levmarq = calc_err(kpts1_np, kpts2_np, f_levmarq) - print(f"err8: {err8}") - print(f"err7: {err7}") - print(f"err_levmarq: {err_levmarq}") - - print(f"f8 : {f8}") - print(f"f7 : {f7}") - print(f"f_levmarq : {f_levmarq}") - - - ransac = Ransac(eight_point) - F_ransac = ransac.run(kpts1_np[:8,:], kpts2_np[:8,:]) - - print(f"F_ransac: {F_ransac}") + # err8 = calc_err_total(kpts1_np, kpts2_np, f8) + # err7 = calc_err_total(kpts1_np, kpts2_np, f7) + # err_levmarq = calc_err_total(kpts1_np, kpts2_np, f_levmarq) + # print(f"err8: {err8}") + # print(f"err7: {err7}") + # print(f"err_levmarq: {err_levmarq}") + + # print(f"f8 : {f8}") + # print(f"f7 : {f7}") + # print(f"f_levmarq : {f_levmarq}") + + eightPoint = EightPoint() + ransac = Ransac(eightPoint) + ratio, F = ransac.run(kpts1_np, kpts2_np) + err_total = calc_err_total(kpts1_np, kpts2_np, F) + err_mean = err_total/kpts1_np.shape[0] + print(f"EightPoint| inlier_ratio: {ratio} | err_total: {err_total} | err_mean: {err_mean}") + + sevenPoint = SevenPoint() + ransac = Ransac(sevenPoint) + ratio, F = ransac.run(kpts1_np, kpts2_np) + err_total = calc_err_total(kpts1_np, kpts2_np, F) + err_mean = err_total/kpts1_np.shape[0] + print(f"SevenPoint| inlier_ratio: {ratio} | err_total: {err_total} | err_mean: {err_mean}") + + # print(f"F_ransac: {F_ransac}") + + # levmarq = LevMarq(7) + # levmarq.__status__() + + # eightPoint = EightPoint() + # print(f"eightPoint.min_samples_: {eightPoint.min_samples_}") + + # ransac_estimator = Ransac(eightPoint) + + # ransac_estimator.run(kpts1_np, kpts2_np)