1+ import bz2
2+ import gzip
3+ import lzma
4+ import os
5+ import sys
16from io import BytesIO
27
38import pytest
49from anyio import EndOfStream , create_memory_object_stream
510from anyio .streams .stapled import StapledObjectStream
611
7- from .encoding import compress_stream
12+ if sys .version_info >= (3 , 14 ):
13+ from compression import zstd
14+ else :
15+ from backports import zstd
16+
17+ from .encoding import (
18+ COMPRESSION_SIGNATURES ,
19+ AutoDecompressIterator ,
20+ Compression ,
21+ compress_stream ,
22+ detect_compression_from_signature ,
23+ )
824
925pytestmark = pytest .mark .anyio
1026
@@ -28,3 +44,133 @@ async def test_compress_stream(compression):
2844 except EndOfStream :
2945 break
3046 assert result .getvalue () == b"hello"
47+
48+
49+ def _get_signature (compression : Compression ) -> bytes :
50+ """Helper to get signature bytes for a compression type."""
51+ for sig in COMPRESSION_SIGNATURES :
52+ if sig .compression == compression :
53+ return sig .signature
54+ raise ValueError (f"No signature found for { compression } " )
55+
56+
57+ class TestDetectCompressionFromSignature :
58+ """Tests for file signature detection."""
59+
60+ @pytest .mark .parametrize (
61+ "compression" ,
62+ [Compression .GZIP , Compression .XZ , Compression .BZ2 , Compression .ZSTD ],
63+ )
64+ def test_detect_from_signature (self , compression ):
65+ """Each compression format should be detected from its signature."""
66+ signature = _get_signature (compression )
67+ # Pad with random bytes to simulate real file content
68+ data = signature + os .urandom (4 )
69+ assert detect_compression_from_signature (data ) == compression
70+
71+ def test_detect_uncompressed (self ):
72+ # Random data that doesn't match any compression format
73+ assert detect_compression_from_signature (b"hello world" ) is None
74+
75+ def test_detect_empty (self ):
76+ assert detect_compression_from_signature (b"" ) is None
77+
78+ def test_detect_too_short (self ):
79+ # Truncated signatures should not match
80+ assert detect_compression_from_signature (b"\x1f " ) is None # gzip partial
81+ assert detect_compression_from_signature (b"\xfd \x37 \x7a " ) is None # xz partial
82+
83+ def test_detect_from_real_gzip_data (self ):
84+ compressed = gzip .compress (b"test data" )
85+ assert detect_compression_from_signature (compressed ) == Compression .GZIP
86+
87+ def test_detect_from_real_xz_data (self ):
88+ compressed = lzma .compress (b"test data" , format = lzma .FORMAT_XZ )
89+ assert detect_compression_from_signature (compressed ) == Compression .XZ
90+
91+ def test_detect_from_real_bz2_data (self ):
92+ compressed = bz2 .compress (b"test data" )
93+ assert detect_compression_from_signature (compressed ) == Compression .BZ2
94+
95+ def test_detect_from_real_zstd_data (self ):
96+ compressed = zstd .compress (b"test data" )
97+ assert detect_compression_from_signature (compressed ) == Compression .ZSTD
98+
99+
100+ class TestAutoDecompressIterator :
101+ """Tests for auto-decompressing async iterator."""
102+
103+ async def _async_iter_from_bytes (self , data : bytes , chunk_size : int ):
104+ """Helper to create an async iterator from bytes."""
105+ for i in range (0 , len (data ), chunk_size ):
106+ yield data [i : i + chunk_size ]
107+
108+ async def _decompress_and_check (self , compressed : bytes , expected : bytes , chunk_size : int = 16 ):
109+ """Helper to decompress data and verify it matches expected output."""
110+ chunks = []
111+ async for chunk in AutoDecompressIterator (source = self ._async_iter_from_bytes (compressed , chunk_size )):
112+ chunks .append (chunk )
113+ assert b"" .join (chunks ) == expected
114+
115+ async def test_passthrough_uncompressed (self ):
116+ """Uncompressed data should pass through unchanged."""
117+ original = b"hello world, this is uncompressed data"
118+ await self ._decompress_and_check (original , original )
119+
120+ async def test_decompress_gzip (self ):
121+ """Gzip compressed data should be decompressed."""
122+ original = b"hello world, this is gzip compressed data"
123+ compressed = gzip .compress (original )
124+ await self ._decompress_and_check (compressed , original )
125+
126+ async def test_decompress_xz (self ):
127+ """XZ compressed data should be decompressed."""
128+ original = b"hello world, this is xz compressed data"
129+ compressed = lzma .compress (original , format = lzma .FORMAT_XZ )
130+ await self ._decompress_and_check (compressed , original )
131+
132+ async def test_decompress_bz2 (self ):
133+ """BZ2 compressed data should be decompressed."""
134+ original = b"hello world, this is bz2 compressed data"
135+ compressed = bz2 .compress (original )
136+ await self ._decompress_and_check (compressed , original )
137+
138+ async def test_decompress_zstd (self ):
139+ """Zstd compressed data should be decompressed."""
140+ original = b"hello world, this is zstd compressed data"
141+ compressed = zstd .compress (original )
142+ await self ._decompress_and_check (compressed , original )
143+
144+ async def test_small_chunks (self ):
145+ """Should work with very small chunks."""
146+ original = b"hello world"
147+ compressed = gzip .compress (original )
148+ await self ._decompress_and_check (compressed , original , chunk_size = 1 )
149+
150+ async def test_empty_input (self ):
151+ """Empty input should produce no output."""
152+
153+ async def empty_iter ():
154+ if False :
155+ yield
156+
157+ chunks = []
158+ async for chunk in AutoDecompressIterator (source = empty_iter ()):
159+ chunks .append (chunk )
160+ assert chunks == []
161+
162+ async def test_large_data (self ):
163+ """Should handle large data correctly."""
164+ original = b"x" * 1024 * 1024 # 1MB of data
165+ compressed = gzip .compress (original )
166+ await self ._decompress_and_check (compressed , original , chunk_size = 65536 )
167+
168+ async def test_corrupted_gzip (self ):
169+ """Corrupted gzip data should raise RuntimeError with clear message."""
170+ # Create fake gzip data: valid signature but corrupted payload
171+ corrupted = b"\x1f \x8b \x08 " + b"corrupted data here"
172+
173+ with pytest .raises (RuntimeError , match = r"Failed to decompress gzip:.*" ):
174+ chunks = []
175+ async for chunk in AutoDecompressIterator (source = self ._async_iter_from_bytes (corrupted , 16 )):
176+ chunks .append (chunk )
0 commit comments