From 5f9e75d58d3f2090f58132128f0b73152741d700 Mon Sep 17 00:00:00 2001 From: haoyuren <13851610112@163.com> Date: Sun, 29 Jun 2025 17:27:47 -0700 Subject: remove limit --- scripts/__pycache__/fetch_papers.cpython-312.pyc | Bin 36787 -> 45572 bytes scripts/fetch_papers.py | 198 +++++++++++++++++- scripts/test_unlimited_historical.py | 248 +++++++++++++++++++++++ 3 files changed, 443 insertions(+), 3 deletions(-) create mode 100644 scripts/test_unlimited_historical.py (limited to 'scripts') diff --git a/scripts/__pycache__/fetch_papers.cpython-312.pyc b/scripts/__pycache__/fetch_papers.cpython-312.pyc index 47d73c0..b5ff943 100644 Binary files a/scripts/__pycache__/fetch_papers.cpython-312.pyc and b/scripts/__pycache__/fetch_papers.cpython-312.pyc differ diff --git a/scripts/fetch_papers.py b/scripts/fetch_papers.py index fd3e628..7920a94 100644 --- a/scripts/fetch_papers.py +++ b/scripts/fetch_papers.py @@ -486,18 +486,32 @@ class ArxivPaperFetcher: end_date = datetime.now(timezone.utc) start_date = end_date - timedelta(days=years * 365) + # 从环境变量获取限制配置 + max_papers = int(os.getenv("MAX_HISTORICAL_PAPERS", "50000")) # 默认50000篇 + max_per_category = int(os.getenv("MAX_PAPERS_PER_CATEGORY", "10000")) # 每类别10000篇 + logger.info(f"📚 历史模式: 获取过去 {years} 年的论文") logger.info(f"🕐 时间范围: {start_date.strftime('%Y-%m-%d')} ~ {end_date.strftime('%Y-%m-%d')}") - logger.info(f"⚠️ 注意: 历史模式最多处理 5000 篇论文,可能需要较长时间") + logger.info(f"📊 配置限制:") + logger.info(f" - 最大论文数: {max_papers:,} 篇") + logger.info(f" - 每类别限制: {max_per_category:,} 篇") + + if max_papers >= 20000: + logger.info(f"⚠️ 大规模历史模式: 这可能需要很长时间和大量API调用") + logger.info(f"💡 建议: 可以通过环境变量调整限制") + logger.info(f" - MAX_HISTORICAL_PAPERS={max_papers}") + logger.info(f" - MAX_PAPERS_PER_CATEGORY={max_per_category}") - papers = self.fetch_papers_by_date_range(start_date, end_date, max_papers=5000) + papers = self.fetch_papers_by_date_range_unlimited( + start_date, end_date, max_papers=max_papers, max_per_category=max_per_category + ) if papers: logger.info(f"📋 开始GPT-4o智能过滤阶段...") # 历史模式默认使用更高的并发数(因为论文数量多) use_parallel = os.getenv("USE_PARALLEL", "true").lower() == "true" - max_concurrent = int(os.getenv("MAX_CONCURRENT", "25")) # 历史模式默认更高并发 + max_concurrent = int(os.getenv("MAX_CONCURRENT", "50")) # 历史模式默认更高并发 return self.filter_papers_with_gpt(papers, use_parallel=use_parallel, max_concurrent=max_concurrent) @@ -505,6 +519,184 @@ class ArxivPaperFetcher: logger.warning("⚠️ 未获取到任何论文,跳过GPT过滤步骤") return [] + def fetch_papers_by_date_range_unlimited(self, start_date: datetime, end_date: datetime, + max_papers: int = 50000, max_per_category: int = 10000) -> List[Dict]: + """ + Fetch papers by date range with higher limits for historical mode. + + Args: + start_date: Start date for paper search + end_date: End date for paper search + max_papers: Maximum total papers to fetch + max_per_category: Maximum papers per category + + Returns: + List of paper dictionaries + """ + logger.info(f"🔍 开始获取论文 - 无限制模式") + logger.info(f"🕐 时间范围: {start_date.strftime('%Y-%m-%d %H:%M')} UTC ~ {end_date.strftime('%Y-%m-%d %H:%M')} UTC") + logger.info(f"📊 搜索配置:") + logger.info(f" - 最大论文数: {max_papers:,}") + logger.info(f" - 每类别限制: {max_per_category:,}") + logger.info(f" - 搜索类别: {len(CS_CATEGORIES)} 个") + + all_papers_dict = {} # 使用字典去重 + total_raw_papers = 0 + total_categories_processed = 0 + + # 分别查询每个类别 + for category in CS_CATEGORIES: + total_categories_processed += 1 + logger.info(f"📂 处理类别 {total_categories_processed}/{len(CS_CATEGORIES)}: {category}") + + category_papers = self._fetch_papers_for_category_unlimited( + category, start_date, end_date, max_papers_per_category=max_per_category + ) + + # 合并到总结果中(去重) + new_papers_count = 0 + for paper in category_papers: + arxiv_id = paper['arxiv_id'] + if arxiv_id not in all_papers_dict: + all_papers_dict[arxiv_id] = paper + new_papers_count += 1 + + # 检查是否达到总数限制 + if len(all_papers_dict) >= max_papers: + logger.info(f"⚠️ 达到最大论文数 {max_papers:,},停止获取") + break + + total_raw_papers += len(category_papers) + logger.info(f" ✅ {category}: 获得{len(category_papers):,}篇, 新增{new_papers_count:,}篇") + + # 如果达到总数限制,停止 + if len(all_papers_dict) >= max_papers: + break + + # 转换为列表并按日期排序 + all_papers = list(all_papers_dict.values()) + all_papers.sort(key=lambda x: x['updated'], reverse=True) + + logger.info(f"📊 抓取总结:") + logger.info(f" - 处理了 {total_categories_processed} 个类别") + logger.info(f" - 从arXiv获取了 {total_raw_papers:,} 篇原始论文") + logger.info(f" - 去重后得到 {len(all_papers):,} 篇唯一论文") + + # 显示类别分布 + if all_papers: + from collections import Counter + + # 日期分布 + dates = [] + for paper in all_papers: + paper_date = datetime.strptime(paper['updated'][:10], '%Y-%m-%d') + dates.append(paper_date.strftime('%Y-%m-%d')) + + date_counts = Counter(dates) + logger.info(f"📅 论文日期分布 (前10天):") + for date, count in date_counts.most_common(10): + days_ago = (datetime.now(timezone.utc).date() - datetime.strptime(date, '%Y-%m-%d').date()).days + logger.info(f" - {date}: {count:,}篇 ({days_ago}天前)") + + # 类别分布 + category_counts = Counter() + for paper in all_papers: + for cat in paper['categories']: + if cat in CS_CATEGORIES: + category_counts[cat] += 1 + + logger.info(f"📊 类别分布:") + for cat, count in category_counts.most_common(): + logger.info(f" - {cat}: {count:,}篇") + + return all_papers + + def _fetch_papers_for_category_unlimited(self, category: str, start_date: datetime, + end_date: datetime, max_papers_per_category: int = 10000) -> List[Dict]: + """ + Fetch papers for a specific category with higher limits. + + Args: + category: arXiv category (e.g., 'cs.AI') + start_date: Start date for paper search + end_date: End date for paper search + max_papers_per_category: Maximum papers to fetch for this category + + Returns: + List of paper dictionaries for this category + """ + papers = [] + start_index = 0 + batch_count = 0 + api_calls = 0 + max_api_calls = max_papers_per_category // MAX_RESULTS_PER_BATCH + 100 # 动态计算API调用限制 + + logger.info(f" 🎯 {category}: 开始获取,目标最多{max_papers_per_category:,}篇论文") + + while len(papers) < max_papers_per_category and api_calls < max_api_calls: + try: + batch_count += 1 + api_calls += 1 + + params = { + "search_query": f"cat:{category}", + "sortBy": "submittedDate", + "sortOrder": "descending", + "start": start_index, + "max_results": min(MAX_RESULTS_PER_BATCH, max_papers_per_category - len(papers)) + } + + if batch_count % 10 == 0: # 每10批次显示一次详细进度 + logger.info(f" 📦 {category}第{batch_count}批次: 从索引{start_index}开始,已获取{len(papers):,}篇...") + + response = self.session.get(ARXIV_BASE_URL, params=params, timeout=30) + response.raise_for_status() + + feed = feedparser.parse(response.content) + entries = feed.entries + + logger.debug(f" ✅ {category}第{batch_count}批次获取了 {len(entries)} 篇论文") + + if not entries: + logger.debug(f" 📭 {category}: 没有更多论文") + break + + # Filter papers by date + batch_papers = [] + older_papers = 0 + for entry in entries: + paper_date = datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc) + + if paper_date < start_date: + older_papers += 1 + continue + + if start_date <= paper_date <= end_date: + paper_data = self._parse_paper_entry(entry) + batch_papers.append(paper_data) + + papers.extend(batch_papers) + logger.debug(f" 📊 {category}第{batch_count}批次: {len(batch_papers)}篇符合日期, {older_papers}篇过旧") + + # If we found older papers, we can stop + if older_papers > 0: + logger.debug(f" 🔚 {category}: 发现过旧论文,停止") + break + + # If we got fewer papers than requested, we've reached the end + if len(entries) < MAX_RESULTS_PER_BATCH: + logger.debug(f" 🔚 {category}: 到达数据末尾") + break + + start_index += MAX_RESULTS_PER_BATCH + + except Exception as e: + logger.error(f" ❌ {category}抓取出错: {e}") + break + + logger.info(f" ✅ {category}: 完成,获取{len(papers):,}篇论文 (API调用{api_calls}次)") + return papers + class GitHubUpdater: """Handle GitHub repository updates.""" diff --git a/scripts/test_unlimited_historical.py b/scripts/test_unlimited_historical.py new file mode 100644 index 0000000..f17a964 --- /dev/null +++ b/scripts/test_unlimited_historical.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +""" +测试无限制历史模式 + +验证系统是否能处理大规模历史数据获取, +测试不同的配置参数和性能表现。 +""" + +import os +import sys +import time +import logging + +# 设置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler(sys.stdout)] +) +logger = logging.getLogger(__name__) + +# Add the parent directory to the path so we can import the main module +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from scripts.fetch_papers import ArxivPaperFetcher + + +def test_configuration_options(): + """测试不同的配置选项""" + + print("🔍 测试无限制历史模式配置") + print("=" * 60) + + # 测试不同的配置场景 + test_scenarios = [ + { + "name": "小规模测试", + "MAX_HISTORICAL_PAPERS": "1000", + "MAX_PAPERS_PER_CATEGORY": "200", + "MAX_CONCURRENT": "10", + "description": "适合快速测试和开发" + }, + { + "name": "中规模测试", + "MAX_HISTORICAL_PAPERS": "5000", + "MAX_PAPERS_PER_CATEGORY": "1000", + "MAX_CONCURRENT": "25", + "description": "适合日常使用" + }, + { + "name": "大规模测试", + "MAX_HISTORICAL_PAPERS": "50000", + "MAX_PAPERS_PER_CATEGORY": "10000", + "MAX_CONCURRENT": "50", + "description": "适合完整历史数据获取" + }, + { + "name": "超大规模测试", + "MAX_HISTORICAL_PAPERS": "100000", + "MAX_PAPERS_PER_CATEGORY": "20000", + "MAX_CONCURRENT": "100", + "description": "适合研究级别的数据挖掘" + } + ] + + print("📊 支持的配置场景:") + for i, scenario in enumerate(test_scenarios, 1): + print(f"\n{i}. {scenario['name']}:") + print(f" - 最大论文数: {int(scenario['MAX_HISTORICAL_PAPERS']):,}") + print(f" - 每类别限制: {int(scenario['MAX_PAPERS_PER_CATEGORY']):,}") + print(f" - 并发数: {scenario['MAX_CONCURRENT']}") + print(f" - 描述: {scenario['description']}") + + # 计算理论性能 + print(f"\n⚡ 理论性能估算:") + print(f" 基于以下假设:") + print(f" - 每篇论文GPT处理时间: 1-2秒") + print(f" - 并行处理加速比: 10-20x") + print(f" - 网络延迟和API限制: 考虑在内") + + for scenario in test_scenarios: + max_papers = int(scenario['MAX_HISTORICAL_PAPERS']) + concurrent = int(scenario['MAX_CONCURRENT']) + + # 串行时间估算 + serial_time = max_papers * 1.5 # 1.5秒每篇 + + # 并行时间估算 + parallel_time = max_papers / concurrent * 1.5 + 60 # 额外60秒开销 + + print(f"\n {scenario['name']}:") + print(f" - 串行处理时间: {serial_time/3600:.1f} 小时") + print(f" - 并行处理时间: {parallel_time/3600:.1f} 小时") + print(f" - 加速比: {serial_time/parallel_time:.1f}x") + + +def test_memory_requirements(): + """测试内存需求""" + + print(f"\n" + "="*60) + print("💾 内存需求分析") + print("="*60) + + # 估算每篇论文的内存占用 + avg_title_length = 100 # 平均标题长度 + avg_abstract_length = 1500 # 平均摘要长度 + avg_authors = 4 # 平均作者数 + avg_categories = 2 # 平均类别数 + + # 每篇论文大约的内存占用(字符数) + chars_per_paper = ( + avg_title_length + + avg_abstract_length + + avg_authors * 30 + # 每个作者约30字符 + avg_categories * 10 + # 每个类别约10字符 + 200 # 其他字段 + ) + + bytes_per_paper = chars_per_paper * 2 # 假设每字符2字节(UTF-8) + + print(f"📊 每篇论文内存占用估算:") + print(f" - 标题: ~{avg_title_length} 字符") + print(f" - 摘要: ~{avg_abstract_length} 字符") + print(f" - 作者: ~{avg_authors * 30} 字符") + print(f" - 类别: ~{avg_categories * 10} 字符") + print(f" - 其他: ~200 字符") + print(f" - 总计: ~{chars_per_paper} 字符 (~{bytes_per_paper/1024:.1f} KB)") + + # 不同规模的内存需求 + paper_counts = [1000, 5000, 20000, 50000, 100000] + + print(f"\n📈 不同规模的内存需求:") + for count in paper_counts: + total_mb = count * bytes_per_paper / 1024 / 1024 + print(f" - {count:,} 篇论文: ~{total_mb:.1f} MB") + + print(f"\n💡 建议:") + print(f" - 16GB内存: 支持最多 ~100,000 篇论文") + print(f" - 8GB内存: 支持最多 ~50,000 篇论文") + print(f" - 4GB内存: 支持最多 ~20,000 篇论文") + print(f" - 如果内存不足,可以降低MAX_HISTORICAL_PAPERS") + + +def test_api_cost_estimation(): + """测试API成本估算""" + + print(f"\n" + "="*60) + print("💰 API成本估算") + print("="*60) + + # OpenAI GPT-4o 价格 (2024年价格) + # Input: $2.50 per 1M tokens + # Output: $10.00 per 1M tokens + input_price_per_1m = 2.50 + output_price_per_1m = 10.00 + + # 估算每篇论文的token消耗 + avg_input_tokens = 400 # 标题+摘要+系统prompt + avg_output_tokens = 1 # 只返回"0"或"1" + + cost_per_paper = ( + (avg_input_tokens / 1000000) * input_price_per_1m + + (avg_output_tokens / 1000000) * output_price_per_1m + ) + + print(f"📊 每篇论文API成本估算:") + print(f" - 输入tokens: ~{avg_input_tokens}") + print(f" - 输出tokens: ~{avg_output_tokens}") + print(f" - 每篇成本: ~${cost_per_paper:.4f}") + + # 不同规模的成本 + paper_counts = [1000, 5000, 20000, 50000, 100000] + + print(f"\n💸 不同规模的API成本:") + for count in paper_counts: + total_cost = count * cost_per_paper + print(f" - {count:,} 篇论文: ~${total_cost:.2f}") + + print(f"\n🎯 成本优化建议:") + print(f" - 先用小规模测试验证效果") + print(f" - 使用MAX_HISTORICAL_PAPERS控制规模") + print(f" - 考虑分批处理大规模数据") + print(f" - 监控API使用量避免超支") + + +def demonstrate_configuration(): + """演示配置使用方法""" + + print(f"\n" + "="*60) + print("🛠️ 配置使用方法") + print("="*60) + + print(f"🔧 环境变量配置:") + print(f""" +# 基础配置 (推荐用于测试) +export MAX_HISTORICAL_PAPERS=1000 +export MAX_PAPERS_PER_CATEGORY=200 +export MAX_CONCURRENT=10 + +# 中等规模配置 (推荐用于日常使用) +export MAX_HISTORICAL_PAPERS=5000 +export MAX_PAPERS_PER_CATEGORY=1000 +export MAX_CONCURRENT=25 + +# 大规模配置 (推荐用于研究) +export MAX_HISTORICAL_PAPERS=50000 +export MAX_PAPERS_PER_CATEGORY=10000 +export MAX_CONCURRENT=50 + +# 无限制配置 (谨慎使用) +export MAX_HISTORICAL_PAPERS=1000000 +export MAX_PAPERS_PER_CATEGORY=100000 +export MAX_CONCURRENT=100 +""") + + print(f"🚀 运行命令:") + print(f""" +# 使用默认配置运行历史模式 +FETCH_MODE=historical python scripts/fetch_papers.py + +# 使用自定义配置运行 +MAX_HISTORICAL_PAPERS=10000 \\ +MAX_PAPERS_PER_CATEGORY=2000 \\ +MAX_CONCURRENT=30 \\ +FETCH_MODE=historical \\ +python scripts/fetch_papers.py +""") + + print(f"⚠️ 注意事项:") + print(f" - 首次运行建议使用小规模配置") + print(f" - 监控内存使用情况") + print(f" - 注意API成本控制") + print(f" - 考虑网络稳定性") + print(f" - 大规模运行可能需要数小时") + + +if __name__ == "__main__": + print("🎯 ArXiv无限制历史模式测试") + print("=" * 60) + + test_configuration_options() + test_memory_requirements() + test_api_cost_estimation() + demonstrate_configuration() + + print(f"\n✅ 测试完成!") + print(f"💡 现在可以根据需求配置合适的参数来运行历史模式") + print(f"🚀 建议先从小规模开始测试,确保一切正常后再扩大规模") \ No newline at end of file -- cgit v1.2.3