diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..aba3ae7 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,28 @@ +name: CI + +on: + push: + branches: ['*'] + pull_request: + branches: ['*'] + +jobs: + smoke-test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.11', '3.12', '3.13'] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Verify tool launches + run: python hypoxia.py --help + + - name: Verify package entry point + run: python -m hypoxia --help diff --git a/README.md b/README.md index abbd736..2995606 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@
- Hypoxia logo + Hypoxia - open-source CLI forensic file extraction tool for Linux, macOS, and Windows

HYPOXIA

@@ -11,6 +11,7 @@ ยท Command-Line Options

+ CI GitHub Downloads (all assets, all releases) GitHub contributors GitHub Release @@ -20,7 +21,7 @@

About

-Terminal +Hypoxia CLI terminal output showing forensic file collection with SHA-256 hashing, directory exclusion, and checkpoint resume **Hypoxia** is a lightweight, dependency-free, cross-platform command-line tool designed for targeted file extraction and backup. Written entirely in standard Python, it recursively searches directories and collects files based on a granular set of criteria - including extensions, modification dates, and file sizes. @@ -38,14 +39,20 @@ Built for efficiency and portability, Hypoxia is the perfect utility for digital - **Size Boundaries:** e.g., files strictly between `10mb` and `2gb`. - **Disk Space Awareness:** Monitors free space on the destination drive in real time, issuing warnings and safely halting execution before the disk fills up completely. - **Metadata Control:** Choose to preserve original file metadata (timestamps, permissions) or discard it to maximize copy speed. -- **Secure & Robust:** Relies exclusively on Python's standard library (`argparse`, `pathlib`, `datetime`, `shutil`), ensuring maximum compatibility and minimizing security risks. +- **Forensic Manifest:** Automatically generates a JSON manifest for every collection task - SHA-256 hash, original path, destination path, file size, timestamps, and an overall manifest checksum for integrity verification. +- **Chain of Custody Log:** Append-only forensic log with timestamped entries for every action (file copied, skipped, errors), establishing a verifiable chain of custody. +- **Checkpoint/Resume:** If a collection is interrupted (crash, power loss, dying media), feed the forensic log back with `--resume` - Hypoxia continues from exactly where it stopped, verified by path and hash. No wasted time, no duplicates. +- **Archive Output:** Compress the entire collection into a `.zip` archive with a single flag. +- **Directory Exclusion:** Skip unwanted directories by name (e.g., system folders, `.git`). +- **Secure & Robust:** Relies exclusively on Python's standard library (`argparse`, `pathlib`, `datetime`, `shutil`, `hashlib`, `json`, `zipfile`), ensuring maximum compatibility and minimizing security risks.

Use Cases

-- **Digital Forensics:** Rapid evidence gathering and metadata extraction. -- **Data Backup:** Targeted backups of specific file types or recent documents. +- **Digital Forensics:** Rapid evidence gathering with SHA-256 hashing, forensic manifest, and chain-of-custody logging. +- **Incident Response:** Collect files from a compromised machine with a single command. Resume interrupted collections from dying media without re-copying. +- **Data Backup:** Targeted backups of specific file types or recent documents, with integrity verification built in. - **Disaster Recovery:** Extracting files from corrupted or unbootable operating systems.
@@ -106,6 +113,10 @@ This command preserves metadata by default and outputs detailed logs to the term | `--date-to` | Filter for files modified on or before this date (`YYYY-MM-DD`). | No | - | | `--size-min` | Minimum file size (e.g., `100mb`). Supported units: `b`, `kb`, `mb`, `gb`. | No | - | | `--size-max` | Maximum file size (e.g., `2gb`). Supported units: `b`, `kb`, `mb`, `gb`. | No | - | +| `--exclude` | Comma-separated list of directory names to exclude from scan (e.g., `windows,program files,.git`). | No | - | +| `--zip` | Compress the output folder into a `.zip` archive after collection. | No | `false` | +| `--hash` | Hash algorithm for forensic manifest (`sha256`, `none`). | No | `sha256` | +| `--resume` | Path to a forensic log from a previous interrupted run. Resumes from where it stopped. | No | - |

Legal Disclaimer

diff --git a/assets/terminal.png b/assets/terminal.png index ee0c644..221a4e4 100644 Binary files a/assets/terminal.png and b/assets/terminal.png differ diff --git a/hypoxia.py b/hypoxia.py index ace07a3..23017f9 100755 --- a/hypoxia.py +++ b/hypoxia.py @@ -1,122 +1,6 @@ #!/usr/bin/env python3 - -import argparse -from argparse import RawTextHelpFormatter -import sys -import uuid -from pathlib import Path -from utils import * -from colors import info, error - - -__version__ = '1.2.2' - - -def dir_path(path_string): - path_obj = Path(path_string) - if path_obj.is_dir(): - return path_obj - else: - raise argparse.ArgumentTypeError(f'Directory not found or access denied: "{path_string}"') - - -def main(): - task_id = str(uuid.uuid4()) - result = False - - parser = argparse.ArgumentParser( - description='Hypoxia: Targeted file extraction and backup utility.', - epilog=''' -Options Summary: - Logging level: -v, --verbosity - Target location: -s, --search-path - Target files: -e, --extensions - Copy behavior: -m, --keep-metadata - Timeframe filters: --date-from, --date-to - Size limits: --size-min, --size-max -''', - formatter_class=RawTextHelpFormatter - ) - - parser.add_argument( - '--version', - action='version', - version=f'%(prog)s {__version__}' - ) - parser.add_argument( - '-v', '--verbosity', - choices=['silent', 'info'], - required=True, - help='Set logging level. "silent" suppresses output, "info" logs all actions.' - ) - parser.add_argument( - '-s', '--search-path', - type=dir_path, - required=True, - help='Absolute or relative path to the target directory.' - ) - parser.add_argument( - '-e', '--extensions', - type=str, - required=True, - help='Comma-separated list of target file extensions (e.g., pdf,docx,txt).' - ) - parser.add_argument( - '-m', '--keep-metadata', - choices=['yes', 'no'], - default='yes', - help='Preserve original file metadata (timestamps, permissions). "no" speeds up copying.' - ) - parser.add_argument( - '--date-from', - type=str, - required=False, - help='Filter for files modified on or after this date (YYYY-MM-DD).' - ) - parser.add_argument( - '--date-to', - type=str, - required=False, - help='Filter for files modified on or before this date (YYYY-MM-DD).' - ) - parser.add_argument( - '--size-min', - type=str, - required=False, - help='Minimum file size boundary (e.g., 10kb, 100mb, 2gb).' - ) - parser.add_argument( - '--size-max', - type=str, - required=False, - help='Maximum file size boundary (e.g., 10kb, 100mb, 2gb).' - ) - - args = parser.parse_args() - - verbosity = (args.verbosity == 'info') - keep_metadata = (args.keep_metadata == 'yes') - - try: - target_extensions = args.extensions.split(',') - except Exception as e: - error('Invalid --extensions format. Expected a comma-separated list.') - sys.exit(1) - - if verbosity: - info('Initializing Hypoxia...') - info(f'Task ID: {task_id}') - - preparation_result = prepare_workspace(task_id, target_extensions, verbosity) - if preparation_result: - result = collect_files( - task_id, target_extensions, verbosity, keep_metadata, args.search_path, args.date_from, args.date_to, args.size_min, args.size_max - ) - - if result: - if verbosity: - info('Extraction task completed successfully.') +from hypoxia.cli import main if __name__ == '__main__': diff --git a/hypoxia/__init__.py b/hypoxia/__init__.py new file mode 100644 index 0000000..19b4f1d --- /dev/null +++ b/hypoxia/__init__.py @@ -0,0 +1 @@ +__version__ = '1.3.0' diff --git a/hypoxia/__main__.py b/hypoxia/__main__.py new file mode 100644 index 0000000..2c7e940 --- /dev/null +++ b/hypoxia/__main__.py @@ -0,0 +1,5 @@ +from hypoxia.cli import main + + +if __name__ == '__main__': + main() diff --git a/hypoxia/cli.py b/hypoxia/cli.py new file mode 100755 index 0000000..95ce287 --- /dev/null +++ b/hypoxia/cli.py @@ -0,0 +1,156 @@ +import argparse +from argparse import RawTextHelpFormatter +import sys +import uuid +from pathlib import Path +from hypoxia import __version__ +from hypoxia.utils import prepare_workspace, collect_files, archive_output +from hypoxia.colors import info, error +from hypoxia.forensic import parse_resume_log + + +def dir_path(path_string): + path_obj = Path(path_string) + if path_obj.is_dir(): + return path_obj + else: + raise argparse.ArgumentTypeError(f'Directory not found or access denied: "{path_string}"') + + +def main(): + task_id = str(uuid.uuid4()) + result = False + + parser = argparse.ArgumentParser( + description='Hypoxia: Targeted file extraction and backup utility.', + epilog=''' +Options Summary: + Logging level: -v, --verbosity + Target location: -s, --search-path + Target files: -e, --extensions + Copy behavior: -m, --keep-metadata + Timeframe filters: --date-from, --date-to + Size limits: --size-min, --size-max + Directory exclusion: --exclude + Archive output: --zip + Hashing: --hash + Resume: --resume +''', + formatter_class=RawTextHelpFormatter + ) + + parser.add_argument( + '--version', + action='version', + version=f'%(prog)s {__version__}' + ) + parser.add_argument( + '-v', '--verbosity', + choices=['silent', 'info'], + required=True, + help='Set logging level. "silent" suppresses output, "info" logs all actions.' + ) + parser.add_argument( + '-s', '--search-path', + type=dir_path, + required=True, + help='Absolute or relative path to the target directory.' + ) + parser.add_argument( + '-e', '--extensions', + type=str, + required=True, + help='Comma-separated list of target file extensions (e.g., pdf,docx,txt).' + ) + parser.add_argument( + '-m', '--keep-metadata', + choices=['yes', 'no'], + default='yes', + help='Preserve original file metadata (timestamps, permissions). "no" speeds up copying.' + ) + parser.add_argument( + '--date-from', + type=str, + required=False, + help='Filter for files modified on or after this date (YYYY-MM-DD).' + ) + parser.add_argument( + '--date-to', + type=str, + required=False, + help='Filter for files modified on or before this date (YYYY-MM-DD).' + ) + parser.add_argument( + '--size-min', + type=str, + required=False, + help='Minimum file size boundary (e.g., 10kb, 100mb, 2gb).' + ) + parser.add_argument( + '--size-max', + type=str, + required=False, + help='Maximum file size boundary (e.g., 10kb, 100mb, 2gb).' + ) + parser.add_argument( + '--exclude', + type=str, + required=False, + help='Comma-separated list of directory names to exclude from scan (e.g., "windows,program files,.git").' + ) + parser.add_argument( + '--zip', + action='store_true', + default=False, + help='Compress the output folder into a .zip archive after collection is complete.' + ) + parser.add_argument( + '--hash', + type=str, + choices=['sha256', 'none'], + default='sha256', + help='Hash algorithm for forensic manifest (default: sha256). Use "none" to disable hashing.' + ) + parser.add_argument( + '--resume', + type=str, + required=False, + help='Path to a forensic log file from a previous interrupted run. Resumes collection from where it stopped.' + ) + + args = parser.parse_args() + + verbosity = (args.verbosity == 'info') + keep_metadata = (args.keep_metadata == 'yes') + + target_extensions = [ext.strip() for ext in args.extensions.split(',')] + + exclude_dirs = [d.strip().lower() for d in args.exclude.split(',')] if args.exclude else [] + + resumed_files = {} + if args.resume: + resume_path = Path(args.resume) + if not resume_path.exists(): + error(f'Resume log not found: "{args.resume}"') + sys.exit(1) + if verbosity: + info(f'Resuming from: {args.resume}') + resumed_files = parse_resume_log(resume_path) + if verbosity: + info(f'Previously completed files: {len(resumed_files)}') + + if verbosity: + info('Initializing Hypoxia...') + info(f'Task ID: {task_id}') + + preparation_result = prepare_workspace(task_id, target_extensions, verbosity) + if preparation_result: + result = collect_files( + task_id, target_extensions, verbosity, keep_metadata, args.search_path, args.date_from, args.date_to, args.size_min, args.size_max, exclude_dirs, args.hash, resumed_files + ) + + if result: + if args.zip: + archive_path = archive_output(task_id, verbosity) + if verbosity: + info('Extraction task completed successfully.') diff --git a/colors.py b/hypoxia/colors.py similarity index 100% rename from colors.py rename to hypoxia/colors.py diff --git a/hypoxia/forensic.py b/hypoxia/forensic.py new file mode 100644 index 0000000..53728d1 --- /dev/null +++ b/hypoxia/forensic.py @@ -0,0 +1,113 @@ +import hashlib +import json +import datetime +from pathlib import Path + + +def compute_hash(filepath, algorithm='sha256'): + h = hashlib.new(algorithm) + with open(filepath, 'rb') as f: + while chunk := f.read(8192): + h.update(chunk) + return h.hexdigest() + + +def create_manifest(manifest_entries, task_id, manifest_path, algorithm): + manifest = { + 'task_id': task_id, + 'created_at': datetime.datetime.now().isoformat(), + 'hash_algorithm': algorithm, + 'total_files': len(manifest_entries), + 'files': manifest_entries + } + + manifest_json = json.dumps(manifest, indent=2) + + manifest_checksum = hashlib.sha256(manifest_json.encode()).hexdigest() + manifest['manifest_checksum'] = manifest_checksum + + with open(manifest_path, 'w') as f: + json.dump(manifest, f, indent=2) + + return manifest_path, manifest_checksum + + +def parse_resume_log(log_path, verify_hashes=True): + completed_files = {} + + with open(log_path, 'r') as f: + for line in f: + parts = line.strip().split('\t', 2) + if len(parts) < 3: + continue + event_type = parts[1] + message = parts[2] + + if event_type == 'FILE_COPIED': + hash_value = None + if ' [' in message and message.endswith(']'): + hash_start = message.rfind(' [') + hash_value = message[hash_start + 2:-1] + message = message[:hash_start] + + arrow_pos = message.find(' -> ') + if arrow_pos == -1: + continue + + source = message[:arrow_pos] + destination = message[arrow_pos + 4:] + + resolved_source = str(Path(source).resolve()) + + if verify_hashes and hash_value: + dest_path = Path(destination) + if dest_path.exists(): + actual_hash = compute_hash(dest_path, 'sha256') + if actual_hash == hash_value: + completed_files[resolved_source] = hash_value + else: + completed_files[resolved_source] = hash_value + + return completed_files + + +class ForensicLog: + def __init__(self, log_path): + self.log_path = log_path + self.f = open(log_path, 'a') + self._write('SESSION_START', f'Forensic log initialized: {log_path.name}') + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self.f.closed: + if exc_type is not None: + self._write('SESSION_ABORT', f'{exc_type.__name__}: {exc_val}') + self.f.close() + return False + + def _write(self, event_type, message): + timestamp = datetime.datetime.now().isoformat() + self.f.write(f'{timestamp}\t{event_type}\t{message}\n') + self.f.flush() + + def file_copied(self, source, destination, file_hash=None): + msg = f'{source} -> {destination}' + if file_hash: + msg += f' [{file_hash}]' + self._write('FILE_COPIED', msg) + + def file_skipped(self, source, reason): + self._write('FILE_SKIPPED', f'{source} ({reason})') + + def file_error(self, source, error_msg): + self._write('FILE_ERROR', f'{source}: {error_msg}') + + def warning(self, message): + self._write('WARNING', message) + + def complete(self, files_copied, files_skipped, total_bytes): + self._write('SUMMARY', f'copied={files_copied} skipped={files_skipped} bytes={total_bytes}') + self._write('SESSION_END', 'Collection complete') + self.f.close() diff --git a/hypoxia/utils.py b/hypoxia/utils.py new file mode 100644 index 0000000..1ad9c1f --- /dev/null +++ b/hypoxia/utils.py @@ -0,0 +1,230 @@ +import sys +import shutil +import zipfile +import datetime +from pathlib import Path +from hypoxia.colors import info, success, warning, error +from hypoxia.forensic import compute_hash, create_manifest, ForensicLog + + +WORKSPACE = Path.cwd() +WORKDIR = 'data' + +WARNING_FREE_SPACE = 500 * 1024 * 1024 +CRITICAL_FREE_SPACE = 50 * 1024 * 1024 + + +def prepare_workspace(task_id, file_extensions, verbosity): + if verbosity: + info('Initializing workspace...') + (WORKSPACE / WORKDIR).mkdir(exist_ok=True) + (WORKSPACE / WORKDIR / task_id).mkdir(exist_ok=True) + + for file_extension in file_extensions: + (WORKSPACE / WORKDIR / task_id / file_extension).mkdir(exist_ok=True) + if verbosity: + success('Workspace initialized.') + return True + + +def parse_size(size_str=None): + if not size_str: + return None + units = {'gb': 1024 ** 3, 'mb': 1024 ** 2, 'kb': 1024, 'b': 1} + normalized = size_str.strip().lower() + for unit, multiplier in units.items(): + if normalized.endswith(unit): + try: + return int(float(normalized[:-len(unit)]) * multiplier) + except ValueError: + error(f'Invalid size value: "{size_str}".') + sys.exit(1) + try: + return int(normalized) + except ValueError: + error(f'Invalid size format: "{size_str}". Supported formats: 500b, 10kb, 100mb, 2gb.') + sys.exit(1) + + +def parse_date(date_str=None, label='date'): + if not date_str: + return None + try: + return datetime.datetime.strptime(date_str, '%Y-%m-%d').date() + except ValueError: + error(f'Invalid {label} format: "{date_str}". Expected format: YYYY-MM-DD.') + sys.exit(1) + + +def collect_files(task_id, file_extensions, verbosity, keep_metadata, search_path, date_from_str, date_to_str, size_min_str, size_max_str, exclude_dirs=None, hash_algorithm='sha256', resumed_files=None): + if verbosity: + info(f'Scanning directory: {search_path}') + + copy_function = shutil.copy2 if keep_metadata else shutil.copy + + search_path_obj = Path(search_path) + + start_date = parse_date(date_from_str, label='--date-from') + end_date = parse_date(date_to_str, label='--date-to') + size_min = parse_size(size_min_str) + size_max = parse_size(size_max_str) + + if exclude_dirs is None: + exclude_dirs = [] + + if resumed_files is None: + resumed_files = {} + + use_hashing = hash_algorithm and hash_algorithm != 'none' + manifest_entries = [] + + log_path = WORKSPACE / WORKDIR / task_id / 'forensic.log' + + with ForensicLog(log_path) as forensic_log: + low_space_warned = False + + files_copied = 0 + files_skipped = 0 + total_bytes = 0 + + for file_extension in file_extensions: + files_to_copy = search_path_obj.rglob(f'*.{file_extension}') + + for source_file in files_to_copy: + free_space = shutil.disk_usage(WORKSPACE).free + + if free_space < CRITICAL_FREE_SPACE: + error(f'CRITICAL: Insufficient disk space ({free_space // (1024 * 1024)}MB remaining). Halting execution.') + return False + + if free_space < WARNING_FREE_SPACE and not low_space_warned: + warning(f'WARNING: Low disk space ({free_space // (1024 * 1024)}MB remaining).') + low_space_warned = True + + if exclude_dirs: + file_parts = [p.lower() for p in source_file.parts] + if any(excluded in file_parts for excluded in exclude_dirs): + files_skipped += 1 + forensic_log.file_skipped(source_file, 'excluded directory') + continue + + if str(source_file.resolve()) in resumed_files: + files_skipped += 1 + forensic_log.file_skipped(source_file, 'already completed (resume)') + continue + + try: + file_stat = source_file.stat() + file_mtime = datetime.datetime.fromtimestamp(file_stat.st_mtime).date() + file_size = file_stat.st_size + except OSError as e: + files_skipped += 1 + forensic_log.file_error(source_file, f'failed to read file attributes: {e}') + continue + + if start_date and file_mtime < start_date: + files_skipped += 1 + forensic_log.file_skipped(source_file, 'before date range') + continue + if end_date and file_mtime > end_date: + files_skipped += 1 + forensic_log.file_skipped(source_file, 'after date range') + continue + if size_min and file_size < size_min: + files_skipped += 1 + forensic_log.file_skipped(source_file, 'below size minimum') + continue + if size_max and file_size > size_max: + files_skipped += 1 + forensic_log.file_skipped(source_file, 'above size maximum') + continue + + try: + if verbosity: + info(f'Copying: {source_file}') + + destination_file = WORKSPACE / WORKDIR / task_id / file_extension / source_file.name + + if destination_file.exists(): + stem = source_file.stem + suffix = source_file.suffix + counter = 1 + while destination_file.exists(): + destination_file = WORKSPACE / WORKDIR / task_id / file_extension / f'{stem}_{counter}{suffix}' + counter += 1 + if verbosity: + warning(f'Name collision resolved: {source_file.name} -> {destination_file.name}') + + copy_function(source_file, destination_file) + + file_hash = None + if use_hashing: + file_hash = compute_hash(destination_file, hash_algorithm) + + manifest_entries.append({ + 'original_path': str(source_file), + 'destination_path': str(destination_file), + 'file_size': file_size, + 'modified_at': file_mtime.isoformat(), + 'copied_at': datetime.datetime.now().isoformat(), + 'hash': file_hash + }) + + forensic_log.file_copied(source_file, destination_file, file_hash) + + files_copied += 1 + total_bytes += file_size + + except (IOError, OSError) as e: + if verbosity: + error(f'Failed to copy {source_file}: {e}') + forensic_log.file_error(source_file, str(e)) + + if verbosity: + success('File collection complete.') + info(f'Files copied: {files_copied}') + info(f'Files skipped: {files_skipped}') + if total_bytes < 1024 * 1024: + info(f'Total size: {total_bytes / 1024:.1f} KB') + elif total_bytes < 1024 * 1024 * 1024: + info(f'Total size: {total_bytes / (1024 * 1024):.1f} MB') + else: + info(f'Total size: {total_bytes / (1024 * 1024 * 1024):.2f} GB') + + forensic_log.complete(files_copied, files_skipped, total_bytes) + + if verbosity: + success(f'Forensic log saved: {log_path.name}') + + manifest_path = WORKSPACE / WORKDIR / task_id / 'manifest.json' + manifest_file, manifest_checksum = create_manifest(manifest_entries, task_id, manifest_path, hash_algorithm if use_hashing else 'none') + if verbosity: + success(f'Forensic manifest saved: {manifest_path.name}') + info(f'Manifest checksum (SHA-256): {manifest_checksum}') + + return True + + +def archive_output(task_id, verbosity): + task_dir = WORKSPACE / WORKDIR / task_id + archive_path = WORKSPACE / WORKDIR / f'{task_id}.zip' + + if verbosity: + info(f'Creating archive: {archive_path.name}') + + with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zf: + for file in task_dir.rglob('*'): + if file.is_file(): + arcname = file.relative_to(task_dir) + zf.write(file, arcname) + + if verbosity: + archive_size = archive_path.stat().st_size + if archive_size < 1024 * 1024: + success(f'Archive created: {archive_path.name} ({archive_size / 1024:.1f} KB)') + elif archive_size < 1024 * 1024 * 1024: + success(f'Archive created: {archive_path.name} ({archive_size / (1024 * 1024):.1f} MB)') + else: + success(f'Archive created: {archive_path.name} ({archive_size / (1024 * 1024 * 1024):.2f} GB)') + + return archive_path diff --git a/utils.py b/utils.py deleted file mode 100644 index 52cbab3..0000000 --- a/utils.py +++ /dev/null @@ -1,143 +0,0 @@ -import sys -import shutil -import datetime -from pathlib import Path -from colors import info, success, warning, error - - -WORKSPACE = Path.cwd() -WORKDIR = 'data' - -WARNING_FREE_SPACE = 500 * 1024 * 1024 -CRITICAL_FREE_SPACE = 50 * 1024 * 1024 - - -def prepare_workspace(task_id, file_extensions, verbosity): - if verbosity: - info('Initializing workspace...') - (WORKSPACE / WORKDIR).mkdir(exist_ok=True) - (WORKSPACE / WORKDIR / task_id).mkdir(exist_ok=True) - - for file_extension in file_extensions: - (WORKSPACE / WORKDIR / task_id / file_extension).mkdir(exist_ok=True) - if verbosity: - success('Workspace initialized.') - return True - - -def parse_size(size_str=None): - if not size_str: - return None - units = {'gb': 1024 ** 3, 'mb': 1024 ** 2, 'kb': 1024, 'b': 1} - normalized = size_str.strip().lower() - for unit, multiplier in units.items(): - if normalized.endswith(unit): - try: - return int(float(normalized[:-len(unit)]) * multiplier) - except ValueError: - error(f'Invalid size value: "{size_str}".') - sys.exit(1) - try: - return int(normalized) - except ValueError: - error(f'Invalid size format: "{size_str}". Supported formats: 500b, 10kb, 100mb, 2gb.') - sys.exit(1) - - -def parse_date(date_str=None, label='date'): - if not date_str: - return None - try: - return datetime.datetime.strptime(date_str, '%Y-%m-%d').date() - except ValueError: - error(f'Invalid {label} format: "{date_str}". Expected format: YYYY-MM-DD.') - sys.exit(1) - - -def collect_files(task_id, file_extensions, verbosity, keep_metadata, search_path, date_from_str, date_to_str, size_min_str, size_max_str): - if verbosity: - info(f'Scanning directory: {search_path}') - - copy_function = shutil.copy2 if keep_metadata else shutil.copy - - search_path_obj = Path(search_path) - - start_date = parse_date(date_from_str, label='--date-from') - end_date = parse_date(date_to_str, label='--date-to') - size_min = parse_size(size_min_str) - size_max = parse_size(size_max_str) - - low_space_warned = False - - files_copied = 0 - files_skipped = 0 - total_bytes = 0 - - for file_extension in file_extensions: - files_to_copy = search_path_obj.rglob(f'*.{file_extension}') - - for source_file in files_to_copy: - free_space = shutil.disk_usage(WORKSPACE).free - - if free_space < CRITICAL_FREE_SPACE: - error(f'CRITICAL: Insufficient disk space ({free_space // (1024 * 1024)}MB remaining). Halting execution.') - return False - - if free_space < WARNING_FREE_SPACE and not low_space_warned: - warning(f'WARNING: Low disk space ({free_space // (1024 * 1024)}MB remaining).') - low_space_warned = True - - file_stat = source_file.stat() - file_mtime = datetime.datetime.fromtimestamp(file_stat.st_mtime).date() - file_size = file_stat.st_size - - if start_date and file_mtime < start_date: - files_skipped += 1 - continue - if end_date and file_mtime > end_date: - files_skipped += 1 - continue - if size_min and file_size < size_min: - files_skipped += 1 - continue - if size_max and file_size > size_max: - files_skipped += 1 - continue - - try: - if verbosity: - info(f'Copying: {source_file}') - - destination_file = WORKSPACE / WORKDIR / task_id / file_extension / source_file.name - - if destination_file.exists(): - stem = source_file.stem - suffix = source_file.suffix - counter = 1 - while destination_file.exists(): - destination_file = WORKSPACE / WORKDIR / task_id / file_extension / f'{stem}_{counter}{suffix}' - counter += 1 - if verbosity: - warning(f'Name collision resolved: {source_file.name} -> {destination_file.name}') - - copy_function(source_file, destination_file) - - files_copied += 1 - total_bytes += file_size - - except (IOError, OSError) as e: - if verbosity: - error(f'Failed to copy {source_file}: {e}') - - if verbosity: - success('File collection complete.') - info(f'Files copied: {files_copied}') - info(f'Files skipped: {files_skipped}') - if total_bytes < 1024 * 1024: - info(f'Total size: {total_bytes / 1024:.1f} KB') - elif total_bytes < 1024 * 1024 * 1024: - info(f'Total size: {total_bytes / (1024 * 1024):.1f} MB') - else: - info(f'Total size: {total_bytes / (1024 * 1024 * 1024):.2f} GB') - - return True