Skip to content

Commit

Permalink
Feature abrp (flobz#79)
Browse files Browse the repository at this point in the history
* add ABRP connection

* update doc

* fix get_last_temp()

* add image

* update abrp

* fix abrpname

* fix no last postition

* add type point

* find model by vin

* add Spacetourer

* fix indent
  • Loading branch information
flobz authored Apr 13, 2021
1 parent e232776 commit 25b86c0
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 108 deletions.
88 changes: 36 additions & 52 deletions Car.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,33 @@
from copy import copy

from MyLogger import logger
from libs.car_model import CarModel
from libs.car_status import CarStatus

ENERGY_CAPACITY = {'SUV 3008': {'BATTERY_POWER': 10.8, 'FUEL_CAPACITY': 43},
'C5 Aircross': {'BATTERY_POWER': 10.8, 'FUEL_CAPACITY': 43},
'e-208': {'BATTERY_POWER': 46, 'FUEL_CAPACITY': 0, "ABRP_NAME": "peugeot:e208:20:50"},
'e-2008': {'BATTERY_POWER': 46, 'FUEL_CAPACITY': 0, "ABRP_NAME": "peugeot:e2008:20:48"},
'corsa-e': {'BATTERY_POWER': 46, 'FUEL_CAPACITY': 0, "ABRP_NAME": "opel:corsae:20:50"}
}
DEFAULT_BATTERY_POWER = 46
DEFAULT_FUEL_CAPACITY = 0
DEFAULT_MAX_ELEC_CONSUMPTION = 70
DEFAULT_MAX_FUEL_CONSUMPTION = 30
DEFAULT_ABRP_NAME = "peugeot:e208:20:50"
CARS_FILE = "cars.json"


class Car:
def __init__(self, vin, vehicle_id, brand, label="unknown", battery_power=None, fuel_capacity=None,
max_elec_consumption=None, max_fuel_consumption=None):
def __init__(self, vin, vehicle_id, brand, label=None, battery_power=None, fuel_capacity=None,
max_elec_consumption=None, max_fuel_consumption=None, abrp_name=None):
self.vin = vin
if label is not None:
model = CarModel.find_model_by_name(label)
else:
model = CarModel.find_model_by_vin(self.vin)
label = model.name
self.vehicle_id = vehicle_id
self.label = label
self.brand = brand
self.battery_power = None
self.fuel_capacity = None
self.max_elec_consumption = 0 # kwh/100Km
self.max_fuel_consumption = 0 # L/100Km
self.set_energy_capacity(battery_power, fuel_capacity, max_elec_consumption, max_fuel_consumption)
self.status = None

def set_energy_capacity(self, battery_power=None, fuel_capacity=None, max_elec_consumption=None,
max_fuel_consumption=None):
if battery_power is not None and fuel_capacity is not None:
self.battery_power = battery_power
self.fuel_capacity = fuel_capacity
elif self.__get_model_name() is not None:
model_name = self.__get_model_name()
self.battery_power = ENERGY_CAPACITY[model_name]["BATTERY_POWER"]
self.fuel_capacity = ENERGY_CAPACITY[model_name]["FUEL_CAPACITY"]
else:
logger.warning("Can't get car model please check %s", CARS_FILE)
self.battery_power = DEFAULT_BATTERY_POWER
self.fuel_capacity = DEFAULT_FUEL_CAPACITY
if self.is_electric():
self.max_fuel_consumption = 0
else:
self.max_fuel_consumption = max_fuel_consumption or DEFAULT_MAX_FUEL_CONSUMPTION
if self.is_thermal():
self.max_elec_consumption = 0
else:
self.max_elec_consumption = max_elec_consumption or DEFAULT_MAX_ELEC_CONSUMPTION
self._status = None
self.abrp_name = abrp_name or model.abrp_name
self.battery_power = battery_power or model.battery_power
self.fuel_capacity = fuel_capacity or model.fuel_capacity
self.max_elec_consumption = max_elec_consumption or model.max_elec_consumption # kwh/100Km
self.max_fuel_consumption = max_fuel_consumption or model.max_fuel_consumption # L/100Km

def __get_model_name(self):
if self.label in ENERGY_CAPACITY:
return self.label
if self.__is_opel_corsa():
return "corsa-e"
return None

def __is_opel_corsa(self):
return self.brand == "C" and self.label is None
def set_model_name(self, name):
self.label = name

def is_electric(self) -> bool:
return self.fuel_capacity == 0 and self.battery_power > 0
Expand All @@ -84,12 +51,27 @@ def from_json(cls, data: dict):

def to_dict(self):
car_dict = copy(self.__dict__)
car_dict.pop("status")
car_dict.pop("_status")
return car_dict

def __str__(self):
return str(self.to_dict())

def get_abrp_name(self):
if self.abrp_name is not None:
return self.abrp_name
raise ValueError("ABRP model is not set")

@property
def status(self):
return self._status

@status.setter
def status(self, value: CarStatus):
self._status = value
if self._status is not None and self.status.__class__ != CarStatus:
self._status.__class__ = CarStatus
self._status.correct()

class Cars(list):
def __init__(self, *args):
Expand Down Expand Up @@ -129,7 +111,9 @@ def load_cars(name=CARS_FILE):
try:
with open(name, "r") as f:
json_str = f.read()
return Cars.from_json(json.loads(json_str))
cars=Cars.from_json(json.loads(json_str))
cars.save_cars()
return cars
except (FileNotFoundError, TypeError) as e:
logger.debug(e)
return Cars()
97 changes: 50 additions & 47 deletions MyPSACC.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import threading
import traceback
import uuid
from copy import copy
from datetime import datetime
from http import HTTPStatus
from json import JSONEncoder
Expand All @@ -24,7 +23,8 @@
from MyLogger import logger

from utils import get_temp, rate_limit
from web.db import get_db, clean_position
from web.abrp import Abrp
from web.db import get_db, clean_position, get_last_temp
from geojson import Feature, Point, FeatureCollection
from geojson import dumps as geo_dumps

Expand All @@ -48,17 +48,9 @@
MQTT_EVENT_TOPIC = "psa/RemoteServices/events/MPHRTServices/"
MQTT_TOKEN_TTL = 890
CARS_FILE = "cars.json"
DEFAULT_CONFIG_FILENAME = "config.json"


# add method to class Energy
def get_energy(self, energy_type):
for energy in self._energy:
if energy.type == energy_type:
return energy
return psac.models.energy.Energy(charging=psac.models.energy_charging.EnergyCharging())


psac.models.status.Status.get_energy = get_energy


class OpenIdCredentialManager(CredentialManager):
Expand Down Expand Up @@ -128,13 +120,11 @@ def gen_correlation_id(date):


class MyPSACC:
vehicles_url = "https://idpcvs.peugeot.com/api/connectedcar/v2/oauth/authorize"

def connect(self, user, password):
self.manager.init_with_user_credentials(user, password, self.realm)

def __init__(self, refresh_token, client_id, client_secret, remote_refresh_token, customer_id, realm, country_code,
proxies=None, weather_api=None):
proxies=None, weather_api=None, abrp=None):
self.realm = realm
self.service_information = ServiceInformation(authorize_service,
realm_info[self.realm]['oauth_url'],
Expand All @@ -149,7 +139,6 @@ def __init__(self, refresh_token, client_id, client_secret, remote_refresh_token
self.remote_refresh_token = remote_refresh_token
self.remote_access_token = None
self.vehicles_list = Cars.load_cars(CARS_FILE)
self.set_proxies(proxies)
self.customer_id = customer_id
self._config_hash = None
self.api_config.verify_ssl = False
Expand All @@ -169,12 +158,19 @@ def __init__(self, refresh_token, client_id, client_secret, remote_refresh_token
self.precond_programs = {}
self.info_callback = []
self.info_refresh_rate = 120
if abrp is None:
self.abrp = Abrp()
else:
self.abrp: Abrp = Abrp(**abrp)
self.set_proxies(proxies)
self.config_file = DEFAULT_CONFIG_FILENAME

def get_app_name(self):
return realm_info[self.realm]['app_name']

def refresh_token(self):
self.manager._refresh_token()
self.save_config()

def api(self) -> psac.VehiclesApi:
self.api_config.access_token = self.manager._access_token
Expand All @@ -188,6 +184,7 @@ def set_proxies(self, proxies):
else:
self._proxies = proxies
self.api_config.proxy = proxies['http']
self.abrp.proxies = proxies
self.manager.proxies = self._proxies
Otp.set_proxies(proxies)

Expand All @@ -198,9 +195,10 @@ def get_vehicle_info(self, vin):
try:
res = self.api().get_vehicle_status(car.vehicle_id, extension=["odometer"])
if res is not None:
car.status = res
if self._record_enabled:
self.record_info(vin, res)
break
self.record_info(car)
return res
except ApiException:
logger.error(traceback.format_exc())
car.status = res
Expand All @@ -210,7 +208,7 @@ def refresh_vehicle_info(self):
if self.info_refresh_rate is not None:
while True:
sleep(self.info_refresh_rate)
logger.info("refresh_vehicle_info")
logger.debug("refresh_vehicle_info")
for car in self.vehicles_list:
self.get_vehicle_info(car.vin)
for callback in self.info_callback:
Expand Down Expand Up @@ -276,7 +274,7 @@ def refresh_remote_token(self, force=False):
last_update: datetime = self.remote_token_last_update
if (datetime.now() - last_update).total_seconds() < MQTT_TOKEN_TTL:
return None
self.manager._refresh_token()
self.refresh_token()
if self.remote_refresh_token is None:
logger.error("remote_refresh_token isn't defined")
self.load_otp(force_new=True)
Expand All @@ -295,6 +293,7 @@ def refresh_remote_token(self, force=False):
otp_code = self.get_otp_code()
res = self.get_remote_access_token(otp_code)
self.mqtt_client.username_pw_set("IMA_OAUTH_ACCESS_TOKEN", self.remote_access_token)
self.save_config()
return res

def on_mqtt_connect(self, client, userdata, rc, a):
Expand Down Expand Up @@ -471,7 +470,9 @@ def preconditioning(self, vin, activate: bool):
self.mqtt_client.publish(MQTT_REQ_TOPIC + self.customer_id + "/ThermalPrecond", msg)
return True

def save_config(self, name="config.json", force=False):
def save_config(self, name=None, force=False):
if name is None:
name = self.config_file
config_str = json.dumps(self, cls=MyPeugeotEncoder, sort_keys=True, indent=4).encode("utf8")
new_hash = md5(config_str).hexdigest()
if force or self._config_hash != new_hash:
Expand All @@ -487,37 +488,35 @@ def load_config(name="config.json"):
config = dict(**json.loads(config_str))
if "country_code" not in config:
config["country_code"] = input("What is your country code ? (ex: FR, GB, DE, ES...)\n")
return MyPSACC(**config)
if "abrp" not in config:
config["abrp"] = None
psacc = MyPSACC(**config)
psacc.config_file = name
return psacc

def set_record(self, value: bool):
self._record_enabled = value

def record_info(self, vin, status: psac.models.status.Status):
mileage = status.timed_odometer.mileage
level = status.get_energy('Electric').level
level_fuel = status.get_energy('Fuel').level
charge_date = status.get_energy('Electric').updated_at
try:
moving = status.kinetic.moving
logger.debug("")
except AttributeError:
logger.error("kinetic not available from api")
moving = None
try:
longitude = status.last_position.geometry.coordinates[0]
latitude = status.last_position.geometry.coordinates[1]
date = status.last_position.properties.updated_at
except AttributeError:
logger.error("last_position not available from api")
longitude = latitude = None
def record_info(self, car: Car):
mileage = car.status.timed_odometer.mileage
level = car.status.get_energy('Electric').level
level_fuel = car.status.get_energy('Fuel').level
charge_date = car.status.get_energy('Electric').updated_at
moving = car.status.kinetic.moving

longitude = car.status.last_position.geometry.coordinates[0]
latitude = car.status.last_position.geometry.coordinates[1]
date = car.status.last_position.properties.updated_at
if date is None:
date = charge_date
logger.debug("vin:%s longitude:%s latitude:%s date:%s mileage:%s level:%s charge_date:%s level_fuel:"
"%s moving:%s", vin, longitude, latitude, date, mileage, level, charge_date, level_fuel,
"%s moving:%s", car.vin, longitude, latitude, date, mileage, level, charge_date, level_fuel,
moving)
self.record_position(vin, mileage, latitude, longitude, date, level, level_fuel, moving)
self.record_position(car.vin, mileage, latitude, longitude, date, level, level_fuel, moving)
self.abrp.call(car, get_last_temp(car.vin))
try:
charging_status = status.get_energy('Electric').charging.status
self.record_charging(vin, charging_status, charge_date, level, latitude, longitude)
charging_status = car.status.get_energy('Electric').charging.status
self.record_charging(car.vin, charging_status, charge_date, level, latitude, longitude)
logger.debug("charging_status:%s ", charging_status)
except AttributeError:
logger.error("charging status not available from api")
Expand Down Expand Up @@ -547,8 +546,9 @@ def record_position(self, vin, mileage, latitude, longitude, date, level, level_
conn.commit()
logger.info("new position recorded for %s", vin)
clean_position(conn)
else:
logger.debug("position already saved")
return True
logger.debug("position already saved")
return False

def record_charging(self, vin, charging_status, charge_date, level, latitude, longitude):
conn = get_db()
Expand Down Expand Up @@ -609,12 +609,15 @@ def get_chargings(mini=None, maxi=None) -> Tuple[dict]:
res = conn.execute("select * from battery").fetchall()
return tuple(map(dict, res))

def __iter__(self):
for key, value in self.__dict__.items():
yield key, value

class MyPeugeotEncoder(JSONEncoder):
def default(self, mp: MyPSACC):
data = copy(mp.__dict__)
data = dict(mp)
mpd = {"proxies": data["_proxies"], "refresh_token": mp.manager.refresh_token,
"client_secret": mp.service_information.client_secret}
"client_secret": mp.service_information.client_secret, "abrp":dict(mp.abrp)}
for el in ["client_id", "realm", "remote_refresh_token", "customer_id", "weather_api", "country_code"]:
mpd[el] = data[el]
return mpd
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ We will retrieve these informations:

## FAQ
If you have a problem, or a question please check if the answer isn't in the [FAQ](FAQ.md).

## Connect to A better Route Planner
You can connect the app to ABRP, see [this page](docs/abrp.md)
## API documentation
The api documentation is described here : [api_spec.md](api_spec.md).
You can use all functions from the doc, for example :
Expand Down
Loading

0 comments on commit 25b86c0

Please sign in to comment.