format_checker.py

Code Hygiene Score: 86

Keine Issues gefunden.

Dependencies 5

Funktionen 8

Code

"""
Deterministic Format Checker - Code-based formatting validation.

Replaces/supplements LLM-based Formatierungsprüfer for reliable character-level checks.
LLMs hallucinate on character detection; this module provides deterministic results.
"""

import re
import sys
from typing import Any

sys.path.insert(0, "/var/www/scripts/pipeline")

# Common emoji ranges (simplified - covers most used emojis)
EMOJI_PATTERN = re.compile(
    "["
    "\U0001f300-\U0001f9ff"  # Miscellaneous Symbols and Pictographs, Emoticons, etc.
    "\U00002702-\U000027b0"  # Dingbats
    "\U0001f600-\U0001f64f"  # Emoticons
    "\U0001f680-\U0001f6ff"  # Transport and Map
    "\U00002600-\U000026ff"  # Misc symbols
    "\U00002b50-\U00002b55"  # Stars
    "\U000023e9-\U000023f3"  # Media symbols
    "\U0000200d"  # Zero width joiner
    "\U0000fe0f"  # Variation selector
    "]+",
    flags=re.UNICODE,
)

# Special bullets that count as emojis in plain text context
BULLET_EMOJIS = ["▪️", "▫️", "◾", "◽", "●", "○", "◆", "◇", "★", "☆", "✓", "✗", "✔", "✘", "→", "➡", "➔"]


def extract_rules(structure_config: dict | None, profile_config: dict | None) -> dict:
    """
    Extract formatting rules from structure and profile configs.

    Args:
        structure_config: The structure configuration dict (from content_config.content)
        profile_config: The author profile configuration dict

    Returns:
        Dict with boolean flags for each rule type
    """
    rules = {
        "emojis_verboten": False,
        "markdown_verboten": False,
        "fettschrift_verboten": False,
        "kursiv_verboten": False,
        "header_verboten": False,
        "gedankenstriche_verboten": False,
        "hashtags_verboten": False,
        "ausrufezeichen_sparsam": False,
        "output_format": "markdown",
    }

    # Extract from structure
    if structure_config:
        # Check ausgabe.format
        ausgabe = structure_config.get("ausgabe", {})
        rules["output_format"] = ausgabe.get("format", "markdown")

        # If plain text, most formatting is implicitly forbidden
        if rules["output_format"] == "reiner Text":
            rules["markdown_verboten"] = True
            rules["fettschrift_verboten"] = True
            rules["kursiv_verboten"] = True
            rules["header_verboten"] = True

        # Check formatierung section
        fmt = structure_config.get("formatierung", {})
        if isinstance(fmt, dict):
            if "verboten" in str(fmt.get("emojis", "")).lower():
                rules["emojis_verboten"] = True
            if "verboten" in str(fmt.get("fettschrift", "")).lower():
                rules["fettschrift_verboten"] = True
            if "verboten" in str(fmt.get("markdown", "")).lower():
                rules["markdown_verboten"] = True
                rules["fettschrift_verboten"] = True
                rules["kursiv_verboten"] = True
                rules["header_verboten"] = True
            if fmt.get("hashtags") == "keine":
                rules["hashtags_verboten"] = True

        # Check verboten array
        verboten = structure_config.get("verboten", [])
        for v in verboten:
            v_lower = v.lower()
            if "emoji" in v_lower:
                rules["emojis_verboten"] = True
            if "fettschrift" in v_lower or "markdown" in v_lower:
                rules["fettschrift_verboten"] = True
                rules["markdown_verboten"] = True
            if "hashtag" in v_lower:
                rules["hashtags_verboten"] = True

    # Extract from profile
    if profile_config:
        # Check grammatik_und_satzbau section
        grammatik = profile_config.get("grammatik_und_satzbau", {})
        if grammatik.get("gedankenstriche") == "verboten":
            rules["gedankenstriche_verboten"] = True

        # Check formatierung section in profile
        fmt = profile_config.get("formatierung", {})
        if isinstance(fmt, dict):
            if "verboten" in str(fmt.get("emojis", "")).lower():
                rules["emojis_verboten"] = True
            if "vermeiden" in str(fmt.get("ausrufezeichen", "")).lower():
                rules["ausrufezeichen_sparsam"] = True

    return rules


def check_emojis(text: str) -> list[dict]:
    """Check for emojis in text."""
    issues = []

    # Check regex pattern
    matches = EMOJI_PATTERN.findall(text)
    if matches:
        for match in matches[:5]:  # Limit to first 5
            pos = text.find(match)
            issues.append(
                {"type": "emoji", "char": match, "position": pos, "context": text[max(0, pos - 20) : pos + 20]}
            )

    # Check bullet emojis
    for bullet in BULLET_EMOJIS:
        if bullet in text:
            pos = text.find(bullet)
            issues.append(
                {
                    "type": "bullet_emoji",
                    "char": bullet,
                    "position": pos,
                    "context": text[max(0, pos - 20) : pos + 20],
                }
            )

    return issues


def check_markdown(
    text: str, check_bold: bool = True, check_italic: bool = True, check_headers: bool = True
) -> list[dict]:
    """Check for markdown formatting in text."""
    issues = []

    # Bold: **text** or __text__
    if check_bold:
        bold_matches = re.finditer(r"\*\*(.+?)\*\*", text)
        for m in bold_matches:
            issues.append(
                {
                    "type": "bold",
                    "match": m.group(0)[:30],
                    "position": m.start(),
                    "context": text[max(0, m.start() - 10) : m.end() + 10],
                }
            )

        bold_matches2 = re.finditer(r"__(.+?)__", text)
        for m in bold_matches2:
            issues.append({"type": "bold", "match": m.group(0)[:30], "position": m.start()})

    # Italic: *text* (but not **)
    if check_italic:
        italic_matches = re.finditer(r"(?<!\*)\*([^*\n]+?)\*(?!\*)", text)
        for m in italic_matches:
            # Skip if it's a list item marker
            if m.start() > 0 and text[m.start() - 1] == "\n":
                continue
            issues.append(
                {
                    "type": "italic",
                    "match": m.group(0)[:30],
                    "position": m.start(),
                    "context": text[max(0, m.start() - 10) : m.end() + 10],
                }
            )

    # Headers: # ## ### at line start
    if check_headers:
        header_matches = re.finditer(r"^#{1,6}\s+.+", text, re.MULTILINE)
        for m in header_matches:
            issues.append({"type": "header", "match": m.group(0)[:40], "position": m.start()})

    return issues


def check_gedankenstriche(text: str) -> list[dict]:
    """Check for en-dash and em-dash characters."""
    issues = []

    # En-dash: – (U+2013)
    for i, char in enumerate(text):
        if char == "–":
            context_start = max(0, i - 25)
            context_end = min(len(text), i + 25)
            issues.append(
                {
                    "type": "en_dash",
                    "char": "–",
                    "code": "U+2013",
                    "position": i,
                    "context": f"...{text[context_start:context_end]}...",
                }
            )

    # Em-dash: — (U+2014)
    for i, char in enumerate(text):
        if char == "—":
            context_start = max(0, i - 25)
            context_end = min(len(text), i + 25)
            issues.append(
                {
                    "type": "em_dash",
                    "char": "—",
                    "code": "U+2014",
                    "position": i,
                    "context": f"...{text[context_start:context_end]}...",
                }
            )

    return issues


def check_hashtags(text: str) -> list[dict]:
    """Check for hashtags in text."""
    issues = []

    hashtag_matches = re.finditer(r"#[A-Za-zÄÖÜäöüß]\w+", text)
    for m in hashtag_matches:
        issues.append({"type": "hashtag", "match": m.group(0), "position": m.start()})

    return issues


def check_ausrufezeichen(text: str, max_allowed: int = 2) -> list[dict]:
    """Check for excessive exclamation marks."""
    issues = []

    count = text.count("!")
    if count > max_allowed:
        # Find positions
        positions = [i for i, c in enumerate(text) if c == "!"]
        issues.append(
            {"type": "ausrufezeichen", "count": count, "max_allowed": max_allowed, "positions": positions[:5]}
        )

    return issues


def check_formatting(
    text: str,
    structure_config: dict | None = None,
    profile_config: dict | None = None,
    rules: dict | None = None,
) -> dict[str, Any]:
    """
    Run all formatting checks based on rules from structure and profile.

    Args:
        text: The content text to check
        structure_config: Structure configuration dict
        profile_config: Author profile configuration dict
        rules: Pre-extracted rules (optional, will be extracted if not provided)

    Returns:
        Dict with:
            - passed: bool
            - score: int (10 - number of issues, min 0)
            - issues: list of issue dicts
            - summary: str
    """
    # Extract rules if not provided
    if rules is None:
        rules = extract_rules(structure_config, profile_config)

    all_issues = []

    # Run checks based on rules
    if rules.get("emojis_verboten"):
        emoji_issues = check_emojis(text)
        all_issues.extend(emoji_issues)

    if rules.get("markdown_verboten") or rules.get("fettschrift_verboten"):
        md_issues = check_markdown(
            text,
            check_bold=rules.get("fettschrift_verboten", False) or rules.get("markdown_verboten", False),
            check_italic=rules.get("kursiv_verboten", False) or rules.get("markdown_verboten", False),
            check_headers=rules.get("header_verboten", False) or rules.get("markdown_verboten", False),
        )
        all_issues.extend(md_issues)

    if rules.get("gedankenstriche_verboten"):
        dash_issues = check_gedankenstriche(text)
        all_issues.extend(dash_issues)

    if rules.get("hashtags_verboten"):
        hashtag_issues = check_hashtags(text)
        all_issues.extend(hashtag_issues)

    if rules.get("ausrufezeichen_sparsam"):
        excl_issues = check_ausrufezeichen(text)
        all_issues.extend(excl_issues)

    # Calculate score
    issue_count = len(all_issues)
    score = max(0, 10 - issue_count)
    passed = issue_count == 0

    # Build summary
    if passed:
        summary = "Keine Formatierungsfehler gefunden."
    else:
        issue_types = list({i["type"] for i in all_issues})
        summary = f"{issue_count} Formatierungsfehler: {', '.join(issue_types)}"

    # Format issues for output
    formatted_issues = []
    for issue in all_issues:
        if issue["type"] in ["en_dash", "em_dash"]:
            formatted_issues.append(
                f"Gedankenstrich ({issue['code']}) an Position {issue['position']}: {issue.get('context', '')}"
            )
        elif issue["type"] in ["bold", "italic", "header"]:
            formatted_issues.append(f"Markdown ({issue['type']}): {issue.get('match', '')}")
        elif issue["type"] in ["emoji", "bullet_emoji"]:
            formatted_issues.append(f"Emoji gefunden: {issue['char']}")
        elif issue["type"] == "hashtag":
            formatted_issues.append(f"Hashtag gefunden: {issue['match']}")
        elif issue["type"] == "ausrufezeichen":
            formatted_issues.append(f"Zu viele Ausrufezeichen: {issue['count']} (max {issue['max_allowed']})")

    return {
        "passed": passed,
        "score": score,
        "issues": formatted_issues,
        "issues_detail": all_issues,
        "summary": summary,
        "rules_applied": rules,
    }


# Convenience function for direct testing
def check_order_formatting(order_id: int) -> dict[str, Any]:
    """
    Check formatting for a content order's latest version.

    Args:
        order_id: The content order ID

    Returns:
        Formatting check results
    """
    import json

    from db import db

    db.connect()

    try:
        # Get latest version content
        cursor = db.execute(
            """
            SELECT cv.content, co.structure_id, co.author_profile_id,
                   cs.content as structure_config, cp.content as profile_config
            FROM content_versions cv
            JOIN content_orders co ON cv.order_id = co.id
            LEFT JOIN content_config cs ON co.structure_id = cs.id
            LEFT JOIN content_config cp ON co.author_profile_id = cp.id
            WHERE cv.order_id = %s
            ORDER BY cv.version_number DESC LIMIT 1
            """,
            (order_id,),
        )
        row = cursor.fetchone()
        cursor.close()

        if not row:
            return {"error": f"Order {order_id} not found"}

        # Parse content
        content_data = json.loads(row["content"]) if isinstance(row["content"], str) else row["content"]
        text = content_data.get("text", "")

        # Parse configs
        structure_config = None
        if row.get("structure_config"):
            structure_config = (
                json.loads(row["structure_config"])
                if isinstance(row["structure_config"], str)
                else row["structure_config"]
            )

        profile_config = None
        if row.get("profile_config"):
            profile_config = (
                json.loads(row["profile_config"]) if isinstance(row["profile_config"], str) else row["profile_config"]
            )

        return check_formatting(text, structure_config, profile_config)

    finally:
        db.disconnect()
← Übersicht