"""
Sync products (SPU + SKU) from Cin7 to yudao

SPU <-> SKU relationship in yudao:
  - product_spu.id = Cin7 product.id (NOT AUTO_INCREMENT)
  - product_sku.id = Cin7 productOptions[].id (NOT AUTO_INCREMENT)
  - product_sku.spu_id = Cin7 product.id (direct, no foreign key needed)
  - product_spu.price = min(product_sku.price)
  - product_spu.stock = sum(product_sku.stock)

Cin7 product fields: id, name, description, brand, category, images, ...
Cin7 productOptions fields: id, productId, code, productOptionBarcode, retailPrice,
  wholesalePrice, stockOnHand, option1/2/3, optionWeight, image, priceColumns{costAUD}, ...
"""
import json
import logging
import os
import threading
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

from db import query_one, query_all, execute_one, insert_and_get_id
from sync_brand import BRAND_NAME_TO_ID_MAP
from sync_category import CATEGORY_NAME_TO_ID_MAP
from config import TENANT_ID


# ── 日志配置 ──────────────────────────────────────────────────────────────────
_LOG_FILE  = os.path.join(os.path.dirname(__file__), "sync_spu.log")
_ERROR_LOG = os.path.join(os.path.dirname(__file__), "sync_errors.jsonl")

def _setup_logger():
    logger = logging.getLogger("sync_spu")
    if logger.handlers:          # 避免重复添加
        return logger
    logger.setLevel(logging.DEBUG)
    fmt = logging.Formatter(
        "%(asctime)s [%(levelname)s] [T%(thread)d] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    # 写文件（DEBUG 及以上全部保存）
    fh = logging.FileHandler(_LOG_FILE, encoding="utf-8")
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(fmt)
    # 控制台（INFO 及以上）
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    ch.setFormatter(fmt)
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger

log = _setup_logger()


# ── 全局缓存 ──────────────────────────────────────────────────────────────────
_PROP_LOCK = threading.Lock()
_PROP_NAME_TO_ID   = {}
_PROPVAL_NAME_TO_ID = {}

# 固定 level id 映射（从数据库 product_sku.levels 实际数据反推，id 为常量）
# 不依赖 product_member_level 表查询
_FIXED_LEVELS = [
    (1,  "retailAUD",      "retailAUD"),
    (2,  "vipaud",         "vipPrice"),       # 来自 opt/sp.vipPrice，不在 priceColumns
    (3,  "wholesale65AUD", "wholesale65AUD"),
    (4,  "wholesale60AUD", "wholesale60AUD"),
    (5,  "partnerAUD",     "partnerAUD"),
    (6,  "costAUD",        "costAUD"),
    (7,  "specialPrice",   "specialPrice"),
    (8,  "tier1AUD",       "tier1AUD"),
    (9,  "tier2AUD",       "tier2AUD"),
    (10, "tier3AUD",       "tier3AUD"),
]


# ── 辅助函数 ──────────────────────────────────────────────────────────────────
def _float(val, default=0.0):
    try:
        return float(val)
    except (ValueError, TypeError):
        return default

def _int(val, default=0):
    try:
        return int(val)
    except (ValueError, TypeError):
        return default

def _str(val):
    if val is None:
        return ""
    return str(val).strip()


def _log_error(cin7_id, product_name, exc, sp=None):
    """将一条错误追加写入 sync_errors.jsonl。"""
    record = {
        "ts":      datetime.now().isoformat(),
        "cin7_id": cin7_id,
        "name":    product_name,
        "error":   str(exc),
    }
    if sp:
        record["product_data"] = sp
    with open(_ERROR_LOG, "a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")
    log.error("product %s '%s' failed: %s", cin7_id, product_name, exc)


# ── Level 计算（固定 id，无需查表）────────────────────────────────────────────
def _ensure_levels_loaded():
    """兼容性占位，固定映射无需加载，直接返回。"""
    pass


def _compute_levels_json(opt, sp):
    """
    根据 Cin7 价格列计算 levels JSON（不写库，不查表）。
    level id 为固定常量（从数据库实际数据反推）：
      1=retailAUD, 2=vipaud, 3=wholesale65AUD, 4=wholesale60AUD,
      5=partnerAUD, 6=costAUD, 7=specialPrice, 8=tier1AUD, 9=tier2AUD, 10=tier3AUD
    返回 JSON 字符串（始终返回，value=0 表示该价格为空）。
    """
    price_cols = (opt or {}).get("priceColumns") or sp.get("priceColumns") or {}

    def to_fen(dollar_val):
        if dollar_val is None or dollar_val == "":
            return 0
        try:
            return int(float(dollar_val) * 100)
        except (ValueError, TypeError):
            return 0

    levels = []
    for lid, name_db, cin7_key in _FIXED_LEVELS:
        if cin7_key == "vipPrice":
            # vipPrice 在 opt/sp 根节点，不在 priceColumns
            val = (opt or {}).get("vipPrice") or sp.get("vipPrice")
        else:
            val = price_cols.get(cin7_key)
        levels.append({"id": lid, "name": name_db, "icon": "", "value": to_fen(val)})

    result = json.dumps(levels)
    log.debug(
        "[levels] opt_id=%s price_cols_keys=%s → %s",
        (opt or {}).get("id"), list(price_cols.keys()), result[:80],
    )
    return result


def _update_sku_levels(sku_id, sp):
    """单条 SKU levels 写库（仅 _upsert_single_sku 路径使用）。"""
    options = sp.get("productOptions") or []
    opt = next((o for o in options if o.get("id") == sku_id), None)
    levels_json = _compute_levels_json(opt, sp)
    execute_one(
        "UPDATE product_sku SET levels=%s, update_time=NOW() WHERE id=%s",
        (levels_json, sku_id),
    )
    log.debug("[sku] levels updated sku_id=%s", sku_id)


# ── SKU 属性辅助 ───────────────────────────────────────────────────────────────
def _update_sku_props(sku_id, properties):
    """仅更新 properties 字段（保留 pic_url 等手动字段）。"""
    if properties:
        execute_one("UPDATE product_sku SET properties=%s WHERE id=%s", (properties, sku_id))

def _update_sku_props_fast(sku_id, properties):
    if properties:
        execute_one("UPDATE product_sku SET properties=%s WHERE id=%s", (properties, sku_id))


# ── 核心同步：SPU ──────────────────────────────────────────────────────────────
def _sync_spu(sp, tenant_id=TENANT_ID):
    """同步一个 Cin7 product → product_spu + product_sku。"""
    cin7_id = sp["id"]
    name    = _str(sp.get("name"))
    options = sp.get("productOptions") or []

    brand_name = _str(sp.get("brand"))
    brand_id   = BRAND_NAME_TO_ID_MAP.get(brand_name, 0)

    sub_cat_name = _str(sp.get("subCategory"))
    category_id  = 0
    if sub_cat_name:
        category_id = CATEGORY_NAME_TO_ID_MAP.get(sub_cat_name, 0)
    if category_id == 0:
        cat_id_array = sp.get("categoryIdArray") or []
        if cat_id_array:
            category_id = int(cat_id_array[0])

    cat_id_array = sp.get("categoryIdArray") or []
    if category_id and category_id not in cat_id_array:
        cat_id_array.append(category_id)
    cat_name = _str(sp.get("category"))
    if cat_name and cat_name in CATEGORY_NAME_TO_ID_MAP:
        parent_id = CATEGORY_NAME_TO_ID_MAP.get(cat_name, 0)
        if parent_id and parent_id not in cat_id_array:
            cat_id_array.append(parent_id)
    cat_ids_str = ",".join(str(int(x)) for x in cat_id_array) if cat_id_array else ""

    channels     = _str(sp.get("channels") or "")
    status       = 0 if channels == "Magento 2" else 1
    description  = _str(sp.get("description") or "")
    introduction = description

    images      = sp.get("images") or []
    pic_url     = images[0].get("link", "") if images else ""
    slider_pics = json.dumps([img.get("link", "") for img in images[:15] if img.get("link")])

    spec_type            = 1 if options else 0
    unit                 = 1
    sort                 = 0
    delivery_template_id = 1
    browse_count         = 0
    keyword              = _str(sp.get("styleCode"))[:256]

    recommendation = dict(
        recommend_hot=False, recommend_benefit=False, recommend_best=False,
        recommend_new=False, recommend_good=False,
    )
    give_integral       = 0
    virtual_sales_count = 1
    activity_orders     = "[0]"
    sub_commission_type = False

    existing_row = query_one(
        "SELECT id FROM product_spu WHERE id=%s AND deleted=0 LIMIT 1", (cin7_id,)
    )

    if not existing_row:
        execute_one(
            """
            INSERT INTO product_spu
                (id, name, category_id, category_ids, brand_id, pic_url, slider_pic_urls,
                 slider_bottom_pic_urls, status, description, unit, sort, spec_type,
                 price, market_price, cost_price, stock,
                 delivery_template_id, keyword, introduction,
                 recommend_hot, recommend_benefit, recommend_best, recommend_new,
                 recommend_good, give_integral, virtual_sales_count, browse_count,
                 activity_orders, sub_commission_type,
                 creator, create_time, updater, tenant_id)
            VALUES
                (%s, %s, %s, %s, %s, %s, %s,
                 NULL, %s, %s, %s, %s, %s,
                 -1, -1, -1, 0,
                 %s, %s, %s,
                 %s, %s, %s, %s, %s, %s, %s, %s,
                 %s, %s,
                 'cin7_sync', NOW(), 'cin7_sync', %s)
            """,
            (
                cin7_id, name, category_id, cat_ids_str, brand_id or 0,
                pic_url, slider_pics,
                status, description, unit, sort, spec_type,
                delivery_template_id, keyword, introduction,
                recommendation["recommend_hot"], recommendation["recommend_benefit"],
                recommendation["recommend_best"], recommendation["recommend_new"],
                recommendation["recommend_good"],
                give_integral, virtual_sales_count, browse_count,
                activity_orders, sub_commission_type,
                tenant_id,
            ),
        )
        log.info("[spu] INSERT id=%s name='%s'", cin7_id, name)
        created = True
    else:
        execute_one(
            """
            UPDATE product_spu SET
                name=%s, category_id=%s, category_ids=%s, brand_id=%s,
                status=%s, description=%s, spec_type=%s,
                keyword=%s, introduction=%s,
                recommend_hot=%s, recommend_benefit=%s, recommend_best=%s,
                recommend_new=%s, recommend_good=%s,
                update_time=NOW(), updater='cin7_sync'
            WHERE id=%s
            """,
            (
                name, category_id, cat_ids_str, brand_id or 0,
                status, description, spec_type, keyword, introduction,
                recommendation["recommend_hot"], recommendation["recommend_benefit"],
                recommendation["recommend_best"], recommendation["recommend_new"],
                recommendation["recommend_good"],
                cin7_id,
            ),
        )
        log.info("[spu] UPDATE id=%s name='%s'", cin7_id, name)
        created = False

    _sync_skus(cin7_id, sp, tenant_id)
    _update_spu_stats(cin7_id)

    return cin7_id, created


# ── 核心同步：SKU ──────────────────────────────────────────────────────────────
def _sync_skus(spu_id, sp, tenant_id):
    """
    批量同步该 SPU 下的所有 SKU。

    匹配规则：product_sku.id == Cin7 productOption.id
    - 已存在 → UPDATE（COALESCE 保护 barcode/levels，防止空值覆盖已有数据）
    - 不存在 → INSERT（使用 Cin7 option id 作为主键，下次同步能正确匹配）
    - DB 中多余的 SKU → 保留不删（可能是手动添加的规格）
    """
    from db import insert_many, update_many

    options = sp.get("productOptions") or []
    if not options:
        existing_single = query_one(
            "SELECT id FROM product_sku WHERE spu_id=%s AND deleted=0 LIMIT 1",
            (spu_id,),
        )
        _upsert_single_sku(spu_id, sp, existing_single, tenant_id)
        return

    existing_skus  = query_all(
        "SELECT id, bar_code, levels FROM product_sku WHERE spu_id=%s AND deleted=0",
        (spu_id,),
    )
    existing_by_id = {es["id"]: es for es in existing_skus}

    _ensure_levels_loaded()

    new_rows = []
    upd_rows = []

    for opt in options:
        cin7_sku_id  = opt["id"]
        existing_sku = existing_by_id.get(cin7_sku_id)
        row_data     = _build_sku_row(spu_id, opt, sp, tenant_id)
        levels_json  = _compute_levels_json(opt, sp)

        # ── 诊断日志：记录每个 SKU 写入前的关键值 ──────────────────────────
        barcode_src = row_data["bar_code"]
        log.debug(
            "[sku] spu=%s opt=%s action=%s barcode='%s' levels=%s",
            spu_id, cin7_sku_id,
            "UPDATE" if existing_sku else "INSERT",
            barcode_src or "(empty)",
            "yes" if levels_json else "None",
        )
        if existing_sku:
            old_barcode = existing_sku.get("bar_code") or ""
            old_levels  = existing_sku.get("levels")
            if old_barcode and not barcode_src:
                log.warning(
                    "[sku] spu=%s opt=%s — barcode would be CLEARED "
                    "(DB='%s', Cin7='') → COALESCE will keep DB value",
                    spu_id, cin7_sku_id, old_barcode,
                )
            if old_levels and not levels_json:
                log.warning(
                    "[sku] spu=%s opt=%s — levels would be CLEARED "
                    "(DB has data, Cin7 computed None) → COALESCE will keep DB value",
                    spu_id, cin7_sku_id,
                )

        if existing_sku:
            upd_rows.append((
                row_data["bar_code"],   # COALESCE(NULLIF(%s,''), bar_code)
                row_data["properties"],
                levels_json,            # COALESCE(%s, levels)
                row_data["price_fen"],
                row_data["market_fen"],
                row_data["cost_fen"],
                row_data["stock"],
                cin7_sku_id,
            ))
        else:
            new_rows.append((
                cin7_sku_id,            # 主键 = Cin7 option id，保证下次能匹配
                spu_id,
                row_data["bar_code"],
                row_data["properties"],
                row_data["pic_url"],
                levels_json,
                row_data["price_fen"],
                row_data["market_fen"],
                row_data["cost_fen"],
                row_data["stock"],
                row_data["weight"],
                row_data["volume"],
                tenant_id,
            ))

    if new_rows:
        # ON DUPLICATE KEY UPDATE：防止 Cin7 option_id 与另一产品的 sku_id 碰撞
        # （单规格商品以 product_id 作为 sku_id，多规格 option_id 可能与之相同）
        INSERT_SKU_SQL = (
            "INSERT INTO product_sku "
            "(id, spu_id, bar_code, properties, pic_url, levels, "
            "price, market_price, cost_price, stock, weight, volume, "
            "creator, create_time, updater, tenant_id) "
            "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, "
            "'cin7_sync', NOW(), NULL, %s) "
            "ON DUPLICATE KEY UPDATE "
            "spu_id=VALUES(spu_id), "
            "bar_code=COALESCE(NULLIF(VALUES(bar_code),''), bar_code), "
            "properties=VALUES(properties), "
            "pic_url=VALUES(pic_url), "
            "levels=COALESCE(VALUES(levels), levels), "
            "price=VALUES(price), market_price=VALUES(market_price), "
            "cost_price=VALUES(cost_price), stock=VALUES(stock), "
            "weight=VALUES(weight), update_time=NOW()"
        )
        inserted = insert_many(INSERT_SKU_SQL, new_rows)
        log.info("[sku] spu=%s — bulk INSERT/UPSERT %d SKUs", spu_id, inserted)

        # 记录插入时 barcode / levels 为空的条目，方便排查
        empty_bc = [r[0] for r in new_rows if not r[2]]      # r[2] = bar_code
        none_lvl = [r[0] for r in new_rows if r[5] is None]  # r[5] = levels_json
        if empty_bc:
            log.warning("[sku] spu=%s — INSERT with empty barcode: opt_ids=%s", spu_id, empty_bc)
        if none_lvl:
            log.warning("[sku] spu=%s — INSERT with NULL levels: opt_ids=%s", spu_id, none_lvl)

    if upd_rows:
        # COALESCE 规则：
        #   barcode：Cin7 传空串时（NULLIF→NULL）保留 DB 现有值
        #   levels ：Cin7 计算为 None 时（NULL）保留 DB 现有值
        UPDATE_SKU_SQL = (
            "UPDATE product_sku "
            "SET bar_code=COALESCE(NULLIF(%s,''), bar_code), "
            "properties=%s, "
            "levels=COALESCE(%s, levels), "
            "price=%s, market_price=%s, cost_price=%s, stock=%s, "
            "update_time=NOW() WHERE id=%s"
        )
        updated = update_many(UPDATE_SKU_SQL, upd_rows)
        log.info("[sku] spu=%s — bulk UPDATE %d SKUs", spu_id, updated)


def _upsert_single_sku(spu_id, sp, existing_sku, tenant_id):
    """单规格商品（无 productOptions）的 SKU 同步。"""
    code    = _str(sp.get("styleCode"))
    barcode = _str(sp.get("barCode") or sp.get("barcode"))
    key     = barcode if barcode else (code if code else "")

    price_cols = sp.get("priceColumns") or {}
    retail_a   = _float(price_cols.get("retailAUD"))
    tier1      = _float(price_cols.get("tier1AUD"))
    cost_a     = _float(price_cols.get("costAUD"))
    retail     = _float(sp.get("retailPrice") or sp.get("sellingPrice"))
    wholesale  = _float(sp.get("wholesalePrice"))
    vip        = _float(sp.get("vipPrice"))
    price_fen  = int((retail_a if retail_a > 0 else retail)    * 100)
    market_fen = int((tier1    if tier1    > 0 else wholesale)  * 100)
    cost_fen   = int((cost_a   if cost_a   > 0 else vip)        * 100)
    stock      = _int(sp.get("stockOnHand"))
    weight     = _float(sp.get("weight"))

    images  = sp.get("images") or []
    pic_url = ""
    if images:
        pic_url = images[0].get("link", "") if isinstance(images[0], dict) else images[0]
    properties = json.dumps(
        [{"propertyId": 0, "propertyName": "默认", "valueId": 0, "valueName": "默认"}]
    )

    if existing_sku:
        _update_sku_props(existing_sku["id"], properties)
        execute_one(
            "UPDATE product_sku "
            "SET bar_code=COALESCE(NULLIF(%s,''), bar_code), "
            "price=%s, market_price=%s, cost_price=%s, stock=%s, "
            "update_time=NOW() WHERE id=%s",
            (key, price_fen, market_fen, cost_fen, stock, existing_sku["id"]),
        )
        _ensure_levels_loaded()
        _update_sku_levels(existing_sku["id"], sp)
        log.info("[sku] single UPDATE spu=%s sku=%s barcode='%s'",
                 spu_id, existing_sku["id"], key or "(empty)")
    else:
        sku_id = _int(sp.get("id"))
        execute_one(
            "INSERT INTO product_sku "
            "(id, spu_id, bar_code, properties, pic_url, "
            "price, market_price, cost_price, stock, weight, volume, "
            "creator, create_time, updater, tenant_id) "
            "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, "
            "'cin7_sync', NOW(), NULL, %s) "
            "ON DUPLICATE KEY UPDATE "
            "spu_id=VALUES(spu_id), "
            "bar_code=COALESCE(NULLIF(VALUES(bar_code),''), bar_code), "
            "properties=VALUES(properties), "
            "price=VALUES(price), market_price=VALUES(market_price), "
            "cost_price=VALUES(cost_price), stock=VALUES(stock), "
            "update_time=NOW()",
            (sku_id, spu_id, key, properties, pic_url,
             price_fen, market_fen, cost_fen, stock, weight, 0.0, tenant_id),
        )
        log.info("[sku] single INSERT spu=%s sku=%s barcode='%s'",
                 spu_id, sku_id, key or "(empty)")


def _build_sku_row(spu_id, opt, sp, tenant_id):
    """从 Cin7 productOption 计算出所有 SKU 字段（不写库）。"""
    price_cols = opt.get("priceColumns") or {}
    cost_a     = _float(price_cols.get("costAUD"))
    retail     = _float(opt.get("retailPrice")   or sp.get("retailPrice")   or sp.get("sellingPrice"))
    wholesale  = _float(opt.get("wholesalePrice") or sp.get("wholesalePrice"))
    vip        = _float(opt.get("vipPrice")       or sp.get("vipPrice"))

    price_fen  = int(retail * 100)
    market_fen = int(wholesale * 100)
    cost_fen   = int((cost_a if cost_a > 0 else vip) * 100)

    stock   = _int(opt.get("stockOnHand"))
    weight  = _float(opt.get("optionWeight"))
    barcode = _str(
        opt.get("productOptionBarcode") or opt.get("barCode") or opt.get("barcode")
        or sp.get("styleCode")
        or opt.get("code") or opt.get("productOptionCode")
    )

    # 属性：从 option1/2/3 构建
    props = []
    for opt_n, opt_key in enumerate(["option1", "option2", "option3"], start=1):
        val = _str(opt.get(opt_key))
        if not val:
            continue
        prop_name = _str(opt.get(f"optionLabel{opt_n}")) or f"属性{opt_n}"
        with _PROP_LOCK:
            prop_id = _PROP_NAME_TO_ID.get(prop_name)
            if prop_id is None:
                row = query_one(
                    "SELECT id FROM product_property WHERE name=%s AND tenant_id=%s",
                    (prop_name, tenant_id),
                )
                prop_id = row["id"] if row else None
                if prop_id:
                    _PROP_NAME_TO_ID[prop_name] = prop_id
        if not prop_id:
            continue
        with _PROP_LOCK:
            val_id = _PROPVAL_NAME_TO_ID.get((prop_id, val))
            if val_id is None:
                row = query_one(
                    "SELECT id FROM product_property_value WHERE property_id=%s AND name=%s",
                    (prop_id, val),
                )
                val_id = row["id"] if row else None
                if val_id:
                    _PROPVAL_NAME_TO_ID[(prop_id, val)] = val_id
        if val_id is None:
            val_id = insert_and_get_id(
                "INSERT INTO product_property_value "
                "(property_id, name, tenant_id, creator, create_time, updater) "
                "VALUES (%s, %s, %s, 'cin7_sync', NOW(), 'cin7_sync')",
                (prop_id, val, tenant_id),
            )
            with _PROP_LOCK:
                _PROPVAL_NAME_TO_ID[(prop_id, val)] = val_id
        if val_id:
            props.append({
                "propertyId": prop_id, "propertyName": prop_name,
                "valueId": val_id, "valueName": val,
            })

    if not props:
        props = [{"propertyId": 0, "propertyName": "默认", "valueId": 0, "valueName": "默认"}]

    pic_url  = ""
    img_link = opt.get("image")
    if img_link:
        pic_url = img_link.get("link") if isinstance(img_link, dict) else img_link
    if not pic_url and sp.get("images"):
        pic_url = sp["images"][0].get("link", "")

    return {
        "bar_code":   barcode,
        "price_fen":  price_fen,
        "market_fen": market_fen,
        "cost_fen":   cost_fen,
        "stock":      stock,
        "weight":     weight,
        "volume":     0.0,
        "properties": json.dumps(props, ensure_ascii=False),
        "pic_url":    pic_url,
    }


def _upsert_sku(spu_id, opt, sp, existing_sku, tenant_id):
    """单条 SKU upsert（保留备用，主路径已改用批量操作）。"""
    opt_id     = opt["id"]
    price_cols = opt.get("priceColumns") or {}
    cost_a     = _float(price_cols.get("costAUD"))
    retail     = _float(opt.get("retailPrice")   or sp.get("retailPrice")   or sp.get("sellingPrice"))
    wholesale  = _float(opt.get("wholesalePrice") or sp.get("wholesalePrice"))
    vip        = _float(opt.get("vipPrice")       or sp.get("vipPrice"))
    price_fen  = int(retail * 100)
    market_fen = int(wholesale * 100)
    cost_fen   = int((cost_a if cost_a > 0 else vip) * 100)
    stock      = _int(opt.get("stockOnHand"))
    weight     = _float(opt.get("optionWeight"))
    barcode    = _str(
        opt.get("productOptionBarcode") or opt.get("barCode") or opt.get("barcode")
        or sp.get("styleCode")
        or opt.get("code") or opt.get("productOptionCode")
    )

    row_data   = _build_sku_row(spu_id, opt, sp, tenant_id)
    properties = row_data["properties"]
    pic_url    = row_data["pic_url"]

    if existing_sku:
        _update_sku_props(existing_sku["id"], properties)
        try:
            _update_sku_levels(opt_id, sp)
        except Exception as e:
            log.warning("[sku] levels skip opt=%s: %s", opt_id, e)
        execute_one(
            "UPDATE product_sku "
            "SET bar_code=COALESCE(NULLIF(%s,''), bar_code), "
            "price=%s, market_price=%s, cost_price=%s, stock=%s, "
            "update_time=NOW() WHERE id=%s",
            (barcode, price_fen, market_fen, cost_fen, stock, opt_id),
        )
        log.debug("[sku] UPDATE opt=%s price=%s stock=%s", opt_id, price_fen, stock)
    else:
        _ensure_levels_loaded()
        try:
            _update_sku_levels(opt_id, sp)
        except Exception as e:
            log.warning("[sku] levels skip opt=%s: %s", opt_id, e)
        execute_one(
            "INSERT INTO product_sku "
            "(id, spu_id, bar_code, properties, pic_url, "
            "price, market_price, cost_price, stock, weight, volume, "
            "creator, create_time, updater, tenant_id) "
            "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, "
            "'cin7_sync', NOW(), NULL, %s)",
            (opt_id, spu_id, barcode, properties, pic_url,
             price_fen, market_fen, cost_fen, stock, weight, 0.0, tenant_id),
        )
        log.debug("[sku] INSERT opt=%s price=%s stock=%s", opt_id, price_fen, stock)


# ── SPU 统计更新 ───────────────────────────────────────────────────────────────
def _update_spu_stats(spu_id):
    """用 SKU 的 MIN(price) 和 SUM(stock) 更新 SPU 汇总字段。"""
    row = query_one(
        "SELECT MIN(price) as min_price, MIN(market_price) as min_market, "
        "SUM(stock) as total_stock "
        "FROM product_sku WHERE spu_id=%s AND deleted=0",
        (spu_id,),
    )
    if row:
        execute_one(
            "UPDATE product_spu SET price=%s, market_price=%s, stock=%s, "
            "update_time=NOW() WHERE id=%s",
            (row["min_price"] or 0, row["min_market"] or 0, row["total_stock"] or 0, spu_id),
        )


# ── 入口 ───────────────────────────────────────────────────────────────────────
def sync_spus(cin7_api, tenant_id=TENANT_ID):
    return sync_all_products(cin7_api, tenant_id)


def sync_all_products(cin7_api, tenant_id=TENANT_ID):
    """拉取所有 Cin7 products，并发同步为 SPU + SKU。"""
    log.info("=== Sync All Products ===")
    raw      = cin7_api.get_products()
    products = raw.get("d", []) if isinstance(raw, dict) else raw
    log.info("[products] fetched %d products from Cin7", len(products))

    success = fail = 0
    with ThreadPoolExecutor(max_workers=16) as executor:
        futures = {executor.submit(_sync_spu, sp, tenant_id): sp for sp in products}
        for future in as_completed(futures):
            sp = futures[future]
            try:
                cid, created = future.result()
                success += 1
            except Exception as e:
                fail += 1
                _log_error(sp.get("id"), sp.get("name"), e, sp)

    log.info("[products] done: %d synced, %d failed", success, fail)
    log.info("Log file: %s", _LOG_FILE)
    return success, fail
