遍历 ROOT 目录,导出带媒体属性(时长/分辨率)的清单到 CSV(UTF-8)。

import os, sys, csv, json, subprocess, shutil, time
from pathlib import Path
from datetime import datetime

# ======= 配置 =======
ROOT = r"D:\test"
OUT_CSV = r"D:\list.csv"
INCLUDE_DIRS = False  # True 也会把文件夹写入(无媒体属性)
# ====================

# --- 自动安装依赖 ---
def ensure(pkg, import_name=None):
    name = import_name or pkg
    try:
        __import__(name)
    except ImportError:
        print(f"[+] 正在安装 {pkg} ...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", pkg])

ensure("Pillow", "PIL")
ensure("mutagen")
ensure("opencv-python", "cv2")

from PIL import Image
from mutagen import File as MutagenFile
import cv2

# --- 媒体类型与扩展名 ---
IMAGE_EXT = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".tif", ".tiff", ".gif"}
AUDIO_EXT = {".mp3", ".m4a", ".aac", ".flac", ".wav", ".ogg", ".opus", ".wma", ".aiff", ".ape"}
VIDEO_EXT = {
    ".mp4", ".mkv", ".mov", ".avi", ".wmv", ".flv", ".webm", ".m4v", ".ts", ".mts", ".m2ts", ".3gp"
}

# --- 工具:ffprobe 是否可用 ---
def has_ffprobe():
    exe = shutil.which("ffprobe")
    if not exe:
        return False
    try:
        subprocess.run([exe, "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
        return True
    except Exception:
        return False

FFPROBE_OK = has_ffprobe()

def probe_video_ffprobe(path: str):
    """用 ffprobe 读取视频分辨率与时长(秒)。返回 (width, height, duration_sec) 或 (None, None, None)"""
    try:
        cmd = [
            "ffprobe",
            "-v", "error",
            "-select_streams", "v:0",
            "-show_entries", "stream=width,height,duration,avg_frame_rate",
            "-of", "json",
            path,
        ]
        p = subprocess.run(cmd, capture_output=True, text=True, check=True)
        data = json.loads(p.stdout or "{}")
        streams = data.get("streams") or []
        if not streams:
            return (None, None, None)
        s = streams[0]
        w = s.get("width")
        h = s.get("height")
        # 有些容器里时长在 format,简单尝试 stream.duration,不行就再探 format
        dur = s.get("duration")
        if dur is None:
            cmd2 = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nk=1:nw=1", path]
            p2 = subprocess.run(cmd2, capture_output=True, text=True, check=True)
            try:
                dur = float(p2.stdout.strip())
            except Exception:
                dur = None
        try:
            dur = float(dur) if dur is not None else None
        except Exception:
            dur = None
        return (int(w) if w else None, int(h) if h else None, dur)
    except Exception:
        return (None, None, None)

def probe_video_opencv(path: str):
    """用 OpenCV 读取分辨率与时长(估算)。返回 (width, height, duration_sec)"""
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        return (None, None, None)
    try:
        w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or None
        h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or None
        fps = cap.get(cv2.CAP_PROP_FPS)
        frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
        dur = None
        if fps and fps > 0 and frames and frames > 0:
            dur = float(frames) / float(fps)
        return (w, h, dur)
    finally:
        cap.release()

def get_video_meta(path: str):
    if FFPROBE_OK:
        w, h, dur = probe_video_ffprobe(path)
        if (w and h) or (dur is not None):
            return w, h, dur
    # 回退
    return probe_video_opencv(path)

def get_audio_duration(path: str):
    try:
        m = MutagenFile(path)
        if m and getattr(m, "info", None):
            dur = getattr(m.info, "length", None)
            return float(dur) if dur else None
    except Exception:
        pass
    return None

def get_image_size(path: str):
    try:
        with Image.open(path) as im:
            return im.width, im.height
    except Exception:
        return None, None

def fmt_iso(ts: float):
    try:
        return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
    except Exception:
        return ""

def classify(ext: str):
    ext = ext.lower()
    if ext in IMAGE_EXT:
        return "image"
    if ext in AUDIO_EXT:
        return "audio"
    if ext in VIDEO_EXT:
        return "video"
    return "other"

def walk_files(root: str):
    for dirpath, dirnames, filenames in os.walk(root):
        for fn in filenames:
            yield os.path.join(dirpath, fn)
        if INCLUDE_DIRS:
            for d in dirnames:
                yield os.path.join(dirpath, d)

def main():
    root = str(Path(ROOT))
    out = str(Path(OUT_CSV))
    Path(Path(out).parent).mkdir(parents=True, exist_ok=True)

    cols = ["path", "type", "width", "height", "resolution", "duration_sec", "size_bytes", "mtime"]
    count = 0
    with open(out, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(cols)

        for p in walk_files(root):
            try:
                P = Path(p)
                st = P.stat()
                if P.is_dir():
                    row = [p, "dir", "", "", "", "", st.st_size, fmt_iso(st.st_mtime)]
                    w.writerow(row)
                    continue

                ext = P.suffix
                kind = classify(ext)

                width = height = ""
                duration = ""
                if kind == "image":
                    w0, h0 = get_image_size(p)
                    if w0 and h0:
                        width, height = str(w0), str(h0)
                elif kind == "audio":
                    dur = get_audio_duration(p)
                    if dur is not None:
                        duration = f"{dur:.3f}"
                elif kind == "video":
                    wv, hv, dv = get_video_meta(p)
                    if wv: width = str(wv)
                    if hv: height = str(hv)
                    if dv is not None:
                        duration = f"{dv:.3f}"

                res = f"{width}x{height}" if width and height else ""
                row = [p, kind, width, height, res, duration, st.st_size, fmt_iso(st.st_mtime)]
                w.writerow(row)
                count += 1

                if count % 500 == 0:
                    print(f"[+] 已写入 {count} 条...")
            except Exception as e:
                # 出错也写一行便于排查
                row = [p, "error", "", "", "", "", "", f"ERROR: {e}"]
                w.writerow(row)

    print(f"[OK] 导出完成 -> {out}")

if __name__ == "__main__":
    main()
❤️ 转载文章请注明出处,谢谢!❤️