Skip to content

Commit

Permalink
simple ransac added
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhafizov committed Jul 15, 2023
1 parent 9fd081a commit df1008a
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pyc
172 changes: 167 additions & 5 deletions fundamental.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from scipy.optimize import least_squares

# calculates error = x1^T*F*x2
def calc_err(kpts1, kpts2, F):
def calc_err_total(kpts1, kpts2, F):
total_err = 0.0
for i in range(kpts1.shape[0]):
kp1 = np.array([kpts1[i,0], kpts1[i,1], 1.0])
Expand All @@ -15,6 +15,167 @@ def calc_err(kpts1, kpts2, F):
return total_err


class Fundamental:
def __init__(self, min_samples):
self.min_samples_ = min_samples

def calc_err_total(self, kpts1, kpts2, F):
total_err = 0.0
for i in range(kpts1.shape[0]):
kp1 = np.array([kpts1[i,0], kpts1[i,1], 1.0])
kp2 = np.array([kpts2[i,0], kpts2[i,1], 1.0])
err = np.linalg.multi_dot([kp1.T,F,kp2])
total_err += err

return total_err

def calc_err(self, kp1, kp2, F):
kp1h = np.array([kp1[0], kp1[0], 1.0])
kp2h = np.array([kp2[0], kp2[0], 1.0])

return np.linalg.multi_dot([kp1h.T, F, kp2h])


# returns inlier/n_total_samples ratio
def calc_inlier_ratio(self, kpts1, kpts2, F, threshold):
n_inliers = 0

err_mean = 0
for i in range(kpts1.shape[0]):
err = self.calc_err(kpts1[i,:], kpts2[i,:], F)
err_mean += err
if(abs(err) < threshold):
n_inliers+=1

return n_inliers/kpts1.shape[0]

class EightPoint(Fundamental):
def __init__(self):
super().__init__(8)

def estimate(self, kpts1, kpts2):
assert(kpts1.shape == kpts2.shape )
assert(kpts1.shape[0] == self.min_samples_)

A = np.zeros((kpts1.shape[0],9))

for i in range(A.shape[0]):
A[i][0] = kpts1[i][0] * kpts2[i][0]
A[i][1] = kpts1[i][1] * kpts2[i][0]
A[i][2] = kpts2[i][0]
A[i][3] = kpts1[i][0] * kpts2[i][1]
A[i][4] = kpts1[i][1] * kpts2[i][1]
A[i][5] = kpts2[i][1]
A[i][6] = kpts1[i][0]
A[i][7] = kpts1[i][1]
A[i][8] = 1.0

# find nullspace to get solution for Af = 0
# u - [row_space, left null_space]
# v - [column_space, null_space]
u,s,vt = np.linalg.svd(A, full_matrices=True)

# only 1 vector in nullspace
null_vec = vt.T[:,8]
null_mat = null_vec.reshape((3,3))

# enforcing constraint on third singular value being equal to 0
u2,s2,vt2 = np.linalg.svd(null_mat, full_matrices=True)
s2[2] = 0.0

s2 = np.diag(s2)
fundamental = np.linalg.multi_dot([u2,s2,vt2])
# up-to-scale, dividing by last element to make it equal to 1
fundamental /= fundamental[2,2]

return fundamental

class SevenPoint(Fundamental):
def __init__(self):
super().__init__(7)

def estimate(self, kpts1, kpts2):
A = np.zeros((7,9))

for i in range(A.shape[0]):
A[i][0] = kpts1[i][0] * kpts2[i][0]
A[i][1] = kpts1[i][1] * kpts2[i][0]
A[i][2] = kpts2[i][0]
A[i][3] = kpts1[i][0] * kpts2[i][1]
A[i][4] = kpts1[i][1] * kpts2[i][1]
A[i][5] = kpts2[i][1]
A[i][6] = kpts1[i][0]
A[i][7] = kpts1[i][1]
A[i][8] = 1.0

u,s,vt = np.linalg.svd(A, full_matrices=True)

# 2 vecs in nullspace
null_vec1 = vt.T[:,8]
null_vec2 = vt.T[:,7]

null_mat1 = null_vec1.reshape((3,3))
null_mat2 = null_vec2.reshape((3,3))

# creating variables, to make code more readable
a = null_mat1[0,0]
b = null_mat1[0,1]
c = null_mat1[0,2]
d = null_mat1[1,0]
e = null_mat1[1,1]
f = null_mat1[1,2]
g = null_mat1[2,0]
h = null_mat1[2,1]
i = null_mat1[2,2]

j = null_mat2[0,0]
k = null_mat2[0,1]
l = null_mat2[0,2]
m = null_mat2[1,0]
n = null_mat2[1,1]
o = null_mat2[1,2]
p = null_mat2[2,0]
q = null_mat2[2,1]
r = null_mat2[2,2]

# here write the result of constrant = det(f_1 + \lambda*f_2)=0 -> as a result there is a cubic polynomial
coeffs =[
a*e*i + b*f*g + c*d*h - a*f*h - b*d*i - c*e*g,
a*e*r + a*i*n + b*f*p + b*g*o + c*d*q + c*h*m + d*h*l + e*i*j + f*g*k -
a*f*q - a*h*o - b*d*r - b*i*m - c*e*p - c*g*n - d*i*k - e*g*l - f*h*j,
a*n*r + b*o*p + c*m*q + d*l*q + e*j*r + f*k*p + g*k*o + h*l*m + i*j*n -
a*o*q - b*m*r - c*n*p - d*k*r - e*l*p - f*j*q - g*l*n - h*j*o - i*k*m,
j*n*r + k*o*p + l*m*q - j*o*q - k*m*r - l*n*p
]

# finding roots of polynomial A*x^3 + B*x^2 + C*x + D
# cubic polynomial -> 1 or 3 solutions
roots = np.roots([coeffs[3],coeffs[2],coeffs[1],coeffs[0]])

# in order to choose best root we calc error and choose solution which minimizes this error
fundamental_best = None
err_best = None
for root in roots:
fundamental_i = null_mat1 + root * null_mat2
err_i = self.calc_err_total(kpts1,kpts2,fundamental_i)

if(err_best is None or err_i < err_best):
err_best = err_i
fundamental_best = fundamental_i

return fundamental_best


class LevMarq(Fundamental):
def __status__(self):
print("status")

def __del__(self):
print("destructor")




# estimating fundamental matrix with 2 ways:
# 7 point + solve quadratic polynom
# 8 points
Expand Down Expand Up @@ -122,7 +283,7 @@ def seven_point(kpts1, kpts2):
err_best = None
for root in roots:
fundamental_i = null_mat1 + root * null_mat2
err_i = calc_err(kpts1,kpts2,fundamental_i)
err_i = calc_err_total(kpts1,kpts2,fundamental_i)

if(err_best == None or err_i < err_best):
err_best = err_i
Expand All @@ -131,7 +292,7 @@ def seven_point(kpts1, kpts2):
return fundamental_best


def levmarq(kpts1, kpts2):
def levmarq(kpts1, kpts2, F0 = None):
assert(kpts1.shape[0] == kpts2.shape[0] )
assert(kpts1.shape[0]>=7)
# here should be cost function of type:
Expand All @@ -156,8 +317,9 @@ def cost(F):
return total_loss

# get initial estimate(without initial estimate lev marquardt may fail)
F = seven_point(kpts1[:7], kpts2[:7])
F0 = np.ravel(F[:3,:3])
if(F0 is None):
F0 = seven_point(kpts1[:7], kpts2[:7])
F0 = np.ravel(F0[:3,:3])

res = least_squares(lambda x: cost(x), F0)

Expand Down
45 changes: 44 additions & 1 deletion ransac.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,53 @@
from fundamental import *
import random

"""
RANSAC overview
ransac has cycle, where:
1. sample N points
2. kernel.fit() - estimates model given sampled points
3. scorer.score() - compute number of inliers
4. if number of inliers > current -> rewrite current
"""



"""
Ransac class which does following:
1. samples N points from data
2. calculates some given parameter(for example fundamental or essential matrix)
3. checks proportion of inliers
4. if enough iterations done or enough inliers obtained -> return best results
"""
class Ransac:
def __init__(self, algo) -> None:
# algorithm to run
self.estimator = algo
self.n_iterations = 1024
self.min_samples = algo.min_samples_

# error = x'^T * F * x
self.threshold = 1

def run(self, kpts1, kpts2):
return self.estimator(kpts1, kpts2)
assert(kpts1.shape[0] >= self.min_samples)
assert(kpts2.shape[0] >= self.min_samples)
assert(kpts1.shape[0] == kpts2.shape[0])

F_best = None
ratio_best = 0.0
for i in range(self.n_iterations):
# sample kpts
curr_idxs = random.sample(range(0, kpts1.shape[0]), self.min_samples)

# estimate given parameter
F = self.estimator.estimate(kpts1[curr_idxs], kpts2[curr_idxs])
# calculate number of inliers
inlier_ratio = self.estimator.calc_inlier_ratio(kpts1, kpts2, F, 1.0)
if(inlier_ratio > ratio_best):
ratio_best = inlier_ratio
F_best = F
# print(f"inlier_ratio: {inlier_ratio}")
# err = self.estimator.calc_err(kpts1, kpts2, F)

return ratio_best, F_best
81 changes: 56 additions & 25 deletions run_phg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from feature_extractor import FeatureExtractor
from feature_matcher import FeatureMatcher

from fundamental import eight_point
from fundamental import seven_point
from fundamental import levmarq
from fundamental import calc_err
# from fundamental import eight_point
# from fundamental import seven_point
# from fundamental import levmarq
# from fundamental import calc_err_total

from ransac import Ransac

from fundamental import *

def read_img(path, grayscale = True):
return cv2.cvtColor(cv2.imread(path),cv2.COLOR_BGR2GRAY)

Expand Down Expand Up @@ -52,32 +54,61 @@ def draw_kpts(im, kpts):

kpts1_np.append(kpts1[matches[i].queryIdx].pt)
kpts2_np.append(kpts2[matches[i].trainIdx].pt)

kpts1_np = np.array(kpts1_np)
kpts2_np = np.array(kpts2_np)

f8 = eight_point(kpts1_np[:8,:], kpts2_np[:8,:])
f7 = seven_point(kpts1_np[:7,:], kpts2_np[:7,:])
f_levmarq = levmarq(kpts1_np[:20,:], kpts2_np[:20,:])

# f8 = eight_point(kpts1_np[:8,:], kpts2_np[:8,:])
# err = calc_err_total(kpts1_np,kpts2_np, f8)
# err_mean = err / kpts1_np.shape[0]
# print(f"8 | err: {err}, err_mean: {err_mean}")
# f7 = seven_point(kpts1_np[:7,:], kpts2_np[:7,:])
# err = calc_err_total(kpts1_np,kpts2_np, f7)
# err_mean = err / kpts1_np.shape[0]
# print(f"7 | err: {err}, err_mean: {err_mean}")
# # f_levmarq = levmarq(kpts1_np[:20,:], kpts2_np[:20,:])

# fCV, mask = cv2.findFundamentalMat(kpts1_np, kpts2_np,cv2.LMEDS)
# err = calc_err_total(kpts1_np,kpts2_np, fCV)
# err_mean = err / kpts1_np.shape[0]
# print(f"CV | err: {err}, err_mean: {err_mean}")

# calc error on all matched keypoints
err8 = calc_err(kpts1_np, kpts2_np, f8)
err7 = calc_err(kpts1_np, kpts2_np, f7)
err_levmarq = calc_err(kpts1_np, kpts2_np, f_levmarq)
print(f"err8: {err8}")
print(f"err7: {err7}")
print(f"err_levmarq: {err_levmarq}")

print(f"f8 : {f8}")
print(f"f7 : {f7}")
print(f"f_levmarq : {f_levmarq}")


ransac = Ransac(eight_point)
F_ransac = ransac.run(kpts1_np[:8,:], kpts2_np[:8,:])

print(f"F_ransac: {F_ransac}")
# err8 = calc_err_total(kpts1_np, kpts2_np, f8)
# err7 = calc_err_total(kpts1_np, kpts2_np, f7)
# err_levmarq = calc_err_total(kpts1_np, kpts2_np, f_levmarq)
# print(f"err8: {err8}")
# print(f"err7: {err7}")
# print(f"err_levmarq: {err_levmarq}")

# print(f"f8 : {f8}")
# print(f"f7 : {f7}")
# print(f"f_levmarq : {f_levmarq}")

eightPoint = EightPoint()
ransac = Ransac(eightPoint)
ratio, F = ransac.run(kpts1_np, kpts2_np)
err_total = calc_err_total(kpts1_np, kpts2_np, F)
err_mean = err_total/kpts1_np.shape[0]
print(f"EightPoint| inlier_ratio: {ratio} | err_total: {err_total} | err_mean: {err_mean}")

sevenPoint = SevenPoint()
ransac = Ransac(sevenPoint)
ratio, F = ransac.run(kpts1_np, kpts2_np)
err_total = calc_err_total(kpts1_np, kpts2_np, F)
err_mean = err_total/kpts1_np.shape[0]
print(f"SevenPoint| inlier_ratio: {ratio} | err_total: {err_total} | err_mean: {err_mean}")

# print(f"F_ransac: {F_ransac}")

# levmarq = LevMarq(7)
# levmarq.__status__()

# eightPoint = EightPoint()
# print(f"eightPoint.min_samples_: {eightPoint.min_samples_}")

# ransac_estimator = Ransac(eightPoint)

# ransac_estimator.run(kpts1_np, kpts2_np)



Expand Down

0 comments on commit df1008a

Please sign in to comment.