Module ft_2_quantum_sat.fault_tree
Definition of FaultTree class to hold all fault tree functionality.
Expand source code
"""
Definition of FaultTree class to hold all fault tree functionality.
"""
import xml.etree.ElementTree as ElementTree
import matplotlib.pyplot as plt
import networkx as nx
from networkx.drawing.nx_pydot import graphviz_layout
from ft_2_quantum_sat.cnf import CNF
class FaultTree:
"""
Note that a Fault Tree is not really a tree, it is a DAG. This class is
mostly just a wrapper around NetworkX's DiGraph keeping track of the
probabilities of basic events and the types of the gate nodes.
"""
def __init__(self):
self.graph = nx.DiGraph()
self.top_event = None # should be one of the gate names
self.basic_events = set()
self.probs = {} # event name -> prob
self.node_types = {} # gate name -> {input, and, or, ...}
self._suported_gates = {'and', 'or'}
def set_top_event(self, name):
"""
Sets the top event to the node with the given name.
"""
self.top_event = name
def add_basic_event(self, name, prob):
"""
Adds a basic event with the given name and probability.
"""
self.graph.add_node(name)
self.basic_events.add(name)
self.node_types[name] = 'input'
self.probs[name] = prob
def add_gate(self, name, gate_type, inputs):
"""
Adds a gate node to the fault tree, with type in {'and', 'or', ...},
and given inputs
"""
self.graph.add_node(name)
self.node_types[name] = gate_type
for _input in inputs:
self.graph.add_edge(name, _input)
def get_gate_inputs(self, gate_name):
"""
Get the inputs of the given gate.
"""
return self.graph.successors(gate_name)
def number_of_nodes(self):
"""
Returns the number of nodes in the fault tree.
"""
return self.graph.number_of_nodes()
def to_cnf(self):
"""
Converts the FT to a CNF expression.
"""
f = CNF()
# 1. assign var numbers to all the events (and gates)
input_vars = {} # map : var_name -> var_number
internal_vars = {} # map : var_name -> var_number
for node in self.graph.nodes:
v = f.get_new_var(name=node)
if self.node_types[node] == 'input':
input_vars[node] = v
else:
internal_vars[node] = v
all_vars = input_vars.copy()
all_vars.update(internal_vars)
# 2. for every non-input node, add tseitin constraints to f
for gate_name, output_var in internal_vars.items():
input_names = self.get_gate_inputs(gate_name)
gate_input_vars = []
for input_name in input_names:
gate_input_vars.append(all_vars[input_name])
gate_type = self.node_types[gate_name]
f.add_tseitin_multi(gate_type, gate_input_vars, output_var)
# 3. add clause containing only the top event as (positive) literal
f.add_clause([all_vars[self.top_event]])
return f, all_vars, input_vars.values()
def compute_min_cutsets(self, m, method, formula=None):
"""
Computes the `m` smallest cut sets of this fault tree.
Args:
m: The number of cutsets to compute.
method: String in ['grover', 'classical', 'min-sat']
formula: (Optional) If set, computes minimal cutsets for the given
CNF formula, instead of for self (mostly for debugging purposes).
Returns:
The cut set as a list of sets of basic event names.
"""
if formula is None:
f, _, input_vars = self.to_cnf()
else:
f = formula.copy()
input_vars = formula.get_vars()
cutsets = []
for k in range(1, len(input_vars) + 1):
f_k = f.copy()
# we don't need a cardinality constraint if we solve with min-sat
if method != 'min-sat':
f_k.add_cardinality_constraint(at_most=k, variables=input_vars)
sat = True
while len(cutsets) < m:
sat, model = f_k.solve(method=method, minimize_vars=input_vars)
if not sat:
break
cutset = model[:len(input_vars)]
# Block this cutset from current f_k and future f_k.
# Only block the positive literals (i.e. the actual cutset),
# this makes sure that only *minimal* cut sets are computed.
f_k.block_positive_only(cutset)
f.block_positive_only(cutset)
# add cutset and return if enough
cutsets.append(cutset)
if len(cutsets) == m:
return f.assignments_to_sets(cutsets)
return f.assignments_to_sets(cutsets)
@classmethod
def load_from_xml(cls, filepath):
"""
Loads an FT from a given XML file in the Open-PSA Model Exchange Format.
"""
# 1. create new FaultTree
ft = FaultTree()
# 2. load xml
xml = ElementTree.parse(filepath)
# 3. get all basic events
basic_events = xml.iter('define-basic-event')
for e in basic_events:
ft._parse_basic_event_xml(e)
# 4. get all gates
gates = xml.iter('define-gate')
for g in gates:
ft._parse_gate_xml(g)
# 5. get top event
ft._parse_top_event_xml(xml)
return ft
def _parse_basic_event_xml(self, xml_element):
"""
Gets the relevant info from a <define-basic-event> XML element.
Currently only gets the event name, not the probability.
"""
self.add_basic_event(xml_element.attrib['name'], prob=0)
def _parse_gate_xml(self, xml_element):
"""
Gets the relevant info from <define-gate> xml element.
"""
# gate name
name = xml_element.attrib['name']
# gate type (or / and)
children = list(xml_element)
gate = None
gate_type = ''
for child in children:
if child.tag == 'label': # there might be a <label> element
continue # just skip these
else:
gate = child
gate_type = gate.tag
if gate_type not in self._suported_gates:
raise ValueError(f"Gate type '{gate_type}' currently not supported")
# gate inputs
inputs = []
for i in gate: # (the xml element is enumerable)
inputs.append(i.attrib['name'])
self.add_gate(name, gate_type, inputs)
def _parse_top_event_xml(self, xml_element):
"""
Sets the top event of the fault tree, assuming the first gate under
the <define-fault-tree> element is the top event.
"""
ft_defs = list(xml_element.iter('define-fault-tree'))
assert len(ft_defs) == 1
event_name = list(ft_defs[0])[0].attrib['name']
self.set_top_event(event_name)
def save_as_image(self, output_file):
"""
Saves the fault tree as image to the the given output file, using
graphviz and pydot.
"""
# split the nodes into and-gates, or-gates, and basic events
and_nodes = []
or_nodes = []
input_nodes = []
for node in self.graph:
if self.node_types[node] == 'input':
input_nodes.append(node)
if self.node_types[node] == 'and':
and_nodes.append(node)
elif self.node_types[node] == 'or':
or_nodes.append(node)
# draw the graph
pos = graphviz_layout(self.graph, prog='dot')
nx.draw_networkx_nodes(self.graph, pos, nodelist=and_nodes, node_shape='^')
nx.draw_networkx_nodes(self.graph, pos, nodelist=or_nodes, node_shape='v')
nx.draw_networkx_nodes(self.graph, pos, nodelist=input_nodes, node_shape='s')
nx.draw_networkx_edges(self.graph, pos)
nx.draw_networkx_labels(self.graph, pos, font_size=6)
plt.tight_layout()
plt.savefig(output_file, dpi=300)
plt.clf()
Classes
class FaultTree
-
Note that a Fault Tree is not really a tree, it is a DAG. This class is mostly just a wrapper around NetworkX's DiGraph keeping track of the probabilities of basic events and the types of the gate nodes.
Expand source code
class FaultTree: """ Note that a Fault Tree is not really a tree, it is a DAG. This class is mostly just a wrapper around NetworkX's DiGraph keeping track of the probabilities of basic events and the types of the gate nodes. """ def __init__(self): self.graph = nx.DiGraph() self.top_event = None # should be one of the gate names self.basic_events = set() self.probs = {} # event name -> prob self.node_types = {} # gate name -> {input, and, or, ...} self._suported_gates = {'and', 'or'} def set_top_event(self, name): """ Sets the top event to the node with the given name. """ self.top_event = name def add_basic_event(self, name, prob): """ Adds a basic event with the given name and probability. """ self.graph.add_node(name) self.basic_events.add(name) self.node_types[name] = 'input' self.probs[name] = prob def add_gate(self, name, gate_type, inputs): """ Adds a gate node to the fault tree, with type in {'and', 'or', ...}, and given inputs """ self.graph.add_node(name) self.node_types[name] = gate_type for _input in inputs: self.graph.add_edge(name, _input) def get_gate_inputs(self, gate_name): """ Get the inputs of the given gate. """ return self.graph.successors(gate_name) def number_of_nodes(self): """ Returns the number of nodes in the fault tree. """ return self.graph.number_of_nodes() def to_cnf(self): """ Converts the FT to a CNF expression. """ f = CNF() # 1. assign var numbers to all the events (and gates) input_vars = {} # map : var_name -> var_number internal_vars = {} # map : var_name -> var_number for node in self.graph.nodes: v = f.get_new_var(name=node) if self.node_types[node] == 'input': input_vars[node] = v else: internal_vars[node] = v all_vars = input_vars.copy() all_vars.update(internal_vars) # 2. for every non-input node, add tseitin constraints to f for gate_name, output_var in internal_vars.items(): input_names = self.get_gate_inputs(gate_name) gate_input_vars = [] for input_name in input_names: gate_input_vars.append(all_vars[input_name]) gate_type = self.node_types[gate_name] f.add_tseitin_multi(gate_type, gate_input_vars, output_var) # 3. add clause containing only the top event as (positive) literal f.add_clause([all_vars[self.top_event]]) return f, all_vars, input_vars.values() def compute_min_cutsets(self, m, method, formula=None): """ Computes the `m` smallest cut sets of this fault tree. Args: m: The number of cutsets to compute. method: String in ['grover', 'classical', 'min-sat'] formula: (Optional) If set, computes minimal cutsets for the given CNF formula, instead of for self (mostly for debugging purposes). Returns: The cut set as a list of sets of basic event names. """ if formula is None: f, _, input_vars = self.to_cnf() else: f = formula.copy() input_vars = formula.get_vars() cutsets = [] for k in range(1, len(input_vars) + 1): f_k = f.copy() # we don't need a cardinality constraint if we solve with min-sat if method != 'min-sat': f_k.add_cardinality_constraint(at_most=k, variables=input_vars) sat = True while len(cutsets) < m: sat, model = f_k.solve(method=method, minimize_vars=input_vars) if not sat: break cutset = model[:len(input_vars)] # Block this cutset from current f_k and future f_k. # Only block the positive literals (i.e. the actual cutset), # this makes sure that only *minimal* cut sets are computed. f_k.block_positive_only(cutset) f.block_positive_only(cutset) # add cutset and return if enough cutsets.append(cutset) if len(cutsets) == m: return f.assignments_to_sets(cutsets) return f.assignments_to_sets(cutsets) @classmethod def load_from_xml(cls, filepath): """ Loads an FT from a given XML file in the Open-PSA Model Exchange Format. """ # 1. create new FaultTree ft = FaultTree() # 2. load xml xml = ElementTree.parse(filepath) # 3. get all basic events basic_events = xml.iter('define-basic-event') for e in basic_events: ft._parse_basic_event_xml(e) # 4. get all gates gates = xml.iter('define-gate') for g in gates: ft._parse_gate_xml(g) # 5. get top event ft._parse_top_event_xml(xml) return ft def _parse_basic_event_xml(self, xml_element): """ Gets the relevant info from a <define-basic-event> XML element. Currently only gets the event name, not the probability. """ self.add_basic_event(xml_element.attrib['name'], prob=0) def _parse_gate_xml(self, xml_element): """ Gets the relevant info from <define-gate> xml element. """ # gate name name = xml_element.attrib['name'] # gate type (or / and) children = list(xml_element) gate = None gate_type = '' for child in children: if child.tag == 'label': # there might be a <label> element continue # just skip these else: gate = child gate_type = gate.tag if gate_type not in self._suported_gates: raise ValueError(f"Gate type '{gate_type}' currently not supported") # gate inputs inputs = [] for i in gate: # (the xml element is enumerable) inputs.append(i.attrib['name']) self.add_gate(name, gate_type, inputs) def _parse_top_event_xml(self, xml_element): """ Sets the top event of the fault tree, assuming the first gate under the <define-fault-tree> element is the top event. """ ft_defs = list(xml_element.iter('define-fault-tree')) assert len(ft_defs) == 1 event_name = list(ft_defs[0])[0].attrib['name'] self.set_top_event(event_name) def save_as_image(self, output_file): """ Saves the fault tree as image to the the given output file, using graphviz and pydot. """ # split the nodes into and-gates, or-gates, and basic events and_nodes = [] or_nodes = [] input_nodes = [] for node in self.graph: if self.node_types[node] == 'input': input_nodes.append(node) if self.node_types[node] == 'and': and_nodes.append(node) elif self.node_types[node] == 'or': or_nodes.append(node) # draw the graph pos = graphviz_layout(self.graph, prog='dot') nx.draw_networkx_nodes(self.graph, pos, nodelist=and_nodes, node_shape='^') nx.draw_networkx_nodes(self.graph, pos, nodelist=or_nodes, node_shape='v') nx.draw_networkx_nodes(self.graph, pos, nodelist=input_nodes, node_shape='s') nx.draw_networkx_edges(self.graph, pos) nx.draw_networkx_labels(self.graph, pos, font_size=6) plt.tight_layout() plt.savefig(output_file, dpi=300) plt.clf()
Static methods
def load_from_xml(filepath)
-
Loads an FT from a given XML file in the Open-PSA Model Exchange Format.
Expand source code
@classmethod def load_from_xml(cls, filepath): """ Loads an FT from a given XML file in the Open-PSA Model Exchange Format. """ # 1. create new FaultTree ft = FaultTree() # 2. load xml xml = ElementTree.parse(filepath) # 3. get all basic events basic_events = xml.iter('define-basic-event') for e in basic_events: ft._parse_basic_event_xml(e) # 4. get all gates gates = xml.iter('define-gate') for g in gates: ft._parse_gate_xml(g) # 5. get top event ft._parse_top_event_xml(xml) return ft
Methods
def add_basic_event(self, name, prob)
-
Adds a basic event with the given name and probability.
Expand source code
def add_basic_event(self, name, prob): """ Adds a basic event with the given name and probability. """ self.graph.add_node(name) self.basic_events.add(name) self.node_types[name] = 'input' self.probs[name] = prob
def add_gate(self, name, gate_type, inputs)
-
Adds a gate node to the fault tree, with type in {'and', 'or', …}, and given inputs
Expand source code
def add_gate(self, name, gate_type, inputs): """ Adds a gate node to the fault tree, with type in {'and', 'or', ...}, and given inputs """ self.graph.add_node(name) self.node_types[name] = gate_type for _input in inputs: self.graph.add_edge(name, _input)
def compute_min_cutsets(self, m, method, formula=None)
-
Computes the
m
smallest cut sets of this fault tree.Args
m
- The number of cutsets to compute.
method
- String in ['grover', 'classical', 'min-sat']
formula
- (Optional) If set, computes minimal cutsets for the given CNF formula, instead of for self (mostly for debugging purposes).
Returns
The cut set as a list of sets of basic event names.
Expand source code
def compute_min_cutsets(self, m, method, formula=None): """ Computes the `m` smallest cut sets of this fault tree. Args: m: The number of cutsets to compute. method: String in ['grover', 'classical', 'min-sat'] formula: (Optional) If set, computes minimal cutsets for the given CNF formula, instead of for self (mostly for debugging purposes). Returns: The cut set as a list of sets of basic event names. """ if formula is None: f, _, input_vars = self.to_cnf() else: f = formula.copy() input_vars = formula.get_vars() cutsets = [] for k in range(1, len(input_vars) + 1): f_k = f.copy() # we don't need a cardinality constraint if we solve with min-sat if method != 'min-sat': f_k.add_cardinality_constraint(at_most=k, variables=input_vars) sat = True while len(cutsets) < m: sat, model = f_k.solve(method=method, minimize_vars=input_vars) if not sat: break cutset = model[:len(input_vars)] # Block this cutset from current f_k and future f_k. # Only block the positive literals (i.e. the actual cutset), # this makes sure that only *minimal* cut sets are computed. f_k.block_positive_only(cutset) f.block_positive_only(cutset) # add cutset and return if enough cutsets.append(cutset) if len(cutsets) == m: return f.assignments_to_sets(cutsets) return f.assignments_to_sets(cutsets)
def get_gate_inputs(self, gate_name)
-
Get the inputs of the given gate.
Expand source code
def get_gate_inputs(self, gate_name): """ Get the inputs of the given gate. """ return self.graph.successors(gate_name)
def number_of_nodes(self)
-
Returns the number of nodes in the fault tree.
Expand source code
def number_of_nodes(self): """ Returns the number of nodes in the fault tree. """ return self.graph.number_of_nodes()
def save_as_image(self, output_file)
-
Saves the fault tree as image to the the given output file, using graphviz and pydot.
Expand source code
def save_as_image(self, output_file): """ Saves the fault tree as image to the the given output file, using graphviz and pydot. """ # split the nodes into and-gates, or-gates, and basic events and_nodes = [] or_nodes = [] input_nodes = [] for node in self.graph: if self.node_types[node] == 'input': input_nodes.append(node) if self.node_types[node] == 'and': and_nodes.append(node) elif self.node_types[node] == 'or': or_nodes.append(node) # draw the graph pos = graphviz_layout(self.graph, prog='dot') nx.draw_networkx_nodes(self.graph, pos, nodelist=and_nodes, node_shape='^') nx.draw_networkx_nodes(self.graph, pos, nodelist=or_nodes, node_shape='v') nx.draw_networkx_nodes(self.graph, pos, nodelist=input_nodes, node_shape='s') nx.draw_networkx_edges(self.graph, pos) nx.draw_networkx_labels(self.graph, pos, font_size=6) plt.tight_layout() plt.savefig(output_file, dpi=300) plt.clf()
def set_top_event(self, name)
-
Sets the top event to the node with the given name.
Expand source code
def set_top_event(self, name): """ Sets the top event to the node with the given name. """ self.top_event = name
def to_cnf(self)
-
Converts the FT to a CNF expression.
Expand source code
def to_cnf(self): """ Converts the FT to a CNF expression. """ f = CNF() # 1. assign var numbers to all the events (and gates) input_vars = {} # map : var_name -> var_number internal_vars = {} # map : var_name -> var_number for node in self.graph.nodes: v = f.get_new_var(name=node) if self.node_types[node] == 'input': input_vars[node] = v else: internal_vars[node] = v all_vars = input_vars.copy() all_vars.update(internal_vars) # 2. for every non-input node, add tseitin constraints to f for gate_name, output_var in internal_vars.items(): input_names = self.get_gate_inputs(gate_name) gate_input_vars = [] for input_name in input_names: gate_input_vars.append(all_vars[input_name]) gate_type = self.node_types[gate_name] f.add_tseitin_multi(gate_type, gate_input_vars, output_var) # 3. add clause containing only the top event as (positive) literal f.add_clause([all_vars[self.top_event]]) return f, all_vars, input_vars.values()