Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions src/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

from workers import Response

import js
from pyodide.ffi import to_js

def capture_exception(exc: Exception, req=None, _env=None, where: str = ""):
"""Best-effort exception logging with full traceback and request context."""
Expand Down Expand Up @@ -109,11 +111,9 @@ def _derive_aes_key_bytes(secret: str) -> bytes:

async def _import_aes_key(key_bytes: bytes) -> object:
"""Import raw bytes as a Web Crypto AES-GCM CryptoKey."""
import js
from pyodide.ffi import to_js
key_buf = to_js(key_bytes, create_pyproxies=False)
algo = to_js({"name": "AES-GCM"}, create_pyproxies=False)
usages = to_js(["encrypt", "decrypt"], create_pyproxies=False)
algo = to_js({"name": "AES-GCM"}, dict_converter=js.Object.fromEntries)
usages = to_js(["encrypt", "decrypt"])
return await js.crypto.subtle.importKey("raw", key_buf, algo, False, usages)


Expand All @@ -126,13 +126,17 @@ async def encrypt_aes(plaintext: str, secret: str) -> str:
if not plaintext:
return ""
try:
import js
from pyodide.ffi import to_js
key_bytes = _derive_aes_key_bytes(secret)
crypto_key = await _import_aes_key(key_bytes)
iv = bytes(js.crypto.getRandomValues(to_js(bytearray(12))))
algo = to_js({"name": "AES-GCM", "iv": to_js(iv)}, create_pyproxies=False)
data = to_js(plaintext.encode("utf-8"), create_pyproxies=False)

# Generate random IV directly into a JS Uint8Array for clean interop
iv_array = js.Uint8Array.new(12)
js.crypto.getRandomValues(iv_array)
iv = bytes(iv_array) # Extract back to python bytes for storage

# Using dict_converter ensures Web Crypto does not see "undefined" for the algo name
algo = to_js({"name": "AES-GCM", "iv": iv_array}, dict_converter=js.Object.fromEntries)
data = to_js(plaintext.encode("utf-8"))
ct_buf = await js.crypto.subtle.encrypt(algo, crypto_key, data)
ct = bytes(js.Uint8Array.new(ct_buf))
return "v1:" + base64.b64encode(iv + ct).decode("ascii")
Expand All @@ -149,20 +153,19 @@ async def decrypt_aes(ciphertext: str, secret: str) -> str:
return ""
if not ciphertext.startswith("v1:"):
return _decrypt_xor(ciphertext, secret)
import js
from pyodide.ffi import to_js
try:
raw = base64.b64decode(ciphertext[3:])
iv, ct = raw[:12], raw[12:]
except Exception as exc:
capture_exception(exc, where="decrypt_aes.decode")
return "[decryption error]"
key_bytes = _derive_aes_key_bytes(secret)
crypto_key = await _import_aes_key(key_bytes)
algo = to_js({"name": "AES-GCM", "iv": to_js(iv)}, create_pyproxies=False)
data = to_js(ct, create_pyproxies=False)
try:
pt_buf = await js.crypto.subtle.decrypt(algo, crypto_key, data)
key_bytes = _derive_aes_key_bytes(secret)
crypto_key = await _import_aes_key(key_bytes)
iv_array = to_js(iv)
algo = to_js({"name": "AES-GCM", "iv": iv_array}, dict_converter=js.Object.fromEntries)
data = to_js(ct)
pt_buf = await js.crypto.subtle.decrypt(algo, crypto_key, data)
return bytes(js.Uint8Array.new(pt_buf)).decode("utf-8")
except Exception as exc:
# Auth tag mismatch = tampered/corrupted ciphertext
Expand Down