import hashlib, json, os, re, tempfile, threading, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import ConnectTimeout, ReadTimeout, ConnectionError

from ffxiv_aku import readJsonFile, writeJsonFile

# ── Config ────────────────────────────────────────────────────────────────────

ASIA_LANGS            = ["chs", "ko", "tc"]
ASIA_BASE             = "https://xivapi-v2.xivcdn.com/api/sheet"
ASIA_OUT_DIR          = "../xivapi_asia_data"
ASIA_EXPECTED_SCHEMA  = "exdschema@2:rev:1af4503931f870c0bfe133490538e1acde13ad5e"
ASIA_EXPECTED_VERSION = "2025122300000000"

GLOBAL_LANGS   = ["de", "en", "fr", "ja"]
GLOBAL_BASE    = "https://xivapi.akurosia.de/api/1/sheet"
GLOBAL_OUT_DIR = "../xivapi_data"

WORKERS          = 24
WORKERS_ASIA     = int(os.environ.get("XIVAPI_ASIA_WORKERS", WORKERS))
PRIORITY_SHEETS  = {"Item", "Action", "Quest", "GatheringPoint", "Level", "ENpcBase"}
PRIORITY_WORKERS = 6

PATCH_ONLY_SHEETS = {s.strip() for s in os.environ.get("XIVAPI_PATCH_ONLY", "").split(",") if s.strip()}

# ── Logging ───────────────────────────────────────────────────────────────────

_PRINT_LOCK = threading.Lock()

def log(msg: str) -> None:
    with _PRINT_LOCK:
        print(msg, flush=True)

# ── HTTP ──────────────────────────────────────────────────────────────────────

_HTTP_SEM     = threading.BoundedSemaphore(int(os.environ.get("XIVAPI_HTTP_CONCURRENCY", "12")))
_thread_local = threading.local()

def _session() -> requests.Session:
    if not hasattr(_thread_local, "s"):
        s = requests.Session()
        a = HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=0)
        s.mount("http://", a); s.mount("https://", a)
        _thread_local.s = s
    return _thread_local.s

def _fetch(url: str, retries: int = 10) -> dict:
    for attempt in range(retries):
        try:
            with _HTTP_SEM:
                r = _session().get(url, timeout=120)
            r.raise_for_status()
            return r.json()
        except requests.HTTPError as e:
            # Retry 5xx server errors; raise 4xx immediately
            if e.response is not None and e.response.status_code < 500:
                raise
            if attempt + 1 >= retries:
                raise
            log(f"[HTTP] {e.response.status_code} server error, retry {attempt+1}/{retries}: {url}")
            time.sleep(2 ** min(attempt, 4))
        except (ConnectTimeout, ReadTimeout, ConnectionError):
            if attempt + 1 >= retries:
                raise
            log(f"[HTTP] timeout, retry {attempt+1}/{retries}: {url}")
            time.sleep(1)

# ── Disk cache (avoids re-fetching the same URL within a run) ─────────────────

class Cache:
    def __init__(self, path: str, ttl: int = 600):
        self.root = Path(path)
        self.ttl  = ttl
        self.root.mkdir(parents=True, exist_ok=True)

    def _p(self, key: str) -> Path:
        h = hashlib.sha256(key.encode()).hexdigest()
        return self.root / h[:2] / f"{h}.json"

    def get(self, key: str):
        p = self._p(key)
        try:
            if self.ttl > 0 and time.time() - p.stat().st_mtime > self.ttl:
                return None
            return json.loads(p.read_text(encoding="utf-8"))
        except Exception:
            return None

    def set(self, key: str, value) -> None:
        p = self._p(key)
        if p.exists():
            try:
                if json.loads(p.read_text(encoding="utf-8")) == value:
                    os.utime(p, None); return
            except Exception:
                pass
        p.parent.mkdir(parents=True, exist_ok=True)
        fd, tmp = tempfile.mkstemp(dir=str(p.parent), suffix=".tmp")
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(value, f, ensure_ascii=False)
        os.replace(tmp, p)

    def get_or_set(self, key: str, fn):
        v = self.get(key)
        if v is None:
            v = fn(); self.set(key, v)
        return v

_CACHE = Cache(
    os.environ.get("XIVAPI_CACHE_DIR", "./.xivapi_http_cache"),
    int(os.environ.get("XIVAPI_CACHE_TTL", "600")),
)

# ── Atomic JSON write ─────────────────────────────────────────────────────────

def _write_json(path: str, obj) -> None:
    try:
        if readJsonFile(path) == obj: return
    except Exception:
        pass
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
    tmp = path + ".tmp"
    writeJsonFile(tmp, obj, keys_per_line=True)
    os.replace(tmp, path)

# ── Schema/version stamps ─────────────────────────────────────────────────────

def _stamp_path(out_dir: str, name: str) -> str:
    return os.path.join(out_dir, ".stamps", name)

def _load_stamp(out_dir: str, name: str) -> dict:
    try:
        d = readJsonFile(_stamp_path(out_dir, name))
        return d if isinstance(d, dict) else {}
    except Exception:
        return {}

def _save_stamp(out_dir: str, name: str, data: dict) -> None:
    p = _stamp_path(out_dir, name)
    os.makedirs(os.path.dirname(p), exist_ok=True)
    tmp = p + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)
    os.replace(tmp, p)

def _needs_refetch(base: str, out_dir: str, stamp_name: str, sheet: str = "Action",
                   expected: dict | None = None) -> tuple[bool, dict]:
    """Fetch remote meta and decide if sheets need re-downloading."""
    url  = f"{base.rstrip('/')}/{sheet}?limit=1"
    meta = _CACHE.get_or_set(url, lambda: _fetch(url, retries=5)) or {}
    cur  = {k: str(meta.get(k, "")) for k in ("schema", "version")}
    sav  = {k: str(_load_stamp(out_dir, stamp_name).get(k, "")) for k in ("schema", "version")}

    if expected:
        refetch = not (cur["schema"] == expected["schema"] and cur["version"] == expected["version"])
    else:
        refetch = bool(sav["schema"] and (cur["schema"] != sav["schema"] or cur["version"] != sav["version"]))

    log(f"[META] {base.split('/')[2]} refetch={'yes' if refetch else 'no'} schema={cur['schema']} version={cur['version']}")
    return refetch, cur

# ── Field normalization ───────────────────────────────────────────────────────

_LANG_RE    = re.compile(r"^(.*?)@lang\((.*?)\)$")
SOFT_HYPHEN = "\u00AD"

def _norm_key(k: str) -> str:
    k = k.replace(SOFT_HYPHEN, "").replace("[p]", "")
    m = _LANG_RE.match(k)
    return (f"{m.group(1)}_{m.group(2)}" if m else k).replace("@as(html)", "")

def _norm(d):
    if isinstance(d, str):   return d.replace(SOFT_HYPHEN, "").replace("[p]", "")
    if isinstance(d, int):   return str(d)
    if isinstance(d, list):  return [_norm(x) for x in d]
    if not isinstance(d, dict): return d
    fields    = {_norm_key(k): _norm(v) for k, v in d.get("fields",   {}).items()}
    transient = {_norm_key(k): _norm(v) for k, v in d.get("transient", {}).items()}
    merged = dict(fields)
    for k, v in transient.items():
        nk = k
        while nk in merged: nk = "t" + nk
        merged[nk] = v
    # Skip API envelope keys — row_id is already the dict key in the output
    # Strip envelope keys only for top-level API rows (those that have a "fields" key).
    # Nested reference objects also have row_id but it is meaningful data, not an envelope.
    _ENVELOPE = {"score", "fields", "transient"}
    for k, v in d.items():
        if k not in _ENVELOPE:
            merged[_norm_key(k)] = _norm(v)
        # For reference objects, enforce consistent key order: value, sheet, row_id first
    if "value" in merged and "sheet" in merged:
        priority = ["value", "sheet", "row_id"]
        merged = {k: merged[k] for k in priority if k in merged} | {k: v for k, v in merged.items() if k not in priority}
    return merged

# ── Field query building ──────────────────────────────────────────────────────

# Icon/ID fields: always fetched as plain values, no language expansion
_ICON_FIELDS = {
    "Id","Icon","Image","IconLarge","IconSmall","IconLarge2","IconSmall2","IconMain","IconOff",
    "IconMap","IconDutyFinder","IconReputation","UiIcon","MapIcon","PlaceNameIcon","PlaceNameRegionIcon",
    "Name.Icon","Icon1","Icon2","TopImage","BottomImage","StampIcon","QuestRedoUIWide","QuestRedoUILarge",
    "QuestRedoUISmall","ScreenImage.Image","IconObjective0","IconObjective1",
    "TerritoryType.Map.Id","Map.Id","Index","SortKey","Excellent","FailImage","Good","Great","Poor",
}

# Non-translatable fields per sheet
IGNORELIST = {
    "Achievement": ["AchievementCategory.Order","Icon","Points","Type"],
    "AchievementCategory": ["AchievementKind.Order"],
    "AchievementKind": ["Order"],
    "Action": ["AdditionalCooldownGroup","Cast100ms","ClassJobLevel","CooldownGroup","EffectRange","Icon","IsPlayerAction","IsPvP","IsRoleAction","PrimaryCostType","PrimaryCostValue","Range","Recast100ms","SecondaryCostType","StatusGainSelf.Icon"],
    "Adventure": ["Level.X","Level.Y","Level.Z","IconDiscovered.id","IconList.id","IconUndiscovered.id","Emote.Icon.id"],
    "AozActionTransient": ["Number"],
    "BNpcName": ["Pronoun","PossessivePronoun","Article","Adjective"],
    "ENpcResident": ["Pronoun","PossessivePronoun","Article","Adjective"],
    "ENpcBase": ["ENpcData[].Shop[].Item[].Item[].Icon.id"],
    "EObjName": ["Pronoun","PossessivePronoun","Article","Adjective"],
    "BuddySkill": ["BuddyLevel","IsActive"],
    "ClassJob": ["JobIndex","PartyBonus","Unknown11","JobType"],
    "ClassJobCategory": ["ACN","ADV","ALC","ARC","ARM","AST","BLM","BLU","BRD","BSM","BTN","CNJ","CRP","CUL","DNC","DRG","DRK","FSH","GLA","GNB","GSM","LNC","LTW","MCH","MIN","MNK","MRD","NIN","PCT","PGL","PLD","RDM","ROG","RPR","SAM","SCH","SGE","SMN","THM","Unknown0","Unknown1","Unknown2","VPR","WAR","WHM","WVR"],
    "ContentFinderCondition": ["AllianceRoulette","ClassJobLevelRequired","ClassJobLevelSync","Content.sheet","ContentMemberType.HealersPerParty","ContentMemberType.MeleesPerParty","ContentMemberType.RangedPerParty","ContentMemberType.TanksPerParty","ExpertRoulette","FeastTeamRoulette","GuildHestRoulette","HighLevelRoulette","ItemLevelRequired","ItemLevelSync","LevelCapRoulette","LevelingRoulette","MSQRoulette","MentorRoulette","TerritoryType.Map.MapMarkerRange","TerritoryType.Map.Id","TerritoryType.Map.SizeFactor","TerritoryType.Map.OffsetX","TerritoryType.Map.OffsetY","NormalRaidRoulette","TrialRoulette","SortKey","Icon","mage"],
    "ContentMemberType": ["HealersPerParty","MeleesPerParty","RangedPerParty","TanksPerParty"],
    "CraftAction": ["ClassJobLevel","Icon"],
    "CraftLeve": ["ItemCount[]","Repeats"],
    "DynamicEvent": ["EventType"],
    "GilShopItem": ["Item.value","Item.PriceMid"],
    "DeepDungeon": ["PomanderSlot[].Icon"],
    "DeepDungeonStatus": ["FloorEffectUI.Icon"],
    "DeepDungeon4GimmickEffect": ["StatusIcon"],
    "EurekaMagiaAction": ["Action.StatusGainSelf.Icon","MaxUses"],
    "Fate": ["ClassJobLevel","ClassJobLevelMax"],
    "FishingSpot": ["GatheringLevel","Radius","Item[].Icon","Item[].LevelItem.value","Rare"],
    "GatheringPoint": ["GatheringPointBase.GatheringType.IconMain","GatheringPointBase.GatheringType.IconOff","GatheringPointBase.GatheringLevel","GatheringPointBase.Item[].GatheringItemLevel","GatheringPointBase.Item[].Item.Icon","GatheringRarePopTimeTable.StartTime"],
    "HousingMapMarkerInfo": ["X","Y","Z","Map.MapMarkerRange","Map.Id"],
    "HousingLandSet": ["LandSet[].PlotSize","LandSet[].InitialPrice"],
    "InstanceContent": ["BGM.File"],
    "Item": ["BaseParamValueSpecial[]","BaseParamValue[]","Block","BlockRate","CanBeHq","DamageMag","DamagePhys","DefenseMag","DefensePhys","IsAdvancedMeldingPermitted","IsUnique","IsUntradable","ItemUICategory.Icon","LevelEquip","LevelItem","MateriaSlotCount","Rarity"],
    "Leve": ["AllowanceCost","ClassJobLevel","ExpReward","GilReward","IconCityState", "IconIssuer"],
    "Level": ["Map","Territory","X","Y","Z"],
    "Map": ["OffsetX","OffsetY","SizeFactor","Id","MapMarkerRange"],
    "MapMarker": ["Icon","X","Y","SubtextOrientation"],
    "MirageStoreSetItem": ["Body.Icon","Bracelets.Icon","Earrings.Icon","Feet.Icon","Hands.Icon","Head.Icon","Legs.Icon","MainHand.Icon","Necklace.Icon","OffHand.Icon","Ring.Icon"],
    "MirageStoreSetItemLookup": ["Item[].Icon"],
    "OrchestrionPath": ["File"],
    "Quest": ["ClassJobLevel","EventIconType","GilReward","IconSpecial","InstanceContentUnlock.row_id","IsRepeatable","IssuerLocation.Map.OffsetX","IssuerLocation.Map.OffsetY","IssuerLocation.Map.SizeFactor","IssuerLocation.Radius","IssuerLocation.X","IssuerLocation.Y","IssuerLocation.Z","IssuerStart.Pronoun","ItemCountReward[]","IssuerLocation.Map.Id","ItemRewardType","OptionalItemCountReward[]","OptionalItemIsHQReward[]","QuestParams"],
    "Status": ["Icon","StatusCategory","MaxStacks"],
    "SubmarineExploration": ["RankReq"],
    "SpecialShop": ["Item[].Item@as(raw)","Item[].CurrencyCost","Item[].ItemCost@as(raw)","UseCurrencyType"],
    "SpearfishingNotebook": ["X","Y","GatheringPointBase.GatheringLevel","Radius","GatheringPointBase.Item[].Item.Icon","GatheringPointBase.Item[].Item.LevelItem.value","IsShadowNode"],
    "TerritoryType": ["Bg","Name","Map.MapMarkerRange"],
    "Trait": ["Level"],
    "TreasureHuntRank": ["MaxPartySize","TreasureHuntTexture"],
    "TreasureHuntTexture": ["Unknown0"],
    "TreasureSpot": ["Location.Map.OffsetX","Location.Map.OffsetY","Location.Map.SizeFactor","Location.X","Location.Y","Location.Z"],
    "GCScripShopCategory": ["SubCategory","Tier"],
    "GCScripShopItem": ["CostGCSeals","RequiredGrandCompanyRank"],
    "FccShop": ["ItemData[].Cost","ItemData[].FCRankRequired@as(raw)"],
    "BuddyEquip": ["IconBody","IconHead","IconLegs"],
    "ContentsTutorial": ["Page.Image"],
    "DawnQuestMember": ["BigImageNew","BigImageOld"],
    "QuestRedoChapterUI": ["QuestRedoUILarge", "QuestRedoUISmall", "QuestRedoUIWide"],
}

def _field_name(spec) -> str | None:
    if isinstance(spec, dict): return next(iter(spec), None)
    if isinstance(spec, list): return spec[0] if spec else None
    return spec

def _expected_keys(fields: list, sheet: str, langs: list[str]) -> set[str]:
    """Return the set of top-level row keys to keep after normalization."""
    ignore = set(IGNORELIST.get(sheet, []))
    keys = set()
    for spec in fields:
        is_transient = (
            isinstance(spec, list) and len(spec) >= 2 and spec[1] == "transient"
        )
        name = _field_name(spec[0] if is_transient else spec)
        if not name:
            continue
        bare = re.split(r"[\[.]", name)[0]
        if "[" in name or "." in name:
            keys.add(bare)
        elif name in _ICON_FIELDS or name in ignore:
            keys.add(_norm_key(name).replace("[]", ""))
        else:
            for l in langs:
                keys.add(_norm_key(f"{name}@lang({l})").replace("[]", ""))
    return keys

def _build_fields_param(fields: list, sheet: str, langs: list[str]) -> str:
    ignore = set(IGNORELIST.get(sheet, []))
    parts, transient_parts = [], []
    for spec in fields:
        is_transient = isinstance(spec, list) and len(spec) >= 2 and spec[1] == "transient"
        name = _field_name(spec[0] if is_transient else spec)
        if not name: continue
        target = transient_parts if is_transient else parts
        if name in _ICON_FIELDS or name in ignore:
            target.append(name)
        else:
            target.extend(f"{name}@lang({l})" for l in langs)
    result = ",".join(parts)
    if transient_parts:
        result += "&transient=" + ",".join(transient_parts)
    return result

def _build_rename_map(fields: list) -> tuple[dict, dict]:
    frename, trename = {}, {}
    for spec in fields:
        is_transient = isinstance(spec, list) and len(spec) >= 2 and spec[1] == "transient"
        raw = spec[0] if is_transient else spec
        if isinstance(raw, dict) and len(raw) >= 2:
            keys = list(raw)
            (trename if is_transient else frename)[keys[0]] = keys[1]
    return frename, trename

def _apply_renames(rows: dict, frename: dict, trename: dict) -> None:
    for row in rows.values():
        for old, new in frename.items():
            for key in list(row):
                if key == old or key.startswith(old + "_"):
                    row[new + key[len(old):]] = row.pop(key)
        for old, new in trename.items():
            t = "t" + old
            for key in list(row):
                if key == old or key.startswith(old + "_"):
                    row[new + key[len(old):]] = row.pop(key)
                elif key == t or key.startswith(t + "_"):
                    row[new + key[len(t):]] = row.pop(key)

# ── Paginated API fetch (adaptive page size) ──────────────────────────────────

_LIMITS     = [20000,15000,10000,5000,2500,1000,500,250,150,100,75,50,25,10,5,3,2,1]
_HINTS:dict = {}
_HINTS_DIRTY = False
_HINTS_LOCK  = threading.Lock()
_HINTS_PATH  = os.environ.get("XIVAPI_LIMIT_HINTS", "./.xivapi_limit_hints.json")

def _load_hints():
    global _HINTS
    try: _HINTS = json.loads(Path(_HINTS_PATH).read_text(encoding="utf-8"))
    except Exception: _HINTS = {}

def _flush_hints():
    global _HINTS_DIRTY
    with _HINTS_LOCK:
        if _HINTS_DIRTY:
            tmp = _HINTS_PATH + ".tmp"
            with open(tmp, "w", encoding="utf-8") as f:
                json.dump(_HINTS, f, indent=2, sort_keys=True)
            os.replace(tmp, _HINTS_PATH)
            _HINTS_DIRTY = False

def _get_limit(key: str) -> int:
    v = _HINTS.get(key)
    try: return int(v) if int(v) in _LIMITS else _LIMITS[0]
    except Exception: return _LIMITS[0]

def _set_limit(key: str, limit: int):
    global _HINTS_DIRTY
    with _HINTS_LOCK: _HINTS[key] = limit; _HINTS_DIRTY = True

def _with_limit(url: str, limit: int) -> str:
    url = re.sub(r"([?&])limit=\d+&?", r"\1", url).rstrip("&?")
    return url + ("&" if "?" in url else "?") + f"limit={limit}"

def _fetch_all_rows(base_url: str, hint_key: str) -> dict:
    """Paginate through all rows, auto-adjusting page size on API errors."""
    all_rows: dict = {}
    after, prev_after = None, None
    limit = _get_limit(hint_key)

    while True:
        page_url = base_url + (f"&after={after}" if after is not None else "")

        # Fetch one page, stepping down limit on API errors. Returns raw response dict.
        def _fetch_page(url, lim):
            while True:
                try:
                    return _fetch(_with_limit(url, lim))
                except requests.HTTPError as e:
                    r = getattr(e, "response", None)
                    if r is None: raise
                    if r.status_code == 400:
                        try:
                            if "over 20,000 rows" in str(r.json().get("message", "")):
                                idx = _LIMITS.index(lim) if lim in _LIMITS else 0
                                if idx + 1 >= len(_LIMITS): raise
                                lim = _LIMITS[idx + 1]
                                _set_limit(hint_key, lim)
                                log(f"[PAGE] limit -> {lim} ({hint_key})")
                                continue
                        except (ValueError, AttributeError): pass
                    if r.status_code == 404:
                        try:
                            m = re.search(r"/(\d+):(\d+)\s+could not be found", str(r.json().get("message", "")))
                            if m:
                                safe = int(m.group(1)) - (int(after) if after is not None else -1) - 1
                                if safe <= 0: return {"rows": []}
                                if safe < lim:
                                    lim = int(safe)
                                    log(f"[PAGE] missing row -> limit {lim} ({hint_key})")
                                    continue
                        except Exception: pass
                        # 404 we could not interpret = no more rows on this page
                        return {"rows": []}
                    raise

        # Fetch (bypassing cache here since limit may shift mid-fetch)
        data  = _fetch_page(page_url, limit)
        rows  = data.get("rows", [])
        if not rows: break

        # Cache the successful page result for reruns
        _CACHE.set(_with_limit(page_url, limit), data)

        has_sub = any(r.get("subrow_id", 0) for r in rows)
        for row in rows:
            key = (f'{row["row_id"]}.{row["subrow_id"]}' if has_sub else str(row["row_id"]))
            normed = _norm(row)
            normed["row_id"] = str(row["row_id"])
            if has_sub:
                normed["subrow_id"] = str(row["subrow_id"])
            all_rows[key] = normed
        after = rows[-1]["row_id"]
        if after == prev_after: break
        prev_after = after

    return all_rows

def _get_sheet(base: str, sheet: str, fields: list, langs: list[str]) -> dict:
    url = (f"{base.rstrip('/')}/{sheet}?fields={_build_fields_param(fields, sheet, langs)}")
    rows = _fetch_all_rows(url, f"sheet:{sheet}")
    keep = _expected_keys(fields, sheet, langs)
    return {rid: {k: v for k, v in row.items() if k in keep or k in ("row_id", "subrow_id")} for rid, row in rows.items()}

# ── Asia lang merging ─────────────────────────────────────────────────────────

_ASIA_SUFFIXES = ("_chs", "_ko", "_tc")
_BASE_LANG_RE  = re.compile(r"^(.*)_(de|en|fr|ja|chs|ko|tc)$")
_ASIA_KEY_RE   = re.compile(r"^(.*)_(chs|ko|tc)$")

def _is_asia_key(k: str) -> bool:
    return isinstance(k, str) and k.endswith(_ASIA_SUFFIXES)

def _merge_asia(dst: dict, src: dict) -> dict:
    for k, v in src.items():
        if isinstance(v, dict):
            dst[k] = _merge_asia(dst.get(k, {}), v)
        elif _is_asia_key(k):
            dst[k] = v
    # Re-insert Asia keys in fixed order: chs, ko, tc
    for suffix in ("_chs", "_ko", "_tc"):
        for k in [k for k in list(dst) if k.endswith(suffix)]:
            dst[k] = dst.pop(k)
    return dst

def _collect_asia_keys(d, out: set):
    if isinstance(d, dict):
        for k, v in d.items():
            if _is_asia_key(k): out.add(k)
            _collect_asia_keys(v, out)

def _fill_missing_asia_keys(d: dict, keys: set):
    if not isinstance(d, dict): return
    bases = {m.group(1) for k in d if (m := _BASE_LANG_RE.match(k))}
    for k in keys:
        m = _ASIA_KEY_RE.match(k)
        if k not in d and m and m.group(1) in bases: d[k] = ""
    for v in d.values(): _fill_missing_asia_keys(v, keys)

# ── Per-sheet processing ──────────────────────────────────────────────────────

def _load_or_fetch(out_dir: str, sheet: str, base: str, fields: list,
                   langs: list[str], refetch: bool) -> dict:
    path = os.path.join(out_dir, f"{sheet}.json")
    if not refetch and os.path.exists(path):
        try:
            data = readJsonFile(path)
            if isinstance(data, dict): return data
        except Exception:
            pass
    try:
        rows = _get_sheet(base, sheet, fields, langs)
    except requests.HTTPError as e:
        status = e.response.status_code if e.response is not None else 0
        # Skip silently on 404 (sheet missing) or 5xx (broken endpoint) — only for Asia
        # For global we always want to know about failures
        if status == 404 or (status >= 500 and base == ASIA_BASE):
            log(f"[SKIP] {sheet} skipped on {base.split('/')[2]} (HTTP {status})")
            return {}
        raise
    _write_json(path, rows)
    return rows

def _process_sheet(sheet: str, fields: list, translations: dict,
                   asia_refetch: bool, global_refetch: bool) -> None:
    t0 = time.time()

    # 1. Get Asia data (load cache or fetch if schema/version changed)
    asia_rows = _load_or_fetch(ASIA_OUT_DIR, sheet, ASIA_BASE, fields, ASIA_LANGS, asia_refetch)

    # 2. Get Global data — patch-only sheets skip re-fetch and keep the existing file
    out_path = os.path.join(GLOBAL_OUT_DIR, f"{sheet}.json")
    if sheet in PATCH_ONLY_SHEETS and os.path.exists(out_path):
        try:
            results = readJsonFile(out_path)
            log(f"[PATCH-ONLY] {sheet}")
        except Exception:
            results = _get_sheet(GLOBAL_BASE, sheet, fields, GLOBAL_LANGS)
    else:
        results = _load_or_fetch(GLOBAL_OUT_DIR, sheet, GLOBAL_BASE, fields, GLOBAL_LANGS, global_refetch)

    # 3. Merge Asia lang keys into global results
    asia_keys: set[str] = set()
    for rid, arow in asia_rows.items():
        results.setdefault(rid, {})
        results[rid] = _merge_asia(results[rid], arow)
        _collect_asia_keys(arow, asia_keys)
    if asia_keys:
        for row in results.values():
            _fill_missing_asia_keys(row, asia_keys)

    # 4. Apply any additional translation data
    for key, extra in (translations.get(sheet) or {}).items():
        results.setdefault(key, {}).update(extra)

    # 5. Apply field renames and write
    frename, trename = _build_rename_map(fields)
    _apply_renames(results, frename, trename)
    # Ensure Asia lang keys are always ordered chs, ko, tc
    _ASIA_ORDER = {"chs": 0, "ko": 1, "tc": 2}
    def _sort_row(row):
        if not isinstance(row, dict):
            return row
        return {k: _sort_row(v) for k, v in sorted(row.items(), key=lambda x: (
            _ASIA_ORDER.get(x[0].rsplit("_", 1)[-1], -1) if "_" in x[0] and x[0].rsplit("_", 1)[-1] in _ASIA_ORDER else -2,
            x[0]
        ))}
    results = {rid: _sort_row(row) for rid, row in results.items()}
    _write_json(out_path, results)
    log(f"[DONE] {sheet} ({time.time() - t0:.1f}s)")

# ── Main ──────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    _load_hints()

    _input       = readJsonFile("./translation_filenames.json")
    translations = readJsonFile("./get_xivapi_data_additional.json")

    ONLY = {"Item", "Action", "Quest", "GatheringPoint", "Level"}
    ONLY = {"ENpcResident", "EObjName"}

    ONLY = {"Action","Status","LogMessage","BNpcName","NpcYell","InstanceContentTextData"}
    #ONLY = {"Status","LogMessage","BNpcName","NpcYell","InstanceContentTextData"}
    #PATCH_ONLY_SHEETS = ONLY

    ONLY = {"ENpcBase"}
    ONLY = {}
    #ONLY = {"AozActionTransient","BuddyEquip","ContentRoulette","ContentsTutorial","DawnQuestMember","Leve","TreasureHuntRank","QuestRedoChapterUI"}
    FORCE_REFRESH = True

    seen, tasks = set(), []
    for _, data in _input.items():
        for sheet, fields in data.items():
            if sheet not in seen and (not ONLY or sheet in ONLY):
                seen.add(sheet); tasks.append((sheet, fields))

    for d in [GLOBAL_OUT_DIR, ASIA_OUT_DIR,
              os.path.join(GLOBAL_OUT_DIR, ".stamps"),
              os.path.join(ASIA_OUT_DIR,   ".stamps")]:
        os.makedirs(d, exist_ok=True)

    asia_refetch,   asia_meta   = _needs_refetch(ASIA_BASE,   ASIA_OUT_DIR,   "_run_asia.json",
                                                  expected={"schema": ASIA_EXPECTED_SCHEMA, "version": ASIA_EXPECTED_VERSION})
    global_refetch, global_meta = _needs_refetch(GLOBAL_BASE, GLOBAL_OUT_DIR, "_run_global.json")
    if FORCE_REFRESH:
        asia_refetch = True
        global_refetch = True

    prio = [(s, f) for s, f in tasks if s in PRIORITY_SHEETS]
    rest = [(s, f) for s, f in tasks if s not in PRIORITY_SHEETS]
    log(f"Processing {len(tasks)} sheets | priority: {[s for s,_ in prio]}")

    failures = []
    def _run(sheet, fields):
        try:
            _process_sheet(sheet, fields, translations, asia_refetch, global_refetch)
        except Exception as e:
            failures.append(f"{sheet}: {e}")
            log(f"[FAIL] {sheet}: {e}")

    if prio:
        log(f"[PRIORITY] running {len(prio)} priority sheets first")
        with ThreadPoolExecutor(max_workers=min(PRIORITY_WORKERS, len(prio))) as px:
            for fut in as_completed([px.submit(_run, s, f) for s, f in prio]):
                fut.result()

    log(f"[REST] running {len(rest)} remaining sheets")
    with ThreadPoolExecutor(max_workers=WORKERS) as rx:
        for fut in as_completed([rx.submit(_run, s, f) for s, f in rest]):
            fut.result()

    _save_stamp(ASIA_OUT_DIR,   "_run_asia.json",   asia_meta)
    _save_stamp(GLOBAL_OUT_DIR, "_run_global.json", global_meta)
    _flush_hints()

    if failures:
        log("\nFailed sheets:")
        for f in failures: log(f"  - {f}")
