LangChain 文件載入器完整實戰指南 課程相關code我都上完到github空間中:請 git clone https://github.com/kevin801221/AgenticU-The-Modular-Teaching-Hub-for-Modern-LLM-Agent-Frameworks.git 去下載相關程式碼。(可以幫忙點個星星唷!) 📚 參考資源與延伸閱讀 官方文檔 LangChain Document Loaders 官方文檔 LangChain Community Document Loaders Document 架構與 Metadata 處理 第三方服務與工具 Unstructured.io 官網 - 進階文件解析服務 LlamaParse 文檔 - LlamaIndex 文件解析工具 Upstage Document AI - 企業級文件處理解決方案 PyMuPDF 文檔 - 高效能 PDF 處理庫 Beautiful Soup 文檔 - HTML/XML 解析工具 學術資源 arXiv.org - 學術預印本論文庫 Layout Parser GitHub - 文件版面分析工具 Document AI Research Papers - 文件 AI 研究論文集 工具與庫 PDFPlumber GitHub - PDF 表格提取專用工具 Python-docx 文檔 - Word 文檔處理 OpenPyXL 文檔 - Excel 文件處理 Pandas 文檔 - 結構化資料處理 目錄 文件載入器基礎概念 PDF 文件處理詳解 網頁內容載入與爬取 結構化資料處理 Office 文檔處理 學術與研究資源載入 進階文件解析技術 自定義載入器開發 效能最佳化與批次處理 實際應用案例與最佳實踐 文件載入器基礎概念 Document 物件架構 在 LangChain 中,所有文件載入器都會將原始資料轉換為標準的 Document 物件格式。理解這個基礎架構對於有效使用各種載入器至關重要。 from langchain.schema import Document from typing import Dict, List, Any import json class DocumentAnalyzer: """Document 物件分析器""" def __init__(self): self.document_stats = {} def analyze_document(self, doc: Document, doc_id: str = None): """詳細分析 Document 物件""" doc_id = doc_id or f"doc_{len(self.document_stats)}" print(f"\n{'='*60}") print(f"Document 分析: {doc_id}") print(f"{'='*60}") # 分析 page_content content = doc.page_content content_stats = { "length": len(content), "lines": len(content.split('\n')), "words": len(content.split()), "characters": len(content), "has_chinese": any('\u4e00' <= char <= '\u9fff' for char in content), "has_english": any(char.isalpha() for char in content), "has_numbers": any(char.isdigit() for char in content), } print(f"📄 內容分析:") print(f" 文字長度: {content_stats['length']:,} 字符") print(f" 行數: {content_stats['lines']:,}") print(f" 詞數: {content_stats['words']:,}") print(f" 包含中文: {'✅' if content_stats['has_chinese'] else '❌'}") print(f" 包含英文: {'✅' if content_stats['has_english'] else '❌'}") print(f" 包含數字: {'✅' if content_stats['has_numbers'] else '❌'}") # 分析 metadata metadata = doc.metadata print(f"\n🏷️ Metadata 分析:") print(f" 欄位數量: {len(metadata)}") for key, value in metadata.items(): value_type = type(value).__name__ value_preview = str(value)[:50] + "..." if len(str(value)) > 50 else str(value) print(f" {key} ({value_type}): {value_preview}") # 儲存分析結果 self.document_stats[doc_id] = { "content_stats": content_stats, "metadata_fields": list(metadata.keys()), "metadata_count": len(metadata) } return content_stats, metadata def compare_documents(self, docs: List[Document], source_name: str = "Unknown"): """比較多個文件的特徵""" print(f"\n{'='*70}") print(f"文件集合分析: {source_name}") print(f"{'='*70}") if not docs: print("❌ 沒有文件可分析") return total_length = sum(len(doc.page_content) for doc in docs) total_words = sum(len(doc.page_content.split()) for doc in docs) # 統計 metadata 欄位 all_metadata_fields = set() for doc in docs: all_metadata_fields.update(doc.metadata.keys()) print(f"📊 集合統計:") print(f" 文件數量: {len(docs)}") print(f" 總字符數: {total_length:,}") print(f" 總詞數: {total_words:,}") print(f" 平均文件長度: {total_length/len(docs):,.0f} 字符") print(f" Metadata 欄位: {len(all_metadata_fields)} 種") print(f"\n📋 Metadata 欄位詳情:") for field in sorted(all_metadata_fields): field_count = sum(1 for doc in docs if field in doc.metadata) coverage = field_count / len(docs) * 100 print(f" {field}: {field_count}/{len(docs)} 文件 ({coverage:.1f}%)") # 文件長度分佈 lengths = [len(doc.page_content) for doc in docs] lengths.sort() print(f"\n📈 長度分佈:") print(f" 最短: {min(lengths):,} 字符") print(f" 最長: {max(lengths):,} 字符") print(f" 中位數: {lengths[len(lengths)//2]:,} 字符") return { "document_count": len(docs), "total_length": total_length, "average_length": total_length / len(docs), "metadata_fields": list(all_metadata_fields) } # 使用範例 analyzer = DocumentAnalyzer() # 創建範例文件 sample_docs = [ Document( page_content="這是一個範例文件,包含中文和English混合內容。", metadata={"source": "sample.txt", "page": 1, "author": "Test Author"} ), Document( page_content="This is an English document with some numbers: 123, 456.", metadata={"source": "sample.txt", "page": 2, "category": "example"} ) ] # 分析單個文件 analyzer.analyze_document(sample_docs[0], "sample_doc_1") # 比較文件集合 analyzer.compare_documents(sample_docs, "Sample Documents") PDF 文件處理詳解 多種 PDF 載入器比較分析 PDF 是最常見也是最複雜的文檔格式之一。不同的 PDF 載入器有各自的優勢和適用場景: from langchain_community.document_loaders import ( PyPDFLoader, PyMuPDFLoader, UnstructuredPDFLoader, PDFMinerLoader, PDFPlumberLoader ) import time from typing import List, Dict import matplotlib.pyplot as plt import seaborn as sns class PDFLoaderComparator: """PDF 載入器效能與功能比較器""" def __init__(self, pdf_path: str): self.pdf_path = pdf_path self.loaders = { "PyPDF": PyPDFLoader, "PyMuPDF": PyMuPDFLoader, "Unstructured": UnstructuredPDFLoader, "PDFMiner": PDFMinerLoader, "PDFPlumber": PDFPlumberLoader } self.results = {} def compare_all_loaders(self, detailed_analysis=True): """比較所有 PDF 載入器""" print(f"🔍 PDF 載入器綜合比較") print(f"檔案: {self.pdf_path}") print(f"{'='*80}") for loader_name, loader_class in self.loaders.items(): print(f"\n📖 測試 {loader_name}...") try: # 測量載入時間 start_time = time.time() if loader_name == "Unstructured": # Unstructured 支援多種模式 loader = loader_class(self.pdf_path, mode="single") else: loader = loader_class(self.pdf_path) docs = loader.load() end_time = time.time() load_time = end_time - start_time # 分析載入結果 result = self._analyze_loader_result( loader_name, docs, load_time, detailed_analysis ) self.results[loader_name] = result except Exception as e: print(f"❌ {loader_name} 載入失敗: {str(e)}") self.results[loader_name] = {"error": str(e)} # 生成比較報告 self._generate_comparison_report() def _analyze_loader_result(self, loader_name: str, docs: List, load_time: float, detailed: bool) -> Dict: """分析載入器結果""" if not docs: return {"error": "沒有載入到任何文件"} total_content_length = sum(len(doc.page_content) for doc in docs) total_words = sum(len(doc.page_content.split()) for doc in docs) # 收集所有 metadata 欄位 all_metadata_fields = set() for doc in docs: all_metadata_fields.update(doc.metadata.keys()) result = { "load_time": load_time, "document_count": len(docs), "total_content_length": total_content_length, "total_words": total_words, "avg_doc_length": total_content_length / len(docs), "metadata_fields": list(all_metadata_fields), "metadata_field_count": len(all_metadata_fields) } print(f" ⏱️ 載入時間: {load_time:.2f} 秒") print(f" 📄 文件數量: {len(docs)}") print(f" 📝 總內容長度: {total_content_length:,} 字符") print(f" 🔤 總詞數: {total_words:,}") print(f" 🏷️ Metadata 欄位: {len(all_metadata_fields)} 種") if detailed: print(f" 📋 Metadata 欄位詳情: {', '.join(all_metadata_fields)}") # 顯示第一個文件的內容預覽 if docs: preview = docs[0].page_content[:200].replace('\n', ' ') print(f" 👀 內容預覽: {preview}...") return result def _generate_comparison_report(self): """生成載入器比較報告""" print(f"\n{'='*80}") print("📊 PDF 載入器綜合比較報告") print(f"{'='*80}") # 過濾出成功的結果 successful_results = {name: result for name, result in self.results.items() if "error" not in result} if not successful_results: print("❌ 所有載入器都失敗了") return # 速度比較 print(f"\n⚡ 載入速度排名:") speed_ranking = sorted(successful_results.items(), key=lambda x: x[1]["load_time"]) for i, (name, result) in enumerate(speed_ranking, 1): print(f" {i}. {name}: {result['load_time']:.2f} 秒") # 內容提取量比較 print(f"\n📝 內容提取量排名:") content_ranking = sorted(successful_results.items(), key=lambda x: x[1]["total_content_length"], reverse=True) for i, (name, result) in enumerate(content_ranking, 1): print(f" {i}. {name}: {result['total_content_length']:,} 字符") # Metadata 豐富度比較 print(f"\n🏷️ Metadata 豐富度排名:") metadata_ranking = sorted(successful_results.items(), key=lambda x: x[1]["metadata_field_count"], reverse=True) for i, (name, result) in enumerate(metadata_ranking, 1): fields = result["metadata_fields"] print(f" {i}. {name}: {result['metadata_field_count']} 欄位 ({', '.join(fields[:3])}...)") # 推薦使用場景 print(f"\n🎯 使用場景推薦:") fastest = speed_ranking[0][0] most_content = content_ranking[0][0] most_metadata = metadata_ranking[0][0] print(f" 🚀 追求速度: {fastest}") print(f" 📖 內容完整性: {most_content}") print(f" 🏷️ 豐富 Metadata: {most_metadata}") return { "fastest": fastest, "most_content": most_content, "most_metadata": most_metadata, "successful_loaders": list(successful_results.keys()) } def test_special_pdf_features(self): """測試特殊 PDF 功能""" print(f"\n{'='*80}") print("🔬 特殊 PDF 功能測試") print(f"{'='*80}") # 測試 OCR 功能(PyPDF) print(f"\n📷 OCR 功能測試 (PyPDF):") try: loader = PyPDFLoader(self.pdf_path, extract_images=True) docs_with_ocr = loader.load() print(f" ✅ OCR 啟用成功,載入 {len(docs_with_ocr)} 個文件") except Exception as e: print(f" ❌ OCR 功能失敗: {e}") # 測試結構化解析(Unstructured) print(f"\n🏗️ 結構化解析測試 (Unstructured):") try: loader = UnstructuredPDFLoader(self.pdf_path, mode="elements") structured_docs = loader.load() # 分析元素類型 element_types = {} for doc in structured_docs: element_type = doc.metadata.get("category", "unknown") element_types[element_type] = element_types.get(element_type, 0) + 1 print(f" ✅ 結構化解析成功,識別出 {len(structured_docs)} 個元素") print(f" 📊 元素類型分佈:") for element_type, count in element_types.items(): print(f" {element_type}: {count}") except Exception as e: print(f" ❌ 結構化解析失敗: {e}") # 測試表格提取(PDFPlumber) print(f"\n📊 表格提取測試 (PDFPlumber):") try: loader = PDFPlumberLoader(self.pdf_path) docs = loader.load() # 檢查是否包含表格相關內容 table_indicators = ["table", "row", "column", "cell", "|", "---"] has_table_content = any( any(indicator in doc.page_content.lower() for indicator in table_indicators) for doc in docs ) print(f" {'✅' if has_table_content else '❓'} 表格內容檢測: {'發現' if has_table_content else '未發現'}可能的表格結構") except Exception as e: print(f" ❌ 表格提取測試失敗: {e}") # 使用範例 - 需要實際的 PDF 檔案 def demonstrate_pdf_comparison(): """示範 PDF 載入器比較""" # 注意:需要提供實際的 PDF 檔案路徑 pdf_path = "./data/sample.pdf" # 請替換為實際檔案路徑 try: comparator = PDFLoaderComparator(pdf_path) comparator.compare_all_loaders(detailed_analysis=True) comparator.test_special_pdf_features() except FileNotFoundError: print("❌ PDF 檔案不存在,請提供有效的 PDF 檔案路徑") # 創建模擬比較結果 print("\n📋 PDF 載入器特性比較表:") comparison_table = { "載入器": ["PyPDF", "PyMuPDF", "Unstructured", "PDFMiner", "PDFPlumber"], "速度": ["中等", "快速", "較慢", "中等", "較慢"], "文字提取": ["基礎", "優秀", "優秀", "優秀", "良好"], "表格支援": ["無", "有限", "良好", "無", "優秀"], "OCR支援": ["是", "否", "是", "否", "否"], "結構化解析": ["無", "有限", "優秀", "良好", "有限"], "Metadata": ["基礎", "豐富", "豐富", "中等", "豐富"] } import pandas as pd df = pd.DataFrame(comparison_table) print(df.to_string(index=False)) # 執行示範 demonstrate_pdf_comparison() 進階 PDF 處理技巧 class AdvancedPDFProcessor: """進階 PDF 處理器""" def __init__(self): self.processed_pdfs = {} def intelligent_pdf_loading(self, pdf_path: str, content_type: str = "auto") -> List[Document]: """智能 PDF 載入 - 根據內容類型選擇最佳載入器""" print(f"🧠 智能 PDF 載入: {pdf_path}") print(f"內容類型: {content_type}") # 根據內容類型選擇載入器 if content_type == "auto": content_type = self._detect_pdf_content_type(pdf_path) loader_map = { "text_heavy": PyPDFLoader, "structured": UnstructuredPDFLoader, "table_heavy": PDFPlumberLoader, "academic": PyMuPDFLoader, "scanned": lambda path: PyPDFLoader(path, extract_images=True) } loader_class = loader_map.get(content_type, PyPDFLoader) print(f"選擇載入器: {loader_class.__name__}") if content_type == "structured": loader = loader_class(pdf_path, mode="elements") else: loader = loader_class(pdf_path) docs = loader.load() # 後處理 processed_docs = self._post_process_pdf_docs(docs, content_type) print(f"✅ 成功載入 {len(processed_docs)} 個文件") return processed_docs def _detect_pdf_content_type(self, pdf_path: str) -> str: """檢測 PDF 內容類型""" print("🔍 檢測 PDF 內容類型...") # 使用快速載入器進行內容分析 try: loader = PyPDFLoader(pdf_path) sample_docs = loader.load() if not sample_docs: return "text_heavy" # 合併前幾頁內容進行分析 sample_content = " ".join( doc.page_content for doc in sample_docs[:3] ).lower() # 檢測模式 table_indicators = ["table", "row", "column", "|", "---", "cell"] structure_indicators = ["title", "heading", "section", "chapter"] academic_indicators = ["abstract", "introduction", "methodology", "reference", "citation"] table_score = sum(sample_content.count(indicator) for indicator in table_indicators) structure_score = sum(sample_content.count(indicator) for indicator in structure_indicators) academic_score = sum(sample_content.count(indicator) for indicator in academic_indicators) # 檢測是否為掃描文件(文字很少) if len(sample_content.strip()) < 100: detected_type = "scanned" elif table_score > 5: detected_type = "table_heavy" elif academic_score > 3: detected_type = "academic" elif structure_score > 5: detected_type = "structured" else: detected_type = "text_heavy" print(f"檢測結果: {detected_type}") print(f" 表格指標: {table_score}") print(f" 結構指標: {structure_score}") print(f" 學術指標: {academic_score}") return detected_type except Exception as e: print(f"⚠️ 檢測失敗,使用預設類型: {e}") return "text_heavy" def _post_process_pdf_docs(self, docs: List[Document], content_type: str) -> List[Document]: """PDF 文件後處理""" processed_docs = [] for doc in docs: # 清理內容 cleaned_content = self._clean_pdf_content(doc.page_content) # 增強 metadata enhanced_metadata = doc.metadata.copy() enhanced_metadata.update({ "content_type": content_type, "word_count": len(cleaned_content.split()), "char_count": len(cleaned_content), "processed_at": time.time() }) # 根據內容類型添加特殊處理 if content_type == "structured": enhanced_metadata["element_type"] = doc.metadata.get("category", "unknown") processed_doc = Document( page_content=cleaned_content, metadata=enhanced_metadata ) processed_docs.append(processed_doc) return processed_docs def _clean_pdf_content(self, content: str) -> str: """清理 PDF 內容""" import re # 移除多餘的空白和換行 content = re.sub(r'\n\s*\n', '\n\n', content) content = re.sub(r' +', ' ', content) # 移除頁眉頁腳(簡單的規則) lines = content.split('\n') cleaned_lines = [] for line in lines: line = line.strip() # 跳過可能是頁碼的行 if re.match(r'^\d+$', line): continue # 跳過太短的行(可能是頁眉頁腳) if len(line) < 3: continue cleaned_lines.append(line) return '\n'.join(cleaned_lines) def batch_process_pdfs(self, pdf_paths: List[str], max_workers: int = 4) -> Dict[str, List[Document]]: """批次處理多個 PDF 檔案""" import concurrent.futures from pathlib import Path print(f"📚 批次處理 {len(pdf_paths)} 個 PDF 檔案") print(f"使用 {max_workers} 個工作執行緒") results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任務 future_to_path = { executor.submit(self.intelligent_pdf_loading, pdf_path): pdf_path for pdf_path in pdf_paths } # 收集結果 for future in concurrent.futures.as_completed(future_to_path): pdf_path = future_to_path[future] try: docs = future.result() results[pdf_path] = docs print(f"✅ {Path(pdf_path).name}: {len(docs)} 個文件") except Exception as e: print(f"❌ {Path(pdf_path).name}: {e}") results[pdf_path] = [] print(f"\n📊 批次處理完成:") total_docs = sum(len(docs) for docs in results.values()) successful_files = len([path for path, docs in results.items() if docs]) print(f" 成功處理: {successful_files}/{len(pdf_paths)} 檔案") print(f" 總文件數: {total_docs}") return results # 使用範例 advanced_processor = AdvancedPDFProcessor() # 智能載入單個 PDF # docs = advanced_processor.intelligent_pdf_loading("./data/sample.pdf") # 批次處理多個 PDF # pdf_files = ["./data/pdf1.pdf", "./data/pdf2.pdf", "./data/pdf3.pdf"] # batch_results = advanced_processor.batch_process_pdfs(pdf_files) 網頁內容載入與爬取 進階網頁載入技術 import asyncio import aiohttp import bs4 from langchain_community.document_loaders import WebBaseLoader from langchain.schema import Document from typing import List, Dict, Optional, Callable import time import requests from urllib.parse import urljoin, urlparse import robots class AdvancedWebLoader: """進階網頁載入器""" def __init__(self, respect_robots=True, delay_between_requests=1.0): self.respect_robots = respect_robots self.delay_between_requests = delay_between_requests self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (compatible; LangChain WebLoader/1.0)' }) def load_with_custom_parser(self, url: str, parser_config: Dict) -> List[Document]: """使用自定義解析器載入網頁""" print(f"🌐 載入網頁: {url}") print(f"解析配置: {parser_config}") # 檢查 robots.txt if self.respect_robots and not self._check_robots_txt(url): raise ValueError(f"robots.txt 禁止訪問: {url}") # 構建 BeautifulSoup 解析器參數 bs_kwargs = self._build_bs_kwargs(parser_config) # 使用 WebBaseLoader loader = WebBaseLoader( web_paths=[url], bs_kwargs=bs_kwargs, header_template=parser_config.get("header_template"), requests_kwargs={ 'timeout': parser_config.get('timeout', 30), 'headers': parser_config.get('headers', {}) } ) docs = loader.load() # 後處理 processed_docs = self._post_process_web_docs(docs, parser_config) print(f"✅ 成功載入 {len(processed_docs)} 個文件") return processed_docs def _check_robots_txt(self, url: str) -> bool: """檢查 robots.txt 是否允許爬取""" try: parsed_url = urlparse(url) robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" response = self.session.get(robots_url, timeout=10) if response.status_code == 200: rp = robots.RobotFileParser() rp.set_url(robots_url) rp.read() return rp.can_fetch('*', url) return True # 如果沒有 robots.txt,假設允許 except Exception as e: print(f"⚠️ 無法檢查 robots.txt: {e}") return True def _build_bs_kwargs(self, parser_config: Dict) -> Dict: """構建 BeautifulSoup 解析器參數""" bs_kwargs = {} # 解析範圍限制 if 'parse_only' in parser_config: parse_only_config = parser_config['parse_only'] if isinstance(parse_only_config, dict): tag = parse_only_config.get('tag', 'div') attrs = parse_only_config.get('attrs', {}) bs_kwargs['parse_only'] = bs4.SoupStrainer(tag, attrs=attrs) elif isinstance(parse_only_config, str): # CSS 選擇器 bs_kwargs['parse_only'] = bs4.SoupStrainer(parse_only_config) # 解析器類型 parser = parser_config.get('parser', 'html.parser') bs_kwargs['features'] = parser return bs_kwargs def _post_process_web_docs(self, docs: List[Document], config: Dict) -> List[Document]: """網頁文件後處理""" processed_docs = [] for doc in docs: content = doc.page_content # 清理 HTML 內容 if config.get('clean_html', True): content = self._clean_html_content(content) # 提取特定資訊 if config.get('extract_links', False): links = self._extract_links(doc.page_content, doc.metadata.get('source', '')) doc.metadata['extracted_links'] = links # 內容過濾 min_length = config.get('min_content_length', 0) if len(content.strip()) < min_length: continue # 增強 metadata enhanced_metadata = doc.metadata.copy() enhanced_metadata.update({ 'content_length': len(content), 'word_count': len(content.split()), 'processed_at': time.time(), 'loader_config': config.get('name', 'custom') }) processed_doc = Document( page_content=content, metadata=enhanced_metadata ) processed_docs.append(processed_doc) return processed_docs def _clean_html_content(self, content: str) -> str: """清理 HTML 內容""" import re # 移除多餘的空白 content = re.sub(r'\s+', ' ', content) content = re.sub(r'\n\s*\n', '\n\n', content) # 移除常見的網頁雜訊 noise_patterns = [ r'Cookie', r'Privacy Policy', r'Terms of Service', r'Subscribe to newsletter', r'Follow us on', r'Share this article' ] for pattern in noise_patterns: content = re.sub(pattern, '', content, flags=re.IGNORECASE) return content.strip() def _extract_links(self, html_content: str, base_url: str) -> List[Dict]: """從 HTML 內容中提取連結""" soup = bs4.BeautifulSoup(html_content, 'html.parser') links = [] for link in soup.find_all('a', href=True): href = link['href'] text = link.get_text(strip=True) # 轉換為絕對 URL absolute_url = urljoin(base_url, href) links.append({ 'url': absolute_url, 'text': text, 'title': link.get('title', '') }) return links async def batch_load_async(self, urls: List[str], parser_configs: List[Dict], max_concurrent: int = 5) -> Dict[str, List[Document]]: """異步批次載入多個網頁""" print(f"🚀 異步載入 {len(urls)} 個網頁") print(f"最大並發數: {max_concurrent}") semaphore = asyncio.Semaphore(max_concurrent) results = {} async def load_single_url(url: str, config: Dict): async with semaphore: try: # 在這裡我們使用同步方法,實際應用中可能需要真正的異步 HTTP 客戶端 docs = await asyncio.to_thread( self.load_with_custom_parser, url, config ) results[url] = docs print(f"✅ {url}: {len(docs)} 個文件") except Exception as e: print(f"❌ {url}: {e}") results[url] = [] # 請求間延遲 await asyncio.sleep(self.delay_between_requests) # 創建任務 tasks = [] for i, url in enumerate(urls): config = parser_configs[i] if i < len(parser_configs) else parser_configs[0] tasks.append(load_single_url(url, config)) # 執行所有任務 await asyncio.gather(*tasks) print(f"\n📊 異步載入完成:") total_docs = sum(len(docs) for docs in results.values()) successful_urls = len([url for url, docs in results.items() if docs]) print(f" 成功載入: {successful_urls}/{len(urls)} 網頁") print(f" 總文件數: {total_docs}") return results def crawl_sitemap(self, sitemap_url: str, max_pages: int = 100) -> List[str]: """從 sitemap 提取 URL 列表""" print(f"🗺️ 解析 Sitemap: {sitemap_url}") try: loader = WebBaseLoader( web_paths=[sitemap_url], bs_kwargs={"features": "xml"} ) docs = loader.load() if not docs: raise ValueError("無法載入 sitemap") # 解析 XML 內容 soup = bs4.BeautifulSoup(docs[0].page_content, 'xml') # 提取 URL urls = [] for loc in soup.find_all('loc'): url = loc.get_text(strip=True) if url: urls.append(url) if len(urls) >= max_pages: break print(f"✅ 從 sitemap 提取 {len(urls)} 個 URL") return urls except Exception as e: print(f"❌ Sitemap 解析失敗: {e}") return [] # 預定義的解析器配置 PARSER_CONFIGS = { "blog_post": { "name": "blog_post", "parse_only": { "tag": "article", "attrs": {} }, "clean_html": True, "extract_links": True, "min_content_length": 100 }, "news_article": { "name": "news_article", "parse_only": { "tag": "div", "attrs": {"class": ["article-content", "post-content", "entry-content"]} }, "clean_html": True, "extract_links": False, "min_content_length": 200 }, "product_page": { "name": "product_page", "parse_only": { "tag": "div", "attrs": {"class": ["product-description", "product-details"]} }, "clean_html": True, "extract_links": True, "min_content_length": 50 }, "documentation": { "name": "documentation", "parse_only": { "tag": "main", "attrs": {} }, "clean_html": True, "extract_links": True, "min_content_length": 100, "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" } } } # 使用範例 def demonstrate_advanced_web_loading(): """示範進階網頁載入功能""" web_loader = AdvancedWebLoader( respect_robots=True, delay_between_requests=1.0 ) # 測試 URL 列表 test_urls = [ "https://lilianweng.github.io/posts/2023-06-23-agent/", "https://python.langchain.com/docs/modules/data_connection/document_loaders/", "https://docs.python.org/3/tutorial/" ] print("🎯 進階網頁載入示範") print("=" * 60) # 單個網頁載入示範 print("\n📄 單個網頁載入:") try: docs = web_loader.load_with_custom_parser( test_urls[0], PARSER_CONFIGS["blog_post"] ) if docs: print(f"載入成功: {len(docs)} 個文件") print(f"內容預覽: {docs[0].page_content[:200]}...") print(f"Metadata: {docs[0].metadata}") except Exception as e: print(f"載入失敗: {e}") # 異步批次載入示範 print("\n🚀 異步批次載入示範:") async def async_demo(): try: configs = [ PARSER_CONFIGS["blog_post"], PARSER_CONFIGS["documentation"], PARSER_CONFIGS["documentation"] ] results = await web_loader.batch_load_async( test_urls, configs, max_concurrent=2 ) for url, docs in results.items(): print(f"{url}: {len(docs)} 文件") except Exception as e: print(f"異步載入失敗: {e}") # 運行異步示範 try: asyncio.run(async_demo()) except Exception as e: print(f"異步示範失敗: {e}") # 執行示範 demonstrate_advanced_web_loading() 結構化資料處理 CSV 和資料庫載入器詳解 import pandas as pd import sqlite3 import json from langchain_community.document_loaders.csv_loader import CSVLoader from langchain_community.document_loaders import ( UnstructuredExcelLoader, DataFrameLoader ) from langchain.schema import Document from typing import List, Dict, Any, Optional, Callable import numpy as np class StructuredDataProcessor: """結構化資料處理器""" def __init__(self): self.processors = { 'csv': self._process_csv, 'excel': self._process_excel, 'dataframe': self._process_dataframe, 'json': self._process_json, 'sql': self._process_sql } def process_structured_data(self, data_source: str, data_type: str, processing_config: Dict = None) -> List[Document]: """處理結構化資料""" print(f"📊 處理結構化資料") print(f"資料來源: {data_source}") print(f"資料類型: {data_type}") config = processing_config or {} if data_type not in self.processors: raise ValueError(f"不支援的資料類型: {data_type}") processor = self.processors[data_type] docs = processor(data_source, config) # 統計分析 self._analyze_structured_data(docs, data_type) return docs def _process_csv(self, csv_path: str, config: Dict) -> List[Document]: """處理 CSV 檔案""" print(f"📄 處理 CSV: {csv_path}") # 基本載入 encoding = config.get('encoding', 'utf-8') source_column = config.get('source_column') loader = CSVLoader( file_path=csv_path, encoding=encoding, source_column=source_column ) basic_docs = loader.load() # 進階處理選項 if config.get('use_pandas', False): return self._process_csv_with_pandas(csv_path, config) # 增強 metadata enhanced_docs = [] for i, doc in enumerate(basic_docs): # 解析 CSV 行資料 row_data = self._parse_csv_row(doc.page_content) enhanced_metadata = doc.metadata.copy() enhanced_metadata.update({ 'row_number': i + 1, 'column_count': len(row_data), 'data_type': 'csv_row', 'has_missing_values': any(not value or value.strip() == '' for value in row_data.values()) }) enhanced_doc = Document( page_content=doc.page_content, metadata=enhanced_metadata ) enhanced_docs.append(enhanced_doc) return enhanced_docs def _process_csv_with_pandas(self, csv_path: str, config: Dict) -> List[Document]: """使用 Pandas 處理 CSV""" print("🐼 使用 Pandas 處理 CSV") # 讀取 CSV pd_kwargs = config.get('pandas_kwargs', {}) df = pd.read_csv(csv_path, **pd_kwargs) print(f"載入 DataFrame: {df.shape[0]} 行 x {df.shape[1]} 列") # 資料清理 if config.get('clean_data', True): df = self._clean_dataframe(df) # 根據處理模式生成文件 mode = config.get('mode', 'row_by_row') if mode == 'row_by_row': return self._dataframe_to_docs_by_row(df, config) elif mode == 'column_analysis': return self._dataframe_to_docs_by_column(df, config) elif mode == 'summary': return self._dataframe_to_summary_docs(df, config) else: raise ValueError(f"未知的處理模式: {mode}") def _clean_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: """清理 DataFrame""" print("🧹 清理 DataFrame") original_shape = df.shape # 移除完全空白的行 df = df.dropna(how='all') # 處理缺失值 for column in df.columns: if df[column].dtype == 'object': df[column] = df[column].fillna('') else: df[column] = df[column].fillna(0) # 移除重複行 df = df.drop_duplicates() print(f"清理結果: {original_shape} -> {df.shape}") return df def _dataframe_to_docs_by_row(self, df: pd.DataFrame, config: Dict) -> List[Document]: """將 DataFrame 按行轉換為文件""" content_columns = config.get('content_columns', list(df.columns)) page_content_template = config.get('page_content_template') docs = [] for index, row in df.iterrows(): # 生成頁面內容 if page_content_template: page_content = page_content_template.format(**row.to_dict()) else: # 預設格式 content_parts = [] for col in content_columns: if col in row and pd.notna(row[col]) and str(row[col]).strip(): content_parts.append(f"{col}: {row[col]}") page_content = "\n".join(content_parts) # 生成 metadata metadata = { 'source': config.get('source', 'dataframe'), 'row_index': index, 'data_type': 'dataframe_row' } # 添加數值型資料統計 numeric_columns = df.select_dtypes(include=[np.number]).columns if len(numeric_columns) > 0: numeric_data = row[numeric_columns].to_dict() metadata['numeric_summary'] = { col: float(val) for col, val in numeric_data.items() if pd.notna(val) } # 添加分類型資料 categorical_columns = df.select_dtypes(include=['object']).columns if len(categorical_columns) > 0: categorical_data = row[categorical_columns].to_dict() metadata['categorical_data'] = { col: str(val) for col, val in categorical_data.items() if pd.notna(val) and str(val).strip() } doc = Document(page_content=page_content, metadata=metadata) docs.append(doc) return docs def _dataframe_to_docs_by_column(self, df: pd.DataFrame, config: Dict) -> List[Document]: """將 DataFrame 按列分析轉換為文件""" docs = [] for column in df.columns: series = df[column] # 生成列分析內容 analysis = self._analyze_series(series) page_content = f"Column Analysis: {column}\n" page_content += f"Data Type: {analysis['dtype']}\n" page_content += f"Non-null Count: {analysis['non_null_count']}\n" page_content += f"Unique Values: {analysis['unique_count']}\n" if analysis['dtype'] in ['int64', 'float64']: page_content += f"Mean: {analysis.get('mean', 'N/A')}\n" page_content += f"Std: {analysis.get('std', 'N/A')}\n" page_content += f"Min: {analysis.get('min', 'N/A')}\n" page_content += f"Max: {analysis.get('max', 'N/A')}\n" else: page_content += f"Most Common: {analysis.get('most_common', 'N/A')}\n" metadata = { 'source': config.get('source', 'dataframe'), 'column_name': column, 'data_type': 'column_analysis', 'analysis': analysis } doc = Document(page_content=page_content, metadata=metadata) docs.append(doc) return docs def _analyze_series(self, series: pd.Series) -> Dict: """分析 Pandas Series""" analysis = { 'dtype': str(series.dtype), 'non_null_count': series.count(), 'null_count': series.isnull().sum(), 'unique_count': series.nunique() } if series.dtype in ['int64', 'float64']: analysis.update({ 'mean': series.mean(), 'std': series.std(), 'min': series.min(), 'max': series.max(), 'median': series.median() }) else: if analysis['unique_count'] > 0: most_common = series.value_counts().head(3) analysis['most_common'] = most_common.to_dict() return analysis def _process_excel(self, excel_path: str, config: Dict) -> List[Document]: """處理 Excel 檔案""" print(f"📊 處理 Excel: {excel_path}") # 使用 UnstructuredExcelLoader mode = config.get('mode', 'single') loader = UnstructuredExcelLoader(excel_path, mode=mode) docs = loader.load() # 如果需要更詳細的處理,使用 pandas if config.get('detailed_processing', False): return self._process_excel_with_pandas(excel_path, config) return docs def _process_excel_with_pandas(self, excel_path: str, config: Dict) -> List[Document]: """使用 Pandas 詳細處理 Excel""" print("🐼 使用 Pandas 處理 Excel") # 讀取所有工作表 excel_file = pd.ExcelFile(excel_path) all_docs = [] sheet_names = config.get('sheet_names', excel_file.sheet_names) for sheet_name in sheet_names: print(f"處理工作表: {sheet_name}") df = pd.read_excel(excel_path, sheet_name=sheet_name) # 為這個工作表創建配置 sheet_config = config.copy() sheet_config['source'] = f"{excel_path}#{sheet_name}" # 處理工作表 if config.get('clean_data', True): df = self._clean_dataframe(df) sheet_docs = self._dataframe_to_docs_by_row(df, sheet_config) # 為每個文件添加工作表資訊 for doc in sheet_docs: doc.metadata['sheet_name'] = sheet_name doc.metadata['excel_file'] = excel_path all_docs.extend(sheet_docs) return all_docs def _parse_csv_row(self, content: str) -> Dict[str, str]: """解析 CSV 行內容""" import csv import io # 簡單的 CSV 行解析 reader = csv.DictReader(io.StringIO(content)) try: return next(reader) except: # 如果解析失敗,返回原始內容 return {'content': content} def _analyze_structured_data(self, docs: List[Document], data_type: str): """分析結構化資料載入結果""" print(f"\n📈 結構化資料分析結果:") print(f"資料類型: {data_type}") print(f"文件數量: {len(docs)}") if not docs: print("❌ 沒有載入任何文件") return # 內容長度統計 content_lengths = [len(doc.page_content) for doc in docs] print(f"內容長度統計:") print(f" 平均: {np.mean(content_lengths):.0f} 字符") print(f" 最短: {min(content_lengths)} 字符") print(f" 最長: {max(content_lengths)} 字符") # Metadata 分析 all_metadata_keys = set() for doc in docs: all_metadata_keys.update(doc.metadata.keys()) print(f"Metadata 欄位: {len(all_metadata_keys)} 種") for key in sorted(all_metadata_keys): count = sum(1 for doc in docs if key in doc.metadata) print(f" {key}: {count}/{len(docs)} 文件") # 使用範例 def demonstrate_structured_data_processing(): """示範結構化資料處理""" processor = StructuredDataProcessor() print("🎯 結構化資料處理示範") print("=" * 60) # 創建範例 CSV 資料 sample_data = { 'name': ['Alice', 'Bob', 'Charlie', 'Diana'], 'age': [25, 30, 35, 28], 'city': ['New York', 'London', 'Tokyo', 'Paris'], 'occupation': ['Engineer', 'Designer', 'Teacher', 'Doctor'] } df = pd.DataFrame(sample_data) csv_path = './sample_data.csv' df.to_csv(csv_path, index=False) print(f"✅ 創建範例 CSV: {csv_path}") # 基本 CSV 處理 print("\n📄 基本 CSV 處理:") basic_config = { 'encoding': 'utf-8', 'source_column': 'name' } try: basic_docs = processor.process_structured_data( csv_path, 'csv', basic_config ) if basic_docs: print(f"第一個文件內容: {basic_docs[0].page_content}") print(f"第一個文件 Metadata: {basic_docs[0].metadata}") except Exception as e: print(f"基本處理失敗: {e}") # 進階 Pandas 處理 print("\n🐼 進階 Pandas 處理:") pandas_config = { 'use_pandas': True, 'clean_data': True, 'mode': 'row_by_row', 'content_columns': ['name', 'age', 'city', 'occupation'], 'page_content_template': "Person: {name}, Age: {age}, City: {city}, Job: {occupation}" } try: pandas_docs = processor.process_structured_data( csv_path, 'csv', pandas_config ) if pandas_docs: print(f"Pandas 處理結果: {len(pandas_docs)} 文件") print(f"第一個文件內容: {pandas_docs[0].page_content}") except Exception as e: print(f"Pandas 處理失敗: {e}") # 列分析模式 print("\n📊 列分析模式:") column_config = { 'use_pandas': True, 'mode': 'column_analysis' } try: column_docs = processor.process_structured_data( csv_path, 'csv', column_config ) if column_docs: print(f"列分析結果: {len(column_docs)} 文件") for doc in column_docs[:2]: # 顯示前兩個 print(f"列分析: {doc.page_content[:100]}...") except Exception as e: print(f"列分析失敗: {e}") # 清理範例檔案 import os try: os.remove(csv_path) print(f"\n🧹 已清理範例檔案: {csv_path}") except: pass # 執行示範 demonstrate_structured_data_processing() Office 文檔處理 Word 和 PowerPoint 進階處理 ```pythonfrom langchain_community.document_loaders import ( Docx2txtLoader, UnstructuredWordDocumentLoader, UnstructuredPowerPointLoader)from langchain.schema import Documentimport zipfileimport xml.etree.ElementTree as ETfrom typing import List, Dict, Optionalimport re class OfficeDocumentProcessor: """Office