Skip to content

Commit

Permalink
black format
Browse files Browse the repository at this point in the history
  • Loading branch information
jdebacker committed Oct 16, 2023
1 parent 25f665d commit c9672dd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 34 deletions.
16 changes: 13 additions & 3 deletions iot/inverse_optimal_tax.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from statsmodels.nonparametric.kernel_regression import KernelReg
from sklearn.kernel_ridge import KernelRidge


class IOT:
"""
Constructor for the IOT class.
Expand Down Expand Up @@ -119,12 +120,19 @@ def compute_mtr_dist(
data = data.sort_values(by=income_measure)
if mtr_smoother == "spline":
spl = UnivariateSpline(
data[income_measure], data["mtr"], w=data[weight_var], k=mtr_smooth_param
data[income_measure],
data["mtr"],
w=data[weight_var],
k=mtr_smooth_param,
)
mtr = spl(self.z)
elif mtr_smoother == "kr":
krr = KernelRidge(alpha=1.0)
krr.fit(data[income_measure], data["mtr"], sample_weight=data[weight_var])
krr.fit(
data[income_measure],
data["mtr"],
sample_weight=data[weight_var],
)
mtr = krr.predict(self.z)
else:
pass
Expand Down Expand Up @@ -196,7 +204,9 @@ def compute_income_dist(
assert False
# normalize f
f = f / np.sum(f)
f_prime = np.gradient(f, edge_order=2) # this works a bit better than finite differences, but still not great
f_prime = np.gradient(
f, edge_order=2
) # this works a bit better than finite differences, but still not great

return z_line, f, f_prime

Expand Down
93 changes: 62 additions & 31 deletions iot/tests/test_inverse_optimal_tax.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ def test_IOT_df():

assert isinstance(df_out, pd.DataFrame)

@pytest.mark.parametrize(
"mtr_smoother",
[("spline"), ("kr"), (None)
]
)

@pytest.mark.parametrize("mtr_smoother", [("spline"), ("kr"), (None)])
def test_IOT_compute_mtr_dist(mtr_smoother):
"""
Test computation of the mtr distribution
Expand Down Expand Up @@ -76,7 +73,8 @@ def test_IOT_compute_mtr_dist(mtr_smoother):
mtr, mtr_prime = iot1.compute_mtr_dist(data, weight_var, mtr_smoother, 3)
# np.savetxt(os.path.join(CUR_PATH, 'test_io_data', str(mtr_smoother) + '_mtr.csv'), mtr, delimiter=",")
expected_mtr = np.genfromtxt(
os.path.join(CUR_PATH, "test_io_data", str(mtr_smoother) + "_mtr.csv"), delimiter=","
os.path.join(CUR_PATH, "test_io_data", str(mtr_smoother) + "_mtr.csv"),
delimiter=",",
)

assert np.allclose(mtr, expected_mtr)
Expand All @@ -87,12 +85,15 @@ def test_IOT_compute_mtr_dist(mtr_smoother):
# may need to do something with bins with no observations in them
@pytest.mark.parametrize(
"income_measure,dist_type",
[("expanded_income", None), ("e00200", None),
("e00200", "log_normal"), ("e00200", "kde_full"),
("e00200", "kde_subset")
]
[
("expanded_income", None),
("e00200", None),
("e00200", "log_normal"),
("e00200", "kde_full"),
("e00200", "kde_subset"),
],
)
def test_IOT_compute_income_dist(income_measure,dist_type):
def test_IOT_compute_income_dist(income_measure, dist_type):
"""
Test the computation of the income distribution
"""
Expand Down Expand Up @@ -122,16 +123,22 @@ def test_IOT_compute_income_dist(income_measure,dist_type):
# create instance of IOT object
weight_var = "s006"
iot1 = IOT(data, income_measure=income_measure)
z, f, f_prime = iot1.compute_income_dist(data, income_measure, weight_var, dist_type=dist_type)
z, f, f_prime = iot1.compute_income_dist(
data, income_measure, weight_var, dist_type=dist_type
)
# np.savetxt(os.path.join(CUR_PATH, "test_io_data", income_measure + "_" + str(dist_type) + "_dist.csv"), f, delimiter=",")
expected_dist = np.genfromtxt(
os.path.join(CUR_PATH, "test_io_data", income_measure + "_" + str(dist_type) + "_dist.csv"), delimiter=","
os.path.join(
CUR_PATH,
"test_io_data",
income_measure + "_" + str(dist_type) + "_dist.csv",
),
delimiter=",",
)

assert np.allclose(f, expected_dist)



pol = taxcalc.policy.Policy()
rec = taxcalc.records.Records.cps_constructor()
calc = taxcalc.calculator.Calculator(policy=pol, records=rec)
Expand Down Expand Up @@ -188,57 +195,81 @@ def test_sw_weights_analytical():
if sim_dist_type == "exponential":
# generate income according to exponential distribution
z = np.random.exponential(beta, 100000)

def f_z_exp(z, beta):
return (1/beta) * np.exp(-z/beta)
return (1 / beta) * np.exp(-z / beta)

def f_prime_z_exp(z, beta):
return (-1/(beta ** 2)) * np.exp(-z/beta)
return (-1 / (beta**2)) * np.exp(-z / beta)

def theta_z_exp(z, beta):
return 1 - (z / beta)

def g_z_exp(z, beta, elasticity, mtr):
theta = theta_z_exp(z, beta)
g_z_exp = (1 + theta * elasticity * (mtr / (1 - mtr)))
g_z_exp = 1 + theta * elasticity * (mtr / (1 - mtr))
return g_z_exp

else:
# generate income according to lognormal distribution
z = np.random.lognormal(mu, sigma, 100000)

def f_z_exp(z, mu, sigma):
f = (
(1 / np.sqrt(2 * np.pi * sigma)) *
np.exp(-((np.log(z) - mu) ** 2) / (2 * sigma ** 2)) *
(1 / z)
(1 / np.sqrt(2 * np.pi * sigma))
* np.exp(-((np.log(z) - mu) ** 2) / (2 * sigma**2))
* (1 / z)
)
return f

def f_prime_z_exp(z, mu, sigma):
fp = (
# -((np.log(z) - mu) / (np.sqrt(2 * np.pi * sigma) * (sigma ** 2) * (z ** 2)))
# * np.exp(-((np.log(z) - mu) ** 2) / (2 * sigma ** 2))
-(np.exp(-(np.log(z)-mu)**2/(2*sigma**2))*(np.log(z)+sigma**2-mu))/(np.sqrt(2)*np.sqrt(np.pi)*sigma**(5/2)*z**2)
-(
np.exp(-((np.log(z) - mu) ** 2) / (2 * sigma**2))
* (np.log(z) + sigma**2 - mu)
)
/ (np.sqrt(2) * np.sqrt(np.pi) * sigma ** (5 / 2) * z**2)
)
return fp

def theta_z_exp(z, mu, sigma):
return 1 - ((np.log(z) + sigma ** 2 - mu) / sigma ** 2)
return 1 - ((np.log(z) + sigma**2 - mu) / sigma**2)

def g_z_exp(z, mu, sigma, elasticity, mtr):
theta = theta_z_exp(z, mu, sigma)
g_z_exp = (1 + theta * elasticity * (mtr / (1 - mtr)))
g_z_exp = 1 + theta * elasticity * (mtr / (1 - mtr))
return g_z_exp

# Find test value for g_z
# sort z -- not sure it matters
z.sort()
dict_in = {"e00200p": z, "e00200s": np.zeros_like(z), "e00200": z,
"s006": np.ones_like(z), "XTOT": np.ones_like(z), "MARS": np.ones_like(z),
"mtr": np.ones_like(z) * mtr}
dict_in = {
"e00200p": z,
"e00200s": np.zeros_like(z),
"e00200": z,
"s006": np.ones_like(z),
"XTOT": np.ones_like(z),
"MARS": np.ones_like(z),
"mtr": np.ones_like(z) * mtr,
}
df = pd.DataFrame(dict_in)
# create instance of IOT object
mtr_smoother = "spline"
mtr_smoother = "spline"
weight_var = "s006"
if sim_dist_type == "log_normal":
dist_type = "log_normal"
else:
dist_type = "kde_full"
iot_test = IOT(
df, income_measure="e00200", bandwidth=100, dist_type=dist_type,
mtr_smoother=mtr_smoother, mtr_smooth_param=3,
upper_bound=80000
df,
income_measure="e00200",
bandwidth=100,
dist_type=dist_type,
mtr_smoother=mtr_smoother,
mtr_smooth_param=3,
upper_bound=80000,
)
if sim_dist_type == "exponential":
g_z_test = iot_test.sw_weights()
Expand All @@ -261,4 +292,4 @@ def g_z_exp(z, mu, sigma, elasticity, mtr):
# numbers (1-12/13 at hight end of z... this then leads to significant
# diffs in theta_z)
# assert np.allclose(iot_test.theta_z[100:], theta_z_expected[100:], atol=1e-4)
# assert np.allclose(g_z_test, g_z_expected, atol=1e-4)
# assert np.allclose(g_z_test, g_z_expected, atol=1e-4)

0 comments on commit c9672dd

Please sign in to comment.