Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions src/datajoint/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,26 +990,47 @@ def json_path_expr(self, column: str, path: str, return_type: str | None = None)
path : str
JSON path (e.g., 'field' or 'nested.field').
return_type : str, optional
Return type specification (not used in PostgreSQL jsonb_extract_path_text).
Return type specification for casting (e.g., 'float', 'decimal(10,2)').

Returns
-------
str
PostgreSQL jsonb_extract_path_text() expression.
PostgreSQL jsonb_extract_path_text() expression, with optional cast.

Examples
--------
>>> adapter.json_path_expr('data', 'field')
'jsonb_extract_path_text("data", \\'field\\')'
>>> adapter.json_path_expr('data', 'nested.field')
'jsonb_extract_path_text("data", \\'nested\\', \\'field\\')'
>>> adapter.json_path_expr('data', 'value', 'float')
'jsonb_extract_path_text("data", \\'value\\')::float'
"""
quoted_col = self.quote_identifier(column)
# Split path by '.' for nested access
path_parts = path.split(".")
# Split path by '.' for nested access, handling array notation
path_parts = []
for part in path.split("."):
# Handle array access like field[0]
if "[" in part:
base, rest = part.split("[", 1)
path_parts.append(base)
# Extract array indices
indices = rest.rstrip("]").split("][")
path_parts.extend(indices)
else:
path_parts.append(part)
path_args = ", ".join(f"'{part}'" for part in path_parts)
# Note: PostgreSQL jsonb_extract_path_text doesn't use return type parameter
return f"jsonb_extract_path_text({quoted_col}, {path_args})"
expr = f"jsonb_extract_path_text({quoted_col}, {path_args})"
# Add cast if return type specified
if return_type:
# Map DataJoint types to PostgreSQL types
pg_type = return_type.lower()
if pg_type in ("unsigned", "signed"):
pg_type = "integer"
elif pg_type == "double":
pg_type = "double precision"
expr = f"({expr})::{pg_type}"
return expr

def translate_expression(self, expr: str) -> str:
"""
Expand Down
26 changes: 18 additions & 8 deletions src/datajoint/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@
JSON_PATTERN = re.compile(r"^(?P<attr>\w+)(\.(?P<path>[\w.*\[\]]+))?(:(?P<type>[\w(,\s)]+))?$")


def translate_attribute(key: str) -> tuple[dict | None, str]:
def translate_attribute(key: str, adapter=None) -> tuple[dict | None, str]:
"""
Translate an attribute key, handling JSON path notation.

Parameters
----------
key : str
Attribute name, optionally with JSON path (e.g., ``"attr.path.field"``).
adapter : DatabaseAdapter, optional
Database adapter for backend-specific SQL generation.
If not provided, uses MySQL syntax for backward compatibility.

Returns
-------
Expand All @@ -53,9 +56,14 @@ def translate_attribute(key: str) -> tuple[dict | None, str]:
if match["path"] is None:
return match, match["attr"]
else:
return match, "json_value(`{}`, _utf8mb4'$.{}'{})".format(
*[((f" returning {v}" if k == "type" else v) if v else "") for k, v in match.items()]
)
# Use adapter's json_path_expr if available, otherwise fall back to MySQL syntax
if adapter is not None:
return match, adapter.json_path_expr(match["attr"], match["path"], match["type"])
else:
# Legacy MySQL syntax for backward compatibility
return match, "json_value(`{}`, _utf8mb4'$.{}'{})".format(
*[((f" returning {v}" if k == "type" else v) if v else "") for k, v in match.items()]
)


class PromiscuousOperand:
Expand Down Expand Up @@ -306,14 +314,16 @@ def make_condition(

def prep_value(k, v):
"""prepare SQL condition"""
key_match, k = translate_attribute(k)
if key_match["path"] is None:
key_match, k = translate_attribute(k, adapter)
is_json_path = key_match is not None and key_match.get("path") is not None

if not is_json_path:
k = adapter.quote_identifier(k)
if query_expression.heading[key_match["attr"]].json and key_match["path"] is not None and isinstance(v, dict):
if is_json_path and isinstance(v, dict):
return f"{k}='{json.dumps(v)}'"
if v is None:
return f"{k} IS NULL"
if query_expression.heading[key_match["attr"]].uuid:
if key_match is not None and query_expression.heading[key_match["attr"]].uuid:
if not isinstance(v, uuid.UUID):
try:
v = uuid.UUID(v)
Expand Down
16 changes: 14 additions & 2 deletions src/datajoint/declare.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,19 @@ def declare(
attribute_sql.extend(job_metadata_sql)

if not primary_key:
raise DataJointError("Table must have a primary key")
# Singleton table: add hidden sentinel attribute
primary_key = ["_singleton"]
singleton_comment = ":bool:singleton primary key"
sql_type = adapter.core_type_to_sql("bool")
singleton_sql = adapter.format_column_definition(
name="_singleton",
sql_type=sql_type,
nullable=False,
default="NOT NULL DEFAULT TRUE",
comment=singleton_comment,
)
attribute_sql.insert(0, singleton_sql)
column_comments["_singleton"] = singleton_comment

pre_ddl = [] # DDL to run BEFORE CREATE TABLE (e.g., CREATE TYPE for enums)
post_ddl = [] # DDL to run AFTER CREATE TABLE (e.g., COMMENT ON)
Expand Down Expand Up @@ -742,7 +754,7 @@ def compile_index(line: str, index_sql: list[str], adapter) -> None:
"""

def format_attribute(attr):
match, attr = translate_attribute(attr)
match, attr = translate_attribute(attr, adapter)
if match is None:
return attr
if match["path"] is None:
Expand Down
3 changes: 2 additions & 1 deletion src/datajoint/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ def proj(self, *attributes, **named_attributes):
from other attributes available before the projection.
Each attribute name can only be used once.
"""
named_attributes = {k: translate_attribute(v)[1] for k, v in named_attributes.items()}
adapter = self.connection.adapter if hasattr(self, "connection") and self.connection else None
named_attributes = {k: translate_attribute(v, adapter)[1] for k, v in named_attributes.items()}
# new attributes in parentheses are included again with the new name without removing original
duplication_pattern = re.compile(rf"^\s*\(\s*(?!{'|'.join(CONSTANT_LITERALS)})(?P<name>[a-zA-Z_]\w*)\s*\)\s*$")
# attributes without parentheses renamed
Expand Down
2 changes: 1 addition & 1 deletion src/datajoint/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# version bump auto managed by Github Actions:
# label_prs.yaml(prep), release.yaml(bump), post_release.yaml(edit)
# manually set this version will be eventually overwritten by the above actions
__version__ = "2.1.0a2"
__version__ = "2.1.0a5"
93 changes: 93 additions & 0 deletions tests/integration/test_declare.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,96 @@ class Table_With_Underscores(dj.Manual):
schema_any(TableNoUnderscores)
with pytest.raises(dj.DataJointError, match="must be alphanumeric in CamelCase"):
schema_any(Table_With_Underscores)


class TestSingletonTables:
"""Tests for singleton tables (empty primary keys)."""

def test_singleton_declaration(self, schema_any):
"""Singleton table creates correctly with hidden _singleton attribute."""

@schema_any
class Config(dj.Lookup):
definition = """
# Global configuration
---
setting : varchar(100)
"""

# Access attributes first to trigger lazy loading from database
visible_attrs = Config.heading.attributes
all_attrs = Config.heading._attributes

# Table should exist and have _singleton as hidden PK
assert "_singleton" in all_attrs
assert "_singleton" not in visible_attrs
assert Config.heading.primary_key == [] # Visible PK is empty for singleton

def test_singleton_insert_and_fetch(self, schema_any):
"""Insert and fetch work without specifying _singleton."""

@schema_any
class Settings(dj.Lookup):
definition = """
---
value : int32
"""

# Insert without specifying _singleton
Settings.insert1({"value": 42})

# Fetch should work
result = Settings.fetch1()
assert result["value"] == 42
assert "_singleton" not in result # Hidden attribute excluded

def test_singleton_uniqueness(self, schema_any):
"""Second insert raises DuplicateError."""

@schema_any
class SingleValue(dj.Lookup):
definition = """
---
data : varchar(50)
"""

SingleValue.insert1({"data": "first"})

# Second insert should fail
with pytest.raises(dj.errors.DuplicateError):
SingleValue.insert1({"data": "second"})

def test_singleton_with_multiple_attributes(self, schema_any):
"""Singleton table with multiple secondary attributes."""

@schema_any
class PipelineConfig(dj.Lookup):
definition = """
# Pipeline configuration singleton
---
version : varchar(20)
max_workers : int32
debug_mode : bool
"""

PipelineConfig.insert1({"version": "1.0.0", "max_workers": 4, "debug_mode": False})

result = PipelineConfig.fetch1()
assert result["version"] == "1.0.0"
assert result["max_workers"] == 4
assert result["debug_mode"] == 0 # bool stored as tinyint

def test_singleton_describe(self, schema_any):
"""Describe should show the singleton nature."""

@schema_any
class Metadata(dj.Lookup):
definition = """
---
info : varchar(255)
"""

description = Metadata.describe()
# Description should show just the secondary attribute
assert "info" in description
# _singleton is hidden, implementation detail
Loading