gh-96414: Inline code examples in sqlite3 docs (GH-96442)

(cherry picked from commit f7e7bf161a)

Co-authored-by: Erlend E. Aasland <erlend.aasland@protonmail.com>
This commit is contained in:
Miss Islington (bot) 2022-08-30 23:03:33 -07:00 committed by GitHub
parent 895c7a4401
commit d4d5e605cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 380 additions and 349 deletions

View file

@ -1,18 +0,0 @@
import sqlite3
class Point:
def __init__(self, x, y):
self.x, self.y = x, y
def __conform__(self, protocol):
if protocol is sqlite3.PrepareProtocol:
return "%f;%f" % (self.x, self.y)
con = sqlite3.connect(":memory:")
cur = con.cursor()
p = Point(4.0, -3.2)
cur.execute("select ?", (p,))
print(cur.fetchone()[0])
con.close()

View file

@ -1,19 +0,0 @@
import sqlite3
class Point:
def __init__(self, x, y):
self.x, self.y = x, y
def adapt_point(point):
return "%f;%f" % (point.x, point.y)
sqlite3.register_adapter(Point, adapt_point)
con = sqlite3.connect(":memory:")
cur = con.cursor()
p = Point(4.0, -3.2)
cur.execute("select ?", (p,))
print(cur.fetchone()[0])
con.close()

View file

@ -1,19 +0,0 @@
import sqlite3
con = sqlite3.connect(":memory:")
con.execute("create table test(blob_col blob)")
con.execute("insert into test(blob_col) values (zeroblob(13))")
# Write to our blob, using two write operations:
with con.blobopen("test", "blob_col", 1) as blob:
blob.write(b"hello, ")
blob.write(b"world.")
# Modify the first and last bytes of our blob
blob[0] = ord("H")
blob[-1] = ord("!")
# Read the contents of our blob
with con.blobopen("test", "blob_col", 1) as blob:
greeting = blob.read()
print(greeting) # outputs "b'Hello, world!'"

View file

@ -1,20 +0,0 @@
import sqlite3
def collate_reverse(string1, string2):
if string1 == string2:
return 0
elif string1 < string2:
return 1
else:
return -1
con = sqlite3.connect(":memory:")
con.create_collation("reverse", collate_reverse)
cur = con.cursor()
cur.execute("create table test(x)")
cur.executemany("insert into test(x) values (?)", [("a",), ("b",)])
cur.execute("select x from test order by x collate reverse")
for row in cur:
print(row)
con.close()

View file

@ -1,40 +0,0 @@
import sqlite3
class Point:
def __init__(self, x, y):
self.x, self.y = x, y
def __repr__(self):
return f"Point({self.x}, {self.y})"
def adapt_point(point):
return f"{point.x};{point.y}".encode("utf-8")
def convert_point(s):
x, y = list(map(float, s.split(b";")))
return Point(x, y)
# Register the adapter and converter
sqlite3.register_adapter(Point, adapt_point)
sqlite3.register_converter("point", convert_point)
# 1) Parse using declared types
p = Point(4.0, -3.2)
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES)
cur = con.execute("create table test(p point)")
cur.execute("insert into test(p) values (?)", (p,))
cur.execute("select p from test")
print("with declared types:", cur.fetchone()[0])
cur.close()
con.close()
# 2) Parse using column names
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES)
cur = con.execute("create table test(p)")
cur.execute("insert into test(p) values (?)", (p,))
cur.execute('select p as "p [point]" from test')
print("with column names:", cur.fetchone()[0])
cur.close()
con.close()

View file

@ -1,20 +0,0 @@
import sqlite3
con = sqlite3.connect(":memory:")
con.execute("create table lang (id integer primary key, name varchar unique)")
# Successful, con.commit() is called automatically afterwards
with con:
con.execute("insert into lang(name) values (?)", ("Python",))
# con.rollback() is called after the with block finishes with an exception, the
# exception is still raised and must be caught
try:
with con:
con.execute("insert into lang(name) values (?)", ("Python",))
except sqlite3.IntegrityError:
print("couldn't add Python twice")
# Connection object used as context manager only commits or rollbacks transactions,
# so the connection object should be closed manually
con.close()

View file

@ -1,22 +0,0 @@
import sqlite3
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table lang (name, first_appeared)")
# This is the qmark style:
cur.execute("insert into lang values (?, ?)", ("C", 1972))
# The qmark style used with executemany():
lang_list = [
("Fortran", 1957),
("Python", 1991),
("Go", 2009),
]
cur.executemany("insert into lang values (?, ?)", lang_list)
# And this is the named style:
cur.execute("select * from lang where first_appeared=:year", {"year": 1972})
print(cur.fetchall())
con.close()

View file

@ -1,28 +0,0 @@
import sqlite3
con = sqlite3.connect(":memory:")
# enable extension loading
con.enable_load_extension(True)
# Load the fulltext search extension
con.execute("select load_extension('./fts3.so')")
# alternatively you can load the extension using an API call:
# con.load_extension("./fts3.so")
# disable extension loading again
con.enable_load_extension(False)
# example from SQLite wiki
con.execute("create virtual table recipe using fts3(name, ingredients)")
con.executescript("""
insert into recipe (name, ingredients) values ('broccoli stew', 'broccoli peppers cheese tomatoes');
insert into recipe (name, ingredients) values ('pumpkin stew', 'pumpkin onions garlic celery');
insert into recipe (name, ingredients) values ('broccoli pie', 'broccoli cheese onions flour');
insert into recipe (name, ingredients) values ('pumpkin pie', 'pumpkin sugar flour butter');
""")
for row in con.execute("select rowid, name, ingredients from recipe where name match 'pie'"):
print(row)
con.close()

View file

@ -1,13 +0,0 @@
import sqlite3
import hashlib
def md5sum(t):
return hashlib.md5(t).hexdigest()
con = sqlite3.connect(":memory:")
con.create_function("md5", 1, md5sum)
cur = con.cursor()
cur.execute("select md5(?)", (b"foo",))
print(cur.fetchone()[0])
con.close()

View file

@ -1,22 +0,0 @@
import sqlite3
class MySum:
def __init__(self):
self.count = 0
def step(self, value):
self.count += value
def finalize(self):
return self.count
con = sqlite3.connect(":memory:")
con.create_aggregate("mysum", 1, MySum)
cur = con.cursor()
cur.execute("create table test(i)")
cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)")
cur.execute("select mysum(i) from test")
print(cur.fetchone()[0])
con.close()

View file

@ -1,15 +0,0 @@
import sqlite3
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print(cur.fetchone()["a"])
con.close()

View file

@ -1,24 +0,0 @@
import sqlite3
langs = [
("C++", 1985),
("Objective-C", 1984),
]
con = sqlite3.connect(":memory:")
# Create the table
con.execute("create table lang(name, first_appeared)")
# Fill the table
con.executemany("insert into lang(name, first_appeared) values (?, ?)", langs)
# Print the table contents
for row in con.execute("select name, first_appeared from lang"):
print(row)
print("I just deleted", con.execute("delete from lang").rowcount, "rows")
# close is not a shortcut method and it's not called automatically,
# so the connection object should be closed manually
con.close()

View file

@ -1,46 +0,0 @@
# Example taken from https://www.sqlite.org/windowfunctions.html#udfwinfunc
import sqlite3
class WindowSumInt:
def __init__(self):
self.count = 0
def step(self, value):
"""Adds a row to the current window."""
self.count += value
def value(self):
"""Returns the current value of the aggregate."""
return self.count
def inverse(self, value):
"""Removes a row from the current window."""
self.count -= value
def finalize(self):
"""Returns the final value of the aggregate.
Any clean-up actions should be placed here.
"""
return self.count
con = sqlite3.connect(":memory:")
cur = con.execute("create table test(x, y)")
values = [
("a", 4),
("b", 5),
("c", 3),
("d", 8),
("e", 1),
]
cur.executemany("insert into test values(?, ?)", values)
con.create_window_function("sumint", 1, WindowSumInt)
cur.execute("""
select x, sumint(y) over (
order by x rows between 1 preceding and 1 following
) as sum_y
from test order by x
""")
print(cur.fetchall())

View file

@ -1,29 +0,0 @@
import sqlite3
con = sqlite3.connect(":memory:")
cur = con.cursor()
AUSTRIA = "Österreich"
# by default, rows are returned as str
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()
assert row[0] == AUSTRIA
# but we can make sqlite3 always return bytestrings ...
con.text_factory = bytes
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()
assert type(row[0]) is bytes
# the bytestrings will be encoded in UTF-8, unless you stored garbage in the
# database ...
assert row[0] == AUSTRIA.encode("utf-8")
# we can also implement a custom text_factory ...
# here we implement one that appends "foo" to all strings
con.text_factory = lambda x: x.decode("utf-8") + "foo"
cur.execute("select ?", ("bar",))
row = cur.fetchone()
assert row[0] == "barfoo"
con.close()