@@ -23,7 +23,8 @@ def opt_oidc(f):
2323 @click .option ("--username" , help = "OIDC username" )
2424 @click .option ("--password" , help = "OIDC password" )
2525 @click .option ("--connector-id" , "connector_id" , help = "OIDC token exchange connector id (Dex specific)" )
26- @click .option ("--callback-port" ,
26+ @click .option (
27+ "--callback-port" ,
2728 "callback_port" ,
2829 type = click .IntRange (0 , 65535 ),
2930 default = None ,
@@ -93,7 +94,7 @@ async def authorization_code_grant(self, callback_port: int | None = None):
9394 elif env_value .isdigit () and int (env_value ) <= 65535 :
9495 port = int (env_value )
9596 else :
96- raise click .ClickException (f" Invalid { JMP_OIDC_CALLBACK_PORT } \ "{ env_value } \ " : must be a valid port" )
97+ raise click .ClickException (f' Invalid { JMP_OIDC_CALLBACK_PORT } "{ env_value } ": must be a valid port' )
9798
9899 tx , rx = create_memory_object_stream ()
99100
@@ -133,8 +134,52 @@ async def callback(request):
133134
134135
135136def decode_jwt (token : str ):
136- return json .loads (extract_compact (token .encode ()).payload )
137+ try :
138+ return json .loads (extract_compact (token .encode ()).payload )
139+ except (ValueError , KeyError , TypeError ) as e :
140+ raise ValueError (f"Invalid JWT format: { e } " ) from e
137141
138142
139143def decode_jwt_issuer (token : str ):
140144 return decode_jwt (token ).get ("iss" )
145+
146+
147+ def get_token_expiry (token : str ) -> int | None :
148+ """Get token expiry timestamp (Unix epoch seconds) from JWT.
149+
150+ Returns None if token doesn't have an exp claim.
151+ """
152+ return decode_jwt (token ).get ("exp" )
153+
154+
155+ def get_token_remaining_seconds (token : str ) -> float | None :
156+ """Get seconds remaining until token expires.
157+
158+ Returns:
159+ Positive value if token is still valid
160+ Negative value if token is expired (magnitude = how long ago)
161+ None if token doesn't have an exp claim
162+ """
163+ import time
164+
165+ exp = get_token_expiry (token )
166+ if exp is None :
167+ return None
168+ return exp - time .time ()
169+
170+
171+ def is_token_expired (token : str , buffer_seconds : int = 0 ) -> bool :
172+ """Check if token is expired or will expire within buffer_seconds.
173+
174+ Args:
175+ token: JWT token string
176+ buffer_seconds: Consider expired if less than this many seconds remain
177+
178+ Returns:
179+ True if token is expired or will expire within buffer
180+ False if token is still valid (or has no exp claim)
181+ """
182+ remaining = get_token_remaining_seconds (token )
183+ if remaining is None :
184+ return False
185+ return remaining < buffer_seconds
0 commit comments