Skip to content
191 changes: 81 additions & 110 deletions src/handlers/bugs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict
from utils import error_response, parse_pagination_params, parse_json_body, convert_d1_results
from libs.db import get_db_safe
from models import Bug
from models import Bug, BugScreenshot
from workers import Response
import logging

Expand Down Expand Up @@ -59,38 +59,32 @@ async def handle_bugs(
except ValueError:
limit_int = 10

search_result = await db.prepare('''
SELECT
b.id,
b.url,
b.description,
b.status,
b.verified,
b.score,
b.views,
b.created,
b.modified,
b.is_hidden,
b.rewarded,
b.cve_id,
b.cve_score,
b.domain,
d.name as domain_name,
d.url as domain_url
FROM bugs b
LEFT JOIN domains d ON b.domain = d.id
WHERE b.url LIKE ? OR b.description LIKE ?
ORDER BY b.created DESC
LIMIT ? OFFSET 0
''').bind(f"%{query}%", f"%{query}%", limit_int).all()

response_data = convert_d1_results(search_result.results if hasattr(search_result, 'results') else [])
return Response.json({
"success": True,
"query": query,
"data": response_data
})

try:
# Search requires OR condition (url LIKE ? OR description LIKE ?)
# which the ORM does not support. Raw SQL is kept intentionally.
search_result = await db.prepare('''
SELECT
b.id, b.url, b.description, b.status, b.verified,
b.score, b.views, b.created, b.modified, b.is_hidden,
b.rewarded, b.cve_id, b.cve_score, b.domain,
d.name as domain_name, d.url as domain_url
FROM bugs b
LEFT JOIN domains d ON b.domain = d.id
WHERE b.url LIKE ? OR b.description LIKE ?
ORDER BY b.created DESC
LIMIT ? OFFSET 0
''').bind(f"%{query}%", f"%{query}%", limit_int).all()
response_data = convert_d1_results(
search_result.results if hasattr(search_result, 'results') else []
)
return Response.json({
"success": True,
"query": query,
"data": response_data
})
except Exception as e:
logger.error(f"Error searching bugs: {str(e)}")
return error_response("Failed to search bugs. Please try again later.", status=500)
# Get specific bug
if "id" in path_params:
try:
Expand All @@ -99,83 +93,60 @@ async def handle_bugs(
logger.warning(f"Invalid bug id format: {path_params['id']}")
return error_response("Invalid bug id format", status=400)

result = await db.prepare('''
SELECT
b.id,
b.url,
b.description,
b.markdown_description,
b.label,
b.views,
b.verified,
b.score,
b.status,
b.user_agent,
b.ocr,
b.screenshot,
b.closed_date,
b.github_url,
b.created,
b.modified,
b.is_hidden,
b.rewarded,
b.reporter_ip_address,
b.cve_id,
b.cve_score,
b.hunt,
b.domain,
b.user,
b.closed_by,
d.id as domain_id,
d.name as domain_name,
d.url as domain_url,
d.logo as domain_logo
FROM bugs b
LEFT JOIN domains d ON b.domain = d.id
WHERE b.id = ?
''').bind(bug_id).first()

# Convert JsProxy result directly to Python dict
if result and hasattr(result, 'to_py'):
bug_data = result.to_py()
elif result and isinstance(result, dict):
bug_data = dict(result)
else:
bug_data = None

if not bug_data:
return error_response("Bug not found", status=404)

# Get screenshots for this bug
screenshots_result = await db.prepare('''
SELECT id, image, created
FROM bug_screenshots
WHERE bug = ?
ORDER BY created DESC
''').bind(bug_id).all()

# Get tags for this bug
tags_result = await db.prepare('''
SELECT t.id, t.name
FROM bug_tags bt
JOIN tags t ON bt.tag_id = t.id
WHERE bt.bug_id = ?
ORDER BY t.name
''').bind(bug_id).all()

# Convert results
screenshots_data = convert_d1_results(screenshots_result.results if hasattr(screenshots_result, 'results') else [])
tags_data = convert_d1_results(tags_result.results if hasattr(tags_result, 'results') else [])

# Add screenshots and tags to bug data
bug_data['screenshots'] = screenshots_data
bug_data['tags'] = tags_data

return Response.json({
"success": True,
"data": bug_data
})

try:
# Use ORM with JOIN for main bug fetch
bug_data = await (
Bug.objects(db)
.join("domains", on="bugs.domain=domains.id", join_type="LEFT")
.filter(**{"bugs.id": bug_id})
.values(
"bugs.id", "bugs.url", "bugs.description",
"bugs.markdown_description", "bugs.label", "bugs.views",
"bugs.verified", "bugs.score", "bugs.status",
"bugs.user_agent", "bugs.ocr", "bugs.screenshot",
"bugs.closed_date", "bugs.github_url", "bugs.created",
"bugs.modified", "bugs.is_hidden", "bugs.rewarded",
"bugs.cve_id", "bugs.cve_score",
"bugs.hunt", "bugs.domain", "bugs.user", "bugs.closed_by",
"domains.id as domain_id", "domains.name as domain_name",
"domains.url as domain_url", "domains.logo as domain_logo"
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
.first()
)

if not bug_data:
return error_response("Bug not found", status=404)

# Screenshots via ORM (no JOIN needed)
screenshots_data = (
await BugScreenshot.objects(db)
.filter(bug=bug_id)
.values("id", "image", "created")
.order_by("-created")
.all()
)

# Tags require a JOIN across bug_tags and tags tables.
# Kept as raw SQL since the ORM only supports single equality ON clauses.
tags_result = await db.prepare('''
SELECT t.id, t.name
FROM bug_tags bt
JOIN tags t ON bt.tag_id = t.id
WHERE bt.bug_id = ?
ORDER BY t.name
''').bind(bug_id).all()
tags_data = convert_d1_results(tags_result.results if hasattr(tags_result, 'results') else [])

bug_data['screenshots'] = screenshots_data
bug_data['tags'] = tags_data

return Response.json({
"success": True,
"data": bug_data
})
except Exception as e:
logger.error(f"Error fetching bug {bug_id}: {str(e)}")
return error_response("Failed to fetch bug. Please try again later.", status=500)
Comment thread
SatishKumar620 marked this conversation as resolved.
# Create bug
if method == "POST":
body = await parse_json_body(request)
Expand Down
15 changes: 14 additions & 1 deletion src/libs/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,23 @@
def _validate_identifier(name: str) -> str:
"""Validate that *name* is a safe SQL identifier.

Supports simple names (``id``) and table-qualified names (``b.id``).
Supports simple names (``id``), table-qualified names (``b.id``),
and aliased expressions (``table.col AS alias`` or ``table.col as alias``).
Raises ``ValueError`` if *name* contains characters outside the safe set
or is empty.
"""
# Support "col AS alias" or "table.col AS alias" syntax
parts = name.split()
if len(parts) == 3 and parts[1].lower() == "as":
for ident in (parts[0], parts[2]):
for part in ident.split("."):
if not part or not all(c in _SAFE_IDENT_CHARS for c in part):
raise ValueError(
f"Unsafe identifier {name!r}: only letters, digits and "
"underscores are allowed."
)
return name
# Original single-identifier path
for part in name.split("."):
if not part or not all(c in _SAFE_IDENT_CHARS for c in part):
raise ValueError(
Expand Down
Loading