|
| 1 | +# Copyright (c) nexB Inc. and others. All rights reserved. |
| 2 | +# VulnerableCode is a trademark of nexB Inc. |
| 3 | +# SPDX-License-Identifier: Apache-2.0 |
| 4 | +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. |
| 5 | +# See https://github.com/aboutcode-org/vulnerablecode for support or download. |
| 6 | +# See https://aboutcode.org for more information about nexB OSS projects. |
| 7 | +# |
| 8 | + |
| 9 | +import time |
| 10 | + |
| 11 | +import requests |
| 12 | + |
| 13 | +from vulnerabilities.models import AdvisoryReference |
| 14 | +from vulnerabilities.pipelines import VulnerableCodePipeline |
| 15 | + |
| 16 | + |
| 17 | +class ArchiveImproverPipeline(VulnerableCodePipeline): |
| 18 | + """ |
| 19 | + Archive Improver Pipeline |
| 20 | + """ |
| 21 | + |
| 22 | + pipeline_id = "archive_improver_pipeline" |
| 23 | + |
| 24 | + @classmethod |
| 25 | + def steps(cls): |
| 26 | + return (cls.archive_urls,) |
| 27 | + |
| 28 | + def archive_urls(self): |
| 29 | + """Get and stores archive URLs for AdvisoryReferences, flagging missing ones as NO_ARCHIVE""" |
| 30 | + advisory_refs = ( |
| 31 | + AdvisoryReference.objects.filter(archive_url__isnull=True) |
| 32 | + .exclude(archive_url="NO_ARCHIVE") |
| 33 | + .only("id", "url") |
| 34 | + ) |
| 35 | + |
| 36 | + for advisory_ref in advisory_refs: |
| 37 | + url = advisory_ref.url |
| 38 | + if not url or not url.startswith("http"): |
| 39 | + continue |
| 40 | + |
| 41 | + archive_url = self.get_archival(url) |
| 42 | + if not archive_url: |
| 43 | + AdvisoryReference.objects.filter(id=advisory_ref.id).update( |
| 44 | + archive_url="NO_ARCHIVE" |
| 45 | + ) |
| 46 | + self.log(f"URL unreachable or returned no archive url: {url}") |
| 47 | + continue |
| 48 | + self.log(f"Found Archived Reference URL: {archive_url}") |
| 49 | + AdvisoryReference.objects.filter(id=advisory_ref.id).update(archive_url=archive_url) |
| 50 | + |
| 51 | + def get_archival(self, url): |
| 52 | + self.log(f"Searching for archive URL for this Reference URL: {url}") |
| 53 | + try: |
| 54 | + archive_response = requests.get( |
| 55 | + url=f"https://web.archive.org/web/{url}", allow_redirects=True |
| 56 | + ) |
| 57 | + time.sleep(30) |
| 58 | + if archive_response.status_code == 200: |
| 59 | + return archive_response.url |
| 60 | + elif archive_response.status_code == 403: |
| 61 | + self.log(f"Wayback Machine permission denied for '{url}'.") |
| 62 | + except requests.RequestException as e: |
| 63 | + self.log(f"Error checking existing archival: {e}") |
0 commit comments