-
Notifications
You must be signed in to change notification settings - Fork 2
Centralize materialized view registry and refactor refresh task #292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f747493
5664759
233d0da
4590054
dc4fa34
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file is from #288 and isnt needed here |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| """fix alias email constraints | ||
|
|
||
| Revision ID: 41 | ||
| Revises: 40 | ||
| Create Date: 2026-05-04 14:23:57.315794 | ||
|
|
||
| """ | ||
| from alembic import op | ||
| import sqlalchemy as sa | ||
|
|
||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = '41' | ||
| down_revision = '40' | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade(): | ||
| # ### commands auto generated by Alembic - please adjust! ### | ||
| op.drop_constraint(op.f('contributor-alias-unique'), 'contributors_aliases', schema='augur_data', type_='unique') | ||
| op.create_unique_constraint('cntrb-email-insert-unique', 'contributors_aliases', ['cntrb_id', 'alias_email'], schema='augur_data') | ||
| # ### end Alembic commands ### | ||
|
|
||
|
|
||
| def downgrade(): | ||
| # ### commands auto generated by Alembic - please adjust! ### | ||
| op.drop_constraint('cntrb-email-insert-unique', 'contributors_aliases', schema='augur_data', type_='unique') | ||
| op.create_unique_constraint(op.f('contributor-alias-unique'), 'contributors_aliases', ['alias_email'], schema='augur_data') | ||
| # ### end Alembic commands ### |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,197 +3,53 @@ | |
| import sqlalchemy as s | ||
|
|
||
| from collectoss.tasks.init.celery_app import celery_app as celery | ||
| from collectoss.application.db.lib import execute_sql | ||
| from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS | ||
| from collectoss.tasks.git.util.facade_worker.facade_worker.config import FacadeHelper | ||
| from collectoss.tasks.git.util.facade_worker.facade_worker.rebuildcache import invalidate_caches, rebuild_unknown_affiliation_and_web_caches | ||
|
|
||
|
|
||
| @celery.task(bind=True) | ||
| def refresh_materialized_views(self): | ||
|
|
||
| #self.logger = SystemLogger("data_collection_jobs").get_logger() | ||
|
|
||
| engine = self.app.engine | ||
|
|
||
| logger = logging.getLogger(refresh_materialized_views.__name__) | ||
| #self.logger = logging.getLogger(refresh_materialized_views.__name__) | ||
|
|
||
| mv1_refresh = s.sql.text(""" | ||
| REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repo_prs with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv2_refresh = s.sql.text(""" | ||
| REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_commits with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv3_refresh = s.sql.text(""" | ||
| REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_issues with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv4_refresh = s.sql.text(""" | ||
| REFRESH MATERIALIZED VIEW concurrently augur_data.augur_new_contributors with data; | ||
| COMMIT; | ||
| """) | ||
| mv5_refresh = s.sql.text(""" | ||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_commits_and_committers_daily_count with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv6_refresh = s.sql.text(""" | ||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_new_contributors with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv7_refresh = s.sql.text(""" | ||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_entry_list with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv8_refresh = s.sql.text(""" | ||
|
|
||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_contributor_actions with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv9_refresh = s.sql.text(""" | ||
|
|
||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_user_repos with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv10_refresh = s.sql.text(""" | ||
|
|
||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_response_times with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv11_refresh = s.sql.text(""" | ||
|
|
||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_assignments with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv12_refresh = s.sql.text(""" | ||
|
|
||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_issue_assignments with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv13_refresh = s.sql.text(""" | ||
|
|
||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_response with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| mv14_refresh = s.sql.text(""" | ||
|
|
||
| REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_repo_languages with data; | ||
| COMMIT; | ||
| """) | ||
|
|
||
| try: | ||
| execute_sql(mv1_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv2_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv3_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv4_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv5_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv6_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv7_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv8_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv9_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv10_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv11_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv12_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv13_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
|
|
||
| try: | ||
| execute_sql(mv14_refresh) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
| pass | ||
| # REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction | ||
| # block, so we use an autocommit connection rather than execute_sql(). | ||
| failed_views = [] | ||
| with self.app.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: | ||
| for view in MATERIALIZED_VIEWS: | ||
| logger.info(f"Refreshing materialized view: {view.fqn}") | ||
|
Comment on lines
+19
to
+21
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other than this feeling a little DIY, id want to maybe strategize more on how we are refreshing these views. CONCURRENTLY cant be used unless the view has a primary key. I'd also want to make sure that we arent refreshing views at inopportune times relative to collection, but thats all maybe worth a followup PR |
||
| try: | ||
| conn.execute(s.sql.text(view.refresh_sql(concurrently=True))) | ||
| except Exception as e: | ||
| logger.warning(f"Concurrent refresh failed for {view.fqn}, trying non-concurrent: {e}") | ||
| try: | ||
| conn.execute(s.sql.text(view.refresh_sql(concurrently=False))) | ||
| except Exception as e2: | ||
| logger.error(f"Non-concurrent refresh also failed for {view.fqn}: {e2}") | ||
| failed_views.append(view.fqn) | ||
|
|
||
| if failed_views: | ||
| raise RuntimeError( | ||
| f"{len(failed_views)} materialized view(s) failed to refresh: {failed_views}" | ||
| ) | ||
|
|
||
| #Now refresh facade tables | ||
| #Use this class to get all the settings and | ||
| #Use this class to get all the settings and | ||
| #utility functions for facade | ||
| facade_helper = FacadeHelper(logger) | ||
|
|
||
| if facade_helper.nuke_stored_affiliations: | ||
| logger.error("Nuke stored affiliations is deprecated!") | ||
| # deprecated because the UI component of facade where affiliations would be | ||
| # nuked upon change no longer exists, and this information can easily be derived | ||
| # deprecated because the UI component of facade where affiliations would be | ||
| # nuked upon change no longer exists, and this information can easily be derived | ||
| # from queries and materialized views in the current version of CollectOSS. | ||
| # This method is also a major performance bottleneck with little value. | ||
|
|
||
| if not facade_helper.limited_run or (facade_helper.limited_run and facade_helper.fix_affiliations): | ||
| logger.error("Fill empty affiliations is deprecated!") | ||
| # deprecated because the UI component of facade where affiliations would need | ||
| # to be fixed upon change no longer exists, and this information can easily be derived | ||
| # deprecated because the UI component of facade where affiliations would need | ||
| # to be fixed upon change no longer exists, and this information can easily be derived | ||
| # from queries and materialized views in the current version of CollectOSS. | ||
| # This method is also a major performance bottleneck with little value. | ||
|
|
||
|
|
@@ -202,13 +58,9 @@ def refresh_materialized_views(self): | |
| invalidate_caches(facade_helper) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
|
|
||
| if not facade_helper.limited_run or (facade_helper.limited_run and facade_helper.rebuild_caches): | ||
| try: | ||
| rebuild_unknown_affiliation_and_web_caches(facade_helper) | ||
| except Exception as e: | ||
| logger.info(f"error is {e}") | ||
|
|
||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -384,7 +384,7 @@ def clone_repos(): | |
| setattr(repoStatus,"facade_status", CollectionState.ERROR.value) | ||
| session.commit() | ||
|
|
||
| clone_repos.si().apply_async(countdown=60*5) | ||
| clone_repos.si().apply_async(countdown=60*5) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was fixed in another PR and shouldnt be included here |
||
|
|
||
|
|
||
| #@celery.task(bind=True) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was made in #288