mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	Co-authored-by: Raymond Hettinger <rhettinger@users.noreply.github.com> Co-authored-by: Serhiy Storchaka <storchaka@gmail.com> Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
		
			
				
	
	
		
			141 lines
		
	
	
	
		
			4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			141 lines
		
	
	
	
		
			4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import sqlite3
 | 
						|
import sys
 | 
						|
from pathlib import Path
 | 
						|
from contextlib import suppress, closing
 | 
						|
from collections.abc import MutableMapping
 | 
						|
 | 
						|
BUILD_TABLE = """
 | 
						|
  CREATE TABLE IF NOT EXISTS Dict (
 | 
						|
    key BLOB UNIQUE NOT NULL,
 | 
						|
    value BLOB NOT NULL
 | 
						|
  )
 | 
						|
"""
 | 
						|
GET_SIZE = "SELECT COUNT (key) FROM Dict"
 | 
						|
LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)"
 | 
						|
STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))"
 | 
						|
DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)"
 | 
						|
ITER_KEYS = "SELECT key FROM Dict"
 | 
						|
 | 
						|
 | 
						|
class error(OSError):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
_ERR_CLOSED = "DBM object has already been closed"
 | 
						|
_ERR_REINIT = "DBM object does not support reinitialization"
 | 
						|
 | 
						|
 | 
						|
def _normalize_uri(path):
 | 
						|
    path = Path(path)
 | 
						|
    uri = path.absolute().as_uri()
 | 
						|
    while "//" in uri:
 | 
						|
        uri = uri.replace("//", "/")
 | 
						|
    return uri
 | 
						|
 | 
						|
 | 
						|
class _Database(MutableMapping):
 | 
						|
 | 
						|
    def __init__(self, path, /, *, flag, mode):
 | 
						|
        if hasattr(self, "_cx"):
 | 
						|
            raise error(_ERR_REINIT)
 | 
						|
 | 
						|
        path = os.fsdecode(path)
 | 
						|
        match flag:
 | 
						|
            case "r":
 | 
						|
                flag = "ro"
 | 
						|
            case "w":
 | 
						|
                flag = "rw"
 | 
						|
            case "c":
 | 
						|
                flag = "rwc"
 | 
						|
                Path(path).touch(mode=mode, exist_ok=True)
 | 
						|
            case "n":
 | 
						|
                flag = "rwc"
 | 
						|
                Path(path).unlink(missing_ok=True)
 | 
						|
                Path(path).touch(mode=mode)
 | 
						|
            case _:
 | 
						|
                raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', "
 | 
						|
                                 f"not {flag!r}")
 | 
						|
 | 
						|
        # We use the URI format when opening the database.
 | 
						|
        uri = _normalize_uri(path)
 | 
						|
        uri = f"{uri}?mode={flag}"
 | 
						|
 | 
						|
        try:
 | 
						|
            self._cx = sqlite3.connect(uri, autocommit=True, uri=True)
 | 
						|
        except sqlite3.Error as exc:
 | 
						|
            raise error(str(exc))
 | 
						|
 | 
						|
        # This is an optimization only; it's ok if it fails.
 | 
						|
        with suppress(sqlite3.OperationalError):
 | 
						|
            self._cx.execute("PRAGMA journal_mode = wal")
 | 
						|
 | 
						|
        if flag == "rwc":
 | 
						|
            self._execute(BUILD_TABLE)
 | 
						|
 | 
						|
    def _execute(self, *args, **kwargs):
 | 
						|
        if not self._cx:
 | 
						|
            raise error(_ERR_CLOSED)
 | 
						|
        try:
 | 
						|
            return closing(self._cx.execute(*args, **kwargs))
 | 
						|
        except sqlite3.Error as exc:
 | 
						|
            raise error(str(exc))
 | 
						|
 | 
						|
    def __len__(self):
 | 
						|
        with self._execute(GET_SIZE) as cu:
 | 
						|
            row = cu.fetchone()
 | 
						|
        return row[0]
 | 
						|
 | 
						|
    def __getitem__(self, key):
 | 
						|
        with self._execute(LOOKUP_KEY, (key,)) as cu:
 | 
						|
            row = cu.fetchone()
 | 
						|
        if not row:
 | 
						|
            raise KeyError(key)
 | 
						|
        return row[0]
 | 
						|
 | 
						|
    def __setitem__(self, key, value):
 | 
						|
        self._execute(STORE_KV, (key, value))
 | 
						|
 | 
						|
    def __delitem__(self, key):
 | 
						|
        with self._execute(DELETE_KEY, (key,)) as cu:
 | 
						|
            if not cu.rowcount:
 | 
						|
                raise KeyError(key)
 | 
						|
 | 
						|
    def __iter__(self):
 | 
						|
        try:
 | 
						|
            with self._execute(ITER_KEYS) as cu:
 | 
						|
                for row in cu:
 | 
						|
                    yield row[0]
 | 
						|
        except sqlite3.Error as exc:
 | 
						|
            raise error(str(exc))
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        if self._cx:
 | 
						|
            self._cx.close()
 | 
						|
            self._cx = None
 | 
						|
 | 
						|
    def keys(self):
 | 
						|
        return list(super().keys())
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *args):
 | 
						|
        self.close()
 | 
						|
 | 
						|
 | 
						|
def open(filename, /, flag="r", mode=0o666):
 | 
						|
    """Open a dbm.sqlite3 database and return the dbm object.
 | 
						|
 | 
						|
    The 'filename' parameter is the name of the database file.
 | 
						|
 | 
						|
    The optional 'flag' parameter can be one of ...:
 | 
						|
        'r' (default): open an existing database for read only access
 | 
						|
        'w': open an existing database for read/write access
 | 
						|
        'c': create a database if it does not exist; open for read/write access
 | 
						|
        'n': always create a new, empty database; open for read/write access
 | 
						|
 | 
						|
    The optional 'mode' parameter is the Unix file access mode of the database;
 | 
						|
    only used when creating a new database. Default: 0o666.
 | 
						|
    """
 | 
						|
    return _Database(filename, flag=flag, mode=mode)
 |