Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion openedx_authz/api/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,12 @@ def get_role_assignments(
field_index, field_values = _get_field_index_and_values(subject, role, scope)
policies = enforcer.get_filtered_grouping_policy(field_index, *field_values)

_perm_cache: dict[str, list[PermissionData]] = {}
for policy in policies:
role = RoleData(namespaced_key=policy[GroupingPolicyIndex.ROLE.value])
role.permissions = get_permissions_for_single_role(role)
if role.namespaced_key not in _perm_cache:
_perm_cache[role.namespaced_key] = get_permissions_for_single_role(role)
role.permissions = _perm_cache[role.namespaced_key]
role_assignments.append(
RoleAssignmentData(
subject=SubjectData(namespaced_key=policy[GroupingPolicyIndex.SUBJECT.value]),
Expand Down
92 changes: 56 additions & 36 deletions openedx_authz/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
ScopeData,
SuperAdminAssignmentData,
UserAssignments,
UserAssignmentsFilter,
UserData,
)
from casbin.util import key_match_func

from openedx_authz.api.permissions import is_subject_allowed
from openedx_authz.api.roles import (
assign_role_to_subject_in_scope,
Expand All @@ -38,7 +39,7 @@
unassign_role_from_subject_in_scope,
unassign_subject_from_all_roles,
)
from openedx_authz.api.utils import filter_user_assignments, get_user_assignment_map
from openedx_authz.api.utils import get_user_assignment_map
from openedx_authz.utils import get_user_by_username_or_email

User = get_user_model()
Expand Down Expand Up @@ -267,30 +268,63 @@ def get_all_user_role_assignments_in_scope(
return get_all_subject_role_assignments_in_scope(ScopeData(external_key=scope_external_key))


def _prefilter_assignments(
assignments: list[RoleAssignmentData],
orgs: list[str] | None,
scopes: list[str] | None,
roles: list[str] | None,
) -> list[RoleAssignmentData]:
"""Filter a flat assignment list by org/scope/role before the expensive authorization pass."""
if scopes:
assignments = [a for a in assignments if a.scope.external_key in scopes]
if orgs:
assignments = [a for a in assignments if getattr(a.scope, "org", None) in orgs]
if roles:
assignments = [a for a in assignments if a.roles and a.roles[0].external_key in roles]
return assignments


def _filter_allowed_assignments(
assignments: list[RoleAssignmentData], user_external_key: str = None
) -> list[RoleAssignmentData]:
"""
Filter the given role assignments to only include those that the user has permission to view.
"""Filter assignments to only those the viewer can see.

Bypasses Casbin enforce() entirely: checks staff/superuser once, then uses
get_scopes_for_subject_and_permission for a DB-backed scope membership test
followed by Python key_match filtering.
"""
if not user_external_key:
# If no user is specified, return all assignments
return assignments
allowed_assignments: list[RoleAssignmentData] = []

try:
user = User.objects.get(username=user_external_key, is_active=True)
if user.is_staff or user.is_superuser:
return assignments
except User.DoesNotExist:
pass

viewer = UserData(external_key=user_external_key)

# Collect distinct permissions needed (typically 1-2 across all scope types).
permissions_by_id: dict[str, PermissionData] = {}
for assignment in assignments:
permission = None
perm = assignment.scope.get_admin_view_permission()
permissions_by_id[perm.identifier] = perm

# Get the permission needed to view the specific scope in the admin console
permission = assignment.scope.get_admin_view_permission().identifier
# One DB-backed lookup per distinct permission.
viewer_scopes_by_permission: dict[str, list[ScopeData]] = {
perm_id: get_scopes_for_subject_and_permission(viewer, perm)
for perm_id, perm in permissions_by_id.items()
}

if permission and is_user_allowed(
user_external_key=user_external_key,
action_external_key=permission,
scope_external_key=assignment.scope.external_key,
):
allowed_assignments.append(assignment)
result = []
for assignment in assignments:
perm_id = assignment.scope.get_admin_view_permission().identifier
viewer_scopes = viewer_scopes_by_permission.get(perm_id, [])
if any(key_match_func(assignment.scope.namespaced_key, vs.namespaced_key) for vs in viewer_scopes):
result.append(assignment)

return allowed_assignments
return result


def get_visible_role_assignments_for_user(
Expand All @@ -312,30 +346,16 @@ def get_visible_role_assignments_for_user(
list[UserAssignments]: A list of users with their role assignments, filtered by orgs/scopes and permissions.
"""
user_role_assignments = get_user_role_assignments_filtered()
# Filter assignments based on the user's permissions
# Reduce the candidate set with cheap filters before the expensive authorization pass.
user_role_assignments = _prefilter_assignments(
user_role_assignments, orgs=orgs, scopes=scopes, roles=roles
)
# Filter assignments based on the viewer's authorization.
user_role_assignments = _filter_allowed_assignments(
user_external_key=allowed_for_user_external_key,
assignments=user_role_assignments,
)
# Group assignments by user
users_with_assignments = get_user_assignment_map(user_role_assignments)

users_with_assignments = filter_user_assignments(
users_with_assignments=users_with_assignments,
by=UserAssignmentsFilter.SCOPES,
values=scopes,
)
users_with_assignments = filter_user_assignments(
users_with_assignments=users_with_assignments,
by=UserAssignmentsFilter.ORGS,
values=orgs,
)
users_with_assignments = filter_user_assignments(
users_with_assignments=users_with_assignments,
by=UserAssignmentsFilter.ROLES,
values=roles,
)
return users_with_assignments
return get_user_assignment_map(user_role_assignments)


def is_user_allowed(
Expand Down
90 changes: 90 additions & 0 deletions openedx_authz/tests/api/test_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

from openedx_authz.api.data import ContentLibraryData, RoleAssignmentData, RoleData, UserData
from openedx_authz.api.users import (
_filter_allowed_assignments,
assign_role_to_user_in_scope,
batch_assign_role_to_users_in_scope,
batch_unassign_role_from_users,
get_all_user_role_assignments_in_scope,
get_user_role_assignments,
get_user_role_assignments_filtered,
get_user_role_assignments_for_role_in_scope,
get_user_role_assignments_in_scope,
get_visible_role_assignments_for_user,
get_visible_user_role_assignments_filtered_by_current_user,
is_user_allowed,
unassign_all_roles_from_user,
Expand Down Expand Up @@ -616,3 +619,90 @@ def test_mixed_active_inactive_subjects_in_assignments(self):

self.assertGreater(len(eve_assignments), 0)
self.assertEqual(grace_assignments, [])


class TestGetVisibleRoleAssignmentsForUser(UserAssignmentsSetupMixin):
"""Tests for get_visible_role_assignments_for_user: pre-filter and batch authorization."""

def _all_scopes(self, user_assignments_list):
return {a.scope.external_key for ua in user_assignments_list for a in ua.assignments}

def _all_roles(self, user_assignments_list):
return {r.external_key for ua in user_assignments_list for a in ua.assignments for r in a.roles}

def _all_orgs(self, user_assignments_list):
return {getattr(a.scope, "org", None) for ua in user_assignments_list for a in ua.assignments}

def test_no_viewer_filter_allowed_returns_all_assignments(self):
"""_filter_allowed_assignments with no viewer returns the full input list unchanged."""
all_assignments = get_user_role_assignments_filtered()
result = _filter_allowed_assignments(all_assignments, user_external_key=None)

self.assertEqual(len(result), len(all_assignments))

def test_scope_prefilter_reduces_to_matching_scopes(self):
"""Assignments outside the requested scopes are excluded."""
target_scopes = ["lib:Org1:math_101", "lib:Org2:physics_401"]
result = get_visible_role_assignments_for_user(scopes=target_scopes)

returned_scopes = self._all_scopes(result)
self.assertTrue(returned_scopes.issubset(set(target_scopes)))

def test_org_prefilter_reduces_to_matching_orgs(self):
"""Assignments outside the requested org are excluded."""
result = get_visible_role_assignments_for_user(orgs=["Org2"])

returned_orgs = self._all_orgs(result)
self.assertTrue(returned_orgs.issubset({"Org2"}))

def test_role_prefilter_reduces_to_matching_roles(self):
"""Assignments with a different role are excluded."""
result = get_visible_role_assignments_for_user(roles=["library_admin"])

returned_roles = self._all_roles(result)
self.assertTrue(returned_roles.issubset({"library_admin"}))

def test_viewer_restricts_to_accessible_scopes(self):
"""Viewer with library_admin in one scope cannot see other scopes."""
# alice has library_admin in lib:Org1:math_101 only.
result = get_visible_role_assignments_for_user(allowed_for_user_external_key="alice")

returned_scopes = self._all_scopes(result)
for scope in returned_scopes:
self.assertEqual(scope, "lib:Org1:math_101")

def test_scope_auth_parity_with_per_assignment_is_user_allowed(self):
"""Scope-based filter and per-assignment is_user_allowed agree on every assignment."""
all_assignments = get_user_role_assignments_filtered()

# New scope-based path
batch_result = _filter_allowed_assignments(all_assignments, user_external_key="alice")

# Reference: old per-assignment path
per_assignment_result = [
a for a in all_assignments
if is_user_allowed(
user_external_key="alice",
action_external_key=a.scope.get_admin_view_permission().identifier,
scope_external_key=a.scope.external_key,
)
]

self.assertEqual(
{(a.subject.username, a.scope.external_key) for a in batch_result},
{(a.subject.username, a.scope.external_key) for a in per_assignment_result},
)

def test_combined_filters_reduce_set_before_auth(self):
"""org + role pre-filters leave only matching assignments for authorization."""
all_assignments = get_user_role_assignments_filtered()

# Pre-filter to Org1 + library_admin, then apply authorization as alice.
from openedx_authz.api.users import _prefilter_assignments
pre_filtered = _prefilter_assignments(all_assignments, orgs=["Org1"], scopes=None, roles=["library_admin"])
authorized = _filter_allowed_assignments(pre_filtered, user_external_key="alice")

# All authorized assignments must be in Org1 and have library_admin role.
for a in authorized:
self.assertEqual(getattr(a.scope, "org", None), "Org1")
self.assertTrue(any(r.external_key == "library_admin" for r in a.roles))
Loading