"""
Absfuyu: Content
----------------
Handle .txt file
Version: 5.1.0
Date updated: 10/03/2025 (dd/mm/yyyy)
Usage:
------
>>> from absfuyu.general.content import ContentLoader
"""
# TODO: rewrite this
# Module level
# ---------------------------------------------------------------------------
__all__ = ["ContentLoader", "Content", "LoadedContent"]
# Library
# ---------------------------------------------------------------------------
import json
import random
import re
from collections import Counter
from itertools import chain
from pathlib import Path
from typing import Self
from absfuyu.core import BaseClass, ShowAllMethodsMixin, unidecode
from absfuyu.logger import logger
# Class
# ---------------------------------------------------------------------------
[docs]
class Content(BaseClass):
"""
Contain data
Data format: list[str, list[str]]
where: ``str: data``; ``list[str]: data tags``
"""
def __init__(self, data: list) -> None:
self.data: str = str(data[0])
self.tag: list = data[1]
# logger.debug(self.__dict__)
def __str__(self) -> str:
# return f"{self.data} | {self.tag}"
return str(self.data)
[docs]
def unidecoded(self) -> Self:
"""
Convert data through ``unidecode`` package
Returns
-------
Content
``unidecoded`` Content
"""
return self.__class__([unidecode(self.data), list(map(unidecode, self.tag))])
[docs]
def to_text(self) -> str:
"""
Convert back into text
Returns
-------
str
Text
"""
# hard code
tags = ",".join(self.tag)
return f"{self.data}|{tags}"
[docs]
def short_form(self, separator: str = ",") -> str:
"""
Short form show only first item when separated by ``separator``
Parameters
----------
separator : str
Default: ``","``
Returns
-------
str
Short form
"""
if not separator.startswith(","):
logger.debug(f"Separated symbol: {separator}")
return self.data.split(separator)[0]
[docs]
def handle_address(
self, address_separator: str = ",", *, first_item_not_address: bool = True
) -> Self:
"""
Handle ``self.data`` as address and then update the address into ``self.tag``
Parameters
----------
address_separator : str
| Split the address by which character
| (Default: ``","``)
first_item_not_address : bool
Set to ``False`` when ``<splited data>[0]`` is not part of an address
Returns
-------
Content
Handled Content
Example:
--------
>>> test = "Shop A, 22 ABC Street, DEF District, GHI City"
>>> # After handle_address()
["Shop A", "22 ABC Street", "DEF District", "GHI City"]
>>> # After handle_address(first_item_not_address = False)
["22 ABC Street", "DEF District", "GHI City"]
"""
if first_item_not_address:
temp = self.data.split(address_separator)
else:
logger.debug(
f"First item ({self.data.split(address_separator)[0]}) is not part of an address"
)
temp = self.data.split(address_separator)[1:]
new_tag = [x.strip().lower() for x in temp]
logger.debug(f"Current tags: {self.tag}")
logger.debug(f"New tags: {new_tag}")
self.tag = list(set(self.tag + new_tag))
logger.debug(f"Final tags: {self.tag} Len: {len(self.tag)}")
return self.__class__([self.data, self.tag])
[docs]
class LoadedContent(list[Content], ShowAllMethodsMixin):
"""
Contain list of ``Content``
"""
def __str__(self) -> str:
# return f"{self.__class__.__name__} - Total: {len(self)}"
return f"{self.__class__.__name__}({[x.data for x in self]})"
def __repr__(self) -> str:
return self.__str__()
[docs]
def apply(self, func) -> Self:
"""
Apply function to each entry
:param func: Callable function
:type func: Callable
:rtype: LoadedContent
"""
return self.__class__(func(x.data) for x in self)
[docs]
@classmethod
def load_from_json(cls, file: Path) -> Self:
"""
Use this method to load data from ``.json`` file from ``to_json()`` method
Parameters
----------
file : Path
Path to ``.json`` file
Returns
-------
LoadedContent
Loaded content from ``.json`` file
"""
with open(file) as json_file:
parsed_json: dict = json.load(json_file)
out = [Content(list(x.values())) for x in parsed_json]
return cls(out)
@property
def tags(self) -> list:
"""A list contains all available tag"""
temp = chain.from_iterable([x.tag for x in self])
out = list(set(temp))
logger.debug(f"Found {len(out)} {'tags' if len(out) > 1 else 'tag'}")
return sorted(out)
[docs]
def tag_count(self) -> Counter:
"""
Count number of tags
:rtype: Counter
"""
temp = chain.from_iterable([x.tag for x in self])
logger.debug(temp)
return Counter(temp)
[docs]
def filter(self, tag: str) -> Self:
"""
Filter out entry with ``tag``
:param tag: Tag to filter
:type tag: str
:rtype: LoadedContent
"""
tag = tag.strip().lower()
logger.debug(f"Tag: {tag}")
if tag not in self.tags:
# tag = random.choice(self.tags)
# logger.debug(f"Tag not exist, changing to a random tag... {tag}")
logger.warning(f'"{tag}" tag does not exist')
_avail_tag = ", ".join(list(dict(self.tag_count().most_common(5)).keys()))
raise ValueError(
f"Available tags: {_avail_tag},..."
f"\nMore tag at: `{self.__class__.__name__}.tags`"
)
return self.__class__([x for x in self if tag in x.tag])
[docs]
def find(self, keyword: str) -> Self:
"""
Return all entries that include ``keyword``
:param keyword: Keyword to find
:type keyword: str
:rtype: LoadedContent
"""
temp = self.__class__(
[x for x in self if x.data.lower().find(keyword.lower()) >= 0]
)
if temp:
logger.debug(f"Found {len(temp)} {'entries' if len(temp) > 1 else 'entry'}")
else:
logger.debug("No result")
return temp
[docs]
def short_form(self, separator: str = ",") -> list[str]:
"""
Shows only first item when separated by ``separator`` of ``Content.data``
Parameters
----------
separator : str
Default: ``","``
Returns
-------
list[str]
Short form
"""
return [x.short_form(separator) for x in self]
[docs]
def pick_one(self, tag: str | None = None) -> Content:
"""
Pick a random entry
Parameters
----------
tag : str | None
Pick a random entry with ``tag``
(Default: None)
Returns
-------
Content
Random entry
"""
if tag:
temp = self.filter(tag)
logger.debug(f"Tag: {tag}")
return random.choice(temp)
return random.choice(self)
[docs]
def handle_address(
self, address_separator: str = ",", *, first_item_not_address: bool = True
) -> Self:
"""
Execute ``Content.handle_address()`` on every ``self.data`` of ``Content``
Parameters
----------
address_separator : str
| Split the address by which character
| (Default: ``","``)
first_item_not_address : bool
Set to ``False`` when ``<splited data>[0]`` is not part of an address
Returns
-------
LoadedContent
Handled Content
Example:
--------
>>> test = "Shop A, 22 ABC Street, DEF District, GHI City"
>>> # After handle_address()
["Shop A", "22 ABC Street", "DEF District", "GHI City"]
>>> # After handle_address(first_item_not_address = False)
["22 ABC Street", "DEF District", "GHI City"]
"""
return self.__class__(
[
x.handle_address(
address_separator=address_separator,
first_item_not_address=first_item_not_address,
)
for x in self
]
)
[docs]
def to_json(self, no_accent: bool = False) -> str:
"""
Convert data into ``.json`` file
Parameters
----------
no_accent : bool
| when ``True``: convert the data through ``unidecode`` package
| (Default: ``False``)
Returns
-------
str
Data
"""
if no_accent:
out = [x.unidecoded().__dict__ for x in self]
else:
out = [x.__dict__ for x in self]
logger.debug(out)
return json.dumps(out, indent=2)
[docs]
def to_text(self, no_accent: bool = False) -> str:
"""
Convert data into ``.txt`` file
Parameters
----------
no_accent : bool
| when ``True``: convert the data through ``unidecode`` package
| (Default: ``False``)
Returns
-------
str
Data
"""
if no_accent:
out = [x.unidecoded().to_text() for x in self]
else:
out = [x.to_text() for x in self]
logger.debug(out)
return "\n".join(out)
[docs]
class ContentLoader(BaseClass):
"""
This load data from ``.txt`` file
Content format:
``<content><split_symbol><tags separated by <tag_separate_symbol>>``
"""
def __init__(
self,
file_path: Path | str,
characteristic_detect: bool = True,
tag_dictionary: dict | None = None,
*,
comment_symbol: str = "#",
split_symbol: str = "|",
tag_separate_symbol: str = ",",
) -> None:
"""
Parameters
----------
file_path : str
file location/file to load
characteristic_detect : bool
detect whether the content is long, short, or a question
(default: ``True``)
tag_dict : dict
custom tag pattern
format: ``{"keyword": "tag",...}``
example: ``{"apple": "fruit", "orange": "fruit"}``
comment_symbol : str
symbol that `ContentLoader` will ignore
(default: ``"#"``)
split_symbol : str
symbol that `ContentLoader` will split content and tags
(default: ``"|"``)
tag_separate_symbol : str
symbol that `ContentLoader` will split between tags
(default: ``","``)
"""
# file
self.file = Path(file_path)
# characteristic detect
self.characteristic_detect: bool = characteristic_detect
# tag dictionary
if tag_dictionary is None:
# logger.debug("No tag patern available")
self.tag_dictionary: dict = dict()
else:
logger.debug(
f"Tag pattern available: "
f"{len(tag_dictionary)} {'entries' if len(tag_dictionary) > 1 else 'entry'} "
f"({len(set(tag_dictionary.values()))} unique "
f"{'tags' if len(set(tag_dictionary.values())) > 1 else 'tag'})"
)
self.tag_dictionary = tag_dictionary
# symbol stuff
assert comment_symbol != split_symbol, (
"comment_symbol and split_symbol should have different values"
)
assert tag_separate_symbol != split_symbol, (
"tag_separate_symbol and split_symbol should have different values"
)
self.comment_symbol: str = comment_symbol
self.split_symbol: str = split_symbol
self.tag_separate_symbol: str = tag_separate_symbol
[docs]
def content_format(self) -> str:
"""
Shows current content format
Returns
-------
str
Current content format
Example:
--------
>>> ContentLoader(".").content_format()
# This line will be ignored
<content> | <tag1>, <tag2>
"""
out = (
f"{self.comment_symbol} This line will be ignored\n"
f"<content> {self.split_symbol} "
f"<tag1>{self.tag_separate_symbol} <tag2>, ..., <tag n>"
)
return out
[docs]
def load(self) -> list:
"""
Load content from ``self.file``
Returns
-------
list
List of content
"""
with open(self.file, "r", encoding="utf-8") as data:
logger.debug("Loading data...")
dat = []
check = []
for i, x in enumerate(data.readlines()):
x = x.strip()
if x.startswith(self.comment_symbol) or len(x) == 0:
continue # skip comment and empty lines
logger.debug(
f"### Loop {i + 1} #####################################################################"
)
temp = x.split(self.split_symbol)
if len(temp) != 2:
logger.debug(f"Split len: {len(temp)}")
logger.warning(
f"The current entry is missing data or tag: {x[:20]}..."
)
temp[0] = temp[0].strip()
if temp[0].lower() not in check:
check.append(temp[0].lower()) # check for dupes
# tag
additional_tags = []
for k, v in self.tag_dictionary.items():
key = k.strip().lower()
val = temp[0].lower()
regex_pattern = f"[^a-zA-Z0-9]({key})[^a-zA-Z0-9]|^({key})[^a-zA-Z0-9]|[^a-zA-Z0-9]({key})$"
if re.search(regex_pattern, val) is not None or val.startswith(
key
):
# regex has a bug (or idk if it's a bug or not) that
# doesn't recognise when there is only one word in the sentence
# therefore use `val.startswith(key)` to fix
additional_tags.append(v.strip().lower())
if self.characteristic_detect:
long_short = (120, 20) # setting
if len(temp[0]) > long_short[0]:
additional_tags.append("long")
if len(temp[0]) < long_short[1]:
additional_tags.append("short")
if temp[0][-1].startswith("?"):
additional_tags.append("question")
if additional_tags:
logger.debug(f"Additional tags: {additional_tags}")
try:
tags = [
tag.strip() for tag in temp[1].strip().lower().split(",")
] # separate and strip tags
logger.debug(f"Tags: {tags}")
except Exception:
logger.warning("No tag found in the original string")
if additional_tags:
tags = additional_tags
else:
tags = ["unspecified"]
logger.debug('Assigned "unspecified" tag')
tags.extend(additional_tags)
final_tags = list(set(tags))
logger.debug(f"Final tags: {final_tags} Len: {len(final_tags)}")
dat.append([temp[0], final_tags]) # add everything
else:
logger.debug(f"Found duplicates, {x} removed")
return dat
[docs]
def load_content(self) -> LoadedContent:
"""
Load data into a list of ``Content``
Returns
-------
LoadedContent
Loaded content
"""
return LoadedContent(map(Content, self.load()))