mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 16:27:06 +00:00 
			
		
		
		
	 9573d14215
			
		
	
	
		9573d14215
		
			
		
	
	
	
	
		
			
			Co-authored-by: Łukasz Langa <lukasz@langa.pl> Co-authored-by: Pieter Eendebak <pieter.eendebak@gmail.com> Co-authored-by: Dennis Sweeney <36520290+sweeneyde@users.noreply.github.com>
		
			
				
	
	
		
			533 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			533 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Original Algorithm:
 | |
| # By Steve Hanov, 2011. Released to the public domain.
 | |
| # Please see http://stevehanov.ca/blog/index.php?id=115 for the accompanying article.
 | |
| #
 | |
| # Adapted for PyPy/CPython by Carl Friedrich Bolz-Tereick
 | |
| #
 | |
| # Based on Daciuk, Jan, et al. "Incremental construction of minimal acyclic finite-state automata."
 | |
| # Computational linguistics 26.1 (2000): 3-16.
 | |
| #
 | |
| # Updated 2014 to use DAWG as a mapping; see
 | |
| # Kowaltowski, T.; CL. Lucchesi (1993), "Applications of finite automata representing large vocabularies",
 | |
| # Software-Practice and Experience 1993
 | |
| 
 | |
| from collections import defaultdict
 | |
| from functools import cached_property
 | |
| 
 | |
| 
 | |
| # This class represents a node in the directed acyclic word graph (DAWG). It
 | |
| # has a list of edges to other nodes. It has functions for testing whether it
 | |
| # is equivalent to another node. Nodes are equivalent if they have identical
 | |
| # edges, and each identical edge leads to identical states. The __hash__ and
 | |
| # __eq__ functions allow it to be used as a key in a python dictionary.
 | |
| 
 | |
| 
 | |
| class DawgNode:
 | |
| 
 | |
|     def __init__(self, dawg):
 | |
|         self.id = dawg.next_id
 | |
|         dawg.next_id += 1
 | |
|         self.final = False
 | |
|         self.edges = {}
 | |
| 
 | |
|         self.linear_edges = None # later: list of (string, next_state)
 | |
| 
 | |
|     def __str__(self):
 | |
|         if self.final:
 | |
|             arr = ["1"]
 | |
|         else:
 | |
|             arr = ["0"]
 | |
| 
 | |
|         for (label, node) in sorted(self.edges.items()):
 | |
|             arr.append(label)
 | |
|             arr.append(str(node.id))
 | |
| 
 | |
|         return "_".join(arr)
 | |
|     __repr__ = __str__
 | |
| 
 | |
|     def _as_tuple(self):
 | |
|         edges = sorted(self.edges.items())
 | |
|         edge_tuple = tuple((label, node.id) for label, node in edges)
 | |
|         return (self.final, edge_tuple)
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash(self._as_tuple())
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         return self._as_tuple() == other._as_tuple()
 | |
| 
 | |
|     @cached_property
 | |
|     def num_reachable_linear(self):
 | |
|         # returns the number of different paths to final nodes reachable from
 | |
|         # this one
 | |
| 
 | |
|         count = 0
 | |
|         # staying at self counts as a path if self is final
 | |
|         if self.final:
 | |
|             count += 1
 | |
|         for label, node in self.linear_edges:
 | |
|             count += node.num_reachable_linear
 | |
| 
 | |
|         return count
 | |
| 
 | |
| 
 | |
| class Dawg:
 | |
|     def __init__(self):
 | |
|         self.previous_word = ""
 | |
|         self.next_id = 0
 | |
|         self.root = DawgNode(self)
 | |
| 
 | |
|         # Here is a list of nodes that have not been checked for duplication.
 | |
|         self.unchecked_nodes = []
 | |
| 
 | |
|         # To deduplicate, maintain a dictionary with
 | |
|         # minimized_nodes[canonical_node] is canonical_node.
 | |
|         # Based on __hash__ and __eq__, minimized_nodes[n] is the
 | |
|         # canonical node equal to n.
 | |
|         # In other words, self.minimized_nodes[x] == x for all nodes found in
 | |
|         # the dict.
 | |
|         self.minimized_nodes = {}
 | |
| 
 | |
|         # word: value mapping
 | |
|         self.data = {}
 | |
|         # value: word mapping
 | |
|         self.inverse = {}
 | |
| 
 | |
|     def insert(self, word, value):
 | |
|         if not all(0 <= ord(c) < 128 for c in word):
 | |
|             raise ValueError("Use 7-bit ASCII characters only")
 | |
|         if word <= self.previous_word:
 | |
|             raise ValueError("Error: Words must be inserted in alphabetical order.")
 | |
|         if value in self.inverse:
 | |
|             raise ValueError(f"value {value} is duplicate, got it for word {self.inverse[value]} and now {word}")
 | |
| 
 | |
|         # find common prefix between word and previous word
 | |
|         common_prefix = 0
 | |
|         for i in range(min(len(word), len(self.previous_word))):
 | |
|             if word[i] != self.previous_word[i]:
 | |
|                 break
 | |
|             common_prefix += 1
 | |
| 
 | |
|         # Check the unchecked_nodes for redundant nodes, proceeding from last
 | |
|         # one down to the common prefix size. Then truncate the list at that
 | |
|         # point.
 | |
|         self._minimize(common_prefix)
 | |
| 
 | |
|         self.data[word] = value
 | |
|         self.inverse[value] = word
 | |
| 
 | |
|         # add the suffix, starting from the correct node mid-way through the
 | |
|         # graph
 | |
|         if len(self.unchecked_nodes) == 0:
 | |
|             node = self.root
 | |
|         else:
 | |
|             node = self.unchecked_nodes[-1][2]
 | |
| 
 | |
|         for letter in word[common_prefix:]:
 | |
|             next_node = DawgNode(self)
 | |
|             node.edges[letter] = next_node
 | |
|             self.unchecked_nodes.append((node, letter, next_node))
 | |
|             node = next_node
 | |
| 
 | |
|         node.final = True
 | |
|         self.previous_word = word
 | |
| 
 | |
|     def finish(self):
 | |
|         if not self.data:
 | |
|             raise ValueError("need at least one word in the dawg")
 | |
|         # minimize all unchecked_nodes
 | |
|         self._minimize(0)
 | |
| 
 | |
|         self._linearize_edges()
 | |
| 
 | |
|         topoorder, linear_data, inverse = self._topological_order()
 | |
|         return self.compute_packed(topoorder), linear_data, inverse
 | |
| 
 | |
|     def _minimize(self, down_to):
 | |
|         # proceed from the leaf up to a certain point
 | |
|         for i in range(len(self.unchecked_nodes) - 1, down_to - 1, -1):
 | |
|             (parent, letter, child) = self.unchecked_nodes[i]
 | |
|             if child in self.minimized_nodes:
 | |
|                 # replace the child with the previously encountered one
 | |
|                 parent.edges[letter] = self.minimized_nodes[child]
 | |
|             else:
 | |
|                 # add the state to the minimized nodes.
 | |
|                 self.minimized_nodes[child] = child
 | |
|             self.unchecked_nodes.pop()
 | |
| 
 | |
|     def _lookup(self, word):
 | |
|         """ Return an integer 0 <= k < number of strings in dawg
 | |
|         where word is the kth successful traversal of the dawg. """
 | |
|         node = self.root
 | |
|         skipped = 0  # keep track of number of final nodes that we skipped
 | |
|         index = 0
 | |
|         while index < len(word):
 | |
|             for label, child in node.linear_edges:
 | |
|                 if word[index] == label[0]:
 | |
|                     if word[index:index + len(label)] == label:
 | |
|                         if node.final:
 | |
|                             skipped += 1
 | |
|                         index += len(label)
 | |
|                         node = child
 | |
|                         break
 | |
|                     else:
 | |
|                         return None
 | |
|                 skipped += child.num_reachable_linear
 | |
|             else:
 | |
|                 return None
 | |
|         return skipped
 | |
| 
 | |
|     def enum_all_nodes(self):
 | |
|         stack = [self.root]
 | |
|         done = set()
 | |
|         while stack:
 | |
|             node = stack.pop()
 | |
|             if node.id in done:
 | |
|                 continue
 | |
|             yield node
 | |
|             done.add(node.id)
 | |
|             for label, child in sorted(node.edges.items()):
 | |
|                 stack.append(child)
 | |
| 
 | |
|     def prettyprint(self):
 | |
|         for node in sorted(self.enum_all_nodes(), key=lambda e: e.id):
 | |
|             s_final = " final" if node.final else ""
 | |
|             print(f"{node.id}: ({node}) {s_final}")
 | |
|             for label, child in sorted(node.edges.items()):
 | |
|                 print(f"    {label} goto {child.id}")
 | |
| 
 | |
|     def _inverse_lookup(self, number):
 | |
|         assert 0, "not working in the current form, but keep it as the pure python version of compact lookup"
 | |
|         result = []
 | |
|         node = self.root
 | |
|         while 1:
 | |
|             if node.final:
 | |
|                 if pos == 0:
 | |
|                     return "".join(result)
 | |
|                 pos -= 1
 | |
|             for label, child in sorted(node.edges.items()):
 | |
|                 nextpos = pos - child.num_reachable_linear
 | |
|                 if nextpos < 0:
 | |
|                     result.append(label)
 | |
|                     node = child
 | |
|                     break
 | |
|                 else:
 | |
|                     pos = nextpos
 | |
|             else:
 | |
|                 assert 0
 | |
| 
 | |
|     def _linearize_edges(self):
 | |
|         # compute "linear" edges. the idea is that long chains of edges without
 | |
|         # any of the intermediate states being final or any extra incoming or
 | |
|         # outgoing edges can be represented by having removing them, and
 | |
|         # instead using longer strings as edge labels (instead of single
 | |
|         # characters)
 | |
|         incoming = defaultdict(list)
 | |
|         nodes = sorted(self.enum_all_nodes(), key=lambda e: e.id)
 | |
|         for node in nodes:
 | |
|             for label, child in sorted(node.edges.items()):
 | |
|                 incoming[child].append(node)
 | |
|         for node in nodes:
 | |
|             node.linear_edges = []
 | |
|             for label, child in sorted(node.edges.items()):
 | |
|                 s = [label]
 | |
|                 while len(child.edges) == 1 and len(incoming[child]) == 1 and not child.final:
 | |
|                     (c, child), = child.edges.items()
 | |
|                     s.append(c)
 | |
|                 node.linear_edges.append((''.join(s), child))
 | |
| 
 | |
|     def _topological_order(self):
 | |
|         # compute reachable linear nodes, and the set of incoming edges for each node
 | |
|         order = []
 | |
|         stack = [self.root]
 | |
|         seen = set()
 | |
|         while stack:
 | |
|             # depth first traversal
 | |
|             node = stack.pop()
 | |
|             if node.id in seen:
 | |
|                 continue
 | |
|             seen.add(node.id)
 | |
|             order.append(node)
 | |
|             for label, child in node.linear_edges:
 | |
|                 stack.append(child)
 | |
| 
 | |
|         # do a (slightly bad) topological sort
 | |
|         incoming = defaultdict(set)
 | |
|         for node in order:
 | |
|             for label, child in node.linear_edges:
 | |
|                 incoming[child].add((label, node))
 | |
|         no_incoming = [order[0]]
 | |
|         topoorder = []
 | |
|         positions = {}
 | |
|         while no_incoming:
 | |
|             node = no_incoming.pop()
 | |
|             topoorder.append(node)
 | |
|             positions[node] = len(topoorder)
 | |
|             # use "reversed" to make sure that the linear_edges get reorderd
 | |
|             # from their alphabetical order as little as necessary (no_incoming
 | |
|             # is LIFO)
 | |
|             for label, child in reversed(node.linear_edges):
 | |
|                 incoming[child].discard((label, node))
 | |
|                 if not incoming[child]:
 | |
|                     no_incoming.append(child)
 | |
|                     del incoming[child]
 | |
|         # check result
 | |
|         assert set(topoorder) == set(order)
 | |
|         assert len(set(topoorder)) == len(topoorder)
 | |
| 
 | |
|         for node in order:
 | |
|             node.linear_edges.sort(key=lambda element: positions[element[1]])
 | |
| 
 | |
|         for node in order:
 | |
|             for label, child in node.linear_edges:
 | |
|                 assert positions[child] > positions[node]
 | |
|         # number the nodes. afterwards every input string in the set has a
 | |
|         # unique number in the 0 <= number < len(data). We then put the data in
 | |
|         # self.data into a linear list using these numbers as indexes.
 | |
|         topoorder[0].num_reachable_linear
 | |
|         linear_data = [None] * len(self.data)
 | |
|         inverse = {} # maps value back to index
 | |
|         for word, value in self.data.items():
 | |
|             index = self._lookup(word)
 | |
|             linear_data[index] = value
 | |
|             inverse[value] = index
 | |
| 
 | |
|         return topoorder, linear_data, inverse
 | |
| 
 | |
|     def compute_packed(self, order):
 | |
|         def compute_chunk(node, offsets):
 | |
|             """ compute the packed node/edge data for a node. result is a
 | |
|             list of bytes as long as order. the jump distance calculations use
 | |
|             the offsets dictionary to know where in the final big output
 | |
|             bytestring the individual nodes will end up. """
 | |
|             result = bytearray()
 | |
|             offset = offsets[node]
 | |
|             encode_varint_unsigned(number_add_bits(node.num_reachable_linear, node.final), result)
 | |
|             if len(node.linear_edges) == 0:
 | |
|                 assert node.final
 | |
|                 encode_varint_unsigned(0, result) # add a 0 saying "done"
 | |
|             prev_child_offset = offset + len(result)
 | |
|             for edgeindex, (label, targetnode) in enumerate(node.linear_edges):
 | |
|                 label = label.encode('ascii')
 | |
|                 child_offset = offsets[targetnode]
 | |
|                 child_offset_difference = child_offset - prev_child_offset
 | |
| 
 | |
|                 info = number_add_bits(child_offset_difference, len(label) == 1, edgeindex == len(node.linear_edges) - 1)
 | |
|                 if edgeindex == 0:
 | |
|                     assert info != 0
 | |
|                 encode_varint_unsigned(info, result)
 | |
|                 prev_child_offset = child_offset
 | |
|                 if len(label) > 1:
 | |
|                     encode_varint_unsigned(len(label), result)
 | |
|                 result.extend(label)
 | |
|             return result
 | |
| 
 | |
|         def compute_new_offsets(chunks, offsets):
 | |
|             """ Given a list of chunks, compute the new offsets (by adding the
 | |
|             chunk lengths together). Also check if we cannot shrink the output
 | |
|             further because none of the node offsets are smaller now. if that's
 | |
|             the case return None. """
 | |
|             new_offsets = {}
 | |
|             curr_offset = 0
 | |
|             should_continue = False
 | |
|             for node, result in zip(order, chunks):
 | |
|                 if curr_offset < offsets[node]:
 | |
|                     # the new offset is below the current assumption, this
 | |
|                     # means we can shrink the output more
 | |
|                     should_continue = True
 | |
|                 new_offsets[node] = curr_offset
 | |
|                 curr_offset += len(result)
 | |
|             if not should_continue:
 | |
|                 return None
 | |
|             return new_offsets
 | |
| 
 | |
|         # assign initial offsets to every node
 | |
|         offsets = {}
 | |
|         for i, node in enumerate(order):
 | |
|             # we don't know position of the edge yet, just use something big as
 | |
|             # the starting position. we'll have to do further iterations anyway,
 | |
|             # but the size is at least a lower limit then
 | |
|             offsets[node] = i * 2 ** 30
 | |
| 
 | |
| 
 | |
|         # due to the variable integer width encoding of edge targets we need to
 | |
|         # run this to fixpoint. in the process we shrink the output more and
 | |
|         # more until we can't any more. at any point we can stop and use the
 | |
|         # output, but we might need padding zero bytes when joining the chunks
 | |
|         # to have the correct jump distances
 | |
|         last_offsets = None
 | |
|         while 1:
 | |
|             chunks = [compute_chunk(node, offsets) for node in order]
 | |
|             last_offsets = offsets
 | |
|             offsets = compute_new_offsets(chunks, offsets)
 | |
|             if offsets is None: # couldn't shrink
 | |
|                 break
 | |
| 
 | |
|         # build the final packed string
 | |
|         total_result = bytearray()
 | |
|         for node, result in zip(order, chunks):
 | |
|             node_offset = last_offsets[node]
 | |
|             if node_offset > len(total_result):
 | |
|                 # need to pad to get the offsets correct
 | |
|                 padding = b"\x00" * (node_offset - len(total_result))
 | |
|                 total_result.extend(padding)
 | |
|             assert node_offset == len(total_result)
 | |
|             total_result.extend(result)
 | |
|         return bytes(total_result)
 | |
| 
 | |
| 
 | |
| # ______________________________________________________________________
 | |
| # the following functions operate on the packed representation
 | |
| 
 | |
| def number_add_bits(x, *bits):
 | |
|     for bit in bits:
 | |
|         assert bit == 0 or bit == 1
 | |
|         x = (x << 1) | bit
 | |
|     return x
 | |
| 
 | |
| def encode_varint_unsigned(i, res):
 | |
|     # https://en.wikipedia.org/wiki/LEB128 unsigned variant
 | |
|     more = True
 | |
|     startlen = len(res)
 | |
|     if i < 0:
 | |
|         raise ValueError("only positive numbers supported", i)
 | |
|     while more:
 | |
|         lowest7bits = i & 0b1111111
 | |
|         i >>= 7
 | |
|         if i == 0:
 | |
|             more = False
 | |
|         else:
 | |
|             lowest7bits |= 0b10000000
 | |
|         res.append(lowest7bits)
 | |
|     return len(res) - startlen
 | |
| 
 | |
| def number_split_bits(x, n, acc=()):
 | |
|     if n == 1:
 | |
|         return x >> 1, x & 1
 | |
|     if n == 2:
 | |
|         return x >> 2, (x >> 1) & 1, x & 1
 | |
|     assert 0, "implement me!"
 | |
| 
 | |
| def decode_varint_unsigned(b, index=0):
 | |
|     res = 0
 | |
|     shift = 0
 | |
|     while True:
 | |
|         byte = b[index]
 | |
|         res = res | ((byte & 0b1111111) << shift)
 | |
|         index += 1
 | |
|         shift += 7
 | |
|         if not (byte & 0b10000000):
 | |
|             return res, index
 | |
| 
 | |
| def decode_node(packed, node):
 | |
|     x, node = decode_varint_unsigned(packed, node)
 | |
|     node_count, final = number_split_bits(x, 1)
 | |
|     return node_count, final, node
 | |
| 
 | |
| def decode_edge(packed, edgeindex, prev_child_offset, offset):
 | |
|     x, offset = decode_varint_unsigned(packed, offset)
 | |
|     if x == 0 and edgeindex == 0:
 | |
|         raise KeyError # trying to decode past a final node
 | |
|     child_offset_difference, len1, last_edge = number_split_bits(x, 2)
 | |
|     child_offset = prev_child_offset + child_offset_difference
 | |
|     if len1:
 | |
|         size = 1
 | |
|     else:
 | |
|         size, offset = decode_varint_unsigned(packed, offset)
 | |
|     return child_offset, last_edge, size, offset
 | |
| 
 | |
| def _match_edge(packed, s, size, node_offset, stringpos):
 | |
|     if size > 1 and stringpos + size > len(s):
 | |
|         # past the end of the string, can't match
 | |
|         return False
 | |
|     for i in range(size):
 | |
|         if packed[node_offset + i] != s[stringpos + i]:
 | |
|             # if a subsequent char of an edge doesn't match, the word isn't in
 | |
|             # the dawg
 | |
|             if i > 0:
 | |
|                 raise KeyError
 | |
|             return False
 | |
|     return True
 | |
| 
 | |
| def lookup(packed, data, s):
 | |
|     return data[_lookup(packed, s)]
 | |
| 
 | |
| def _lookup(packed, s):
 | |
|     stringpos = 0
 | |
|     node_offset = 0
 | |
|     skipped = 0  # keep track of number of final nodes that we skipped
 | |
|     false = False
 | |
|     while stringpos < len(s):
 | |
|         #print(f"{node_offset=} {stringpos=}")
 | |
|         _, final, edge_offset = decode_node(packed, node_offset)
 | |
|         prev_child_offset = edge_offset
 | |
|         edgeindex = 0
 | |
|         while 1:
 | |
|             child_offset, last_edge, size, edgelabel_chars_offset = decode_edge(packed, edgeindex, prev_child_offset, edge_offset)
 | |
|             #print(f"    {edge_offset=} {child_offset=} {last_edge=} {size=} {edgelabel_chars_offset=}")
 | |
|             edgeindex += 1
 | |
|             prev_child_offset = child_offset
 | |
|             if _match_edge(packed, s, size, edgelabel_chars_offset, stringpos):
 | |
|                 # match
 | |
|                 if final:
 | |
|                     skipped += 1
 | |
|                 stringpos += size
 | |
|                 node_offset = child_offset
 | |
|                 break
 | |
|             if last_edge:
 | |
|                 raise KeyError
 | |
|             descendant_count, _, _ = decode_node(packed, child_offset)
 | |
|             skipped += descendant_count
 | |
|             edge_offset = edgelabel_chars_offset + size
 | |
|     _, final, _ = decode_node(packed, node_offset)
 | |
|     if final:
 | |
|         return skipped
 | |
|     raise KeyError
 | |
| 
 | |
| def inverse_lookup(packed, inverse, x):
 | |
|     pos = inverse[x]
 | |
|     return _inverse_lookup(packed, pos)
 | |
| 
 | |
| def _inverse_lookup(packed, pos):
 | |
|     result = bytearray()
 | |
|     node_offset = 0
 | |
|     while 1:
 | |
|         node_count, final, edge_offset = decode_node(packed, node_offset)
 | |
|         if final:
 | |
|             if pos == 0:
 | |
|                 return bytes(result)
 | |
|             pos -= 1
 | |
|         prev_child_offset = edge_offset
 | |
|         edgeindex = 0
 | |
|         while 1:
 | |
|             child_offset, last_edge, size, edgelabel_chars_offset = decode_edge(packed, edgeindex, prev_child_offset, edge_offset)
 | |
|             edgeindex += 1
 | |
|             prev_child_offset = child_offset
 | |
|             descendant_count, _, _ = decode_node(packed, child_offset)
 | |
|             nextpos = pos - descendant_count
 | |
|             if nextpos < 0:
 | |
|                 assert edgelabel_chars_offset >= 0
 | |
|                 result.extend(packed[edgelabel_chars_offset: edgelabel_chars_offset + size])
 | |
|                 node_offset = child_offset
 | |
|                 break
 | |
|             elif not last_edge:
 | |
|                 pos = nextpos
 | |
|                 edge_offset = edgelabel_chars_offset + size
 | |
|             else:
 | |
|                 raise KeyError
 | |
|         else:
 | |
|             raise KeyError
 | |
| 
 | |
| 
 | |
| def build_compression_dawg(ucdata):
 | |
|     d = Dawg()
 | |
|     ucdata.sort()
 | |
|     for name, value in ucdata:
 | |
|         d.insert(name, value)
 | |
|     packed, pos_to_code, reversedict = d.finish()
 | |
|     print("size of dawg [KiB]", round(len(packed) / 1024, 2))
 | |
|     # check that lookup and inverse_lookup work correctly on the input data
 | |
|     for name, value in ucdata:
 | |
|         assert lookup(packed, pos_to_code, name.encode('ascii')) == value
 | |
|         assert inverse_lookup(packed, reversedict, value) == name.encode('ascii')
 | |
|     return packed, pos_to_code
 |