summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/__pycache__/fetch_papers.cpython-312.pycbin0 -> 26121 bytes
-rw-r--r--scripts/debug_fetch.py200
-rw-r--r--scripts/fetch_papers.py237
-rw-r--r--scripts/test_arxiv_only.py150
-rw-r--r--scripts/test_fetch_only.py169
-rw-r--r--scripts/test_improved_fetch.py168
6 files changed, 888 insertions, 36 deletions
diff --git a/scripts/__pycache__/fetch_papers.cpython-312.pyc b/scripts/__pycache__/fetch_papers.cpython-312.pyc
new file mode 100644
index 0000000..afe99e8
--- /dev/null
+++ b/scripts/__pycache__/fetch_papers.cpython-312.pyc
Binary files differ
diff --git a/scripts/debug_fetch.py b/scripts/debug_fetch.py
new file mode 100644
index 0000000..100fc94
--- /dev/null
+++ b/scripts/debug_fetch.py
@@ -0,0 +1,200 @@
+#!/usr/bin/env python3
+"""
+调试脚本 - 详细显示论文抓取过程
+
+这个脚本专门用于调试和诊断论文抓取系统,会显示每个步骤的详细信息,
+帮助用户了解系统是否正常工作,以及在哪个环节可能出现问题。
+"""
+
+import os
+import sys
+import logging
+from datetime import datetime, timezone, timedelta
+
+# 设置详细的调试日志
+logging.basicConfig(
+ level=logging.DEBUG,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ handlers=[
+ logging.StreamHandler(sys.stdout),
+ ]
+)
+
+# Add the parent directory to the path so we can import the main module
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from scripts.fetch_papers import ArxivPaperFetcher
+
+
+def debug_arxiv_connection():
+ """调试arXiv连接"""
+ print("🔍 测试arXiv API连接...")
+
+ import requests
+ import feedparser
+
+ try:
+ # 测试最基本的arXiv查询
+ url = "http://export.arxiv.org/api/query"
+ params = {
+ "search_query": "cat:cs.AI",
+ "sortBy": "submittedDate",
+ "sortOrder": "descending",
+ "max_results": 5
+ }
+
+ print(f"📡 发送请求到: {url}")
+ print(f"📋 查询参数: {params}")
+
+ response = requests.get(url, params=params, timeout=10)
+ print(f"✅ HTTP状态码: {response.status_code}")
+
+ if response.status_code == 200:
+ feed = feedparser.parse(response.content)
+ entries = feed.entries
+ print(f"📄 获取到 {len(entries)} 篇论文")
+
+ if entries:
+ print(f"📝 第一篇论文示例:")
+ entry = entries[0]
+ print(f" - 标题: {entry.title}")
+ print(f" - 发布时间: {entry.published}")
+ print(f" - 更新时间: {entry.updated}")
+ print(f" - 类别: {[tag.term for tag in entry.tags] if hasattr(entry, 'tags') else '无'}")
+ print(f" - 摘要长度: {len(entry.summary)} 字符")
+ return True
+ else:
+ print(f"❌ HTTP请求失败: {response.status_code}")
+ return False
+
+ except Exception as e:
+ print(f"❌ arXiv连接测试失败: {e}")
+ return False
+
+
+def debug_openai_connection(api_key):
+ """调试OpenAI连接"""
+ print("\n🤖 测试OpenAI API连接...")
+
+ try:
+ from openai import OpenAI
+ client = OpenAI(api_key=api_key)
+
+ # 测试一个简单的请求
+ response = client.chat.completions.create(
+ model="gpt-4o",
+ messages=[
+ {"role": "system", "content": "You are a helpful assistant. Respond with just the number 1."},
+ {"role": "user", "content": "Test"}
+ ],
+ temperature=0,
+ max_tokens=1
+ )
+
+ result = response.choices[0].message.content.strip()
+ print(f"✅ OpenAI API连接成功")
+ print(f"📤 发送模型: gpt-4o")
+ print(f"📨 API响应: '{result}'")
+ return True
+
+ except Exception as e:
+ print(f"❌ OpenAI连接测试失败: {e}")
+ return False
+
+
+def debug_paper_fetch():
+ """调试论文抓取过程"""
+ print("\n" + "="*60)
+ print("🔍 ArXiv论文抓取系统调试")
+ print("="*60)
+
+ # 检查环境变量
+ openai_api_key = os.getenv("OPENAI_API_KEY")
+ print(f"🔑 OpenAI API Key: {'已设置' if openai_api_key else '❌ 未设置'}")
+
+ if not openai_api_key:
+ print("❌ 请设置OPENAI_API_KEY环境变量")
+ print(" export OPENAI_API_KEY='your-api-key-here'")
+ return False
+
+ # 测试API连接
+ if not debug_arxiv_connection():
+ return False
+
+ if not debug_openai_connection(openai_api_key):
+ return False
+
+ # 测试论文抓取器
+ print(f"\n📋 开始测试论文抓取器...")
+
+ try:
+ fetcher = ArxivPaperFetcher(openai_api_key)
+ print("✅ 论文抓取器初始化成功")
+
+ # 测试获取最近3天的论文(确保有一些结果)
+ print(f"\n🕐 测试获取最近3天的论文...")
+ end_date = datetime.now(timezone.utc)
+ start_date = end_date - timedelta(days=3)
+
+ print(f"📅 时间范围: {start_date.date()} 到 {end_date.date()}")
+
+ # 限制到20篇论文进行测试
+ papers = fetcher.fetch_papers_by_date_range(start_date, end_date, max_papers=20)
+
+ print(f"\n📊 抓取结果分析:")
+ print(f" - 总共获取: {len(papers)} 篇论文")
+
+ if papers:
+ print(f"\n📄 论文样本 (前3篇):")
+ for i, paper in enumerate(papers[:3], 1):
+ print(f"\n {i}. {paper['title']}")
+ print(f" 发布时间: {paper['published']}")
+ print(f" 类别: {', '.join(paper['categories'])}")
+ print(f" 摘要长度: {len(paper['abstract'])} 字符")
+
+ # 测试GPT过滤(只测试前5篇)
+ print(f"\n🤖 测试GPT-4o过滤 (前5篇论文)...")
+ sample_papers = papers[:5]
+ filtered_papers = fetcher.filter_papers_with_gpt(sample_papers)
+
+ print(f"\n🎯 过滤结果:")
+ print(f" - 输入论文: {len(sample_papers)} 篇")
+ print(f" - 相关论文: {len(filtered_papers)} 篇")
+ print(f" - 相关比例: {len(filtered_papers)/len(sample_papers)*100:.1f}%")
+
+ if filtered_papers:
+ print(f"\n✅ 发现相关论文:")
+ for i, paper in enumerate(filtered_papers, 1):
+ print(f" {i}. {paper['title']}")
+
+ return True
+ else:
+ print("⚠️ 未获取到任何论文")
+ print("可能的原因:")
+ print(" - 最近3天内这些类别没有新论文")
+ print(" - arXiv API响应延迟")
+ print(" - 网络连接问题")
+ return False
+
+ except Exception as e:
+ print(f"❌ 论文抓取测试失败: {e}")
+ import traceback
+ print(f"详细错误信息: {traceback.format_exc()}")
+ return False
+
+
+if __name__ == "__main__":
+ print("🚀 开始ArXiv论文抓取系统调试...")
+
+ success = debug_paper_fetch()
+
+ print(f"\n" + "="*60)
+ if success:
+ print("✅ 调试完成!系统工作正常")
+ print("\n🎯 接下来可以:")
+ print(" - 运行 python scripts/fetch_papers.py 进行实际抓取")
+ print(" - 运行 python scripts/test_daily_fetch.py 进行完整测试")
+ else:
+ print("❌ 调试发现问题,请检查上述错误信息")
+
+ print("="*60) \ No newline at end of file
diff --git a/scripts/fetch_papers.py b/scripts/fetch_papers.py
index 4fdfc87..3db80c7 100644
--- a/scripts/fetch_papers.py
+++ b/scripts/fetch_papers.py
@@ -94,67 +94,155 @@ class ArxivPaperFetcher:
Returns:
List of paper dictionaries
"""
- logger.info(f"Fetching papers from {start_date.date()} to {end_date.date()}")
+ logger.info(f"🔍 开始从arXiv抓取论文: {start_date.date()} 到 {end_date.date()}")
+ logger.info(f"📋 目标类别: {', '.join(CS_CATEGORIES)}")
+ logger.info(f"🔧 改进策略: 分别查询每个类别以避免OR查询限制")
- # Build category query
- category_query = " OR ".join(f"cat:{cat}" for cat in CS_CATEGORIES)
+ all_papers_dict = {} # 使用字典去重,key为arxiv_id
+ total_categories_processed = 0
+ total_raw_papers = 0
- all_papers = []
+ # 分别查询每个类别
+ for category in CS_CATEGORIES:
+ total_categories_processed += 1
+ logger.info(f"📂 处理类别 {total_categories_processed}/{len(CS_CATEGORIES)}: {category}")
+
+ category_papers = self._fetch_papers_for_category(
+ category, start_date, end_date, max_papers_per_category=500
+ )
+
+ # 合并到总结果中(去重)
+ new_papers_count = 0
+ for paper in category_papers:
+ arxiv_id = paper['arxiv_id']
+ if arxiv_id not in all_papers_dict:
+ all_papers_dict[arxiv_id] = paper
+ new_papers_count += 1
+
+ total_raw_papers += len(category_papers)
+ logger.info(f" ✅ {category}: 获得{len(category_papers)}篇, 新增{new_papers_count}篇")
+
+ # 转换为列表并按日期排序
+ all_papers = list(all_papers_dict.values())
+ all_papers.sort(key=lambda x: x['updated'], reverse=True)
+
+ logger.info(f"📊 抓取总结:")
+ logger.info(f" - 处理了 {total_categories_processed} 个类别")
+ logger.info(f" - 从arXiv获取了 {total_raw_papers} 篇原始论文")
+ logger.info(f" - 去重后得到 {len(all_papers)} 篇唯一论文")
+
+ # 显示类别分布
+ if all_papers:
+ from collections import Counter
+
+ # 日期分布
+ dates = []
+ for paper in all_papers:
+ paper_date = datetime.strptime(paper['updated'][:10], '%Y-%m-%d')
+ dates.append(paper_date.strftime('%Y-%m-%d'))
+
+ date_counts = Counter(dates)
+ logger.info(f"📅 论文日期分布 (前5天):")
+ for date, count in date_counts.most_common(5):
+ days_ago = (datetime.now(timezone.utc).date() - datetime.strptime(date, '%Y-%m-%d').date()).days
+ logger.info(f" - {date}: {count}篇 ({days_ago}天前)")
+
+ # 类别分布
+ category_counts = Counter()
+ for paper in all_papers:
+ for cat in paper['categories']:
+ if cat in CS_CATEGORIES:
+ category_counts[cat] += 1
+
+ logger.info(f"📊 类别分布:")
+ for cat, count in category_counts.most_common():
+ logger.info(f" - {cat}: {count}篇")
+
+ return all_papers
+
+ def _fetch_papers_for_category(self, category: str, start_date: datetime,
+ end_date: datetime, max_papers_per_category: int = 500) -> List[Dict]:
+ """
+ Fetch papers for a specific category.
+
+ Args:
+ category: arXiv category (e.g., 'cs.AI')
+ start_date: Start date for paper search
+ end_date: End date for paper search
+ max_papers_per_category: Maximum papers to fetch for this category
+
+ Returns:
+ List of paper dictionaries for this category
+ """
+ papers = []
start_index = 0
+ batch_count = 0
- while len(all_papers) < max_papers:
+ while len(papers) < max_papers_per_category:
try:
- # Build search query
- search_query = f"({category_query})"
+ batch_count += 1
params = {
- "search_query": search_query,
+ "search_query": f"cat:{category}",
"sortBy": "submittedDate",
"sortOrder": "descending",
"start": start_index,
- "max_results": min(MAX_RESULTS_PER_BATCH, max_papers - len(all_papers))
+ "max_results": min(MAX_RESULTS_PER_BATCH, max_papers_per_category - len(papers))
}
- logger.debug(f"Fetching batch starting at index {start_index}")
+ logger.debug(f" 📦 {category}第{batch_count}批次: 从索引{start_index}开始...")
+
response = self.session.get(ARXIV_BASE_URL, params=params, timeout=30)
response.raise_for_status()
feed = feedparser.parse(response.content)
entries = feed.entries
+ logger.debug(f" ✅ {category}第{batch_count}批次获取了 {len(entries)} 篇论文")
+
if not entries:
- logger.info("No more papers available")
+ logger.debug(f" 📭 {category}: 没有更多论文")
break
# Filter papers by date
batch_papers = []
+ older_papers = 0
for entry in entries:
paper_date = datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc)
if paper_date < start_date:
- # Papers are sorted by date, so we can stop here
- logger.info(f"Reached papers older than start date: {paper_date.date()}")
- return all_papers
+ older_papers += 1
+ continue
if start_date <= paper_date <= end_date:
paper_data = self._parse_paper_entry(entry)
batch_papers.append(paper_data)
- all_papers.extend(batch_papers)
- logger.info(f"Fetched {len(batch_papers)} papers in date range from this batch. Total: {len(all_papers)}")
+ papers.extend(batch_papers)
+ logger.debug(f" 📊 {category}第{batch_count}批次: {len(batch_papers)}篇符合日期, {older_papers}篇过旧")
+
+ # If we found older papers, we can stop
+ if older_papers > 0:
+ logger.debug(f" 🔚 {category}: 发现过旧论文,停止")
+ break
# If we got fewer papers than requested, we've reached the end
if len(entries) < MAX_RESULTS_PER_BATCH:
+ logger.debug(f" 🔚 {category}: 到达数据末尾")
break
start_index += MAX_RESULTS_PER_BATCH
+ # Safety limit per category
+ if start_index >= 1000:
+ logger.debug(f" ⚠️ {category}: 达到单类别安全上限")
+ break
+
except Exception as e:
- logger.error(f"Error fetching papers: {e}")
+ logger.error(f" ❌ {category}抓取出错: {e}")
break
- logger.info(f"Total papers fetched: {len(all_papers)}")
- return all_papers
+ return papers
def _parse_paper_entry(self, entry) -> Dict:
"""Parse a feedparser entry into a paper dictionary."""
@@ -179,23 +267,41 @@ class ArxivPaperFetcher:
Returns:
List of relevant papers
"""
- logger.info(f"Filtering {len(papers)} papers using GPT-4o")
+ if not papers:
+ logger.warning("⚠️ 没有论文需要过滤!")
+ return []
+
+ logger.info(f"🤖 开始使用GPT-4o过滤论文...")
+ logger.info(f"📝 待处理论文数量: {len(papers)} 篇")
+
relevant_papers = []
+ processed_count = 0
for i, paper in enumerate(papers, 1):
try:
+ logger.info(f"🔍 处理第 {i}/{len(papers)} 篇论文: {paper['title'][:60]}...")
is_relevant = self._check_paper_relevance(paper)
+ processed_count += 1
+
if is_relevant:
relevant_papers.append(paper)
- logger.info(f"✓ Paper {i}/{len(papers)}: {paper['title'][:80]}...")
+ logger.info(f"✅ 第 {i} 篇论文 [相关]: {paper['title'][:80]}...")
else:
- logger.debug(f"✗ Paper {i}/{len(papers)}: {paper['title'][:80]}...")
+ logger.info(f"❌ 第 {i} 篇论文 [不相关]: {paper['title'][:80]}...")
+
+ # 每处理10篇论文显示一次进度
+ if i % 10 == 0:
+ logger.info(f"📊 进度更新: 已处理 {i}/{len(papers)} 篇论文,发现 {len(relevant_papers)} 篇相关论文")
except Exception as e:
- logger.error(f"Error filtering paper {i}: {e}")
+ logger.error(f"❌ 处理第 {i} 篇论文时出错: {e}")
continue
- logger.info(f"Found {len(relevant_papers)} relevant papers out of {len(papers)}")
+ logger.info(f"🎯 GPT-4o过滤完成!")
+ logger.info(f" - 总共处理: {processed_count} 篇论文")
+ logger.info(f" - 发现相关: {len(relevant_papers)} 篇论文")
+ logger.info(f" - 相关比例: {len(relevant_papers)/processed_count*100:.1f}%" if processed_count > 0 else " - 相关比例: 0%")
+
return relevant_papers
def _check_paper_relevance(self, paper: Dict) -> bool:
@@ -214,10 +320,13 @@ class ArxivPaperFetcher:
)
result = response.choices[0].message.content.strip()
- return result == "1"
+ is_relevant = result == "1"
+
+ logger.debug(f"GPT-4o响应: '{result}' -> {'相关' if is_relevant else '不相关'}")
+ return is_relevant
except Exception as e:
- logger.error(f"Error calling GPT-4o: {e}")
+ logger.error(f"调用GPT-4o API时出错: {e}")
return False
def fetch_recent_papers(self, days: int = 1) -> List[Dict]:
@@ -225,17 +334,35 @@ class ArxivPaperFetcher:
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
+ logger.info(f"📅 日常模式: 获取 {days} 天内的论文")
+ logger.info(f"🕐 时间范围: {start_date.strftime('%Y-%m-%d %H:%M')} UTC ~ {end_date.strftime('%Y-%m-%d %H:%M')} UTC")
+
papers = self.fetch_papers_by_date_range(start_date, end_date)
- return self.filter_papers_with_gpt(papers)
+
+ if papers:
+ logger.info(f"📋 开始GPT-4o智能过滤阶段...")
+ return self.filter_papers_with_gpt(papers)
+ else:
+ logger.warning("⚠️ 未获取到任何论文,跳过GPT过滤步骤")
+ return []
def fetch_historical_papers(self, years: int = 2) -> List[Dict]:
"""Fetch papers from the past N years."""
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=years * 365)
- logger.info(f"Fetching historical papers from the past {years} years")
+ logger.info(f"📚 历史模式: 获取过去 {years} 年的论文")
+ logger.info(f"🕐 时间范围: {start_date.strftime('%Y-%m-%d')} ~ {end_date.strftime('%Y-%m-%d')}")
+ logger.info(f"⚠️ 注意: 历史模式最多处理 5000 篇论文,可能需要较长时间")
+
papers = self.fetch_papers_by_date_range(start_date, end_date, max_papers=5000)
- return self.filter_papers_with_gpt(papers)
+
+ if papers:
+ logger.info(f"📋 开始GPT-4o智能过滤阶段...")
+ return self.filter_papers_with_gpt(papers)
+ else:
+ logger.warning("⚠️ 未获取到任何论文,跳过GPT过滤步骤")
+ return []
class GitHubUpdater:
@@ -301,47 +428,85 @@ class GitHubUpdater:
def main():
"""Main function to run the paper fetcher."""
+ import time
+
+ start_time = time.time()
+ logger.info("🚀 开始执行ArXiv论文抓取任务")
+ logger.info("=" * 60)
+
# Get environment variables
openai_api_key = os.getenv("OPENAI_API_KEY")
github_token = os.getenv("TARGET_REPO_TOKEN")
target_repo = os.getenv("TARGET_REPO_NAME", "YurenHao0426/awesome-llm-bias-papers")
+ logger.info("🔧 配置信息:")
+ logger.info(f" - OpenAI API Key: {'已设置' if openai_api_key else '未设置'}")
+ logger.info(f" - GitHub Token: {'已设置' if github_token else '未设置'}")
+ logger.info(f" - 目标仓库: {target_repo}")
+
# Check for required environment variables
if not openai_api_key:
- logger.error("OPENAI_API_KEY environment variable is required")
+ logger.error("❌ OPENAI_API_KEY 环境变量未设置")
sys.exit(1)
if not github_token:
- logger.error("TARGET_REPO_TOKEN environment variable is required")
+ logger.error("❌ TARGET_REPO_TOKEN 环境变量未设置")
sys.exit(1)
# Get command line arguments
mode = os.getenv("FETCH_MODE", "daily") # daily or historical
days = int(os.getenv("FETCH_DAYS", "1"))
+ logger.info(f"📋 执行模式: {mode}")
+ if mode == "daily":
+ logger.info(f"📅 抓取天数: {days} 天")
+
try:
+ step_start = time.time()
+
# Initialize fetcher
+ logger.info("🔄 初始化论文抓取器...")
fetcher = ArxivPaperFetcher(openai_api_key)
+ logger.info(f"✅ 初始化完成 ({time.time() - step_start:.1f}秒)")
+ # Fetch papers
+ step_start = time.time()
if mode == "historical":
- logger.info("Running in historical mode - fetching papers from past 2 years")
+ logger.info("📚 运行历史模式 - 抓取过去2年的论文")
papers = fetcher.fetch_historical_papers(years=2)
section_title = "Historical LLM Bias Papers (Past 2 Years)"
else:
- logger.info(f"Running in daily mode - fetching papers from last {days} day(s)")
+ logger.info(f"📰 运行日常模式 - 抓取过去{days}天的论文")
papers = fetcher.fetch_recent_papers(days=days)
section_title = None # Use default timestamp
+ fetch_time = time.time() - step_start
+ logger.info(f"⏱️ 论文抓取和过滤完成 ({fetch_time:.1f}秒)")
+
# Update GitHub repository
if papers:
+ step_start = time.time()
+ logger.info(f"📤 开始更新GitHub仓库...")
updater = GitHubUpdater(github_token, target_repo)
updater.update_readme_with_papers(papers, section_title)
- logger.info(f"Successfully processed {len(papers)} papers")
+ update_time = time.time() - step_start
+ logger.info(f"✅ GitHub仓库更新完成 ({update_time:.1f}秒)")
+
+ logger.info("🎉 任务完成!")
+ logger.info(f" - 找到相关论文: {len(papers)} 篇")
+ logger.info(f" - 总执行时间: {time.time() - start_time:.1f} 秒")
else:
- logger.info("No relevant papers found")
+ logger.warning("⚠️ 没有找到相关论文")
+ logger.info("可能的原因:")
+ logger.info(" - 指定日期范围内没有新的LLM偏见相关论文")
+ logger.info(" - arXiv API连接问题")
+ logger.info(" - GPT-4o过滤条件过于严格")
+ logger.info(f" - 总执行时间: {time.time() - start_time:.1f} 秒")
except Exception as e:
- logger.error(f"Error in main execution: {e}")
+ logger.error(f"❌ 执行过程中出现错误: {e}")
+ import traceback
+ logger.error(f"详细错误信息: {traceback.format_exc()}")
sys.exit(1)
diff --git a/scripts/test_arxiv_only.py b/scripts/test_arxiv_only.py
new file mode 100644
index 0000000..1c8f653
--- /dev/null
+++ b/scripts/test_arxiv_only.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python3
+"""
+测试arXiv连接 - 不需要OpenAI API密钥
+
+这个脚本只测试arXiv API连接和论文抓取功能,不涉及GPT过滤。
+"""
+
+import requests
+import feedparser
+from datetime import datetime, timezone, timedelta
+
+def test_arxiv_connection():
+ """测试arXiv API连接"""
+ print("🔍 测试arXiv API连接...")
+
+ try:
+ # 测试最基本的arXiv查询
+ url = "http://export.arxiv.org/api/query"
+ params = {
+ "search_query": "cat:cs.AI",
+ "sortBy": "submittedDate",
+ "sortOrder": "descending",
+ "max_results": 10
+ }
+
+ print(f"📡 发送请求到: {url}")
+ print(f"📋 查询参数: {params}")
+
+ response = requests.get(url, params=params, timeout=15)
+ print(f"✅ HTTP状态码: {response.status_code}")
+
+ if response.status_code == 200:
+ feed = feedparser.parse(response.content)
+ entries = feed.entries
+ print(f"📄 获取到 {len(entries)} 篇论文")
+
+ if entries:
+ print(f"\n📝 论文样本:")
+ for i, entry in enumerate(entries[:3], 1):
+ print(f"\n{i}. 标题: {entry.title}")
+ print(f" 发布时间: {entry.published}")
+ print(f" 更新时间: {entry.updated}")
+ print(f" 类别: {[tag.term for tag in entry.tags] if hasattr(entry, 'tags') else '无'}")
+ print(f" 摘要长度: {len(entry.summary)} 字符")
+ print(f" 摘要预览: {entry.summary[:150]}...")
+ return True
+ else:
+ print(f"❌ HTTP请求失败: {response.status_code}")
+ return False
+
+ except Exception as e:
+ print(f"❌ arXiv连接测试失败: {e}")
+ return False
+
+def test_date_filtering():
+ """测试日期过滤功能"""
+ print(f"\n🕐 测试日期过滤功能...")
+
+ try:
+ # 测试最近3天的论文
+ url = "http://export.arxiv.org/api/query"
+
+ # 构建包含多个CS类别的查询
+ categories = ["cs.AI", "cs.CL", "cs.CV", "cs.LG", "cs.NE", "cs.RO", "cs.IR", "cs.HC", "stat.ML"]
+ category_query = " OR ".join(f"cat:{cat}" for cat in categories)
+
+ params = {
+ "search_query": f"({category_query})",
+ "sortBy": "submittedDate",
+ "sortOrder": "descending",
+ "max_results": 100
+ }
+
+ print(f"📋 搜索类别: {', '.join(categories)}")
+ print(f"📦 请求最多100篇论文...")
+
+ response = requests.get(url, params=params, timeout=15)
+
+ if response.status_code == 200:
+ feed = feedparser.parse(response.content)
+ entries = feed.entries
+ print(f"📄 总共获取: {len(entries)} 篇论文")
+
+ # 分析日期分布
+ now = datetime.now(timezone.utc)
+ cutoff_1day = now - timedelta(days=1)
+ cutoff_3days = now - timedelta(days=3)
+ cutoff_7days = now - timedelta(days=7)
+
+ recent_1day = 0
+ recent_3days = 0
+ recent_7days = 0
+
+ for entry in entries:
+ paper_date = datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc)
+
+ if paper_date >= cutoff_1day:
+ recent_1day += 1
+ if paper_date >= cutoff_3days:
+ recent_3days += 1
+ if paper_date >= cutoff_7days:
+ recent_7days += 1
+
+ print(f"\n📊 日期分布统计:")
+ print(f" - 最近1天: {recent_1day} 篇")
+ print(f" - 最近3天: {recent_3days} 篇")
+ print(f" - 最近7天: {recent_7days} 篇")
+
+ # 显示最新的几篇论文
+ if entries:
+ print(f"\n📝 最新论文样本:")
+ for i, entry in enumerate(entries[:5], 1):
+ paper_date = datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc)
+ print(f"\n{i}. {entry.title[:80]}...")
+ print(f" 更新时间: {paper_date.strftime('%Y-%m-%d %H:%M')} UTC")
+ print(f" 类别: {', '.join([tag.term for tag in entry.tags][:3])}")
+
+ return True
+ else:
+ print(f"❌ 请求失败: {response.status_code}")
+ return False
+
+ except Exception as e:
+ print(f"❌ 日期过滤测试失败: {e}")
+ return False
+
+def main():
+ print("🚀 开始ArXiv连接测试...")
+ print("=" * 60)
+
+ success1 = test_arxiv_connection()
+ success2 = test_date_filtering()
+
+ print("\n" + "=" * 60)
+ if success1 and success2:
+ print("✅ arXiv连接测试通过!")
+ print("\n🎯 测试结果:")
+ print(" - arXiv API连接正常")
+ print(" - 论文抓取功能正常")
+ print(" - 日期过滤功能正常")
+ print("\n💡 接下来需要:")
+ print(" - 设置OPENAI_API_KEY环境变量")
+ print(" - 运行完整的调试脚本: python scripts/debug_fetch.py")
+ else:
+ print("❌ 测试发现问题,请检查网络连接")
+
+ print("=" * 60)
+
+if __name__ == "__main__":
+ main() \ No newline at end of file
diff --git a/scripts/test_fetch_only.py b/scripts/test_fetch_only.py
new file mode 100644
index 0000000..db65231
--- /dev/null
+++ b/scripts/test_fetch_only.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python3
+"""
+测试论文抓取功能 - 显示改进的日志
+
+这个脚本只测试论文抓取部分,展示分页过程和日期分布,不需要OpenAI API。
+"""
+
+import os
+import sys
+import logging
+from datetime import datetime, timezone, timedelta
+from collections import Counter
+
+# 设置日志
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=[logging.StreamHandler(sys.stdout)]
+)
+logger = logging.getLogger(__name__)
+
+# Add the parent directory to the path so we can import the main module
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from scripts.fetch_papers import ArxivPaperFetcher
+
+
+def test_paper_fetching_with_detailed_logs():
+ """测试论文抓取,显示详细的分页和日期信息"""
+
+ print("🚀 测试改进后的论文抓取日志显示")
+ print("=" * 60)
+
+ # 创建一个模拟的fetcher(不需要OpenAI API)
+ class MockArxivFetcher:
+ def __init__(self):
+ import requests
+ self.session = requests.Session()
+ self.session.headers.update({
+ 'User-Agent': 'PaperFetcher/1.0 (Test)'
+ })
+
+ def fetch_papers_by_date_range(self, start_date, end_date, max_papers=300):
+ """模拟我们改进后的抓取函数"""
+ logger.info(f"🔍 开始从arXiv抓取论文: {start_date.date()} 到 {end_date.date()}")
+ logger.info(f"📋 目标类别: cs.AI, cs.CL, cs.CV, cs.LG, cs.NE, cs.RO, cs.IR, cs.HC, stat.ML")
+
+ from scripts.fetch_papers import ARXIV_BASE_URL, CS_CATEGORIES, MAX_RESULTS_PER_BATCH
+ import requests
+ import feedparser
+
+ # Build category query
+ category_query = " OR ".join(f"cat:{cat}" for cat in CS_CATEGORIES)
+
+ all_papers = []
+ start_index = 0
+ batch_count = 0
+ total_raw_papers = 0
+
+ while len(all_papers) < max_papers:
+ try:
+ batch_count += 1
+ search_query = f"({category_query})"
+
+ params = {
+ "search_query": search_query,
+ "sortBy": "submittedDate",
+ "sortOrder": "descending",
+ "start": start_index,
+ "max_results": min(MAX_RESULTS_PER_BATCH, max_papers - len(all_papers))
+ }
+
+ logger.info(f"📦 第{batch_count}批次: 从索引{start_index}开始抓取...")
+
+ response = self.session.get(ARXIV_BASE_URL, params=params, timeout=30)
+ response.raise_for_status()
+
+ feed = feedparser.parse(response.content)
+ entries = feed.entries
+ total_raw_papers += len(entries)
+
+ logger.info(f"✅ 第{batch_count}批次获取了 {len(entries)} 篇论文")
+
+ if not entries:
+ logger.info("📭 没有更多论文可用")
+ break
+
+ # Filter papers by date and parse them
+ batch_papers = []
+ older_papers = 0
+ for entry in entries:
+ paper_date = datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc)
+
+ if paper_date < start_date:
+ older_papers += 1
+ continue
+
+ if start_date <= paper_date <= end_date:
+ paper_data = {
+ "title": entry.title.replace('\n', ' ').strip(),
+ "abstract": entry.summary.replace('\n', ' ').strip(),
+ "authors": [author.name for author in entry.authors] if hasattr(entry, 'authors') else [],
+ "published": entry.published,
+ "updated": entry.updated,
+ "link": entry.link,
+ "arxiv_id": entry.id.split('/')[-1],
+ "categories": [tag.term for tag in entry.tags] if hasattr(entry, 'tags') else []
+ }
+ batch_papers.append(paper_data)
+
+ all_papers.extend(batch_papers)
+ logger.info(f"📊 第{batch_count}批次筛选结果: {len(batch_papers)}篇在日期范围内, {older_papers}篇过旧")
+ logger.info(f"📈 累计获取论文: {len(all_papers)}篇")
+
+ if older_papers > 0:
+ logger.info(f"🔚 发现{older_papers}篇超出日期范围的论文,停止抓取")
+ break
+
+ if len(entries) < MAX_RESULTS_PER_BATCH:
+ logger.info("🔚 已达到arXiv数据末尾")
+ break
+
+ start_index += MAX_RESULTS_PER_BATCH
+
+ except Exception as e:
+ logger.error(f"❌ 抓取论文时出错: {e}")
+ break
+
+ # 显示总结信息
+ logger.info(f"📊 抓取总结:")
+ logger.info(f" - 总共处理了 {batch_count} 个批次")
+ logger.info(f" - 从arXiv获取了 {total_raw_papers} 篇原始论文")
+ logger.info(f" - 筛选出 {len(all_papers)} 篇符合日期范围的论文")
+
+ # 显示日期分布
+ if all_papers:
+ dates = []
+ for paper in all_papers:
+ paper_date = datetime.strptime(paper['updated'][:10], '%Y-%m-%d')
+ dates.append(paper_date.strftime('%Y-%m-%d'))
+
+ date_counts = Counter(dates)
+ logger.info(f"📅 论文日期分布 (前5天):")
+ for date, count in date_counts.most_common(5):
+ days_ago = (datetime.now(timezone.utc).date() - datetime.strptime(date, '%Y-%m-%d').date()).days
+ logger.info(f" - {date}: {count}篇 ({days_ago}天前)")
+
+ return all_papers
+
+ # 测试不同的时间范围
+ fetcher = MockArxivFetcher()
+
+ print("\n🕐 测试1: 过去1天")
+ end_date = datetime.now(timezone.utc)
+ start_date = end_date - timedelta(days=1)
+ papers_1day = fetcher.fetch_papers_by_date_range(start_date, end_date, max_papers=50)
+
+ print(f"\n🕐 测试2: 过去7天")
+ start_date = end_date - timedelta(days=7)
+ papers_7days = fetcher.fetch_papers_by_date_range(start_date, end_date, max_papers=200)
+
+ print(f"\n📊 对比结果:")
+ print(f" - 过去1天: {len(papers_1day)} 篇论文")
+ print(f" - 过去7天: {len(papers_7days)} 篇论文")
+ print(f" - 这解释了为什么日常模式很快完成!")
+
+
+if __name__ == "__main__":
+ test_paper_fetching_with_detailed_logs() \ No newline at end of file
diff --git a/scripts/test_improved_fetch.py b/scripts/test_improved_fetch.py
new file mode 100644
index 0000000..14490f0
--- /dev/null
+++ b/scripts/test_improved_fetch.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python3
+"""
+测试改进后的论文抓取功能
+
+验证分别查询每个类别和去重逻辑是否正常工作。
+"""
+
+import os
+import sys
+import logging
+from datetime import datetime, timezone, timedelta
+
+# 设置日志
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=[logging.StreamHandler(sys.stdout)]
+)
+logger = logging.getLogger(__name__)
+
+# Add the parent directory to the path so we can import the main module
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from scripts.fetch_papers import ArxivPaperFetcher
+
+
+def test_improved_fetching():
+ """测试改进后的抓取逻辑"""
+
+ print("🚀 测试改进后的论文抓取逻辑")
+ print("=" * 60)
+
+ # 创建一个模拟的fetcher(不需要OpenAI API)
+ class MockArxivFetcher(ArxivPaperFetcher):
+ def __init__(self):
+ import requests
+ self.session = requests.Session()
+ self.session.headers.update({
+ 'User-Agent': 'PaperFetcher/1.0 (Test)'
+ })
+
+ # 测试不同的时间范围
+ fetcher = MockArxivFetcher()
+
+ print("\n🕐 测试1: 过去1天(应该显示0篇论文)")
+ end_date = datetime.now(timezone.utc)
+ start_date = end_date - timedelta(days=1)
+ papers_1day = fetcher.fetch_papers_by_date_range(start_date, end_date, max_papers=100)
+
+ print(f"\n🕐 测试2: 过去7天(应该显示更多论文和详细分布)")
+ start_date = end_date - timedelta(days=7)
+ papers_7days = fetcher.fetch_papers_by_date_range(start_date, end_date, max_papers=300)
+
+ print(f"\n📊 改进效果对比:")
+ print(f" - 过去1天: {len(papers_1day)} 篇论文")
+ print(f" - 过去7天: {len(papers_7days)} 篇论文")
+
+ if papers_7days:
+ print(f"\n📋 论文样本 (前3篇):")
+ for i, paper in enumerate(papers_7days[:3], 1):
+ print(f"\n{i}. {paper['title'][:80]}...")
+ print(f" arXiv ID: {paper['arxiv_id']}")
+ print(f" 更新时间: {paper['updated']}")
+ print(f" 类别: {', '.join(paper['categories'][:3])}")
+ print(f" 作者: {', '.join(paper['authors'][:2])}")
+ if len(paper['authors']) > 2:
+ print(f" et al.")
+
+ print(f"\n✅ 改进后的优势:")
+ print(f" - ✅ 分别查询每个类别,避免OR查询限制")
+ print(f" - ✅ 自动去重,避免重复论文")
+ print(f" - ✅ 详细的类别分布统计")
+ print(f" - ✅ 更准确的日期分布分析")
+ print(f" - ✅ 更透明的日志显示")
+
+def test_category_overlap():
+ """测试类别重叠和去重功能"""
+
+ print(f"\n" + "="*60)
+ print("🔍 测试类别重叠和去重功能")
+ print("="*60)
+
+ # 简单测试:手动获取几个类别,看看重叠情况
+ import requests
+ import feedparser
+ from collections import defaultdict
+
+ categories = ['cs.AI', 'cs.LG', 'cs.CL']
+ papers_by_category = {}
+ arxiv_ids_seen = set()
+ overlaps = defaultdict(list)
+
+ for cat in categories:
+ print(f"\n📂 获取 {cat} 类别的论文...")
+
+ params = {
+ 'search_query': f'cat:{cat}',
+ 'sortBy': 'submittedDate',
+ 'sortOrder': 'descending',
+ 'max_results': 50
+ }
+
+ try:
+ response = requests.get('http://export.arxiv.org/api/query', params=params, timeout=10)
+ feed = feedparser.parse(response.content)
+ entries = feed.entries
+
+ papers_by_category[cat] = []
+
+ for entry in entries:
+ arxiv_id = entry.id.split('/')[-1]
+ title = entry.title.replace('\n', ' ').strip()
+ categories_list = [tag.term for tag in entry.tags] if hasattr(entry, 'tags') else []
+
+ papers_by_category[cat].append({
+ 'arxiv_id': arxiv_id,
+ 'title': title,
+ 'categories': categories_list
+ })
+
+ # 检查重叠
+ if arxiv_id in arxiv_ids_seen:
+ overlaps[arxiv_id].append(cat)
+ else:
+ arxiv_ids_seen.add(arxiv_id)
+ overlaps[arxiv_id] = [cat]
+
+ print(f" 获得 {len(entries)} 篇论文")
+
+ except Exception as e:
+ print(f" 错误: {e}")
+
+ # 分析重叠情况
+ print(f"\n📊 重叠分析:")
+ total_papers = sum(len(papers) for papers in papers_by_category.values())
+ unique_papers = len(arxiv_ids_seen)
+ duplicate_papers = total_papers - unique_papers
+
+ print(f" - 总获取论文: {total_papers} 篇")
+ print(f" - 唯一论文: {unique_papers} 篇")
+ print(f" - 重复论文: {duplicate_papers} 篇")
+ print(f" - 去重率: {duplicate_papers/total_papers*100:.1f}%")
+
+ # 显示一些重叠例子
+ overlap_examples = [(arxiv_id, cats) for arxiv_id, cats in overlaps.items() if len(cats) > 1][:5]
+
+ if overlap_examples:
+ print(f"\n📋 重叠论文示例:")
+ for arxiv_id, cats in overlap_examples:
+ # 找到这篇论文的标题
+ title = "未找到标题"
+ for cat, papers in papers_by_category.items():
+ for paper in papers:
+ if paper['arxiv_id'] == arxiv_id:
+ title = paper['title'][:60] + "..." if len(paper['title']) > 60 else paper['title']
+ break
+ if title != "未找到标题":
+ break
+
+ print(f" - {arxiv_id}: {title}")
+ print(f" 类别: {', '.join(cats)}")
+
+ print(f"\n✅ 这证明了去重功能的重要性!")
+
+
+if __name__ == "__main__":
+ test_improved_fetching()
+ test_category_overlap() \ No newline at end of file