-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #65 from FozzieBear/feat_geolocate
Refactoring geolocation to mirror Locast methods.
- Loading branch information
Showing
2 changed files
with
150 additions
and
118 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,42 @@ | ||
import json, urllib2, time, os, sys, string | ||
import m3u8 | ||
import re | ||
from functools import update_wrapper | ||
from datetime import datetime | ||
|
||
def handle_url_except(f): | ||
def wrapper_func(self, *args, **kwargs): | ||
try: | ||
return f(self, *args, **kwargs) | ||
except urllib2.URLError as urlError: | ||
print("Error in function {}: {}".format(f.__name__, str(urlError.reason))) | ||
return False | ||
except urllib2.HTTPError as httpError: | ||
print("Error in function {}: {}".format(f.__name__, str(httpError.reason))) | ||
return False | ||
except Exception as e: | ||
print("Error in function {}: {}".format(f.__name__, e.reason)) | ||
return False | ||
return update_wrapper(wrapper_func, f) | ||
|
||
|
||
class LocastService: | ||
|
||
current_token = None | ||
current_location = None | ||
current_dma = None | ||
current_city = None | ||
active_dma = None | ||
base_data_folder = None | ||
|
||
|
||
|
||
|
||
|
||
def __init__(self, base_folder, mock_location): | ||
def __init__(self, base_folder, mock_location, zipcode): | ||
self.base_data_folder = base_folder | ||
|
||
if not mock_location == None: | ||
self.current_location = mock_location | ||
|
||
self.mock_location = mock_location | ||
self.zipcode = zipcode | ||
|
||
|
||
@handle_url_except | ||
def login(self, username, password): | ||
|
||
# check environment vars | ||
|
@@ -40,135 +55,150 @@ def login(self, username, password): | |
# POST | ||
# {"username":"[email protected]","password":"xxxxxxxx"} | ||
|
||
try: | ||
loginReq = urllib2.Request('https://api.locastnet.org/api/user/login', | ||
'{"username":"' + username + '","password":"' + password + '"}', | ||
{'Content-Type': 'application/json'}) | ||
|
||
loginReq = urllib2.Request('https://api.locastnet.org/api/user/login', | ||
'{"username":"' + username + '","password":"' + password + '"}', | ||
{'Content-Type': 'application/json'}) | ||
|
||
loginOpn = urllib2.urlopen(loginReq) | ||
loginRes = json.load(loginOpn) | ||
loginOpn.close() | ||
except urllib2.URLError as urlError: | ||
print("Error during login: " + str(urlError.reason)) | ||
return False | ||
except urllib2.HTTPError as httpError: | ||
print("Error during login: " + str(httpError.reason)) | ||
return False | ||
except: | ||
loginErr = sys.exc_info()[0] | ||
print("Error during login: " + loginErr.message) | ||
return False | ||
loginOpn = urllib2.urlopen(loginReq) | ||
loginRes = json.load(loginOpn) | ||
loginOpn.close() | ||
|
||
self.current_token = loginRes["token"] | ||
return True | ||
|
||
|
||
|
||
|
||
|
||
@handle_url_except | ||
def validate_user(self): | ||
print("Validating User Info...") | ||
|
||
try: | ||
# get user info and make sure we donated | ||
userReq = urllib2.Request('https://api.locastnet.org/api/user/me', | ||
headers={'Content-Type': 'application/json', 'authorization': 'Bearer ' + self.current_token}) | ||
|
||
userOpn = urllib2.urlopen(userReq) | ||
userRes = json.load(userOpn) | ||
userOpn.close() | ||
except urllib2.URLError as urlError: | ||
print("Error during user info request: " + str(urlError.reason)) | ||
return False | ||
except urllib2.HTTPError as httpError: | ||
print("Error during user info request: " + str(httpError.reason)) | ||
return False | ||
except: | ||
userInfoErr = sys.exc_info()[0] | ||
print("Error during user info request: " + userInfoErr.message) | ||
return False | ||
# get user info and make sure we donated | ||
userReq = urllib2.Request('https://api.locastnet.org/api/user/me', | ||
headers={'Content-Type': 'application/json', 'authorization': 'Bearer ' + self.current_token}) | ||
|
||
userOpn = urllib2.urlopen(userReq) | ||
userRes = json.load(userOpn) | ||
userOpn.close() | ||
|
||
print("User Info obtained.") | ||
print("User didDonate: " + str(userRes['didDonate'])) | ||
|
||
# Check if donated | ||
if not userRes['didDonate']: | ||
print("User didDonate: {}".format(userRes['didDonate'])) | ||
# Check if the user has donated, and we got an actual expiration date. | ||
if userRes['didDonate'] and userRes['donationExpire']: | ||
# Check if donation has expired. | ||
donateExp = datetime.fromtimestamp(userRes['donationExpire'] / 1000) | ||
print("User donationExpire: {}".format(donateExp)) | ||
if datetime.now() > donateExp: | ||
print("Error! User's donation ad-free period has expired.") | ||
return False | ||
else: | ||
print("Error! User must donate for this to work.") | ||
return False | ||
|
||
print("User donationExpire: " + str(userRes['donationExpire'] / 1000)) | ||
|
||
# Check if donation has expired | ||
if ((userRes['donationExpire'] / 1000) < int(time.time())): | ||
print("Error! User's donation ad-free period has expired.") | ||
return False | ||
|
||
|
||
# Check for user's location | ||
print("Getting user location...") | ||
|
||
if self.current_location is None: | ||
try: | ||
# get current location | ||
geoReq = urllib2.Request('https://get.geojs.io/v1/ip/geo.json') | ||
geoOpn = urllib2.urlopen(geoReq) | ||
geoRes = json.load(geoOpn) | ||
geoOpn.close() | ||
except urllib2.URLError as urlError: | ||
print("Error during geo IP acquisition: " + str(urlError.reason)) | ||
return False | ||
except urllib2.HTTPError as httpError: | ||
print("Error during geo IP acquisition: " + str(httpError.reason)) | ||
return False | ||
except: | ||
geoIpErr = sys.exc_info()[0] | ||
print("Error during geo IP acquisition: " + geoIpErr.message) | ||
return False | ||
print("User location obtained as " + geoRes['latitude'] + '/' + geoRes['longitude']) | ||
self.current_location = geoRes | ||
|
||
|
||
# See if we have a market available to the user | ||
if self.current_dma is None: | ||
print("Getting user's media market (DMA)...") | ||
|
||
try: | ||
# https://api.locastnet.org/api/watch/dma/40.543034399999996/-75.42280769999999 | ||
# returns dma - local market | ||
dmaReq = urllib2.Request('https://api.locastnet.org/api/watch/dma/' + | ||
self.current_location['latitude'] + '/' + | ||
self.current_location['longitude'], | ||
headers={'Content-Type': 'application/json'}) | ||
|
||
dmaOpn = urllib2.urlopen(dmaReq) | ||
dmaRes = json.load(dmaOpn) | ||
dmaOpn.close() | ||
except urllib2.URLError as urlError: | ||
print("Error when getting the users's DMA: " + str(urlError.reason)) | ||
return False | ||
except urllib2.HTTPError as httpError: | ||
print("Error when getting the users's DMA: " + str(httpError.reason)) | ||
return False | ||
except: | ||
dmaErr = sys.exc_info()[0] | ||
print("Error when getting the users's DMA: " + dmaErr.message) | ||
return False | ||
|
||
print("DMA found as " + dmaRes['DMA'] + ": " + dmaRes['name']) | ||
|
||
if (dmaRes['active'] == False): | ||
print("DMA not available in Locast yet. Exiting...") | ||
return False | ||
|
||
self.current_dma = dmaRes['DMA'] | ||
|
||
# Find the users location via lat\long or zipcode if specified,(lat\lon | ||
# taking precedence if both are provided) otherwise use IP. Attempts to | ||
# mirror the geolocation found at locast.org\dma. Also allows for a | ||
# check that Locast reports the area as active. | ||
if self.find_location(): | ||
print("Got location as {} - DMA {} - Lat\Lon {}\{}".format(self.current_city, | ||
self.current_dma, | ||
self.current_location['latitude'], | ||
self.current_location['longitude']) | ||
) | ||
else: | ||
return False | ||
# Check that Locast reports this market is currently active and available. | ||
if not self.active_dma: | ||
print("Locast reports that this DMA\Market area is not currently active!") | ||
return False | ||
|
||
return True | ||
|
||
def find_location(self): | ||
''' | ||
Mirror the geolocation options found at locast.org/dma since we can't | ||
rely on browser geolocation. If the user provides override coords, or | ||
override_zipcode, resolve location based on that data. Otherwise check | ||
by external ip, (using ipinfo.io, as the site does). | ||
Calls to Locast return JSON in the following format: | ||
{ | ||
u'DMA': str (DMA Number), | ||
u'large_url': str, | ||
u'name': str, | ||
u'longitude': lon, | ||
u'latitude': lat, | ||
u'active': bool, | ||
u'announcements': list, | ||
u'small_url': str | ||
} | ||
''' | ||
zip_format = re.compile(r'^[0-9]{5}$') | ||
# Check if the user provided override coords. | ||
if self.mock_location: | ||
return self.get_coord_location() | ||
# Check if the user provided an override zipcode, and that it's valid. | ||
elif self.zipcode and zip_format.match(self.zipcode): | ||
return self.get_zip_location() | ||
else: | ||
# If no override zip, or not a valid ZIP, fallback to IP location. | ||
return self.get_ip_location() | ||
|
||
@handle_url_except | ||
def get_zip_location(self): | ||
print("Getting location via provided zipcode {}".format(self.zipcode)) | ||
# Get geolocation via Locast, based on user provided zipcode. | ||
req = urllib2.Request('https://api.locastnet.org/api/watch/dma/zip/{}'.format(self.zipcode)) | ||
resp = urllib2.urlopen(req) | ||
geoRes = json.load(resp) | ||
resp.close() | ||
self.current_location = {'latitude': str(geoRes['latitude']), 'longitude': str(geoRes['longitude'])} | ||
self.current_dma = str(geoRes['DMA']) | ||
self.active_dma = geoRes['active'] | ||
self.current_city = str(geoRes['name']) | ||
return True | ||
|
||
|
||
|
||
@handle_url_except | ||
def get_ip_location(self): | ||
print("Getting location via IP Address.") | ||
# Get geolocation via Locast. Mirror their website and use https://ipinfo.io/ip to get external IP. | ||
ip_resp = urllib2.urlopen('https://ipinfo.io/ip') | ||
ip = ip_resp.read().strip() | ||
ip_resp.close() | ||
|
||
print("Got external IP {}.".format(ip)) | ||
|
||
# Query Locast by IP, using a 'client_ip' header. | ||
req = urllib2.Request('https://api.locastnet.org/api/watch/dma/ip') | ||
req.add_header('client_ip', ip) | ||
resp = urllib2.urlopen(req) | ||
geoRes = json.load(resp) | ||
resp.close() | ||
self.current_location = {'latitude': str(geoRes['latitude']), 'longitude': str(geoRes['longitude'])} | ||
self.current_dma = str(geoRes['DMA']) | ||
self.active_dma = geoRes['active'] | ||
self.current_city = str(geoRes['name']) | ||
return True | ||
|
||
@handle_url_except | ||
def get_coord_location(self): | ||
print("Getting location via provided lat\lon coordinates.") | ||
# Get geolocation via Locast, using lat\lon coordinates. | ||
lat = self.mock_location['latitude'] | ||
lon = self.mock_location['longitude'] | ||
req = urllib2.Request('https://api.locastnet.org/api/watch/dma/{}/{}'.format(lat, lon)) | ||
req.add_header('Content-Type', 'application/json') | ||
resp = urllib2.urlopen(req) | ||
geoRes = json.load(resp) | ||
resp.close() | ||
self.current_location = {'latitude': str(geoRes['latitude']), 'longitude': str(geoRes['longitude'])} | ||
self.current_dma = str(geoRes['DMA']) | ||
self.active_dma = geoRes['active'] | ||
self.current_city = str(geoRes['name']) | ||
return True | ||
|
||
|
||
def get_stations(self): | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters