Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
210 changes: 136 additions & 74 deletions scripts/generate_leaderboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ def format_query_date(value: datetime) -> str:


def build_search_query(
org: str,
repo: str,
state: str,
start_date: datetime,
end_date: Optional[datetime],
date_field: str,
) -> str:
terms = [
f"org:{org}",
f"repo:{repo}",
"is:pr",
f"state:{state}",
f"{date_field}:>={format_query_date(start_date)}",
Expand Down Expand Up @@ -91,6 +91,47 @@ def fetch_search_pulls(query_text: str, page: int):
DELAY_SECONDS = 0.35


def fetch_org_repos(org: str) -> list:
"""Return a list of full repository names (owner/repo) for the given org."""
repos = []
page = 1
while True:
query = urllib.parse.urlencode(
{"per_page": PER_PAGE, "page": page, "type": "public"}
)
url = f"https://api.github.com/orgs/{org}/repos?{query}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "alphaonelabs-leaderboard-generator",
}
if GITHUB_TOKEN:
headers["Authorization"] = f"Bearer {GITHUB_TOKEN}"
request = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(request, timeout=30) as response:
data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as error:
if error.code == 403:
raise RuntimeError(
"GitHub API access is temporarily restricted (403)."
) from error
raise RuntimeError(f"GitHub API error: {error.code}") from error
except urllib.error.URLError as error:
raise RuntimeError("Unable to fetch repository list.") from error

if not isinstance(data, list) or len(data) == 0:
break
for repo in data:
full_name = repo.get("full_name")
if full_name:
repos.append(full_name)
if len(data) < PER_PAGE:
break
page += 1
time.sleep(DELAY_SECONDS)
return repos


def parse_cli_date(value: str):
try:
return datetime.strptime(value, "%Y-%m-%d").replace(
Expand Down Expand Up @@ -192,85 +233,106 @@ def is_within_date_range(
return True


def get_pr_id(pr: dict):
"""Return a stable unique identifier for a PR search result, or None."""
return pr.get("node_id") or pr.get("id") or None


def build_leaderboard(start_date: datetime, end_date: Optional[datetime]):
contributor_stats = {}
seen_pr_ids: set[str] = set()

closed_query = build_search_query(
GITHUB_ORG,
"closed",
start_date,
end_date,
"closed",
)
repos = fetch_org_repos(GITHUB_ORG)

closed_prs = []
page = 1
while page <= MAX_CLOSED_PAGES:
rows = fetch_search_pulls(closed_query, page)
if not isinstance(rows, list) or len(rows) == 0:
break
closed_prs.extend(rows)
if len(rows) < PER_PAGE:
break
page += 1
time.sleep(DELAY_SECONDS)
for repo_full_name in repos:
closed_query = build_search_query(
repo_full_name,
"closed",
start_date,
end_date,
"closed",
)

for pr in closed_prs:
user = pr.get("user")
if (
not user
or not user.get("login")
or should_exclude(user.get("login"))
):
continue

merged_at = parse_github_date(pr.get("pull_request", {}).get("merged_at"))
closed_at = parse_github_date(pr.get("closed_at"))
relevant_date = merged_at or closed_at
if not is_within_date_range(relevant_date, start_date, end_date):
continue

ensure_contributor(contributor_stats, user)
if merged_at:
contributor_stats[user["login"]]["merged_pr_count"] += 1
else:
contributor_stats[user["login"]]["closed_pr_count"] += 1

open_query = build_search_query(
GITHUB_ORG,
"open",
start_date,
end_date,
"created",
)
closed_prs = []
page = 1
while page <= MAX_CLOSED_PAGES:
rows = fetch_search_pulls(closed_query, page)
if not isinstance(rows, list) or len(rows) == 0:
break
closed_prs.extend(rows)
if len(rows) < PER_PAGE:
break
page += 1
time.sleep(DELAY_SECONDS)

for pr in closed_prs:
pr_id = get_pr_id(pr)
if pr_id is None or pr_id in seen_pr_ids:
continue
seen_pr_ids.add(pr_id)

user = pr.get("user")
if (
not user
or not user.get("login")
or should_exclude(user.get("login"))
):
continue

merged_at = parse_github_date(pr.get("pull_request", {}).get("merged_at"))
closed_at = parse_github_date(pr.get("closed_at"))
relevant_date = merged_at or closed_at
if not is_within_date_range(relevant_date, start_date, end_date):
continue

ensure_contributor(contributor_stats, user)
if merged_at:
contributor_stats[user["login"]]["merged_pr_count"] += 1
else:
contributor_stats[user["login"]]["closed_pr_count"] += 1

open_query = build_search_query(
repo_full_name,
"open",
start_date,
end_date,
"created",
)

open_prs = []
page = 1
while page <= MAX_OPEN_PAGES:
rows = fetch_search_pulls(open_query, page)
if not isinstance(rows, list) or len(rows) == 0:
break
open_prs.extend(rows)
if len(rows) < PER_PAGE:
break
page += 1
time.sleep(DELAY_SECONDS)
open_prs = []
page = 1
while page <= MAX_OPEN_PAGES:
rows = fetch_search_pulls(open_query, page)
if not isinstance(rows, list) or len(rows) == 0:
break
open_prs.extend(rows)
if len(rows) < PER_PAGE:
break
page += 1
time.sleep(DELAY_SECONDS)

for pr in open_prs:
pr_id = get_pr_id(pr)
if pr_id is None or pr_id in seen_pr_ids:
continue
seen_pr_ids.add(pr_id)

user = pr.get("user")
if (
not user
or not user.get("login")
or should_exclude(user.get("login"))
):
continue

created_at = parse_github_date(pr.get("created_at"))
if not is_within_date_range(created_at, start_date, end_date):
continue

ensure_contributor(contributor_stats, user)
contributor_stats[user["login"]]["open_pr_count"] += 1

for pr in open_prs:
user = pr.get("user")
if (
not user
or not user.get("login")
or should_exclude(user.get("login"))
):
continue

created_at = parse_github_date(pr.get("created_at"))
if not is_within_date_range(created_at, start_date, end_date):
continue

ensure_contributor(contributor_stats, user)
contributor_stats[user["login"]]["open_pr_count"] += 1
time.sleep(DELAY_SECONDS)

contributors = []
for item in contributor_stats.values():
Expand Down