logo
Loading...

RAG - Loaders 組件統整 - AI Agent 開發特訓營:短期實現智能自動化 - Cupoy

LangChain 文件載入器實戰指南 課程相關code我都上完到github空間中:請 git clone https://github.com/kevin801221/AgenticU-The-M...

LangChain 文件載入器實戰指南 課程相關code我都上完到github空間中:請 git clone https://github.com/kevin801221/AgenticU-The-Modular-Teaching-Hub-for-Modern-LLM-Agent-Frameworks.git 去下載相關程式碼。(可以幫忙點個星星唷!) 📚 參考資源與延伸閱讀 官方文檔 LangChain Document Loaders 官方文檔 LangChain Community Document Loaders 第三方服務與工具 Unstructured.io 官網 - 進階文件解析服務 LlamaParse 文檔 - LlamaIndex 文件解析工具 Upstage Document AI - 企業級文件處理解決方案 Beautiful Soup 文檔 - HTML/XML 解析工具 學術資源 arXiv.org - 學術預印本論文庫 Layout Parser GitHub - 文件版面分析工具 目錄 Document 物件基礎 PDF 文件處理 網頁內容載入 結構化資料處理 Office 文檔處理 學術資源載入 進階解析技術 效能最佳化 實際應用案例 Document 物件基礎 在 LangChain 中,所有文件載入器都會將原始資料轉換為標準的 Document 物件格式: from langchain.schema import Document # Document 物件包含兩個核心部分 doc = Document( page_content="文件的實際內容", # 主要文字內容 metadata={ # 元數據 "source": "file.pdf", "page": 1, "author": "作者名稱" } ) print(f"內容: {doc.page_content}") print(f"元數據: {doc.metadata}") 文件分析工具 def analyze_documents(docs: List[Document]): """分析文件集合的基本統計""" total_length = sum(len(doc.page_content) for doc in docs) print(f"📊 文件分析:") print(f" 文件數量: {len(docs)}") print(f" 總字符數: {total_length:,}") print(f" 平均長度: {total_length/len(docs):,.0f} 字符") # 分析 metadata 欄位 all_fields = set() for doc in docs: all_fields.update(doc.metadata.keys()) print(f" Metadata 欄位: {len(all_fields)} 種") for field in sorted(all_fields): count = sum(1 for doc in docs if field in doc.metadata) print(f" {field}: {count}/{len(docs)} 文件") PDF 文件處理 PDF 是最常見的文檔格式,LangChain 提供多種載入器選擇: 基本載入器比較 from langchain_community.document_loaders import ( PyPDFLoader, PyMuPDFLoader, UnstructuredPDFLoader, PDFPlumberLoader ) # 1. PyPDFLoader - 最基礎,速度快 loader = PyPDFLoader("document.pdf") docs = loader.load() # 2. PyMuPDFLoader - 速度最快,metadata 豐富 loader = PyMuPDFLoader("document.pdf") docs = loader.load() # 3. UnstructuredPDFLoader - 功能最強,可識別結構 loader = UnstructuredPDFLoader("document.pdf", mode="elements") docs = loader.load() # 4. PDFPlumberLoader - 表格提取最佳 loader = PDFPlumberLoader("document.pdf") docs = loader.load() 選擇建議 載入器 速度 功能 最適用場景 PyPDFLoader 快 基礎 一般文字提取 PyMuPDFLoader 最快 中等 大量 PDF 處理 UnstructuredPDFLoader 慢 最強 結構化內容分析 PDFPlumberLoader 中等 表格 包含表格的文件 進階 PDF 處理 def smart_pdf_loading(pdf_path: str, content_type: str = "auto"): """根據內容類型智能選擇 PDF 載入器""" # 自動檢測內容類型 if content_type == "auto": with open(pdf_path, 'rb') as f: sample = f.read(1024).decode('utf-8', errors='ignore') if 'table' in sample.lower() or '|' in sample: content_type = "table_heavy" elif len(sample.strip()) < 50: content_type = "scanned" else: content_type = "text" # 選擇載入器 if content_type == "table_heavy": loader = PDFPlumberLoader(pdf_path) elif content_type == "scanned": loader = PyPDFLoader(pdf_path, extract_images=True) elif content_type == "structured": loader = UnstructuredPDFLoader(pdf_path, mode="elements") else: loader = PyMuPDFLoader(pdf_path) # 預設選擇 return loader.load() 網頁內容載入 基本網頁載入 import bs4 from langchain_community.document_loaders import WebBaseLoader # 基本載入 loader = WebBaseLoader("https://example.com/article") docs = loader.load() # 指定解析範圍(只抓取文章內容) bs_kwargs = dict( parse_only=bs4.SoupStrainer("div", attrs={"class": ["post-content", "article-body"]}) ) loader = WebBaseLoader( web_paths=["https://example.com/article"], bs_kwargs=bs_kwargs ) docs = loader.load() 批次載入多個網頁 def batch_load_websites(urls: List[str], max_concurrent: int = 5): """批次載入多個網頁""" # 配置解析器 bs_kwargs = dict( parse_only=bs4.SoupStrainer(["article", "main", "div"], attrs={"class": ["content", "post", "article"]}) ) # 批次載入 loader = WebBaseLoader( web_paths=urls, bs_kwargs=bs_kwargs, requests_per_second=2 # 控制請求頻率 ) docs = loader.load() # 清理內容 cleaned_docs = [] for doc in docs: # 移除多餘空白 content = ' '.join(doc.page_content.split()) # 過濾太短的內容 if len(content) > 100: doc.page_content = content cleaned_docs.append(doc) return cleaned_docs 結構化資料處理 CSV 檔案處理 from langchain_community.document_loaders import CSVLoader import pandas as pd # 基本 CSV 載入 loader = CSVLoader( file_path="data.csv", source_column="title" # 指定來源欄位 ) docs = loader.load() # 使用 Pandas 進行進階處理 def process_csv_with_pandas(csv_path: str, content_template: str = None): """使用 Pandas 進行進階 CSV 處理""" df = pd.read_csv(csv_path) # 數據清理 df = df.dropna().drop_duplicates() docs = [] for _, row in df.iterrows(): # 自定義內容格式 if content_template: content = content_template.format(**row.to_dict()) else: content = "\n".join([f"{col}: {row[col]}" for col in df.columns]) metadata = { "source": csv_path, "row_index": row.name, **{f"field_{col}": row[col] for col in df.columns[:5]} # 前5個欄位作為 metadata } docs.append(Document(page_content=content, metadata=metadata)) return docs # 使用範例 docs = process_csv_with_pandas( "customers.csv", content_template="客戶: {name}, 年齡: {age}, 城市: {city}, 職業: {occupation}" ) Excel 檔案處理 from langchain_community.document_loaders import UnstructuredExcelLoader # 基本 Excel 載入 loader = UnstructuredExcelLoader("data.xlsx", mode="elements") docs = loader.load() # 處理多個工作表 def process_excel_workbook(excel_path: str): """處理包含多個工作表的 Excel 檔案""" import pandas as pd # 讀取所有工作表 excel_file = pd.ExcelFile(excel_path) all_docs = [] for sheet_name in excel_file.sheet_names: df = pd.read_excel(excel_path, sheet_name=sheet_name) # 將工作表轉換為文檔 content = f"工作表: {sheet_name}\n\n" content += df.to_string(index=False) metadata = { "source": excel_path, "sheet_name": sheet_name, "rows": len(df), "columns": len(df.columns) } all_docs.append(Document(page_content=content, metadata=metadata)) return all_docs Office 文檔處理 Word 文檔處理 from langchain_community.document_loaders import ( Docx2txtLoader, UnstructuredWordDocumentLoader ) # 簡單文字提取 loader = Docx2txtLoader("document.docx") docs = loader.load() # 結構化解析 loader = UnstructuredWordDocumentLoader("document.docx", mode="elements") docs = loader.load() # 分析文檔結構 def analyze_word_structure(docs: List[Document]): """分析 Word 文檔結構""" elements = {} for doc in docs: element_type = doc.metadata.get("category", "unknown") elements[element_type] = elements.get(element_type, 0) + 1 print("📄 文檔結構分析:") for element_type, count in elements.items(): print(f" {element_type}: {count}") PowerPoint 處理 from langchain_community.document_loaders import UnstructuredPowerPointLoader # 按投影片載入 loader = UnstructuredPowerPointLoader("presentation.pptx", mode="elements") docs = loader.load() # 組織投影片內容 def organize_powerpoint_content(docs: List[Document]): """組織 PowerPoint 內容""" slides = {} for doc in docs: # 假設 metadata 中有投影片編號 slide_num = doc.metadata.get("page_number", 1) if slide_num not in slides: slides[slide_num] = [] slides[slide_num].append(doc.page_content) # 重新組織為完整投影片 organized_docs = [] for slide_num in sorted(slides.keys()): content = "\n\n".join(slides[slide_num]) metadata = { "source": "presentation.pptx", "slide_number": slide_num, "element_count": len(slides[slide_num]) } organized_docs.append(Document(page_content=content, metadata=metadata)) return organized_docs 學術資源載入 arXiv 論文搜尋 from langchain_community.document_loaders import ArxivLoader # 基本搜尋 loader = ArxivLoader( query="large language models", load_max_docs=5 ) papers = loader.load() # 進階搜尋與分析 def search_and_analyze_papers(query: str, max_papers: int = 10): """搜尋並分析 arXiv 論文""" loader = ArxivLoader( query=query, load_max_docs=max_papers, load_all_available_meta=True ) papers = loader.load() # 分析論文 analysis = { "total_papers": len(papers), "avg_length": sum(len(p.page_content) for p in papers) // len(papers), "years": {}, "categories": {} } for paper in papers: # 分析發表年份 published = paper.metadata.get("Published", "") if published: year = published[:4] analysis["years"][year] = analysis["years"].get(year, 0) + 1 # 分析分類 categories = paper.metadata.get("categories", "").split() for cat in categories: analysis["categories"][cat] = analysis["categories"].get(cat, 0) + 1 return papers, analysis # 使用範例 papers, analysis = search_and_analyze_papers("transformer attention mechanism") print(f"搜尋結果: {analysis}") 文獻回顧生成 def create_literature_review(search_queries: List[str]): """創建文獻回顧""" all_papers = [] # 收集所有論文 for query in search_queries: loader = ArxivLoader(query=query, load_max_docs=5) papers = loader.load() all_papers.extend(papers) # 去重(基於標題) unique_papers = {} for paper in all_papers: title = paper.metadata.get("Title", "") if title not in unique_papers: unique_papers[title] = paper papers = list(unique_papers.values()) # 生成回顧內容 review_content = f"# 文獻回顧\n\n" review_content += f"**搜尋查詢**: {', '.join(search_queries)}\n" review_content += f"**論文數量**: {len(papers)}\n\n" # 按年份分組 papers_by_year = {} for paper in papers: published = paper.metadata.get("Published", "Unknown") year = published[:4] if published != "Unknown" else "Unknown" if year not in papers_by_year: papers_by_year[year] = [] papers_by_year[year].append(paper) # 生成年份摘要 for year in sorted(papers_by_year.keys(), reverse=True): review_content += f"## {year} 年論文 ({len(papers_by_year[year])} 篇)\n\n" for paper in papers_by_year[year][:3]: # 只顯示前3篇 title = paper.metadata.get("Title", "無標題") authors = paper.metadata.get("Authors", "無作者") summary = paper.metadata.get("Summary", "") review_content += f"### {title}\n" review_content += f"**作者**: {authors}\n" if summary: short_summary = summary[:200] + "..." if len(summary) > 200 else summary review_content += f"**摘要**: {short_summary}\n" review_content += "\n" metadata = { "document_type": "literature_review", "paper_count": len(papers), "search_queries": search_queries } return Document(page_content=review_content, metadata=metadata) 進階解析技術 第三方 AI 服務整合 # 使用 Upstage Document AI from langchain_community.document_loaders import UpstageDocumentParseLoader def parse_with_upstage(file_path: str, api_key: str): """使用 Upstage 解析文檔""" loader = UpstageDocumentParseLoader( file_path=file_path, api_key=api_key, output_format="text", # 或 "markdown" coordinates=True # 提取座標資訊 ) docs = loader.load() # 增強 metadata for doc in docs: doc.metadata.update({ "parsing_service": "upstage", "enhanced_extraction": True, "coordinate_info": doc.metadata.get("coordinates") is not None }) return docs # 使用 LlamaParse def parse_with_llamaparse(file_path: str, api_key: str): """使用 LlamaParse 解析文檔""" try: from llama_parse import LlamaParse parser = LlamaParse( api_key=api_key, result_type="markdown", verbose=False ) documents = parser.load_data(file_path) # 轉換為 LangChain Document docs = [] for i, doc in enumerate(documents): langchain_doc = Document( page_content=doc.text, metadata={ "source": file_path, "page": i + 1, "parsing_service": "llamaparse", "output_format": "markdown" } ) docs.append(langchain_doc) return docs except ImportError: raise ValueError("需要安裝 llama-parse: pip install llama-parse") 智能載入器選擇 def auto_select_loader(file_path: str, requirements: dict = None): """根據需求自動選擇最佳載入器""" import os file_ext = os.path.splitext(file_path)[1].lower() file_size = os.path.getsize(file_path) requirements = requirements or {} if file_ext == '.pdf': if requirements.get('extract_tables', False): return PDFPlumberLoader(file_path) elif requirements.get('preserve_structure', False): return UnstructuredPDFLoader(file_path, mode="elements") elif file_size > 50 * 1024 * 1024: # > 50MB return PyMuPDFLoader(file_path) # 最快 else: return PyPDFLoader(file_path) # 平衡 elif file_ext == '.docx': if requirements.get('preserve_structure', False): return UnstructuredWordDocumentLoader(file_path, mode="elements") else: return Docx2txtLoader(file_path) elif file_ext == '.csv': source_col = requirements.get('source_column') return CSVLoader(file_path, source_column=source_col) elif file_ext == '.txt': encoding = requirements.get('encoding', 'utf-8') return TextLoader(file_path, encoding=encoding) else: raise ValueError(f"不支援的檔案類型: {file_ext}") 效能最佳化 批次處理與快取 import hashlib import pickle import time from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed class DocumentProcessor: """文檔處理器(含快取功能)""" def __init__(self, cache_dir: str = "./cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) def process_documents(self, file_paths: List[str], strategy: str = "parallel", max_workers: int = 4): """批次處理文檔""" if strategy == "parallel": return self._parallel_process(file_paths, max_workers) else: return self._sequential_process(file_paths) def _parallel_process(self, file_paths: List[str], max_workers: int): """並行處理""" results = {} with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交任務 future_to_file = { executor.submit(self._process_single_file, fp): fp for fp in file_paths } # 收集結果 for future in as_completed(future_to_file): file_path = future_to_file[future] try: docs = future.result() results[file_path] = docs print(f"✅ {Path(file_path).name}: {len(docs)} 文檔") except Exception as e: print(f"❌ {Path(file_path).name}: {e}") results[file_path] = [] return results def _process_single_file(self, file_path: str): """處理單個文件(含快取)""" # 檢查快取 cached = self._load_from_cache(file_path) if cached is not None: return cached # 實際處理 loader = auto_select_loader(file_path) docs = loader.load() # 保存快取 self._save_to_cache(file_path, docs) return docs def _cache_key(self, file_path: str) -> str: """生成快取鍵""" stat = Path(file_path).stat() key_data = f"{file_path}_{stat.st_size}_{stat.st_mtime}" return hashlib.md5(key_data.encode()).hexdigest() def _load_from_cache(self, file_path: str): """從快取載入""" cache_file = self.cache_dir / f"{self._cache_key(file_path)}.pickle" if cache_file.exists(): try: with open(cache_file, 'rb') as f: return pickle.load(f) except: cache_file.unlink() # 刪除損壞的快取 return None def _save_to_cache(self, file_path: str, docs: List[Document]): """保存到快取""" cache_file = self.cache_dir / f"{self._cache_key(file_path)}.pickle" try: with open(cache_file, 'wb') as f: pickle.dump(docs, f) except Exception as e: print(f"快取保存失敗: {e}") 記憶體監控 def monitor_memory_usage(func): """記憶體使用監控裝飾器""" def wrapper(*args, **kwargs): try: import psutil import os process = psutil.Process(os.getpid()) # 記錄開始狀態 start_memory = process.memory_info().rss / 1024 / 1024 # MB start_time = time.time() # 執行函數 result = func(*args, **kwargs) # 記錄結束狀態 end_memory = process.memory_info().rss / 1024 / 1024 # MB end_time = time.time() print(f"🔍 記憶體監控:") print(f" 開始: {start_memory:.1f} MB") print(f" 結束: {end_memory:.1f} MB") print(f" 增加: {end_memory - start_memory:.1f} MB") print(f" 時間: {end_time - start_time:.2f} 秒") return result except ImportError: print("⚠️ psutil 未安裝,無法監控記憶體") return func(*args, **kwargs) return wrapper # 使用範例 @monitor_memory_usage def process_large_documents(file_paths: List[str]): processor = DocumentProcessor() return processor.process_documents(file_paths, strategy="parallel") 實際應用案例 企業文檔管理系統 def enterprise_document_system(): """企業文檔管理系統範例""" # 文檔分類處理 document_configs = { "合約文件": { "file_types": [".pdf", ".docx"], "requirements": {"preserve_structure": True, "extract_tables": True}, "priority": "high" }, "技術文檔": { "file_types": [".pdf", ".md", ".txt"], "requirements": {"preserve_structure": True}, "priority": "medium" }, "財務報告": { "file_types": [".xlsx", ".pdf"], "requirements": {"extract_tables": True}, "priority": "high" } } processor = DocumentProcessor() # 按類別處理文檔 results = {} for doc_type, config in document_configs.items(): print(f"處理 {doc_type}...") # 這裡會是實際的文件路徑 file_paths = [] # 從文件系統或資料庫獲取 docs = processor.process_documents( file_paths, strategy="parallel" if config["priority"] == "high" else "sequential" ) results[doc_type] = docs return results 學術研究平台 def research_platform_pipeline(): """學術研究平台處理流程""" # 多源資料收集 research_sources = { "arxiv_papers": { "queries": ["machine learning", "natural language processing"], "max_papers": 20 }, "uploaded_papers": { "file_paths": [], # 用戶上傳的論文 "formats": [".pdf"] } } all_documents = [] # 處理 arXiv 論文 for query in research_sources["arxiv_papers"]["queries"]: loader = ArxivLoader( query=query, load_max_docs=research_sources["arxiv_papers"]["max_papers"] ) papers = loader.load() all_documents.extend(papers) # 處理上傳論文 processor = DocumentProcessor() uploaded_docs = processor.process_documents( research_sources["uploaded_papers"]["file_paths"] ) for docs in uploaded_docs.values(): all_documents.extend(docs) # 生成研究摘要 summary = create_literature_review([ doc.metadata.get("Title", "未知標題") for doc in all_documents[:5] ]) return { "total_papers": len(all_documents), "summary": summary, "documents": all_documents } 最佳實踐總結 載入器選擇指南 場景 推薦載入器 原因 大量 PDF 處理 PyMuPDFLoader 速度最快 表格重要的文檔 PDFPlumberLoader 表格提取最佳 需要保留結構 UnstructuredPDFLoader 結構識別能力強 網頁批次爬取 WebBaseLoader 內建並發支援 學術論文研究 ArxivLoader 專門的學術資源 結構化資料 CSVLoader + pandas 靈活的資料處理 效能最佳化策略 檔案預處理 檢查檔案完整性 統一檔案格式 過濾不必要的檔案 載入策略 小文件(<10個):順序處理 中等文件(10-100個):並行處理 大量文件(>100個):分批流式處理 記憶體管理 啟用快取避免重複處理 監控記憶體使用情況 大文件分塊處理 錯誤處理 實作重試機制 隔離問題文件 提供回退方案 常見問題解決 Q: PDF 文字提取不完整怎麼辦? def robust_pdf_extraction(pdf_path: str): """穩健的 PDF 文字提取""" loaders = [ PyMuPDFLoader(pdf_path), PyPDFLoader(pdf_path), UnstructuredPDFLoader(pdf_path, mode="single") ] for loader in loaders: try: docs = loader.load() if docs and len(docs[0].page_content.strip()) > 100: return docs except Exception as e: print(f"載入器 {loader.__class__.__name__} 失敗: {e}") continue raise ValueError("所有 PDF 載入器都失敗") Q: 如何處理大量小文件? def process_many_small_files(file_paths: List[str]): """處理大量小文件的最佳化方法""" # 分批處理避免記憶體問題 batch_size = 50 all_results = {} for i in range(0, len(file_paths), batch_size): batch = file_paths[i:i + batch_size] processor = DocumentProcessor() batch_results = processor.process_documents( batch, strategy="parallel", max_workers=8 ) all_results.update(batch_results) # 清理記憶體 del processor print(f"已處理 {min(i + batch_size, len(file_paths))}/{len(file_paths)} 文件") return all_results Q: 網頁載入被封鎖怎麼辦? def respectful_web_scraping(urls: List[str]): """遵守網站規則的網頁抓取""" import time import random headers = { 'User-Agent': 'Mozilla/5.0 (compatible; DocumentLoader/1.0)' } successful_docs = [] for url in urls: try: # 檢查 robots.txt(簡化版) if "/admin" in url or "/private" in url: print(f"跳過受保護的 URL: {url}") continue loader = WebBaseLoader( web_paths=[url], requests_kwargs={ 'headers': headers, 'timeout': 30 } ) docs = loader.load() successful_docs.extend(docs) print(f"✅ 成功載入: {url}") # 請求間隔 time.sleep(random.uniform(1, 3)) except Exception as e: print(f"❌ 載入失敗 {url}: {e}") continue return successful_docs 生產環境部署建議 小型系統(< 1000 文件) # 簡單配置 def small_scale_setup(): return { "loader_strategy": "基本載入器", "processing": "同步處理", "cache": "本地文件快取", "deployment": "單機部署" } 中型系統(1K-10K 文件) # 進階配置 def medium_scale_setup(): return { "loader_strategy": "智能載入器選擇", "processing": "並行 + 異步處理", "cache": "Redis 分佈式快取", "deployment": "容器化部署", "monitoring": "記憶體 + 效能監控" } 大型系統(> 10K 文件) # 企業級配置 def large_scale_setup(): return { "loader_strategy": "雲端 AI 服務 + 自定義載入器", "processing": "分佈式處理", "cache": "多層快取架構", "deployment": "微服務架構 + K8s", "monitoring": "全方位監控 + 告警", "scaling": "自動擴縮容" } 總結 🎯 關鍵要點 正確選擇載入器 根據文件格式和需求選擇 考慮速度、功能、準確性的平衡 實作載入器回退機制 效能最佳化 使用快取避免重複處理 根據文件量選擇處理策略 監控資源使用情況 錯誤處理 實作穩健的錯誤處理機制 提供多種載入器選擇 記錄詳細的處理日誌 🚀 進階應用 多模態處理: 結合文字、圖片、表格的統一處理 智能分類: 自動識別文件類型和最佳處理方法 實時處理: 支援文件上傳後的即時處理 API 整合: 與第三方 AI 服務的深度整合 📚 學習建議 從簡單開始: 先掌握基本載入器的使用 實際練習: 用自己的文件測試不同載入器 關注效能: 在實際專案中測試處理效能 保持更新: 關注新的載入器和最佳實踐 掌握文件載入器是構建高效 RAG 系統的基礎,選擇合適的工具和策略比使用最新技術更重要。記住,最佳的載入器是最適合你需求的載入器! 本指南基於 LangChain 官方文檔和實際專案經驗編寫,持續更新以反映最新的技術發展。