Skip to content

Commit

Permalink
Use tempfile312 and temp_to_disk in extfs components
Browse files Browse the repository at this point in the history
  • Loading branch information
alchzh committed Jan 3, 2025
1 parent 0e99d4c commit 8fbe003
Showing 1 changed file with 37 additions and 46 deletions.
83 changes: 37 additions & 46 deletions ofrak_core/ofrak/core/extfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import math
import os
import posixpath
import tempfile
import tempfile312 as tempfile
from dataclasses import dataclass
from subprocess import CalledProcessError
from typing import (
Expand All @@ -21,7 +21,7 @@
from uuid import UUID
from collections import deque

from ofrak import Resource, ResourceAttributes
from ofrak import Resource, ResourceAttributes, Unpacker
from ofrak.component.analyzer import Analyzer
from ofrak.component.packer import Packer
from ofrak.core.binary import GenericBinary
Expand Down Expand Up @@ -153,50 +153,43 @@ class ExtAnalyzer(Analyzer[None, ExtFilesystemAttributes]):
outputs = (ExtFilesystemAttributes,)
external_dependencies = (_TUNE2FS,)

@staticmethod
async def _analyze_path(path: str) -> ExtFilesystemAttributes:
command = ["tune2fs", "-l", path]

proc = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode:
raise CalledProcessError(returncode=proc.returncode, cmd=command, stderr=stderr)

fields: Dict[str, str] = {}
for line in stdout.splitlines():
field, _, value = line.decode().partition(":")
if not value:
continue
fields[field] = value.strip()

label: Optional[str] = fields["Filesystem volume name"]
if label == "<none>":
label = None

return ExtFilesystemAttributes(
label=label,
uuid=UUID(fields["Filesystem UUID"]),
features=fields["Filesystem features"].split(),
inode_count=int(fields["Inode count"]),
inode_size=int(fields["Inode size"]),
block_count=int(fields["Block count"]),
block_size=int(fields["Block size"]),
fragment_size=int(fields["Fragment size"]),
reserved_block_count=int(fields["Reserved block count"]),
)

async def analyze(
self, resource: Resource, config: ComponentConfig = None
) -> ExtFilesystemAttributes:
with tempfile.NamedTemporaryFile(suffix=".extfs") as temp_fs_file:
temp_fs_file.write(await resource.get_data())
temp_fs_file.flush()
async with resource.temp_to_disk(suffix=".extfs") as temp_fs_file:
command = ["tune2fs", "-l", temp_fs_file]

return await self._analyze_path(temp_fs_file.name)
proc = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode:
raise CalledProcessError(returncode=proc.returncode, cmd=command, stderr=stderr)

fields: Dict[str, str] = {}
for line in stdout.splitlines():
field, _, value = line.decode().partition(":")
if not value:
continue
fields[field] = value.strip()

label: Optional[str] = fields["Filesystem volume name"]
if label == "<none>":
label = None

return ExtFilesystemAttributes(
label=label,
uuid=UUID(fields["Filesystem UUID"]),
features=fields["Filesystem features"].split(),
inode_count=int(fields["Inode count"]),
inode_size=int(fields["Inode size"]),
block_count=int(fields["Block count"]),
block_size=int(fields["Block size"]),
fragment_size=int(fields["Fragment size"]),
reserved_block_count=int(fields["Reserved block count"]),
)


@dataclass
Expand All @@ -207,10 +200,8 @@ def writecmd(self, data: str):
print(data, file=self.buffer)

async def write_file(self, path: str, entry: FilesystemEntry, data_dir: str):
with tempfile.NamedTemporaryFile("wb", dir=data_dir, delete=False) as f:
f.write(await entry.resource.get_data())

self.writecmd(f'write "{f.name}" "{path}"')
async with entry.resource.temp_to_disk(dir=data_dir, delete=False) as temp_path:
self.writecmd(f'write "{temp_path}" "{path}"')

def mkdir(self, path: str):
self.writecmd(f'mkdir "{path}"')
Expand Down

0 comments on commit 8fbe003

Please sign in to comment.