From c44c7b59766da0aa0983ae0f00915d65d97dd568 Mon Sep 17 00:00:00 2001 From: Luciano Amodio Date: Fri, 6 Dec 2024 22:43:34 +0100 Subject: [PATCH] feat(cli): --filename-template and --max-length Introduces two new CLI arguments to allow fine-grained control over how output file paths are generated: --filename-template: Specify a template string using variables like {domain}, {hash}, {ext} to define a custom directory structure and file naming scheme --max-length: Set a maximum character limit for generated file paths, intelligently truncating if needed while preserving essential components Includes documentation updates covering the new options, examples, and troubleshooting. Closes #754 --- README.md | 6 +- docs/quickstart.rst | 6 + docs/settings.rst | 11 ++ docs/troubleshooting.rst | 10 + docs/usage-cli.rst | 38 +++- tests/cli_tests.py | 64 ++++++ tests/deduplication_tests.py | 2 +- tests/filename_tests.py | 372 +++++++++++++++++++++++++++++++++++ trafilatura/cli.py | 7 + trafilatura/cli_utils.py | 65 +++--- trafilatura/filename.py | 358 +++++++++++++++++++++++++++++++++ 11 files changed, 911 insertions(+), 28 deletions(-) create mode 100644 tests/filename_tests.py create mode 100644 trafilatura/filename.py diff --git a/README.md b/README.md index f17387a6..463bf917 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,11 @@ the University of Munich. - JSON - HTML, XML and [XML-TEI](https://tei-c.org/) +- Flexible output file naming: + - Template-based filename generation with variables like {domain}, {path}, {hash} + - Path length control and automatic truncation + - Safe character handling and URL component parsing + - Optional add-ons: - Language detection on extracted content - Speed optimizations @@ -74,7 +79,6 @@ the University of Munich. - Regular updates, feature additions, and optimizations - Comprehensive documentation - ### Evaluation and alternatives Trafilatura consistently outperforms other open-source libraries in text diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 6241f7ad..6a00fb46 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -119,7 +119,13 @@ Extraction options are also available on the command-line and they can be combin $ < myfile.html trafilatura --json --no-tables +Use ``--filename-template`` to control how output filenames are generated based on the URL and content. +.. code-block:: bash + + $ trafilatura -u "https://example.com/path/dirs" --filename-template "{domain}/{path_dirs}/{hash}.{ext}" --markdown -o output/ + +this will produce a file named ``example.com/path/dirs/uOHdo6wKo4IK0pkL.md`` in the ``output`` directory. Further steps ------------- diff --git a/docs/settings.rst b/docs/settings.rst index c5d007aa..ca63b04c 100644 --- a/docs/settings.rst +++ b/docs/settings.rst @@ -52,6 +52,17 @@ Using a custom file on the command-line With the ``--config-file`` option, followed by the file name or path. All the required variables have to be present in the custom file. +Filename Generation +^^^^^^^^^^^^^^^^^^^^^ +Two new options allow customizing how output filenames are generated: + +--filename-template: Specify a template string for generating filenames, using variables like {domain}, {path}, {hash}, {ext}, etc. Example: --filename-template "{domain}/{hash}.{ext}" +--max-length: Set the maximum total path length, including directory components. The default is 250 characters. Example: --max-length 200 + +The filename template can include directory separators to preserve parts of the original URL's path structure. Unsafe characters are sanitized automatically. If the total path would exceed max-length, it is intelligently truncated while preserving key components. +Invalid variables or unsafe characters will raise an error. + + Adapting settings in Python ^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/troubleshooting.rst b/docs/troubleshooting.rst index 5c377dfc..bc131d91 100644 --- a/docs/troubleshooting.rst +++ b/docs/troubleshooting.rst @@ -101,3 +101,13 @@ Download first and extract later Since the they have distinct characteristics it can be useful to separate the infrastructure needed for download from the extraction. Using a custom IP or network infrastructure can also prevent your usual IP from getting banned. For an approach using files from the Common Crawl and Trafilatura, see the external tool `datatrove/process_common_crawl_dump.py `_. + + +Invalid template variables and filenames +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you see an error about invalid template variables, check that your ``--filename-template`` string only uses supported values like ``{domain}``, ``{hash}``, etc. +Refer to the filename.py source for a complete list. + +An error about unsafe characters in the filename template means that characters like ``<>``, ``:``, ``"`` were used outside of ``{variable}`` sections. +Make sure to only use alphanumeric characters, underscores, dashes and forward slashes in static parts of the template. \ No newline at end of file diff --git a/docs/usage-cli.rst b/docs/usage-cli.rst index e5f75304..463df558 100644 --- a/docs/usage-cli.rst +++ b/docs/usage-cli.rst @@ -92,6 +92,33 @@ Output as TXT without metadata is the default, another format can be selected in *HTML output is available from version 1.11, Markdown from version 1.9 onwards.* +Filename Customization +~~~~~~~~~~~~~~~~~~~~~~ + +Use ``--filename-template`` to control how output filenames are generated based on the URL and content. Supported variables: + +- {domain}: Website domain +- {path}: URL path segments, joined by underscores +- {path_dirs}: URL path segments, joined by directory separators +- {params}: URL query parameters +- {hash}: Hash of extracted content +- {ext}: File extension +- {lang}: Identified language + +Example: ``--filename-template "{domain}/{hash}.{ext}"`` + +Use ``--max-length`` to set the maximum total path length, including any directories. It defaults to 250 characters. + +If the generated path would exceed this limit, it is intelligently truncated: +1. Individual directory and file components are preserved as long as possible. +2. The file component is reduced to a minimum of {hash}.{ext}. +3. The --output-dir is omitted from length calculations. + +Example: ``--max-length 200`` + +Invalid template variables or unsafe path characters will raise an error. + + Optimizing for precision and recall ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -166,7 +193,7 @@ Two major command line arguments are necessary here: .. hint:: Backup of HTML sources can be useful for archival and further processing: - + ``$ trafilatura --input-file links.txt --output-dir converted/ --backup-dir html-sources/ --xml`` @@ -288,6 +315,7 @@ For all usage instructions see ``trafilatura -h``: trafilatura [-h] [-i INPUTFILE | --input-dir INPUTDIR | -u URL] [--parallel PARALLEL] [-b BLACKLIST] [--list] [-o OUTPUTDIR] [--backup-dir BACKUP_DIR] [--keep-dirs] + [--filename-template FILENAME_TEMPLATE] [--max-length MAX_LENGTH] [--feed [FEED] | --sitemap [SITEMAP] | --crawl [CRAWL] | --explore [EXPLORE] | --probe [PROBE]] [--archived] [--url-filter URL_FILTER [URL_FILTER ...]] [-f] @@ -295,7 +323,7 @@ For all usage instructions see ``trafilatura -h``: [--no-tables] [--only-with-metadata] [--with-metadata] [--target-language TARGET_LANGUAGE] [--deduplicate] [--config-file CONFIG_FILE] [--precision] [--recall] - [--output-format {csv,json,html,markdown,txt,xml,xmltei} | + [--output-format {csv,json,html,markdown,txt,xml,xmltei} | --csv | --html | --json | --markdown | --xml | --xmltei] [--validate-tei] [-v] [--version] @@ -331,6 +359,11 @@ Output: preserve a copy of downloaded files in a backup directory --keep-dirs keep input directory structure and file names + --filename-template FILENAME_TEMPLATE + template for generating filenames (e.g. + {domain}/{path}-{hash}.{ext}) + --max-length MAX_LENGTH + maximum length for generated file paths Navigation: Link discovery and web crawling @@ -381,4 +414,3 @@ Format: --xml shorthand for XML output --xmltei shorthand for XML TEI output --validate-tei validate XML TEI output - diff --git a/tests/cli_tests.py b/tests/cli_tests.py index 8d094445..53048805 100644 --- a/tests/cli_tests.py +++ b/tests/cli_tests.py @@ -21,6 +21,7 @@ from trafilatura import cli, cli_utils, spider, settings from trafilatura.downloads import add_to_compressed_dict, fetch_url +from trafilatura.filename import generate_hash_filename from trafilatura.utils import LANGID_FLAG logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) @@ -586,6 +587,67 @@ def test_probing(): else: assert f.getvalue().strip() == url +def test_filename_template_cli_integration(): + """Test CLI integration with FilenameTemplate.""" + # Test hierarchical structure with no extension + testargs = ["", "--filename-template", "{domain}/{path_dirs}", "--output-dir", "/tmp/test", "-u", "https://example.com/blog/post1"] + with patch.object(sys, "argv", testargs): + args = cli.parse_args(testargs) + + output_path, destination_dir = cli_utils.determine_output_path(args=args, orig_filename="", content="Test content 1") + assert destination_dir == "/tmp/test/example.com/blog" + assert output_path == "/tmp/test/example.com/blog/post1" + + # Test with markdown extension + testargs = ["", "--filename-template", "{domain}/{path_dirs}.{ext}", "--output-dir", "/tmp/test", "--markdown", "-u", "https://example.com/blog/post1"] + with patch.object(sys, "argv", testargs): + args = cli.parse_args(testargs) + + output_path2, destination_dir2 = cli_utils.determine_output_path(args=args, orig_filename="", content="Test content 1") + assert destination_dir2 == "/tmp/test/example.com/blog" + assert output_path2 == "/tmp/test/example.com/blog/post1.md" + + # Test flattened structure + testargs = ["", "--filename-template", "{domain}/{path}", "--output-dir", "/tmp/test", "-u", "https://example.com/articles/tech/news"] + with patch.object(sys, "argv", testargs): + args = cli.parse_args(testargs) + + output_path3, destination_dir3 = cli_utils.determine_output_path(args=args, orig_filename="", content="Test content 2") + assert destination_dir3 == "/tmp/test/example.com" + assert output_path3 == "/tmp/test/example.com/articles_tech_news" + + # Test with parameters + testargs = ["", "--filename-template", "{domain}/{path_dirs}/{hash}-{params}", "--output-dir", "/tmp/test", "-u", "https://example.com/articles/tech?id=123&cat=news"] + with patch.object(sys, "argv", testargs): + args = cli.parse_args(testargs) + + output_path4, destination_dir4 = cli_utils.determine_output_path(args=args, orig_filename="", content="Test content 3") + assert destination_dir4 == "/tmp/test/example.com/articles/tech" + assert output_path4 == f"/tmp/test/example.com/articles/tech/{generate_hash_filename('Test content 3')}-cat-news_id-123" + +@pytest.mark.usefixtures("caplog") +def test_filename_template_cli_errors(caplog): + """Test error handling in CLI filename template integration.""" + # Test URL too long + testargs = ["", "--filename-template", "{domain}/{path_dirs}", "--output-dir", "/tmp/test", "-u", "https://example.com/" + "a" * 100, "--max-length", "100"] + with patch.object(sys, "argv", testargs): + args = cli.parse_args(testargs) + + output_path, destination_dir = cli_utils.determine_output_path(args=args, orig_filename="", content="test content") + assert "_ttt_" in output_path + assert destination_dir == "/tmp/test/example.com" + assert generate_hash_filename("test content") in output_path + + # Test no URL + testargs = ["", "--filename-template", "{domain}/{path}", "--output-dir", "/tmp/test"] + with patch.object(sys, "argv", testargs): + args = cli.parse_args(testargs) + + caplog.set_level(logging.WARNING) + output_path2, destination_dir2 = cli_utils.determine_output_path(args=args, orig_filename="", content="test content") + assert "Template generation failed: URL is required for template variables" in caplog.text + assert output_path2 == "/tmp/test" + assert generate_hash_filename("test content") in destination_dir2 if __name__ == "__main__": test_parser() @@ -599,3 +661,5 @@ def test_probing(): test_crawling() test_download() test_probing() + test_filename_template_cli_integration() + test_filename_template_cli_errors() diff --git a/tests/deduplication_tests.py b/tests/deduplication_tests.py index 4b8a00af..9d904dbb 100644 --- a/tests/deduplication_tests.py +++ b/tests/deduplication_tests.py @@ -8,10 +8,10 @@ import trafilatura.deduplication from trafilatura import extract -from trafilatura.cli_utils import generate_hash_filename from trafilatura.core import Extractor from trafilatura.deduplication import (LRUCache, Simhash, content_fingerprint, duplicate_test) +from trafilatura.filename import generate_hash_filename DEFAULT_OPTIONS = Extractor() diff --git a/tests/filename_tests.py b/tests/filename_tests.py new file mode 100644 index 00000000..ab40f783 --- /dev/null +++ b/tests/filename_tests.py @@ -0,0 +1,372 @@ +import os +import unittest + +from trafilatura.filename import FilenameTemplate, generate_hash_filename + + +class TestFilenameTemplate(unittest.TestCase): + """Test suite for FilenameTemplate class.""" + + def test_template_validation_valid_templates(self): + """Test that valid templates are accepted.""" + valid_templates = [ + "{hash}.{ext}", + "{domain}/{path}.{ext}", + "{domain}/{path_dirs}/{hash}.{ext}", + "{domain}/{path_dirs}/{params}-{date}.{ext}", + "{domain}/{path_dirs}/{lang}/{hash}.{ext}", + "{file_basepath}/{file_basename}.{ext}", + "{url}/{filename}.{ext}", + ] + for template in valid_templates: + try: + FilenameTemplate(template) + except ValueError as e: + self.fail(f"Valid template {template} raised ValueError: {str(e)}") + + def test_template_validation_invalid_templates(self): + """Test that invalid templates raise appropriate errors.""" + invalid_templates = [ + "{unknown}.{ext}", # Unknown variable + "{domain\\path}.{ext}", # Invalid character + "test<>.{ext}", # Invalid characters + "{domain}/*.{ext}", # Invalid character + "{path}|{ext}", # Invalid character + ] + for template in invalid_templates: + with self.assertRaises(ValueError): + FilenameTemplate(template) + + def test_directory_structure_preserved(self): + """Test that directory structure is preserved using path_dirs variable.""" + template = FilenameTemplate("{domain}/{path_dirs}.{ext}") + test_cases = [ + ("https://example.com/path/to/file", "example.com/path/to/file.txt"), + ("https://example.com/dir/subdir/page", "example.com/dir/subdir/page.txt"), + ("https://example.com/a/b/c/d", "example.com/a/b/c/d.txt"), + ] + for url, expected in test_cases: + output_dir, _ = template.generate("content", url=url) + self.assertEqual( + output_dir, expected, f"Failed directory structure for URL: {url}" + ) + + def test_directory_structure_with_url(self): + """Test directory structure generation with URL components using path_dirs.""" + content = "test content" + template = FilenameTemplate("{domain}/{path_dirs}.{ext}") + url = "https://example.com/path/to/page" + output_dir, destination_dir = template.generate(content, url=url) + + expected_output_dir = os.path.join("example.com", "path/to") + expected_path = os.path.join(expected_output_dir, "page.txt") + + self.assertEqual(destination_dir, expected_output_dir) + self.assertEqual(output_dir, expected_path) + + def test_flattened_structure_with_url(self): + """Test flattened directory structure with URL components using path variable.""" + content = "test content" + template = FilenameTemplate("{domain}/{path}.{ext}") + url = "https://example.com/path/to/page" + output_dir, destination_dir = template.generate(content, url=url) + + expected_path = "example.com" + expected_dir = os.path.join(expected_path, "path_to_page.txt") + + self.assertEqual(output_dir, expected_dir) + self.assertEqual(destination_dir, expected_path) + + def test_basic_hash_template(self): + """Test basic hash-based template.""" + content = "test content" + template = FilenameTemplate("{hash}.{ext}") + output_dir, destination_dir = template.generate(content) + + self.assertEqual(output_dir, "") + self.assertTrue(destination_dir.endswith(".txt")) + self.assertIn(generate_hash_filename(content), destination_dir) + + def test_custom_output_directory(self): + """Test with custom output directory.""" + content = "test content" + template = FilenameTemplate("{hash}.{ext}", output_dir="/custom/output") + output_path, destination_dir = template.generate(content) + + self.assertEqual(destination_dir, "/custom/output") + self.assertTrue(output_path.startswith("/custom/output/")) + self.assertTrue(output_path.endswith(".txt")) + + def test_url_parameters(self): + """Test handling of URL parameters.""" + content = "test content" + template = FilenameTemplate("{domain}/{path}/{params}.{ext}") + url = "https://example.com/page?param1=value1¶m2=value2" + output_dir, destination_dir = template.generate(content, url=url) + + expected_dir = os.path.join("example.com", "page") + self.assertEqual(destination_dir, expected_dir) + self.assertTrue("param1-value1_param2-value2" in output_dir) + + def test_sanitization(self): + """Test path sanitization.""" + content = "test content" + template = FilenameTemplate("{domain}/{path}.{ext}") + url = "https://example.com/path/with spaces/and<>special:chars" + output_dir, _ = template.generate(content, url=url) + + self.assertNotIn(" ", output_dir) + self.assertNotIn("<", output_dir) + self.assertNotIn(">", output_dir) + self.assertNotIn(":", output_dir) + self.assertTrue(output_dir.endswith(".txt")) + + def test_dot_segments(self): + """Test handling of dot and dot-dot segments.""" + content = "test content" + template = FilenameTemplate("{domain}/{path_dirs}.{ext}") + url = "https://example.com/./path/../to/./page" + output_dir, _ = template.generate(content, url=url) + + self.assertIn("_d_", output_dir) # . becomes _d_ + self.assertIn("_dd_", output_dir) # .. becomes _dd_ + + def test_missing_url(self): + """Test behavior when URL is missing but required.""" + content = "test content" + template = FilenameTemplate("{domain}/{path}.{ext}") + + with self.assertRaises(ValueError): + template.generate(content) + + def test_empty_content_handling(self): + """Test empty content handling across different template scenarios.""" + test_cases = [ + # Basic template + { + "template": "{hash}.{ext}", + "url": None, + "filename": None, + "output_dir": "", + "destination_dir": "uOHdo6wKo4IK0pkL.txt", + "content": "", + }, + # Template with URL components + { + "template": "{domain}/{path}/{hash}.{ext}", + "url": "https://example.com/test", + "filename": None, + "output_dir": "example.com/test/uOHdo6wKo4IK0pkL.txt", + "destination_dir": "example.com/test", + "content": "", + }, + # Template with filename components + { + "template": "{file_basepath}/{file_basename}_{hash}.{ext}", + "url": None, + "filename": "dir/test.txt", + "output_dir": "dir/test_uOHdo6wKo4IK0pkL.txt", + "destination_dir": "dir", + "content": "", + }, + # Complex template with all components + { + "template": "{domain}/{path_dirs}/{file_basename}_{hash}.{ext}", + "url": "https://example.com/path/to/file", + "filename": "local/doc.txt", + "output_dir": "example.com/path/to/file/doc_uOHdo6wKo4IK0pkL.txt", + "destination_dir": "example.com/path/to/file", + "content": "", + }, + ] + + for case in test_cases: + template = FilenameTemplate(case["template"]) + output_dir, destination_dir = template.generate( + case["content"], url=case["url"], filename=case["filename"] + ) + + # Validate output dir + self.assertEqual(output_dir, case["output_dir"]) + + # Validate full path + self.assertEqual(destination_dir, case["destination_dir"]) + + # Validate path is valid and normalized + self.assertTrue(os.path.normpath(destination_dir)) + + # Check for no double separators + self.assertNotIn("//", destination_dir) + self.assertNotIn("\\\\", destination_dir) + + def test_custom_extension(self): + """Test custom file extension.""" + content = "test content" + template = FilenameTemplate("{hash}.{ext}", ext="json") + _, destination_dir = template.generate(content) + + self.assertTrue(destination_dir.endswith(".json")) + + def test_path_length_limits(self): + """Test path length limiting.""" + content = "test content" + long_path = "a" * 300 + url = f"https://example.com/{long_path}" + + template = FilenameTemplate("{domain}/{path}.{ext}", max_length=50) + output_dir, _ = template.generate(content, url=url) + + self.assertLessEqual( + len(output_dir), + 50, + f"Generated path length {len(output_dir)} exceeds limit of 50: {output_dir}", + ) + self.assertTrue(output_dir.endswith(".txt")) + + def test_minimal_truncation(self): + """Test truncation with minimal possible length.""" + url = "https://example.com/" + content = "test content" + template = FilenameTemplate("{domain}/{hash}.{ext}", max_length=21) + output_dir, _ = template.generate(content, url=url) + + # Hash length (16) + ".txt" (4) + truncation flag "_ttt_" (5) + self.assertEqual( + output_dir, + "_ttt_eA2ZYxECccrTXcoP.txt", + f"Path does not match: {output_dir}", + ) + self.assertLessEqual( + len(output_dir), 25, f"Length not match for path: {output_dir}" + ) + self.assertIn( + generate_hash_filename(content), f"Hash not found in path: {output_dir}" + ) + + def test_truncation_with_preserved_dirs(self): + """Test path truncation while preserving directory structure.""" + content = "test content" + template = FilenameTemplate("{domain}/{path_dirs}.{ext}", max_length=50) + + long_segments = "/".join(["segment" + str(i) for i in range(10)]) + url = f"https://example.com/{long_segments}" + output_dir, _ = template.generate(content, url=url) + + self.assertLessEqual( + len(output_dir), + 50, + f"Generated path length {len(output_dir)} exceeds limit of 50: {output_dir}", + ) + self.assertTrue( + output_dir.startswith("example.com"), + f"Path does not start with domain: {output_dir}", + ) + self.assertIn( + "_ttt_", output_dir, f"Truncation indicator not found in path: {output_dir}" + ) + self.assertTrue(output_dir.endswith(".txt")) + self.assertIn(generate_hash_filename(content), output_dir) + + def test_truncation_without_dirs(self): + """Test path truncation with flattened directory structure.""" + content = "test content" + template = FilenameTemplate("{domain}/{path}.{ext}", max_length=40) + + url = "https://example.com/" + "a" * 100 + output_dir, _ = template.generate(content, url=url) + + self.assertLessEqual( + len(output_dir), + 40, + f"Generated path length {len(output_dir)} exceeds limit of 40: {output_dir}", + ) + self.assertTrue( + output_dir.startswith("example.com"), + f"Path does not start with domain: {output_dir}", + ) + self.assertIn( + "_ttt_", output_dir, f"Truncation indicator not found in path: {output_dir}" + ) + self.assertTrue(output_dir.endswith(".txt")) + self.assertIn(generate_hash_filename(content), output_dir) + + def test_truncation_preserves_important_parts(self): + """Test that truncation preserves essential path components while meeting length limits.""" + content = "test content" + template = FilenameTemplate("{domain}/{path_dirs}/{hash}.{ext}", max_length=50) + + # Test URL with important segments + url = "https://example.com/category/important-section/article" + output_dir, _ = template.generate(content, url=url) + + # Basic assertions + self.assertTrue( + output_dir.startswith("example.com"), f"Domain not preserved: {output_dir}" + ) + self.assertIn( + "category", output_dir, f"Important path segment missing: {output_dir}" + ) + self.assertIn( + "_ttt_", output_dir, f"Truncation indicator missing: {output_dir}" + ) + + # Verify hash is preserved + content_hash = generate_hash_filename(content) + self.assertIn(content_hash, output_dir, f"Content hash missing: {output_dir}") + + # Length constraint + self.assertLessEqual( + len(output_dir), 50, f"Path exceeds length limit: {output_dir}" + ) + + # Directory structure assertions + dir_parts = output_dir.split(os.sep) + self.assertEqual( + dir_parts[0], "example.com", "Domain not preserved in directory structure" + ) + self.assertEqual( + dir_parts[1], "category", "Category not preserved in directory structure" + ) + + # Test with very long path components + long_url = "https://example.com/" + "/".join( + ["segment" + str(i) for i in range(10)] + ) + output_dir2, _ = template.generate(content, url=long_url) + + # Verify long path handling + self.assertLessEqual(len(output_dir2), 50, "Long path not properly truncated") + self.assertTrue( + output_dir2.startswith("example.com"), "Domain lost in long path truncation" + ) + self.assertIn("_ttt_", output_dir2, "Truncation indicator missing in long path") + self.assertIn(content_hash, output_dir2, "Hash missing in long path") + self.assertTrue(output_dir2.endswith(".txt"), "Extension missing in long path") + + def test_truncation_with_custom_output_dir(self): + """Test truncation behavior with custom output directory.""" + content = "test content" + template = FilenameTemplate( + "{domain}/{path_dirs}.{ext}", max_length=60, output_dir="/custom/output" + ) + + url = "https://example.com/very/long/path/that/needs/truncation" + output_dir, _ = template.generate(content, url=url) + + self.assertTrue( + output_dir.startswith("/custom/output"), + f"Output directory missing from path: {output_dir}", + ) + self.assertLessEqual( + len(output_dir), + 60, + f"Generated path length {len(output_dir)} exceeds limit of 60: {output_dir}", + ) + self.assertIn( + "_ttt_", output_dir, f"Truncation indicator not found in path: {output_dir}" + ) + self.assertTrue(output_dir.endswith(".txt")) + + +if __name__ == "__main__": + unittest.main() diff --git a/trafilatura/cli.py b/trafilatura/cli.py index e69a6d5b..9c780532 100644 --- a/trafilatura/cli.py +++ b/trafilatura/cli.py @@ -65,6 +65,13 @@ def add_args(parser: Any) -> Any: group2.add_argument('--keep-dirs', help="keep input directory structure and file names", action="store_true") + group2.add_argument("--filename-template", + help="template for generating filenames (e.g. {domain}/{path}-{hash}.{ext})", + type=str) + group2.add_argument("--max-length", + help="maximum length for generated file paths", + type=int, + default=250) group3_ex.add_argument("--feed", help="look for feeds and/or pass a feed URL as input", diff --git a/trafilatura/cli_utils.py b/trafilatura/cli_utils.py index 7fe46f4d..76e3d701 100644 --- a/trafilatura/cli_utils.py +++ b/trafilatura/cli_utils.py @@ -15,21 +15,20 @@ import sys import traceback -from base64 import urlsafe_b64encode from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from datetime import datetime from functools import partial from os import makedirs, path, stat, walk from threading import RLock -from typing import Any, Generator, Optional, List, Set, Tuple +from typing import Any, Generator, List, Optional, Set, Tuple from courlan import UrlStore, extract_domain, get_base_url # validate_url from trafilatura import spider +from trafilatura.filename import FilenameTemplate, generate_hash_filename from .baseline import html2txt from .core import extract -from .deduplication import generate_bow_hash from .downloads import ( Response, add_to_compressed_dict, @@ -63,13 +62,12 @@ STRIP_DIR = re.compile(r"[^/]+$") STRIP_EXTENSION = re.compile(r"\.[a-z]{2,5}$") -CLEAN_XML = re.compile(r"<[^<]+?>") - INPUT_URLS_ARGS = ["URL", "crawl", "explore", "probe", "feed", "sitemap"] EXTENSION_MAPPING = { "csv": ".csv", "json": ".json", + "markdown": ".md", "xml": ".xml", "xmltei": ".xml", } @@ -154,12 +152,6 @@ def get_writable_path(destdir: str, extension: str) -> Tuple[str, str]: return output_path, filename -def generate_hash_filename(content: str) -> str: - """Create a filename-safe string by hashing the given content - after deleting potential XML tags.""" - return urlsafe_b64encode(generate_bow_hash(CLEAN_XML.sub("", content), 12)).decode() - - def determine_output_path( args: Any, orig_filename: str, @@ -171,19 +163,46 @@ def determine_output_path( # determine extension, TXT by default extension = EXTENSION_MAPPING.get(args.output_format, ".txt") - if args.keep_dirs: - # strip directory - original_dir = STRIP_DIR.sub("", orig_filename) - destination_dir = path.join(args.output_dir, original_dir) - # strip extension - filename = STRIP_EXTENSION.sub("", orig_filename) - else: - destination_dir = determine_counter_dir(args.output_dir, counter) - # use cryptographic hash on file contents to define name - filename = new_filename or generate_hash_filename(content) + if hasattr(args, "filename_template") and args.filename_template: + # Initialize template with configuration + template = FilenameTemplate( + template=args.filename_template, + max_length=args.max_length, + output_dir=args.output_dir, + lang=args.target_language, + ext=extension, + ) - output_path = path.join(destination_dir, filename + extension) - return output_path, destination_dir + try: + output_path, destination_dir = template.generate( + content=content, + url=args.URL, + filename=new_filename or orig_filename + ) + return output_path, destination_dir + + except ValueError as e: + # Log the error and fall back to hash-based filename + LOGGER.warning("Template generation failed: %s. Falling back to hash-based name.", str(e)) + filename = generate_hash_filename(content) + full_path = path.join(args.output_dir, filename + extension) + return args.output_dir, full_path + + else: + # Original behavior for backward compatibility + if args.keep_dirs: + # strip directory + original_dir = STRIP_DIR.sub("", orig_filename) + destination_dir = path.join(args.output_dir, original_dir) + # strip extension + filename = STRIP_EXTENSION.sub("", orig_filename) + else: + destination_dir = determine_counter_dir(args.output_dir, counter) + # use cryptographic hash on file contents to define name + filename = new_filename or generate_hash_filename(content) + + output_path = path.join(destination_dir, filename + extension) + return output_path, destination_dir def archive_html(htmlstring: str, args: Any, counter: int = -1) -> str: diff --git a/trafilatura/filename.py b/trafilatura/filename.py new file mode 100644 index 00000000..b70cec2a --- /dev/null +++ b/trafilatura/filename.py @@ -0,0 +1,358 @@ +from datetime import datetime +import re +import os +from base64 import urlsafe_b64encode +from string import Formatter +from typing import Dict, Optional, Tuple +from urllib.parse import parse_qs, urlparse, unquote + +from .deduplication import generate_bow_hash + +CLEAN_XML = re.compile(r"<[^<]+?>") + +# Characters that are unsafe everywhere - templates and filenames +ALWAYS_UNSAFE_CHARS = r'[<>:"|?*\\]' +# Additional characters unsafe in filenames (`/` is allowed in templates) +FILENAME_UNSAFE_CHARS = r'[<>:"|?*\\\s]' + +# Maximum total path length +DEFAULT_PATH_LENGTH = 250 + +NO_PARAMS_KEY = "__no_params" +TRUNCATE_KEY = "_ttt_" +DOT_KEY_UNIT = "d" + + +def generate_hash_filename(content: str) -> str: + """Create a filename-safe string by hashing the given content + after deleting potential XML tags.""" + return urlsafe_b64encode(generate_bow_hash(CLEAN_XML.sub("", content), 12)).decode() + + +class FilenameTemplate: + """Handle template-based filename generation with variables.""" + + def __init__( + self, + template: str = "{hash}.{ext}", + ext: str = "txt", + lang: Optional[str] = None, + max_length: Optional[int] = None, + output_dir: Optional[str] = None, + date: Optional[str] = None, + ): + self.template = template + self.ext = ext + self.lang = lang + self.max_length = max_length or DEFAULT_PATH_LENGTH + self.output_dir = output_dir or None + self.content = None + self.date = date or datetime.now().strftime("%Y-%m-%d") + self._validate_template(template) + + def _validate_template(self, template: str) -> None: + """Ensure template only uses allowed variables and is filesystem safe. + + Forward slashes are allowed in templates as directory separators, but + other unsafe characters are still forbidden. + """ + allowed_vars = { + "domain", + "path", + "path_dirs", + "params", + "hash", + "ext", + "lang", + "date", + "filename", + "url", + "file_basepath", + "file_basename", + "file_ext", + } + used_vars = {v[1] for v in Formatter().parse(template) if v[1] is not None} + + invalid_vars = used_vars - allowed_vars + if invalid_vars: + raise ValueError(f"Invalid template variables: {invalid_vars}") + + # Check for always-unsafe characters, allowing forward slashes + if re.search(ALWAYS_UNSAFE_CHARS, template): + raise ValueError("Template contains unsafe characters") + + def generate( + self, + content: str, + url: Optional[str] = None, + filename: Optional[str] = None, + ) -> Tuple[str, str]: + """Generate filename from template and return both the directory path and full path. + + Args: + content: The content to generate a hash from + url: Optional URL to extract components from + filename: Optional filename to use (assumed valid) + + Returns: + Tuple[str, str]: (output_path, destination_dir) + - output_path: Path where the file will be written + - destination_dir: Complete path including filename and eventual extension + + Raises: + ValueError: If template requirements aren't met or paths exceed length limits + """ + # Validate we have required data based on template variables + self._validate_requirements(url, filename) + + # Get template variables + variables = self._get_variables(content, url, filename) + + # Generate the path from template + path = self.template.format(**variables) + + # Calculate available space for path components + available_length = self._get_available_length() + + # Split into directory and filename components + dirname, basename = os.path.split(path) + + # If no directory specified in template and no output_dir, return empty string + if not dirname and not self.output_dir: + return ("", f"{variables['hash']}.{variables['ext']}") + + # Handle output directory if specified + if self.output_dir: + dirname = ( + os.path.join(self.output_dir, dirname) if dirname else self.output_dir + ) + + # Truncate path if needed while preserving structure + if available_length and len(os.path.join(dirname, basename)) > available_length: + dirname, basename = self._truncate_path( + dirname, basename, available_length, variables["hash"], variables["ext"] + ) + + # Normalize paths + output_path = os.path.join(dirname, basename) if dirname else basename + destination_dir = os.path.normpath(dirname) if dirname else "" + + return (output_path, destination_dir) + + def _validate_requirements( + self, url: Optional[str], filename: Optional[str] + ) -> None: + """Validate that we have the data required by the template variables.""" + required_vars = {v[1] for v in Formatter().parse(self.template) if v[1]} + + # Check URL-dependent variables + url_vars = {"domain", "path", "path_dirs", "params", "url"} + if url_vars & required_vars and not url: + missing = url_vars & required_vars + raise ValueError(f"URL is required for template variables: {missing}") + + # Check filename-dependent variables + filename_vars = {"filename", "file_basepath", "file_basename", "file_ext"} + if filename_vars & required_vars and not filename: + missing = filename_vars & required_vars + raise ValueError(f"Filename is required for template variables: {missing}") + + def _get_variables( + self, content: str, url: Optional[str], filename: Optional[str] + ) -> Dict[str, str]: + """Get all variables that can be used in the template.""" + variables = { + "hash": generate_hash_filename(content or ""), + "ext": self.ext.lstrip("."), + "lang": self.lang or "", + "date": self.date or "", + "filename": filename or "", + "url": url or "", + } + + # Handle filename components if provided + if filename: + # Split into directory and name parts + dirname, basename = os.path.split(filename) + # Split basename into name and extension + name, ext = os.path.splitext(basename) + + variables.update( + { + "file_basepath": dirname, + "file_basename": name, + "file_ext": ext.lstrip("."), # Remove leading dot for consistency + } + ) + + # Add URL components if URL provided + if url: + url_vars = self._get_url_parts(url) + variables.update(url_vars) + else: + variables.update({"domain": "", "path": "", "path_dirs": "", "params": ""}) + + return variables + + def _get_url_parts(self, url: str) -> dict: + if not url: + return {"domain": "", "path": "", "path_dirs": "", "params": ""} + + parsed = urlparse(url) + + # Handle domain and port + domain = parsed.netloc.split(":")[0] + domain = self._sanitize_component(domain) + + # Get both flat and structured paths + path_segments = self._sanitize_path(parsed.path) + flat_path = "_".join(path_segments) + structured_path = "/".join(path_segments) + + # Handle query parameters + params = self._get_params(parsed.query) + + return { + "domain": domain, + "path": flat_path, + "path_dirs": structured_path, + "params": params, + } + + def _get_params(self, query: str) -> str: + params = "" + if query: + try: + param_dict = parse_qs(query) + # Sort for consistency and take first value of each parameter + param_pairs = sorted((k, v[0]) for k, v in param_dict.items() if v) + params = "_".join(f"{k}-{v}" for k, v in param_pairs) + params = self._sanitize_component(params) + except (IndexError, KeyError): + params = NO_PARAMS_KEY + else: + params = NO_PARAMS_KEY + + return params + + def _sanitize_component(self, part: str) -> str: + """Create safe filename component. + + All unsafe characters, including forward slashes, are replaced with + underscores in actual filenames. + """ + if not part: + return "" + safe = re.sub(FILENAME_UNSAFE_CHARS, "_", part) + safe = re.sub(r"_+", "_", safe) + return safe.strip("_") + + def _sanitize_path(self, path: str) -> list[str]: + """Sanitize path into list of clean segments.""" + # First replace %2F with _ + raw_path = path.replace("%2F", "_") + # Then decode other URL-encoded characters + path = unquote(raw_path) + # Remove common endings + path = re.sub(r"/(index|default)\.(html?|php)$", "", path) + + segments = [] + for segment in path.split("/"): + if not segment: + continue + sanitized = self._sanitize_path_segment(segment) + if sanitized: + segments.append(sanitized) + + return segments + + def _sanitize_path_segment(self, segment: str) -> str: + """Sanitize individual path segments, with special handling only for '.' and '..'.""" + if not segment or segment.isspace(): + return "" + if segment and all(c == "." for c in segment): + return "_" + DOT_KEY_UNIT * len(segment) + "_" + return self._sanitize_component(segment) + + def _get_available_length(self) -> Optional[int]: + """Calculate available length for path components.""" + if not self.max_length: + return None + + # If output_dir specified, subtract its length + if self.output_dir: + output_dir_len = len(self.output_dir) + 1 # +1 for separator + if output_dir_len >= self.max_length: + raise ValueError( + f"Output directory length ({output_dir_len}) exceeds " + f"maximum path length ({self.max_length})" + f"for '{self.output_dir}'" + ) + return self.max_length - output_dir_len + + return self.max_length + + def _truncate_path( + self, + dirname: str, + basename: str, + available_length: int, + content_hash: str, + extension: str, + ) -> Tuple[str, str]: + """Truncate path components while preserving structure and essential information. + + Args: + dirname: Directory path components + basename: Original filename + available_length: Maximum allowed length + content_hash: Content hash for uniqueness + extension: File extension without dot + + Returns: + Tuple of (directory_path, filename) + """ + # Check if extension is required in template + ext_in_template = "{ext}" in self.template + extension_part = f".{extension}" if ext_in_template else "" + + # Minimum filename configuration + min_filename = f"{TRUNCATE_KEY}{content_hash}{extension_part}" + + # If path fits as-is, return unmodified + full_path = os.path.join(dirname, basename) if dirname else basename + if len(full_path) <= available_length: + return dirname, basename + + # Remove output_dir if present from dirname + if self.output_dir and dirname.startswith(self.output_dir): + dirname = dirname[len(self.output_dir) :].lstrip(os.sep) + + # Split path into segments + segments = dirname.split(os.sep) if dirname else [] + + # Try to preserve as many path segments as possible + preserved_segments: list[str] = [] + remaining_length = available_length - len(min_filename) + + # Add segments while they fit + for segment in segments: + # Account for path separator + segment_len = len(segment) + (1 if preserved_segments else 0) + if remaining_length - segment_len > 0: + preserved_segments.append(segment) + remaining_length -= segment_len + else: + break + + # Build final paths + final_dirname = ( + os.path.join(self.output_dir, *preserved_segments) + if self.output_dir + else os.path.join(*preserved_segments) + if preserved_segments + else "" + ) + final_basename = min_filename + + return final_dirname, final_basename