From 3d7cd99951c4229780b0d436e97844a4394d1dcd Mon Sep 17 00:00:00 2001 From: Abdelaziz Elrashed Date: Fri, 19 Jul 2024 15:45:36 +0300 Subject: [PATCH] =?UTF-8?q?-=20=D8=A5=D8=B6=D8=A7=D9=81=D8=A9=20=D8=AF?= =?UTF-8?q?=D9=88=D8=A7=D9=84=20=D9=84=D8=AA=D8=AE=D8=B2=D9=8A=D9=86=20?= =?UTF-8?q?=D9=86=D8=B3=D8=AE=D8=A9=20=D9=85=D9=86=20=D9=85=D9=84=D9=81=20?= =?UTF-8?q?=D9=82=D8=A7=D8=B9=D8=AF=D8=A9=20=D8=A7=D9=84=D8=A8=D9=8A=D8=A7?= =?UTF-8?q?=D9=86=D8=A7=D8=AA=20=D8=B9=D9=86=20=D9=83=D9=84=20=D8=B9=D9=85?= =?UTF-8?q?=D9=84=D9=8A=D8=A9=20=D8=A7=D8=B3=D8=AA=D8=B9=D8=A7=D8=AF=D8=A9?= =?UTF-8?q?.=20-=20=D8=A7=D8=B5=D9=84=D8=A7=D8=AD=20=D8=AE=D9=84=D9=84=20?= =?UTF-8?q?=D9=81=D9=8A=20=D8=A7=D9=84=D8=A7=D8=B3=D8=AA=D8=B9=D8=A7=D8=AF?= =?UTF-8?q?=D8=A9=20=D8=A8=D8=B3=D8=A8=D8=A8=20=D9=85=D8=B4=D9=83=D9=84?= =?UTF-8?q?=D8=A9=20=D8=A7=D9=84=D8=AA=D8=AD=D9=88=D9=8A=D9=84=20=D9=81?= =?UTF-8?q?=D9=8A=D9=85=D8=A7=20=D8=A8=D9=8A=D9=86=20=D9=86=D9=88=D8=B9=20?= =?UTF-8?q?float=20=D9=88=20Decimal.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyproject.toml | 2 +- setup.py | 2 +- zakat/zakat_tracker.py | 164 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 156 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3f754e3..74d4a8b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "zakat" -version = "0.2.77" +version = "0.2.78" authors = [ { name="Abdelaziz Elrashed Elshaikh Mohamed", email="aeemh.sdn@gmail.com" }, ] diff --git a/setup.py b/setup.py index 257a2ff..5dced2c 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='zakat', packages=find_packages(include=['zakat']), - version='0.2.77', + version='0.2.78', description='A Python Library for Islamic Financial Management.', author='Abdelaziz Elrashed Elshaikh Mohamed', install_requires=[], diff --git a/zakat/zakat_tracker.py b/zakat/zakat_tracker.py index 1d6ccab..7663242 100644 --- a/zakat/zakat_tracker.py +++ b/zakat/zakat_tracker.py @@ -60,6 +60,7 @@ import pickle import random import datetime +import hashlib from time import sleep from pprint import PrettyPrinter as pp from math import floor @@ -67,6 +68,8 @@ from sys import version_info from decimal import Decimal from typing import List, Dict, Any +from pathlib import Path +from time import time_ns class Action(Enum): @@ -180,7 +183,7 @@ def Version(): Returns: str: The current version of the software. """ - return '0.2.77' + return '0.2.78' @staticmethod def ZakatCut(x: float) -> float: @@ -255,18 +258,54 @@ def __init__(self, db_path: str = "zakat.pickle", history_mode: bool = True): def path(self, path: str = None) -> str: """ - Set or get the database path. + Set or get the path to the database file. + + If no path is provided, the current path is returned. + If a path is provided, it is set as the new path. + The function also creates the necessary directories if the provided path is a file. Parameters: - path (str): The path to the database file. If not provided, it returns the current path. + path (str): The new path to the database file. If not provided, the current path is returned. Returns: - str: The current database path. + str: The current or new path to the database file. """ - if path is not None: - self._vault_path = path + if path is None: + return self._vault_path + self._vault_path = Path(path).resolve() + base_path = Path(path).resolve() + if base_path.is_file() or base_path.suffix: + base_path = base_path.parent + base_path.mkdir(parents=True, exist_ok=True) + self._base_path = base_path return self._vault_path + def base_path(self, *args) -> str: + """ + Generate a base path by joining the provided arguments with the existing base path. + + Parameters: + *args (str): Variable length argument list of strings to be joined with the base path. + + Returns: + str: The generated base path. If no arguments are provided, the existing base path is returned. + """ + if not args: + return self._base_path + filtered_args = [] + ignored_filename = None + for arg in args: + if Path(arg).suffix: + ignored_filename = arg + else: + filtered_args.append(arg) + base_path = Path(self._base_path) + full_path = base_path.joinpath(*filtered_args) + full_path.mkdir(parents=True, exist_ok=True) + if ignored_filename is not None: + return full_path.resolve() / ignored_filename # Join with the ignored filename + return full_path.resolve() + @staticmethod def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: """ @@ -581,6 +620,102 @@ def log_size(self, account) -> int: return len(self._vault['account'][account]['log']) return -1 + @staticmethod + def file_hash(file_path: str, algorithm: str = "blake2b") -> str: + """ + Calculates the hash of a file using the specified algorithm. + + Parameters: + file_path (str): The path to the file. + algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". + + Returns: + str: The hexadecimal representation of the file's hash. + """ + hash_obj = hashlib.new(algorithm) # Create the hash object + with open(file_path, "rb") as f: # Open file in binary mode for reading + for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks + hash_obj.update(chunk) + return hash_obj.hexdigest() # Return the hash as a hexadecimal string + + def snapshot_cache_path(self): + """ + Generate the path for the cache file used to store snapshots. + + The cache file is a pickle file that stores the timestamps of the snapshots. + The file name is derived from the main database file name by replacing the ".pickle" extension with ".snapshots.pickle". + + Returns: + str: The path to the cache file. + """ + path = str(self.path()) + if path.endswith(".pickle"): + path = path[:-7] + return path + '.snapshots.pickle' + + def snapshot(self) -> bool: + """ + This function creates a snapshot of the current database state. + + The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. + If a snapshot with the same hash exists, the function returns True without creating a new snapshot. + If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state + in a new pickle file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. + + Parameters: + None + + Returns: + bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. + """ + current_hash = self.file_hash(self.path()) + cache: dict[str, int] = {} # hash: time_ns + try: + with open(self.snapshot_cache_path(), "rb") as f: + cache = pickle.load(f) + except: + pass + if current_hash in cache: + return True + time = time_ns() + cache[current_hash] = time + if not self.save(self.base_path('snapshots', f'{time}.pickle')): + return False + with open(self.snapshot_cache_path(), "wb") as f: + pickle.dump(cache, f) + return True + + def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) -> dict[int, tuple[str, str, bool]]: + """ + Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. + + Parameters: + - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. + - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. + + Returns: + - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, + and the values are tuples containing the snapshot's hash, path, and existence status. + """ + cache: dict[str, int] = {} # hash: time_ns + try: + with open(self.snapshot_cache_path(), "rb") as f: + cache = pickle.load(f) + except: + pass + if not cache: + return {} + result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) + for file_hash, ref in cache.items(): + path = self.base_path('snapshots', f'{ref}.pickle') + exists = os.path.exists(path) + valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True + if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): + continue + if exists or not hide_missing: + result[ref] = (file_hash, path, exists) + return result + def recall(self, dry=True, debug=False) -> bool: """ Revert the last operation. @@ -602,7 +737,7 @@ def recall(self, dry=True, debug=False) -> bool: memory = self._vault['history'][ref] if debug: print(type(memory), 'memory', memory) - + self.snapshot() limit = len(memory) + 1 sub_positive_log_negative = 0 for i in range(-1, -limit, -1): @@ -645,8 +780,17 @@ def recall(self, dry=True, debug=False) -> bool: assert self.box_exists(x['account'], box_ref) box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] assert box_value < 0 - self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value - self._vault['account'][x['account']]['balance'] += -box_value + + try: + self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value + except TypeError: + self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal(-box_value) + + try: + self._vault['account'][x['account']]['balance'] += -box_value + except TypeError: + self._vault['account'][x['account']]['balance'] += Decimal(-box_value) + self._vault['account'][x['account']]['count'] -= 1 del self._vault['account'][x['account']]['log'][x['ref']] @@ -1612,7 +1756,7 @@ def import_csv_cache_path(self): >>> obj.import_csv_cache_path() '/data/reports.import_csv.pickle' """ - path = self.path() + path = str(self.path()) if path.endswith(".pickle"): path = path[:-7] return path + '.import_csv.pickle'