Source code for chc.util.IndexedTable

# ------------------------------------------------------------------------------
# CodeHawk C Analyzer
# Author: Henny Sipma
# ------------------------------------------------------------------------------
# The MIT License (MIT)
#
# Copyright (c) 2017-2020 Kestrel Technology LLC
# Copyright (c) 2020-2022 Henny B. Sipma
# Copyright (c) 2023-2024 Aarno Labs LLC
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ------------------------------------------------------------------------------

import xml.etree.ElementTree as ET

import chc.util.fileutil as UF

from typing import Callable, Dict, List, Generic, Optional, Tuple, TypeVar


[docs]class IndexedTableError(UF.CHCError): def __init__( self, msg: str, itemlist: List[Tuple[int, "IndexedTableValue"]] = []) -> None: UF.CHCError.__init__(self, msg) self._itemlist = itemlist @property def itemlist(self) -> List[Tuple[int, "IndexedTableValue"]]: return self._itemlist def __str__(self) -> str: lines: List[str] = [] if len(self.itemlist) > 0 and len(self.itemlist) < 20: lines.append("-") for (index, i) in self.itemlist: lines.append(str(index).rjust(3) + ": " + str(i)) lines.append("-") lines.append(self.msg) return "\n".join(lines)
[docs]class IndexedTableValueMismatchError(UF.CHCError): def __init__( self, tag: str, reqtagcount: int, reqargcount: int, acttagcount: int, actargcount: int, name: str) -> None: UF.CHCError.__init__( self, "Dictionary record mismatch for " + tag + " in " + name + ": Expected " + str(reqtagcount) + " tags and " + str(reqargcount) + " args, but found " + str(acttagcount) + " tags and " + str(actargcount))
[docs]def get_attribute_int_list(node: ET.Element, attr: str) -> List[int]: """Return list of integers in attr if attr is present or [] otherwise.""" xattr = node.get(attr) if xattr is None or xattr == "": return [] else: return [int(x) for x in xattr.split(",")]
[docs]def get_rep(node: ET.Element) -> Tuple[int, List[str], List[int]]: tags = node.get("t") args = node.get("a") try: if tags is None: taglist = [] else: taglist = tags.split(",") if args is None or args == "": arglist = [] else: arglist = [int(x) for x in args.split(",")] index_str = node.get("ix") if index_str is None: raise Exception("node " + str(node) + " did not have an ix element") index = int(index_str) return (index, taglist, arglist) except Exception as e: print("tags: " + str(tags)) print("args: " + str(args)) print(e) raise
[docs]def get_key(tags: List[str], args: List[int]) -> Tuple[str, str]: return (",".join(tags), ",".join([str(x) for x in args]))
[docs]class IndexedTableValue: def __init__( self, index: int, tags: List[str], args: List[int]) -> None: self._index = index self._tags = tags self._args = args @property def index(self) -> int: return self._index @property def tags(self) -> List[str]: return self._tags @property def args(self) -> List[int]: return self._args @property def key(self) -> Tuple[str, str]: return (",".join(self.tags), ",".join([str(x) for x in self.args]))
[docs] def check_key(self, reqtagcount: int, reqargcount: int, name: str) -> None: """Check if the constructed value has the expected tags and args.""" acttagcount = len(self.tags) actargcount = len(self.args) if acttagcount == reqtagcount and actargcount == reqargcount: return raise IndexedTableValueMismatchError( self.tags[0], reqtagcount, reqargcount, acttagcount, actargcount, name)
[docs] def write_xml(self, node: ET.Element) -> None: (tagstr, argstr) = self.key if len(tagstr) > 0: node.set("t", tagstr) if len(argstr) > 0: node.set("a", argstr) node.set("ix", str(self.index))
def __str__(self) -> str: lines: List[str] = [] lines.append("\nIndex table value\n--------------------------") lines.append("index: " + str(self.index)) lines.append("tags : [" + ", ".join(self.tags) + "]") lines.append("args : [" + ", ".join(str(x) for x in self.args) + "]") lines.append("") return "\n".join(lines)
[docs]def get_value(node: ET.Element) -> IndexedTableValue: rep = get_rep(node) return IndexedTableValue(*rep)
[docs]class IndexedTableSuperclass: def __init__(self, name: str) -> None: self.name = name
[docs] def size(self) -> int: raise NotImplementedError("size not overridden in IndexedTableSuperclass")
[docs] def reset(self) -> None: raise NotImplementedError("reset not overridden in IndexedTableSuperclass")
V = TypeVar("V", bound=IndexedTableValue)
[docs]class IndexedTable(IndexedTableSuperclass): """Table to provide unique indices to objects represented by a key string. The table can be checkpointed and reset to that checkpoint with - set_checkpoint - reset_to_checkpoint Note: the string encodings use the comma as a concatenation character, hence the comma character cannot be used in any string representation. """ def __init__(self, name: str) -> None: IndexedTableSuperclass.__init__(self, name) self.keytable: Dict[Tuple[str, str], int] = {} # key -> index self.indextable: Dict[int, IndexedTableValue] = {} # index -> object self.next = 1 self.reserved: List[int] = [] self.checkpoint: Optional[int] = None
[docs] def reset(self) -> None: self.keytable = {} self.indextable = {} self.next = 1 self.reserved = [] self.checkpoint = None
[docs] def set_checkpoint(self) -> int: if self.checkpoint is None: self.checkpoint = self.next return self.next raise IndexedTableError( "Checkpoint has already been set at " + str(self.checkpoint) )
[docs] def iter(self, f: Callable[[int, IndexedTableValue], None]) -> None: for (i, v) in self.items(): f(i, v)
[docs] def reset_to_checkpoint(self) -> int: """Remove all entries added since the checkpoint was set.""" cp = self.checkpoint if cp is None: raise ValueError("Cannot reset non-existent checkpoint") for i in range(cp, self.next): if i in self.reserved: continue self.indextable.pop(i) for k in self.keytable.keys(): if self.keytable[k] >= cp: self.keytable.pop(k) self.checkpoint = None self.reserved = [] self.next = cp return cp
[docs] def remove_checkpoint(self) -> None: self.checkpoint = None
[docs] def add( self, key: Tuple[str, str], f: Callable[[int, Tuple[str, str]], IndexedTableValue]) -> int: if key in self.keytable: return self.keytable[key] else: index = self.next obj = f(index, key) self.keytable[key] = index self.indextable[index] = obj self.next += 1 return index
[docs] def add_tags_args( self, tags: List[str], args: List[int], f: Callable[[int, List[str], List[int]], IndexedTableValue]) -> int: key = get_key(tags, args) if key in self.keytable: return self.keytable[key] else: index = self.next obj = f(index, tags, args) self.keytable[key] = index self.indextable[index] = obj self.next += 1 return index
[docs] def reserve(self) -> int: index = self.next self.reserved.append(index) self.next += 1 return index
[docs] def values(self) -> List[IndexedTableValue]: result: List[IndexedTableValue] = [] for i in sorted(self.indextable): result.append(self.indextable[i]) return result
[docs] def items(self) -> List[Tuple[int, IndexedTableValue]]: result: List[Tuple[int, IndexedTableValue]] = [] for i in sorted(self.indextable): result.append((i, self.indextable[i])) return result
[docs] def commit_reserved( self, index: int, key: Tuple[str, str], obj: IndexedTableValue) -> None: if index in self.reserved: self.keytable[key] = index self.indextable[index] = obj self.reserved.remove(index) else: raise IndexedTableError("Trying to commit nonexisting index: " + str(index))
[docs] def size(self) -> int: return self.next - 1
[docs] def retrieve(self, index: int) -> IndexedTableValue: if index in self.indextable: return self.indextable[index] else: msg = ( "Unable to retrieve item " + str(index) + " from table " + self.name + " (size: " + str(self.size()) + ")" ) raise IndexedTableError( msg + "\n" + self.name + ", size: " + str(self.size()) )
[docs] def retrieve_by_key( self, f: Callable[[Tuple[str, str]], bool] ) -> List[Tuple[Tuple[str, str], IndexedTableValue]]: result: List[Tuple[Tuple[str, str], IndexedTableValue]] = [] for key in self.keytable: if f(key): result.append((key, self.indextable[self.keytable[key]])) return result
[docs] def write_xml( self, node: ET.Element, f: Callable[[ET.Element, IndexedTableValue], None], tag: str = "n") -> None: for key in sorted(self.indextable): snode = ET.Element(tag) f(snode, self.indextable[key]) node.append(snode)
[docs] def read_xml( self, node: Optional[ET.Element], tag: str, get_value: Callable[ [ET.Element], IndexedTableValue] = lambda x: get_value(x), get_key: Callable[ [IndexedTableValue], Tuple[str, str]] = lambda x: x.key, get_index: Callable[ [IndexedTableValue], int] = lambda x: x.index, ) -> None: if node is None: print("Xml node not present in " + self.name) raise IndexedTableError(self.name) for snode in node.findall(tag): obj = get_value(snode) key = get_key(obj) index = get_index(obj) self.keytable[key] = index self.indextable[index] = obj if index >= self.next: self.next = index + 1
[docs] def objectmap( self, p: Callable[[int], IndexedTableValue]) -> Dict[int, IndexedTableValue]: result: Dict[int, IndexedTableValue] = {} def f(ix: int, v: IndexedTableValue) -> None: result[ix] = p(ix) self.iter(f) return result
def __str__(self) -> str: lines: List[str] = [] lines.append("\n" + self.name) for ix in sorted(self.indextable): lines.append(str(ix).rjust(4) + " " + str(self.indextable[ix])) if len(self.reserved) > 0: lines.append("Reserved: " + str(self.reserved)) if self.checkpoint is not None: lines.append("Checkpoint: " + str(self.checkpoint)) return "\n".join(lines)