-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
218 lines (165 loc) · 6.47 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import os, psutil
import subprocess, sys
from pyproj import Transformer
from osgeo import gdal, osr
import numpy as np
import datetime
import fiona, rasterio
import geopandas as gpd
from rasterio.mask import mask
import xml.etree.ElementTree as ET
from offset_tracking.util import numpy_array_to_raster
def execute(cmd):
subprocess.check_call(cmd, shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)
def read_vals(fn, nodat=None, band=1):
ds = gdal.Open(fn)
disp = ds.GetRasterBand(band).ReadAsArray()
nodata = np.isnan(disp)
disp[nodata] = -32767
if nodat is not None:
nodata = (nodat|nodata)
disp[nodat] = -32767
ds = None
return disp, nodata
def get_DT(date1, date2):
date1, date2 = str(date1), str(date2)
date1 = datetime.date(int(date1[:4]), int(date1[4:6]), int(date1[6:]))
date2 = datetime.date(int(date2[:4]), int(date2[4:6]), int(date2[6:]))
return (date2-date1).days
def get_deltaT(dates):
deltaT = []
for i in range(len(dates)-1):
deltaT.append(get_DT(dates[i], dates[i+1]))
return deltaT
def getazimuthAngle(filename):
from zipfile import ZipFile
print(filename)
with ZipFile(filename, 'r') as zipObj:
listOfiles = zipObj.namelist()
for l in listOfiles:
if l.endswith('xml') and l.split('/')[-2]=='annotation':
f = zipObj.open(l)
fl = str(f.read())
azimuth_angle = float(fl.split('platformHeading>')[1][:-2])
break
return azimuth_angle
def horn_gradient(z, geo):
"""calculate x and y gradients according to Horn (1981) method"""
nrows, ncols = z.shape
z_00 = z[0:nrows-2, 0:ncols-2]
z_10 = z[1:nrows-1, 0:ncols-2]
z_20 = z[2:nrows, 0:ncols-2]
z_02 = z[0:nrows-2, 2:ncols]
z_12 = z[1:nrows-1, 2:ncols]
z_22 = z[2:nrows, 2:ncols]
dz_dx_tmp = (z_02+2.0*z_12+z_22-z_00-2.0*z_10-z_20)/8.0
dz_dx = np.zeros((nrows, ncols))
dz_dx[1:-1,1:-1] = dz_dx_tmp
z_00 = z[0:nrows-2, 0:ncols-2]
z_01 = z[0:nrows-2, 1:ncols-1]
z_02 = z[0:nrows-2, 2:ncols]
z_20 = z[2:nrows, 0:ncols-2]
z_21 = z[2:nrows, 1:ncols-1]
z_22 = z[2:nrows, 2:ncols]
dz_dy_tmp = (z_20+2.0*z_21+z_22-z_00-2.0*z_01-z_02)/8.0
dz_dy = np.zeros((nrows, ncols))
dz_dy[1:-1,1:-1] = dz_dy_tmp
dz_dy = dz_dy/geo[1]
dz_dx = dz_dx/geo[5]
return (dz_dy, dz_dx)
def directional_slope(slopex, slopey, angle):
rad = (angle*np.pi)/180
dslope = (slopex*np.sin(rad)) + (slopey*np.cos(rad))
return dslope
def get_espg(lat, zone):
epsg_code = 32600
epsg_code += int(zone)
if (lat < 0): # South
epsg_code += 100
return epsg_code
def cropRaster(shape_filenm, raster_filenm, cropped_filenm):
# read imagery file
src = rasterio.open(raster_filenm)
crs_gt = src.crs
gdf = gpd.read_file(shape_filenm)
if gdf.crs != crs_gt:
gdf = gdf.to_crs(crs_gt)
print("Reprojected CRS: ", gdf.crs)
else:
print("No reprojection needed.")
# Save the GeoDataFrame as a shapefile
gdf.to_file(shape_filenm)
# Read Shape file
with fiona.open(shape_filenm, "r") as shapefile:
shapes = [feature["geometry"] for feature in shapefile]
out_image, out_transform = mask(src, shapes, crop=False)
out_meta = src.meta
# Save clipped imagery
out_meta.update({"driver": "GTiff",
"height": out_image.shape[1],
"width": out_image.shape[2],
"transform": out_transform})
with rasterio.open(cropped_filenm, "w", **out_meta) as dest:
dest.write(out_image)
return out_image
def generate_dem_products(dem_dir, bbox, config=None):
bbox = np.array(bbox[1:-1].replace(' ','').split(',')).astype('float')
zone = round((180+bbox[2])/6)
if not os.path.exists('dem.tif'):
execute(f'gdalwarp -s_srs "EPSG:4326" -t_srs "+proj=utm +zone={zone} +datum=WGS84 +units=m +no_defs" -of GTIFF {dem_dir} dem.tif')
# Assuming the entire region lies in the same UTM zone
epsg = get_espg(bbox[0], zone)
transformer = Transformer.from_crs("epsg:4326",f"epsg:{epsg}")
xl,yt = bbox[0], bbox[2]
xr, yb = bbox[1], bbox[3]
x2,y2 = transformer.transform(xl,yt)
x3,y3 = transformer.transform(xr,yb)
if not os.path.exists('dem_crop.tif'):
execute(f'gdalwarp -te {x2} {y2} {x3} {y3} dem.tif dem_crop.tif')
execute('rm dem.tif')
demvel = gdal.Open('dem_crop.tif')
dem = demvel.GetRasterBand(1).ReadAsArray().astype(float)
projs = demvel.GetProjection()
geo = demvel.GetGeoTransform()
demvel = None
# Get azimuth angle
try:
tree = ET.parse('reference.xml')
ref_dir = str(tree.getroot().findall(".//*[@name='safe']")[0].text[1:-1])
azimuth_angle = getazimuthAngle(ref_dir[1:-1])
except:
azimuth_angle = config['azimuthAngle']
dz_dy, dz_dx = horn_gradient(dem, geo)
slopey = directional_slope(dz_dx, dz_dy, float(azimuth_angle))
slopey = np.arctan(slopey)*(180.0)/np.pi
slopex = directional_slope(dz_dx, dz_dy, (float(azimuth_angle)+90))
slopex = np.arctan(slopex)*(180.0)/np.pi
slope = np.sqrt(dz_dx**2 + dz_dy**2)
D = np.arctan(slope)*(180.0)/np.pi
# Saving slope along X and Y directions
ds = numpy_array_to_raster('dem_x.tif', np.expand_dims(slopex, 0), projs, geo, nband=1)
ds.FlushCache()
ds = None
ds = numpy_array_to_raster('dem_y.tif', np.expand_dims(slopey, 0), projs, geo, nband=1)
ds.FlushCache()
ds = None
ds = numpy_array_to_raster('dem_slope.tif', np.expand_dims(D, 0), projs, geo, nband=1)
ds.FlushCache()
ds = None
def process_check_running(num):
check = ['config_reference', 'config_secondary', 'config_baseline', None, 'config_overlap_geo2rdr', 'config_overlap_resample',
'config_misreg', None, 'config_fullBurst_geo2rdr', 'config_fullBurst_resample', None, 'config_merge']
if check[num-1] is None:
return False
for proc in psutil.process_iter():
try:
processName = proc.cmdline()
if len(processName)!=0:
# if proc.status() == psutil.STATUS_RUNNING:
if 'python3' in processName[0]:
cmd = ' '.join(processName)
if check[num-1] in cmd:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False