"""
Absfuyu: Path
-------------
Path related
Version: 5.1.0
Date updated: 10/03/2025 (dd/mm/yyyy)
Feature:
--------
- Directory
- SaveFileAs
"""
# Module level
# ---------------------------------------------------------------------------
__all__ = [
# Main
"DirectoryBase",
"Directory",
"SaveFileAs",
# Mixin
"DirectoryInfoMixin",
"DirectoryBasicOperationMixin",
"DirectoryArchiverMixin",
"DirectoryOrganizerMixin",
"DirectoryTreeMixin",
# Support
"FileOrFolderWithModificationTime",
"DirectoryInfo",
]
# Library
# ---------------------------------------------------------------------------
import os
import re
import shutil
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Any, ClassVar, Literal, NamedTuple
from absfuyu.core.baseclass import BaseClass
from absfuyu.core.decorator import add_subclass_methods_decorator
from absfuyu.core.docstring import deprecated, versionadded, versionchanged
from absfuyu.logger import logger
# Support Class
# ---------------------------------------------------------------------------
[docs]
@versionadded("3.3.0")
class FileOrFolderWithModificationTime(NamedTuple):
"""
File or Folder with modification time
:param path: Original path
:param modification_time: Modification time
"""
path: Path
modification_time: datetime
[docs]
@deprecated(
"5.1.0", reason="Support for ``DirectoryInfoMixin`` which is also deprecated"
)
@versionadded("3.3.0")
class DirectoryInfo(NamedTuple):
"""
Information of a directory
"""
creation_time: datetime
modification_time: datetime
# Class - Directory
# ---------------------------------------------------------------------------
@add_subclass_methods_decorator
class DirectoryBase(BaseClass):
"""
Directory - Base
Parameters
----------
source_path : str | Path
Source folder
create_if_not_exist : bool
Create directory when not exist,
by default ``False``
"""
# Custom attribute
_METHOD_INCLUDE: ClassVar[bool] = True # Include in DIR_METHODS
SUBCLASS_METHODS: ClassVar[dict[str, list[str]]] = {}
def __init__(
self,
source_path: str | Path,
create_if_not_exist: bool = False,
) -> None:
"""
Parameters
----------
source_path : str | Path
Source folder
create_if_not_exist : bool
Create directory when not exist,
by default ``False``
"""
self.source_path = Path(source_path)
if not self.source_path.exists():
if create_if_not_exist:
self.source_path.mkdir(exist_ok=True, parents=True)
else:
raise FileNotFoundError("Directory not existed")
[docs]
class DirectoryInfoMixin(DirectoryBase):
"""
Directory - Info
- Quick info
"""
[docs]
@deprecated("5.1.0", reason="Not efficient")
@versionadded("3.3.0")
def quick_info(self) -> DirectoryInfo:
"""
Quick information about this Directory
Returns
-------
DirectoryInfo
DirectoryInfo
"""
source_stat: os.stat_result = self.source_path.stat()
out = DirectoryInfo(
creation_time=datetime.fromtimestamp(source_stat.st_ctime),
modification_time=datetime.fromtimestamp(source_stat.st_mtime),
)
return out
[docs]
class DirectoryBasicOperationMixin(DirectoryBase):
"""
Directory - Basic operation
- Rename
- Copy
- Move
- Delete
"""
# Rename
[docs]
def rename(self, new_name: str) -> None:
"""
Rename directory
Parameters
----------
new_name : str
Name only (not the entire path)
"""
try:
logger.debug(f"Renaming to {new_name}...")
self.source_path.rename(self.source_path.with_name(new_name))
logger.debug(f"Renaming to {new_name}...DONE")
except Exception as e:
logger.error(e)
# return self.source_path
# Copy
[docs]
def copy(self, dst: Path) -> None:
"""
Copy entire directory
Parameters
----------
dst : Path
Destination
"""
logger.debug(f"Copying to {dst}...")
try:
try:
shutil.copytree(self.source_path, Path(dst), dirs_exist_ok=True)
except Exception:
shutil.copytree(self.source_path, Path(dst))
logger.debug(f"Copying to {dst}...DONE")
except Exception as e:
logger.error(e)
# Move
[docs]
def move(self, dst: Path, content_only: bool = False) -> None:
"""
Move entire directory
Parameters
----------
dst : Path
Destination
content_only : bool
Only move content inside the folder (Default: ``False``; Move entire folder)
"""
try:
logger.debug(f"Moving to {dst}...")
if content_only:
for x in self.source_path.iterdir():
shutil.move(x, Path(dst))
else:
shutil.move(self.source_path, Path(dst))
logger.debug(f"Moving to {dst}...DONE")
except OSError as e: # File already exists
logger.error(e)
logger.debug("Overwriting file...")
if content_only:
for x in self.source_path.iterdir():
shutil.move(x, Path(dst).joinpath(x.name))
else:
shutil.move(self.source_path, Path(dst))
logger.debug("Overwriting file...DONE")
# Delete folder
def _mtime_folder(self) -> list[FileOrFolderWithModificationTime]:
"""
Get modification time of file/folder (first level only)
"""
return [
FileOrFolderWithModificationTime(
path, datetime.fromtimestamp(path.stat().st_mtime)
)
for path in self.source_path.glob("*")
]
@staticmethod
def _delete_files(list_of_files: list[Path]) -> None:
"""
Delete files/folders
"""
for x in list_of_files:
x = Path(x).absolute()
logger.debug(f"Removing {x}...")
try:
if x.is_dir():
shutil.rmtree(x)
else:
x.unlink()
logger.debug(f"Removing {x}...SUCCEED")
except Exception:
logger.error(f"Removing {x}...FAILED")
@staticmethod
def _date_filter(
value: FileOrFolderWithModificationTime,
period: Literal["Y", "M", "D"] = "Y",
) -> bool:
"""
Filter out file with current Year|Month|Day
"""
data = {
"Y": value.modification_time.year,
"M": value.modification_time.month,
"D": value.modification_time.day,
}
now = datetime.now()
ntime = {"Y": now.year, "M": now.month, "D": now.day}
return data[period] != ntime[period]
[docs]
def delete(
self,
entire: bool = False,
*,
based_on_time: bool = False,
keep: Literal["Y", "M", "D"] = "Y",
) -> None:
"""
Deletes everything
Parameters
----------
entire : bool
| ``True``: Deletes the folder itself
| ``False``: Deletes content inside only
| (Default: ``False``)
based_on_time : bool
| ``True``: Deletes everything except ``keep`` period
| ``False``: Works normal
| (Default: ``False``)
keep : Literal["Y", "M", "D"]
Delete all file except current ``Year`` | ``Month`` | ``Day``
"""
try:
logger.info(f"Removing {self.source_path}...")
if entire:
shutil.rmtree(self.source_path)
else:
if based_on_time:
filter_func = partial(self._date_filter, period=keep)
# self._delete_files([x[0] for x in filter(filter_func, self._mtime_folder())])
self._delete_files(
[x.path for x in filter(filter_func, self._mtime_folder())]
)
else:
self._delete_files(
map(lambda x: x.path, self._mtime_folder()) # type: ignore
)
logger.info(f"Removing {self.source_path}...SUCCEED")
except Exception as e:
logger.error(f"Removing {self.source_path}...FAILED\n{e}")
[docs]
class DirectoryArchiverMixin(DirectoryBase):
"""
Directory - Archiver/Compress
- Compress
- Decompress
- Register extra zip format <staticmethod>
"""
[docs]
@versionchanged("5.1.0", reason="Update funcionality (new parameter)")
def compress(
self,
format: Literal["zip", "tar", "gztar", "bztar", "xztar"] = "zip",
delete_after_compress: bool = False,
move_inside: bool = True,
) -> Path | None:
"""
Compress the directory (Default: Create ``.zip`` file)
Parameters
----------
format : Literal["zip", "tar", "gztar", "bztar", "xztar"], optional
By default ``"zip"``
- ``zip``: ZIP file (if the ``zlib`` module is available).
- ``tar``: Uncompressed tar file. Uses POSIX.1-2001 pax format for new archives.
- ``gztar``: gzip'ed tar-file (if the ``zlib`` module is available).
- ``bztar``: bzip2'ed tar-file (if the ``bz2`` module is available).
- ``xztar``: xz'ed tar-file (if the ``lzma`` module is available).
delete_after_compress : bool, optional
Delete directory after compress, by default ``False``
move_inside : bool, optional
Move the commpressed file inside the directory,
by default ``True``
Returns
-------
Path
Compressed path
None
When fail to compress
"""
logger.debug(f"Zipping {self.source_path}...")
try:
# Zip
# zip_name = self.source_path.parent.joinpath(self.source_path.name).__str__()
# shutil.make_archive(zip_name, format=format, root_dir=self.source_path)
zip_path = shutil.make_archive(
self.source_path.__str__(), format=format, root_dir=self.source_path
)
logger.debug(f"Zipping {self.source_path}...DONE")
logger.debug(f"Path: {zip_path}")
# Del
if delete_after_compress:
move_inside = False
shutil.rmtree(self.source_path)
# Move
if move_inside:
zf = Path(zip_path)
_move_path = self.source_path.joinpath(zf.name)
if _move_path.exists():
_move_path.unlink(missing_ok=True)
_move = zf.rename(_move_path)
return _move
return Path(zip_path)
except (FileExistsError, OSError) as e:
logger.error(f"Zipping {self.source_path}...FAILED\n{e}")
return None
[docs]
@versionadded("5.1.0")
def decompress(
self,
format: Literal["zip", "tar", "gztar", "bztar", "xztar"] | None = None,
delete_after_done: bool = False,
) -> None:
"""
Decompress compressed file in directory (first level only)
Parameters
----------
format : Literal["zip", "tar", "gztar", "bztar", "xztar"] | None, optional
By default ``None``
- ``zip``: ZIP file (if the ``zlib`` module is available).
- ``tar``: Uncompressed tar file. Uses POSIX.1-2001 pax format for new archives.
- ``gztar``: gzip'ed tar-file (if the ``zlib`` module is available).
- ``bztar``: bzip2'ed tar-file (if the ``bz2`` module is available).
- ``xztar``: xz'ed tar-file (if the ``lzma`` module is available).
delete_after_done : bool, optional
Delete compressed file when extracted, by default ``False``
"""
# Register extra extension
self.register_extra_zip_format()
# Decompress first level only
for path in self.source_path.glob("*"):
try:
shutil.unpack_archive(
path, path.parent.joinpath(path.stem), format=format
)
if delete_after_done and path.is_file():
path.unlink(missing_ok=True)
except OSError:
continue
[docs]
class DirectoryOrganizerMixin(DirectoryBase):
"""
Directory - File organizer - SOON
"""
pass
[docs]
class DirectoryTreeMixin(DirectoryBase):
# Directory structure
def _list_dir(self, *ignore: str) -> list[Path]:
"""
List all directories and files
Parameters
----------
ignore : str
List of pattern to ignore. Example: "__pycache__", ".pyc"
"""
logger.debug(f"Base folder: {self.source_path.name}")
list_of_path = self.source_path.glob("**/*")
# No ignore rules
if len(ignore) == 0: # No ignore pattern
return [path.relative_to(self.source_path) for path in list_of_path]
# With ignore rules
# ignore_pattern = "|".join(ignore)
ignore_pattern = re.compile("|".join(ignore))
logger.debug(f"Ignore pattern: {ignore_pattern}")
return [
path.relative_to(self.source_path)
for path in list_of_path
if re.search(ignore_pattern, path.name) is None
]
@staticmethod
@versionadded("3.3.0")
def _split_dir(list_of_path: list[Path]) -> list[list[str]]:
"""
Split pathname by ``os.sep``
Parameters
----------
list_of_path : list[Path]
List of Path
Returns
-------
list[list[str]]
List of splitted dir
Example:
--------
>>> test = [Path(test_root/test_not_root), ...]
>>> Directory._split_dir(test)
[[test_root, test_not_root], [...]...]
"""
return sorted([str(path).split(os.sep) for path in list_of_path])
def _separate_dir_and_files(
self,
list_of_path: list[Path],
*,
tab_symbol: str | None = None,
sub_dir_symbol: str | None = None,
) -> list[str]:
"""
Separate dir and file and transform into folder structure
Parameters
----------
list_of_path : list[Path]
List of paths
tab_symbol : str | None
Tab symbol
(Default: ``"\\t"``)
sub_dir_symbol : str | None
Sub-directory symbol
(Default: ``"|-- "``)
Returns
-------
list[str]
Folder structure ready to print
"""
# Check for tab and sub-dir symbol
if tab_symbol is None:
tab_symbol = "\t"
if sub_dir_symbol is None:
sub_dir_symbol = "|-- "
temp: list[list[str]] = self._split_dir(list_of_path)
return [ # Returns n-tab space with sub-dir-symbol for the last item in x
f"{tab_symbol * (len(x) - 1)}{sub_dir_symbol}{x[-1]}" for x in temp
]
[docs]
def list_structure(self, *ignore: str) -> str:
"""
List folder structure
Parameters
----------
ignore : str
Tuple contains patterns to ignore
Returns
-------
str
Directory structure
Example (For typical python library):
-------------------------------------
>>> test = Directory(<source path>)
>>> test.list_structure(
"__pycache__",
".pyc",
"__init__",
"__main__",
)
...
"""
temp: list[Path] = self._list_dir(*ignore)
out: list[str] = self._separate_dir_and_files(temp)
return "\n".join(out) # Join the list
[docs]
def list_structure_pkg(self) -> str:
"""
List folder structure of a typical python package
Returns
-------
str
Directory structure
"""
return self.list_structure("__pycache__", ".pyc")
[docs]
class Directory(
DirectoryTreeMixin,
DirectoryOrganizerMixin,
DirectoryArchiverMixin,
DirectoryBasicOperationMixin,
DirectoryInfoMixin,
):
"""
Some shortcuts for directory
Parameters
----------
source_path : str | Path
Source folder
create_if_not_exist : bool
Create directory when not exist,
by default ``False``
Example:
--------
>>> # For a list of method
>>> Directory.SUBCLASS_METHODS
"""
pass
# Class - SaveFileAs
# ---------------------------------------------------------------------------
[docs]
class SaveFileAs:
"""
File as multiple file type
"""
def __init__(self, data: Any, *, encoding: str | None = "utf-8") -> None:
"""
:param encoding: Default: utf-8
"""
self.data = data
self.encoding = encoding
def __str__(self) -> str:
return f"{self.__class__.__name__}()"
def __repr__(self) -> str:
return self.__str__()
[docs]
def to_txt(self, path: str | Path) -> None:
"""
Save as ``.txt`` file
Parameters
----------
path : Path
Save location
"""
with open(path, "w", encoding=self.encoding) as file:
file.writelines(self.data)
# def to_pickle(self, path: Union[str, Path]) -> None:
# """
# Save as .pickle file
# :param path: Save location
# """
# from absfuyu.util.pkl import Pickler
# Pickler.save(path, self.data)
# def to_json(self, path: Union[str, Path]) -> None:
# """
# Save as .json file
# :param path: Save location
# """
# from absfuyu.util.json_method import JsonFile
# temp = JsonFile(path, sort_keys=False)
# temp.save_json()