Skip to content

Commit

Permalink
icm install: refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Obijuan committed Jun 17, 2024
1 parent e10b044 commit 7b6d03a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 73 deletions.
4 changes: 2 additions & 2 deletions icm-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
try:
icm(None)

#-- Apio commands finish with this excepcion
#-- icm commands finish with this excepcion
except SystemExit:
print("")
print("icm command executed!")
print("icm-run done")

#-- Exit!
sys.exit()
190 changes: 119 additions & 71 deletions icm/commands/cmd_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

import zipfile
import os
from typing import NamedTuple
import sys
from pathlib import Path

import requests
import click
from tqdm import tqdm
from icm.commons import commons


class Collection(NamedTuple):
class Collection:
"""Manage collections"""

# Oficial icestudio Collection are zip files downloaded
Expand All @@ -29,6 +30,9 @@ class Collection(NamedTuple):
GITHUB_TYPE_DEV = "heads/"
GITHUB_TYPE_VER = "tags/"

def __init__(self, folders: commons.Folders) -> None:
self.folders = folders

def filename(self, version="") -> str:
"""Return the coleccion filename, according to its version
No Path. No url. Just the .zip filename
Expand All @@ -48,11 +52,11 @@ def filename(self, version="") -> str:
# -- Return the name according to the version type
return name_ver if version else name_dev

def abs_filename(self, folders: commons.Folders, version="") -> Path:
def abs_filename(self, version="") -> Path:
"""Return the absolute path of the collection target filename
ex: /home/obijuan/.icestudio/collections/v0.1.4.zip
"""
abs_file = folders.collections / self.filename(version)
abs_file = self.folders.collections / self.filename(version)
return abs_file

def url(self, name: str, version="") -> str:
Expand All @@ -78,90 +82,134 @@ def url(self, name: str, version="") -> str:
# -- Return the url
return url

def nametag(self, name: str, version=""):
"""Return the collection name+tag:
* version Given:
name+tag = 'name-version' (Ex. iceK-0.1.4)
* version NOT Given:
name+tag = 'name-main' (Ex. iceK-main)
"""
return f"{name}-{version}" if version else f"{name}-main"

# -- Information about collections
icek = {"name": "iceK", "version": "0.1.4"}

# -- Collection filefo
COLLECTION_FILE = "v"


def main():
"""ENTRY POINT: Install collections"""

# -- Get context information
folders = commons.Folders()
colection = Collection()

print()
print("Install!!!\n")

abs_filename = colection.abs_filename(folders, icek["version"])
print(f"* File: {abs_filename}")

url = colection.url(icek["name"], icek["version"])
print(f"* Url: {url}")
def download(self, url: str, destfile: Path):
"""Download the collection given by its url.
Store it in the given file
A download bar is shown
"""

# -- Download the collection
# Realizar la solicitud HTTP para obtener el contenido del archivo
response = requests.get(url, stream=True, timeout=10)
# -- Generate an http request
response = requests.get(url, stream=True, timeout=10)

# Verificar que la solicitud se completó correctamente
if response.status_code == 200:
# -- Check the status. If not ok, exit!
if response.status_code != 200:
click.secho(
f"ERROR when downloading. Code: {response.status_code}",
fg="red",
)
sys.exit(1)

# Obtener el tamaño total del archivo desde los headers
# Get the file size (from the headers)
total_size = int(response.headers.get("content-length", 0))

# Abrir un archivo local con el nombre especificado en
# modo escritura binaria
with open(abs_filename, "wb") as file:
# -- Open the destination file
with open(destfile, "wb") as file:

# Crear una barra de progreso con tqdm
# Use a progress bar...
with tqdm(
total=total_size,
unit="B",
unit_scale=True,
desc=" Downloading",
desc="• Download",
) as pbar:
# Iterar sobre el contenido en bloques

# Iterate over the blocks
for chunk in response.iter_content(chunk_size=1024):
# Filtrar bloques vacíos

# -- Make sure the block is not empty
if chunk:
# Escribir el contenido del bloque en el archivo local

# Write data!
file.write(chunk)
# Actualizar la barra de progreso

# Update pogress bar
pbar.update(len(chunk))

# shutil.copyfileobj(response.raw, file)
else:
print(
f"Error al descargar el archivo. "
f"Código de estado: {response.status_code}"
)
# -- Uncompress the collection zip file
def uncompress(self, zip_file: Path):
"""Uncompress the given zip file. The destination folder is
the icestudio collection folder
"""

# -- Open the zip file and extract it with progress bar
with zipfile.ZipFile(zip_file, "r") as zip_ref:

# Get the file list in the zip file
file_list = zip_ref.namelist()

# Use a progress bar
with tqdm(
total=len(file_list), desc="• Unzip", unit="file"
) as pbar:

# -- Iterate over each file in the zip
for file in file_list:

# Extract the file!
zip_ref.extract(file, self.folders.collections)

# -- Update progress bar!
pbar.update(1)


def main():
"""ENTRY POINT: Install collections"""

# -- Get context information
folders = commons.Folders()
collection = Collection(folders)

# -- Install the collection iceK-0.1.4
install(collection, "iceK", "0.1.4")
install(collection, "iceK")
install(collection, "iceK", "0.1.3")
install(collection, "iceWires")
install(collection, "iceIO")
install(collection, "iceGates", "0.3.1")
install(collection, "iceMux")
install(collection, "iceCoders")
install(collection, "iceFF")
install(collection, "iceRegs")
install(collection, "iceSRegs")


def install(collection: Collection, name: str, version="") -> None:
"""Install the given collection by name + version
* collection: Collection class (Context)
* Name: Collection name (ex. 'iceK')
* Version: (Optional) (ex. '0.1.4')
The collection is downloaded and unziped in the icestudio
collection folder
"""

# -- Get the name+tag
nametag = collection.nametag(name, version)
print()
click.secho(f"Installing collection {nametag}", fg="yellow")

abs_filename = collection.abs_filename(version)
url = collection.url(name, version)

# -- DEBUG!
# print(f"* File: {abs_filename}")
# print(f"* Url: {url}")

# -- Download the collection
collection.download(url, abs_filename)

# -- Uncompress the collection
collection.uncompress(abs_filename)

# -- Remove the .zip file
os.remove(abs_filename)

# Nombre del archivo ZIP
zip_filename = abs_filename

# Directorio de destino para descomprimir los archivos
extract_to = folders.collections

# Abrir el archivo ZIP y extraer su contenido con barra de progreso
with zipfile.ZipFile(zip_filename, "r") as zip_ref:
# Obtener la lista de archivos en el archivo ZIP
file_list = zip_ref.namelist()

# Crear una barra de progreso
with tqdm(
total=len(file_list), desc=" Uncompressing", unit="file"
) as pbar:
# Iterar sobre cada archivo en el archivo ZIP
for file in file_list:
# Extraer cada archivo
zip_ref.extract(file, extract_to)
# Actualizar la barra de progreso
pbar.update(1)

# -- Borrar el archivo zip
os.remove(zip_filename)
click.secho("Done!", fg="green")

0 comments on commit 7b6d03a

Please sign in to comment.