1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
|
#!/usr/bin/env python3
"""
Arxiv Paper Fetcher for LLM Bias Research
==========================================
This script fetches computer science papers from arxiv.org, filters them using
GPT-4o to identify papers related to LLM bias and fairness, and updates a
target GitHub repository's README with the results.
Features:
- Fetches papers from the last 24 hours (or specified days)
- Can also fetch historical papers from the past 2 years
- Uses GPT-4o for intelligent filtering
- Updates target repository via GitHub API
- Supports GitHub Actions automation
"""
import os
import sys
import json
import logging
import requests
import feedparser
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Optional, Tuple
from github import Github
from openai import OpenAI, AsyncOpenAI
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
]
)
logger = logging.getLogger(__name__)
# Configuration
ARXIV_BASE_URL = "http://export.arxiv.org/api/query"
MAX_RESULTS_PER_BATCH = 100
MAX_RETRIES = 3
# Computer Science categories related to AI/ML
CS_CATEGORIES = [
"cs.AI", # Artificial Intelligence
"cs.CL", # Computation and Language
"cs.CV", # Computer Vision and Pattern Recognition
"cs.LG", # Machine Learning
"cs.NE", # Neural and Evolutionary Computing
"cs.RO", # Robotics
"cs.IR", # Information Retrieval
"cs.HC", # Human-Computer Interaction
"stat.ML" # Machine Learning (Statistics)
]
GPT_SYSTEM_PROMPT = """You are an expert researcher in AI bias, fairness, and social good applications.
Your task is to analyze a paper's title and abstract to determine if it's relevant to bias and fairness research with clear social good implications.
A paper is RELEVANT if it discusses:
- Algorithmic fairness in real-world applications (healthcare, education, criminal justice, hiring, finance)
- Demographic bias affecting marginalized or underrepresented groups in society
- Social implications of AI bias (perpetuating inequality, discrimination, harm to vulnerable populations)
- Ethical AI deployment addressing social justice and human welfare
- Bias auditing/evaluation in systems that directly impact people's lives
- Data bias with clear social consequences and harm
- AI safety and alignment with human values in societal applications
- Representation and inclusion in AI systems used by the public
- Fair recommendation systems, search engines, or content moderation with social impact
A paper is NOT RELEVANT if it discusses:
- Purely technical computer vision bias without clear social applications
- Generic ML fairness metrics without real-world context
- Theoretical bias research without societal implications
- Technical optimization of models without addressing social harm
- Academic benchmarking without connection to social good
- Pure algorithmic improvements without considering human impact
FOCUS: The research must clearly address how AI bias affects society, vulnerable populations, or social justice. Reject purely technical advances without explicit social relevance.
Respond with exactly "1" if the paper is relevant, or "0" if it's not relevant.
Do not include any other text in your response."""
class ArxivPaperFetcher:
"""Main class for fetching and filtering arxiv papers."""
def __init__(self, openai_api_key: str):
"""Initialize the fetcher with OpenAI API key."""
self.openai_client = OpenAI(api_key=openai_api_key)
self.async_openai_client = AsyncOpenAI(api_key=openai_api_key)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'PaperFetcher/1.0 (https://github.com/YurenHao0426/PaperFetcher)'
})
def fetch_papers_by_date_range(self, start_date: datetime, end_date: datetime,
max_papers: int = 1000) -> List[Dict]:
"""
Fetch papers from arxiv within a specific date range.
Args:
start_date: Start date for paper search
end_date: End date for paper search
max_papers: Maximum number of papers to fetch
Returns:
List of paper dictionaries
"""
logger.info(f"🔍 Starting arXiv paper fetch: {start_date.date()} to {end_date.date()}")
logger.info(f"📋 Target categories: {', '.join(CS_CATEGORIES)}")
logger.info(f"🔧 Strategy: Query each category separately to avoid OR query limitations")
all_papers_dict = {} # 使用字典去重,key为arxiv_id
total_categories_processed = 0
total_raw_papers = 0
# 分别查询每个类别
for category in CS_CATEGORIES:
total_categories_processed += 1
logger.info(f"📂 Processing category {total_categories_processed}/{len(CS_CATEGORIES)}: {category}")
category_papers = self._fetch_papers_for_category(
category, start_date, end_date, max_papers_per_category=500
)
# Merge to total results (deduplication)
new_papers_count = 0
for paper in category_papers:
arxiv_id = paper['arxiv_id']
if arxiv_id not in all_papers_dict:
all_papers_dict[arxiv_id] = paper
new_papers_count += 1
total_raw_papers += len(category_papers)
logger.info(f" ✅ {category}: Found {len(category_papers)} papers, {new_papers_count} new")
# Convert to list and sort by date
all_papers = list(all_papers_dict.values())
all_papers.sort(key=lambda x: x['updated'], reverse=True)
logger.info(f"📊 Fetch Summary:")
logger.info(f" - Processed {total_categories_processed} categories")
logger.info(f" - Retrieved {total_raw_papers} raw papers from arXiv")
logger.info(f" - After deduplication: {len(all_papers)} unique papers")
# Show category distribution
if all_papers:
from collections import Counter
# Date distribution
dates = []
for paper in all_papers:
paper_date = datetime.strptime(paper['updated'][:10], '%Y-%m-%d')
dates.append(paper_date.strftime('%Y-%m-%d'))
date_counts = Counter(dates)
logger.info(f"📅 Paper date distribution (top 5 days):")
for date, count in date_counts.most_common(5):
days_ago = (datetime.now(timezone.utc).date() - datetime.strptime(date, '%Y-%m-%d').date()).days
logger.info(f" - {date}: {count} papers ({days_ago} days ago)")
# Category distribution
category_counts = Counter()
for paper in all_papers:
for cat in paper['categories']:
if cat in CS_CATEGORIES:
category_counts[cat] += 1
logger.info(f"📊 Category distribution:")
for cat, count in category_counts.most_common():
logger.info(f" - {cat}: {count} papers")
return all_papers
def _fetch_papers_for_category(self, category: str, start_date: datetime,
end_date: datetime, max_papers_per_category: int = 500) -> List[Dict]:
"""
Fetch papers for a specific category.
Args:
category: arXiv category (e.g., 'cs.AI')
start_date: Start date for paper search
end_date: End date for paper search
max_papers_per_category: Maximum papers to fetch for this category
Returns:
List of paper dictionaries for this category
"""
papers = []
start_index = 0
batch_count = 0
while len(papers) < max_papers_per_category:
try:
batch_count += 1
params = {
"search_query": f"cat:{category}",
"sortBy": "submittedDate",
"sortOrder": "descending",
"start": start_index,
"max_results": min(MAX_RESULTS_PER_BATCH, max_papers_per_category - len(papers))
}
logger.debug(f" 📦 {category}第{batch_count}批次: 从索引{start_index}开始...")
response = self.session.get(ARXIV_BASE_URL, params=params, timeout=30)
response.raise_for_status()
feed = feedparser.parse(response.content)
entries = feed.entries
logger.debug(f" ✅ {category}第{batch_count}批次获取了 {len(entries)} 篇论文")
if not entries:
logger.debug(f" 📭 {category}: 没有更多论文")
break
# Filter papers by date
batch_papers = []
older_papers = 0
for entry in entries:
paper_date = datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc)
if paper_date < start_date:
older_papers += 1
continue
if start_date <= paper_date <= end_date:
paper_data = self._parse_paper_entry(entry)
batch_papers.append(paper_data)
papers.extend(batch_papers)
logger.debug(f" 📊 {category}第{batch_count}批次: {len(batch_papers)}篇符合日期, {older_papers}篇过旧")
# If we found older papers, we can stop
if older_papers > 0:
logger.debug(f" 🔚 {category}: 发现过旧论文,停止")
break
# If we got fewer papers than requested, we've reached the end
if len(entries) < MAX_RESULTS_PER_BATCH:
logger.debug(f" 🔚 {category}: 到达数据末尾")
break
start_index += MAX_RESULTS_PER_BATCH
# Safety limit per category
if start_index >= 1000:
logger.debug(f" ⚠️ {category}: 达到单类别安全上限")
break
except Exception as e:
logger.error(f" ❌ {category}抓取出错: {e}")
break
return papers
def _parse_paper_entry(self, entry) -> Dict:
"""Parse a feedparser entry into a paper dictionary."""
return {
"title": entry.title.replace('\n', ' ').strip(),
"abstract": entry.summary.replace('\n', ' ').strip(),
"authors": [author.name for author in entry.authors] if hasattr(entry, 'authors') else [],
"published": entry.published,
"updated": entry.updated,
"link": entry.link,
"arxiv_id": entry.id.split('/')[-1],
"categories": [tag.term for tag in entry.tags] if hasattr(entry, 'tags') else []
}
def filter_papers_with_gpt(self, papers: List[Dict], use_parallel: bool = True,
max_concurrent: int = 16) -> List[Dict]:
"""
Filter papers using GPT-4o to identify bias-related research.
Args:
papers: List of paper dictionaries
use_parallel: Whether to use parallel processing (default: True)
max_concurrent: Maximum concurrent requests (default: 16)
Returns:
List of relevant papers
"""
if not papers:
logger.warning("⚠️ No papers to filter!")
return []
if use_parallel and len(papers) > 5:
logger.info(f"🚀 Using parallel mode for {len(papers)} papers (max concurrent: {max_concurrent})")
return self._filter_papers_parallel(papers, max_concurrent)
else:
logger.info(f"🔄 Using serial mode for {len(papers)} papers")
return self._filter_papers_sequential(papers)
def _filter_papers_sequential(self, papers: List[Dict]) -> List[Dict]:
"""Serial processing of papers (original method)."""
logger.info(f"🤖 Starting GPT-4o paper filtering...")
logger.info(f"📝 Papers to process: {len(papers)}")
relevant_papers = []
processed_count = 0
for i, paper in enumerate(papers, 1):
try:
logger.info(f"🔍 Processing paper {i}/{len(papers)}: {paper['title'][:60]}...")
is_relevant = self._check_paper_relevance(paper)
processed_count += 1
if is_relevant:
relevant_papers.append(paper)
logger.info(f"✅ Paper {i} [RELEVANT]: {paper['title'][:80]}...")
else:
logger.info(f"❌ Paper {i} [NOT RELEVANT]: {paper['title'][:80]}...")
# Show progress every 10 papers
if i % 10 == 0:
logger.info(f"📊 Progress update: Processed {i}/{len(papers)} papers, found {len(relevant_papers)} relevant")
except Exception as e:
logger.error(f"❌ Error processing paper {i}: {e}")
continue
logger.info(f"🎯 GPT-4o filtering completed!")
logger.info(f" - Total processed: {processed_count} papers")
logger.info(f" - Found relevant: {len(relevant_papers)} papers")
logger.info(f" - Relevance ratio: {len(relevant_papers)/processed_count*100:.1f}%" if processed_count > 0 else " - Relevance ratio: 0%")
return relevant_papers
def _filter_papers_parallel(self, papers: List[Dict], max_concurrent: int = 16) -> List[Dict]:
"""Parallel processing of papers using asyncio."""
try:
# 检查是否已有事件循环
loop = asyncio.get_event_loop()
if loop.is_running():
# 在已有事件循环中运行
import nest_asyncio
nest_asyncio.apply()
return loop.run_until_complete(self._async_filter_papers(papers, max_concurrent))
else:
# 创建新的事件循环
return asyncio.run(self._async_filter_papers(papers, max_concurrent))
except Exception as e:
logger.error(f"❌ 并行处理失败: {e}")
logger.info("🔄 回退到串行处理模式...")
return self._filter_papers_sequential(papers)
async def _async_filter_papers(self, papers: List[Dict], max_concurrent: int) -> List[Dict]:
"""Async implementation of paper filtering."""
logger.info(f"🤖 开始异步GPT-4o过滤...")
logger.info(f"📝 待处理论文数量: {len(papers)} 篇")
# 创建信号量控制并发数
semaphore = asyncio.Semaphore(max_concurrent)
# 创建所有任务
tasks = []
for i, paper in enumerate(papers):
task = self._check_paper_relevance_async(paper, semaphore, i + 1, len(papers))
tasks.append(task)
# 并行执行所有任务
start_time = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# 处理结果
relevant_papers = []
successful_count = 0
error_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"❌ 第 {i+1} 篇论文处理出错: {result}")
error_count += 1
elif isinstance(result, tuple):
is_relevant, paper = result
successful_count += 1
if is_relevant:
relevant_papers.append(paper)
logger.debug(f"✅ 第 {i+1} 篇论文 [相关]: {paper['title'][:60]}...")
else:
logger.debug(f"❌ 第 {i+1} 篇论文 [不相关]: {paper['title'][:60]}...")
# 显示最终统计
logger.info(f"🎯 并行GPT-4o过滤完成!")
logger.info(f" - 总处理时间: {total_time:.1f} 秒")
logger.info(f" - 平均每篇: {total_time/len(papers):.2f} 秒")
logger.info(f" - 成功处理: {successful_count} 篇论文")
logger.info(f" - 处理错误: {error_count} 篇论文")
logger.info(f" - 发现相关: {len(relevant_papers)} 篇论文")
if successful_count > 0:
logger.info(f" - 相关比例: {len(relevant_papers)/successful_count*100:.1f}%")
# 估算加速效果
estimated_serial_time = len(papers) * 2.0 # 估计串行处理每篇需要2秒
speedup = estimated_serial_time / total_time if total_time > 0 else 1
logger.info(f" - 预估加速: {speedup:.1f}x")
return relevant_papers
async def _check_paper_relevance_async(self, paper: Dict, semaphore: asyncio.Semaphore,
index: int, total: int) -> tuple:
"""Async version of paper relevance checking."""
async with semaphore:
try:
# 显示进度(每10篇显示一次)
if index % 10 == 0:
logger.info(f"📊 并行进度: {index}/{total} 篇论文处理中...")
prompt = f"Title: {paper['title']}\n\nAbstract: {paper['abstract']}"
response = await self.async_openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": GPT_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
temperature=0,
max_tokens=1
)
result = response.choices[0].message.content.strip()
is_relevant = result == "1"
logger.debug(f"GPT-4o响应 #{index}: '{result}' -> {'相关' if is_relevant else '不相关'}")
return (is_relevant, paper)
except Exception as e:
logger.error(f"❌ 第 {index} 篇论文异步处理出错: {e}")
# 返回异常,让上层处理
raise e
def _check_paper_relevance(self, paper: Dict) -> bool:
"""Check if a paper is relevant using GPT-4o."""
prompt = f"Title: {paper['title']}\n\nAbstract: {paper['abstract']}"
try:
response = self.openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": GPT_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
temperature=0,
max_tokens=1
)
result = response.choices[0].message.content.strip()
is_relevant = result == "1"
logger.debug(f"GPT-4o响应: '{result}' -> {'相关' if is_relevant else '不相关'}")
return is_relevant
except Exception as e:
logger.error(f"调用GPT-4o API时出错: {e}")
return False
def fetch_recent_papers(self, days: int = 1) -> List[Dict]:
"""Fetch papers from the last N days."""
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days)
logger.info(f"📅 日常模式: 获取 {days} 天内的论文")
logger.info(f"🕐 时间范围: {start_date.strftime('%Y-%m-%d %H:%M')} UTC ~ {end_date.strftime('%Y-%m-%d %H:%M')} UTC")
papers = self.fetch_papers_by_date_range(start_date, end_date)
if papers:
logger.info(f"📋 开始GPT-4o智能过滤阶段...")
# 从环境变量获取并行设置
use_parallel = os.getenv("USE_PARALLEL", "true").lower() == "true"
max_concurrent = int(os.getenv("MAX_CONCURRENT", "16"))
return self.filter_papers_with_gpt(papers, use_parallel=use_parallel,
max_concurrent=max_concurrent)
else:
logger.warning("⚠️ 未获取到任何论文,跳过GPT过滤步骤")
return []
def fetch_historical_papers(self, years: int = 2) -> List[Dict]:
"""Fetch papers from the past N years."""
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=years * 365)
# 从环境变量获取限制配置
max_papers = int(os.getenv("MAX_HISTORICAL_PAPERS", "50000")) # 默认50000篇
max_per_category = int(os.getenv("MAX_PAPERS_PER_CATEGORY", "10000")) # 每类别10000篇
logger.info(f"📚 历史模式: 获取过去 {years} 年的论文")
logger.info(f"🕐 时间范围: {start_date.strftime('%Y-%m-%d')} ~ {end_date.strftime('%Y-%m-%d')}")
logger.info(f"📊 配置限制:")
logger.info(f" - 最大论文数: {max_papers:,} 篇")
logger.info(f" - 每类别限制: {max_per_category:,} 篇")
if max_papers >= 20000:
logger.info(f"⚠️ 大规模历史模式: 这可能需要很长时间和大量API调用")
logger.info(f"💡 建议: 可以通过环境变量调整限制")
logger.info(f" - MAX_HISTORICAL_PAPERS={max_papers}")
logger.info(f" - MAX_PAPERS_PER_CATEGORY={max_per_category}")
papers = self.fetch_papers_by_date_range_unlimited(
start_date, end_date, max_papers=max_papers, max_per_category=max_per_category
)
if papers:
logger.info(f"📋 开始GPT-4o智能过滤阶段...")
# 历史模式默认使用更高的并发数(因为论文数量多)
use_parallel = os.getenv("USE_PARALLEL", "true").lower() == "true"
max_concurrent = int(os.getenv("MAX_CONCURRENT", "50")) # 历史模式默认更高并发
return self.filter_papers_with_gpt(papers, use_parallel=use_parallel,
max_concurrent=max_concurrent)
else:
logger.warning("⚠️ 未获取到任何论文,跳过GPT过滤步骤")
return []
def fetch_papers_by_date_range_unlimited(self, start_date: datetime, end_date: datetime,
max_papers: int = 50000, max_per_category: int = 10000) -> List[Dict]:
"""
Fetch papers by date range with higher limits for historical mode.
Args:
start_date: Start date for paper search
end_date: End date for paper search
max_papers: Maximum total papers to fetch
max_per_category: Maximum papers per category
Returns:
List of paper dictionaries
"""
logger.info(f"🔍 开始获取论文 - 无限制模式")
logger.info(f"🕐 时间范围: {start_date.strftime('%Y-%m-%d %H:%M')} UTC ~ {end_date.strftime('%Y-%m-%d %H:%M')} UTC")
logger.info(f"📊 搜索配置:")
logger.info(f" - 最大论文数: {max_papers:,}")
logger.info(f" - 每类别限制: {max_per_category:,}")
logger.info(f" - 搜索类别: {len(CS_CATEGORIES)} 个")
all_papers_dict = {} # 使用字典去重
total_raw_papers = 0
total_categories_processed = 0
# 分别查询每个类别
for category in CS_CATEGORIES:
total_categories_processed += 1
logger.info(f"📂 处理类别 {total_categories_processed}/{len(CS_CATEGORIES)}: {category}")
category_papers = self._fetch_papers_for_category_unlimited(
category, start_date, end_date, max_papers_per_category=max_per_category
)
# 合并到总结果中(去重)
new_papers_count = 0
for paper in category_papers:
arxiv_id = paper['arxiv_id']
if arxiv_id not in all_papers_dict:
all_papers_dict[arxiv_id] = paper
new_papers_count += 1
# 检查是否达到总数限制
if len(all_papers_dict) >= max_papers:
logger.info(f"⚠️ 达到最大论文数 {max_papers:,},停止获取")
break
total_raw_papers += len(category_papers)
logger.info(f" ✅ {category}: 获得{len(category_papers):,}篇, 新增{new_papers_count:,}篇")
# 如果达到总数限制,停止
if len(all_papers_dict) >= max_papers:
break
# 转换为列表并按日期排序
all_papers = list(all_papers_dict.values())
all_papers.sort(key=lambda x: x['updated'], reverse=True)
logger.info(f"📊 抓取总结:")
logger.info(f" - 处理了 {total_categories_processed} 个类别")
logger.info(f" - 从arXiv获取了 {total_raw_papers:,} 篇原始论文")
logger.info(f" - 去重后得到 {len(all_papers):,} 篇唯一论文")
# 显示类别分布
if all_papers:
from collections import Counter
# 日期分布
dates = []
for paper in all_papers:
paper_date = datetime.strptime(paper['updated'][:10], '%Y-%m-%d')
dates.append(paper_date.strftime('%Y-%m-%d'))
date_counts = Counter(dates)
logger.info(f"📅 论文日期分布 (前10天):")
for date, count in date_counts.most_common(10):
days_ago = (datetime.now(timezone.utc).date() - datetime.strptime(date, '%Y-%m-%d').date()).days
logger.info(f" - {date}: {count:,}篇 ({days_ago}天前)")
# 类别分布
category_counts = Counter()
for paper in all_papers:
for cat in paper['categories']:
if cat in CS_CATEGORIES:
category_counts[cat] += 1
logger.info(f"📊 类别分布:")
for cat, count in category_counts.most_common():
logger.info(f" - {cat}: {count:,}篇")
return all_papers
def _fetch_papers_for_category_unlimited(self, category: str, start_date: datetime,
end_date: datetime, max_papers_per_category: int = 10000) -> List[Dict]:
"""
Fetch papers for a specific category with higher limits.
Args:
category: arXiv category (e.g., 'cs.AI')
start_date: Start date for paper search
end_date: End date for paper search
max_papers_per_category: Maximum papers to fetch for this category
Returns:
List of paper dictionaries for this category
"""
papers = []
start_index = 0
batch_count = 0
api_calls = 0
max_api_calls = max_papers_per_category // MAX_RESULTS_PER_BATCH + 100 # 动态计算API调用限制
logger.info(f" 🎯 {category}: 开始获取,目标最多{max_papers_per_category:,}篇论文")
while len(papers) < max_papers_per_category and api_calls < max_api_calls:
try:
batch_count += 1
api_calls += 1
params = {
"search_query": f"cat:{category}",
"sortBy": "submittedDate",
"sortOrder": "descending",
"start": start_index,
"max_results": min(MAX_RESULTS_PER_BATCH, max_papers_per_category - len(papers))
}
if batch_count % 10 == 0: # 每10批次显示一次详细进度
logger.info(f" 📦 {category}第{batch_count}批次: 从索引{start_index}开始,已获取{len(papers):,}篇...")
response = self.session.get(ARXIV_BASE_URL, params=params, timeout=30)
response.raise_for_status()
feed = feedparser.parse(response.content)
entries = feed.entries
logger.debug(f" ✅ {category}第{batch_count}批次获取了 {len(entries)} 篇论文")
if not entries:
logger.debug(f" 📭 {category}: 没有更多论文")
break
# Filter papers by date
batch_papers = []
older_papers = 0
for entry in entries:
paper_date = datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc)
if paper_date < start_date:
older_papers += 1
continue
if start_date <= paper_date <= end_date:
paper_data = self._parse_paper_entry(entry)
batch_papers.append(paper_data)
papers.extend(batch_papers)
logger.debug(f" 📊 {category}第{batch_count}批次: {len(batch_papers)}篇符合日期, {older_papers}篇过旧")
# If we found older papers, we can stop
if older_papers > 0:
logger.debug(f" 🔚 {category}: 发现过旧论文,停止")
break
# If we got fewer papers than requested, we've reached the end
if len(entries) < MAX_RESULTS_PER_BATCH:
logger.debug(f" 🔚 {category}: 到达数据末尾")
break
start_index += MAX_RESULTS_PER_BATCH
except Exception as e:
logger.error(f" ❌ {category}抓取出错: {e}")
break
logger.info(f" ✅ {category}: 完成,获取{len(papers):,}篇论文 (API调用{api_calls}次)")
return papers
class GitHubUpdater:
"""Handle GitHub repository updates."""
def __init__(self, token: str, repo_name: str):
"""Initialize GitHub updater."""
self.github = Github(token)
self.repo_name = repo_name
self.repo = self.github.get_repo(repo_name)
def update_readme_with_papers(self, papers: List[Dict], section_title: str = None):
"""Update README with new papers in reverse chronological order (newest first)."""
if not papers:
logger.info("No papers to add to README")
return
if section_title is None:
section_title = f"Papers Updated on {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}"
try:
# Get current README
readme_file = self.repo.get_contents("README.md", ref="main")
current_content = readme_file.decoded_content.decode("utf-8")
# Create new section
new_section = f"\n\n## {section_title}\n\n"
for paper in papers:
# Format paper entry
authors_str = ", ".join(paper['authors'][:3]) # First 3 authors
if len(paper['authors']) > 3:
authors_str += " et al."
categories_str = ", ".join(paper['categories'])
new_section += f"### {paper['title']}\n\n"
new_section += f"**Authors:** {authors_str}\n\n"
new_section += f"**Categories:** {categories_str}\n\n"
new_section += f"**Published:** {paper['published']}\n\n"
new_section += f"**Abstract:** {paper['abstract']}\n\n"
new_section += f"**Link:** [arXiv:{paper['arxiv_id']}]({paper['link']})\n\n"
new_section += "---\n\n"
# Insert new papers at the beginning to maintain reverse chronological order
# Find the end of the main documentation (after the project description and setup)
insert_position = self._find_papers_insert_position(current_content)
if insert_position > 0:
# Insert new section after the main documentation but before existing papers
updated_content = (current_content[:insert_position] +
new_section +
current_content[insert_position:])
logger.info(f"📝 新论文段落插入到README开头,保持时间倒序")
else:
# Fallback: append to end if can't find proper insertion point
updated_content = current_content + new_section
logger.info(f"📝 新论文段落追加到README末尾(找不到合适插入位置)")
commit_message = f"Auto-update: Added {len(papers)} new papers on {datetime.now(timezone.utc).strftime('%Y-%m-%d')}"
self.repo.update_file(
path="README.md",
message=commit_message,
content=updated_content,
sha=readme_file.sha,
branch="main"
)
logger.info(f"✅ 成功更新README,添加了 {len(papers)} 篇论文 (时间倒序)")
except Exception as e:
logger.error(f"Error updating README: {e}")
raise
def _find_papers_insert_position(self, content: str) -> int:
"""Find the best position to insert new papers (after main doc, before existing papers)."""
lines = content.split('\n')
# Look for patterns that indicate the end of documentation and start of papers
# Search in order of priority
insert_patterns = [
"**Note**: This tool is designed for academic research purposes", # End of README
"## Papers Updated on", # Existing paper sections
"## Historical", # Historical paper sections
"### ", # Any section that might be a paper title
"---", # Common separator before papers
]
for pattern in insert_patterns:
for i, line in enumerate(lines):
if pattern in line:
# Found a good insertion point - insert before this line
# Convert line index to character position
char_position = sum(len(lines[j]) + 1 for j in range(i)) # +1 for newline
return char_position
# If no patterns found, try to find end of main documentation
# Look for the end of the last documentation section
last_doc_section = -1
for i, line in enumerate(lines):
if line.startswith('## ') and not line.startswith('## Papers') and not line.startswith('## Historical'):
last_doc_section = i
if last_doc_section >= 0:
# Find the end of this documentation section
section_end = len(lines)
for i in range(last_doc_section + 1, len(lines)):
if lines[i].startswith('## '):
section_end = i
break
# Insert after this section
char_position = sum(len(lines[j]) + 1 for j in range(section_end))
return char_position
# Final fallback: return 0 to trigger append behavior
return 0
def main():
"""Main function to run the paper fetcher."""
import time
start_time = time.time()
logger.info("🚀 开始执行ArXiv论文抓取任务")
logger.info("=" * 60)
# Get environment variables
openai_api_key = os.getenv("OPENAI_API_KEY")
github_token = os.getenv("TARGET_REPO_TOKEN")
target_repo = os.getenv("TARGET_REPO_NAME", "YurenHao0426/awesome-llm-bias-papers")
logger.info("🔧 配置信息:")
logger.info(f" - OpenAI API Key: {'已设置' if openai_api_key else '未设置'}")
logger.info(f" - GitHub Token: {'已设置' if github_token else '未设置'}")
logger.info(f" - 目标仓库: {target_repo}")
# Check for required environment variables
if not openai_api_key:
logger.error("❌ OPENAI_API_KEY 环境变量未设置")
sys.exit(1)
if not github_token:
logger.error("❌ TARGET_REPO_TOKEN 环境变量未设置")
sys.exit(1)
# Get command line arguments
mode = os.getenv("FETCH_MODE", "daily") # daily or historical
days = int(os.getenv("FETCH_DAYS", "1"))
logger.info(f"📋 执行模式: {mode}")
if mode == "daily":
logger.info(f"📅 抓取天数: {days} 天")
try:
step_start = time.time()
# Initialize fetcher
logger.info("🔄 初始化论文抓取器...")
fetcher = ArxivPaperFetcher(openai_api_key)
logger.info(f"✅ 初始化完成 ({time.time() - step_start:.1f}秒)")
# Fetch papers
step_start = time.time()
if mode == "historical":
logger.info("📚 运行历史模式 - 抓取过去2年的论文")
papers = fetcher.fetch_historical_papers(years=2)
section_title = "Historical LLM Bias Papers (Past 2 Years)"
else:
logger.info(f"📰 运行日常模式 - 抓取过去{days}天的论文")
papers = fetcher.fetch_recent_papers(days=days)
section_title = None # Use default timestamp
fetch_time = time.time() - step_start
logger.info(f"⏱️ 论文抓取和过滤完成 ({fetch_time:.1f}秒)")
# Update GitHub repository
if papers:
step_start = time.time()
logger.info(f"📤 开始更新GitHub仓库...")
updater = GitHubUpdater(github_token, target_repo)
updater.update_readme_with_papers(papers, section_title)
update_time = time.time() - step_start
logger.info(f"✅ GitHub仓库更新完成 ({update_time:.1f}秒)")
logger.info("🎉 任务完成!")
logger.info(f" - 找到相关论文: {len(papers)} 篇")
logger.info(f" - 总执行时间: {time.time() - start_time:.1f} 秒")
else:
logger.warning("⚠️ 没有找到相关论文")
logger.info("可能的原因:")
logger.info(" - 指定日期范围内没有新的LLM偏见相关论文")
logger.info(" - arXiv API连接问题")
logger.info(" - GPT-4o过滤条件过于严格")
logger.info(f" - 总执行时间: {time.time() - start_time:.1f} 秒")
except Exception as e:
logger.error(f"❌ 执行过程中出现错误: {e}")
import traceback
logger.error(f"详细错误信息: {traceback.format_exc()}")
sys.exit(1)
if __name__ == "__main__":
main()
|