Source code for carto_mesh

"""
This file contains the class CartoMesh with all its reconstruction-specific methods.
This class contains methods for initialising from a .mesh file, plotting, reconstructing and applying conduction velocities.
This file imports :mod:`mesh_tools` for all general mesh functions that are not specific to carto meshes.
:mod:`mesh_tools` also concerns itself with all python module imports.
"""
from mesh_tools import *


[docs]class CartoMesh: """ A class containing functions for initialising from file, plotting, reconstructing and applying conduction velocities. """ def __init__(self, name: str = ""): self.name = "" self.dir = "./" self.points = np.array([]) self.n_points = 0 self.point_info = pd.DataFrame() self.triangles = np.array([], dtype=int) self.n_triangles = 0 self.triangle_info = pd.DataFrame() self.edges = np.array([]) self.cells = [] self.n_cells = 0 self.layers = None self.mesh = None self.myo = self.non_myo = None self.thickness = 0 self.ratio = None self.settings = {} self.__initialiseFromFile(name) self.__readSettings() def __initialiseFromFile(self, name: str = "") -> None: """ Initialise the class from either a .mesh file or a .vtk file. Args: name: The filename (.mesh or .vtk), or the directory containing the .mesh file. Returns: None: Nothing """ def parseName(fn) -> Tuple[str, str]: """ Given a filename to initialise, this method parses the filename and extracts the filetype and parent directory. Args: fn: Filename, containing extension or not, or name of the parent directory Returns: Tuple: tuple containing the name of the file and the name of its parent directory """ if '*' in fn: # starred expression: file name not given fn_ = glob.glob(fn)[0] # search for the file else: fn_ = fn comp = fn_.split(os.sep) if fn_ else ['.'] if '.mesh' not in comp[-1] and '.vtk' not in comp[-1]: # only directory given: look for file fns = glob.glob(os.path.join(*comp, '*.mesh')) # take first .mesh file if len(fns): comp = fns[0].split(os.sep) # overwrite filename and dir components else: raise FileNotFoundError("No .mesh or .vtk file found in directory \'{}\'" "".format(os.path.join(*comp))) d = os.path.join(*comp[:-1]) + os.sep n = '.'.join(comp[-1].split('.')[:-1]) # drop file extension return n, d def initialiseFromMeshFile(self_, name_: str = "") -> None: """ Initialize a carto mesh from .mesh file. Reads in a .mesh file and writes out a .csv file per .mesh header. Reads in the VerticesSection and TrianglesSection and uses these to construct a mesh. Args: self_: self, the :class:`CartoMesh` object, passed through from the overarching function. name_: name of .mesh file to be initialised, including '.mesh' extension Returns: None: Nothing. Updates mesh. """ self_.name, self_.dir = parseName(name_) self_.__cartoToCsv(verbose=False) vert, tri = self_.__readVertTri() self_.point_info = vert self_.triangle_info = tri points = np.array([[1000. * e for e in (x, y, z)] for x, y, z in self_.point_info[['X', 'Y', 'Z']].values]) triangles = self_.triangle_info[['Vertex0', 'Vertex1', 'Vertex2']].values self_.layers = 1 self_.__update(pv.PolyData(points, pmToPvFaces(triangles))) color = [e for e in self_.triangle_info['GroupID'].to_numpy()] self_.mesh['color'] = color self_.mesh = self_.mesh.ctp() # triangle data to point data self_.myo, self_.non_myo = self_.extractMyoAndNonMyo() def initialiseFromVtkFile(self_, name_: str = "") -> None: """Initialize a mesh from .vtk file. Args: self_: self_: self, the :class:`CartoMesh` object, passed through from the overarching function. name_: name of .mesh file to be initialised, including '.vtk' extension Returns: None: Nothing. Updates mesh. """ self_.name, self_.dir = parseName(name_) self_.layers = 1 self_.__update(pv.read(self_.dir + self_.name + '.vtk')) if '.vtk' in name: initialiseFromVtkFile(self, name) else: initialiseFromMeshFile(self, name) def __cartoToCsv(self, verbose: bool = True) -> None: """ Reads in a carto .mesh file and generates .csv files per header in real-time in the same directory Args: verbose: Use verbose output. Default=True Returns: None: Nothing. Writes out .csv files. """ with io.open(os.path.join(self.dir, self.name) + '.mesh', 'r', encoding='utf-8') as f: # variables used throughout loop: section = "Header" # name of current section current_line = 0 # index of current line being read section_line = 0 # index of current section name section_ind = 0 # Section 0 = Header, file format info # Section 1 = General Attributes, info about section data # Section 2 = VerticesSection, x y z coordinates plus data (normals and GroupID) # Section 3 = TrianglesSection, # section 4 = VerticesColorSection # Section 5 = VerticesAttributesSection for line in f: current_line += 1 # update line index if line[0] == "[": # new section encountered if verbose: print("\t" + str(section_ind) + '. ' + section) section = line[1:-2] # section name section_line = current_line section_ind += 1 # Opening or closing csv files if section_ind > 2: of.close() # past VerticesSection, an outfile has already been created (infra) if section_ind > 1: # skip GeneralAttributes of = open(os.path.join(self.dir, section + ".csv"), "w+", encoding='utf-8') elif section_ind > 1: # useful data if section_ind == 4: header_line = section_line + 2 # VerticesColorSection has extra line else: header_line = section_line + 1 if current_line == header_line: # column names column_names = line.split()[1:] # first element is useless ";" of.write("Index") for name in column_names: of.write("," + name) of.write("\n") elif len(line) > 1 and line[0] != ";": # actual data, not empty line or column names ind, data = line.split("=") # line has shape "ind = x y z nx ny nz groupID of.write(str(ind)) for el in data.split(): # ignore ind of.write("," + str(el)) of.write("\n") of.close() def __readVertTri(self) -> None: """ Reads VerticesSection.csv and TrianglesSection.csv files and initializes class Returns: None: Nothing. Updates mesh. """ assert os.path.exists("{}/VerticesSection.csv".format(self.dir)) \ and os.path.exists('{}/TrianglesSection.csv'.format(self.dir)), \ "VerticesSection.csv or TrianglesSection.csv not found! Run cartoToCsv() first." vert = pd.read_csv("{}/VerticesSection.csv".format(self.dir), sep=',') tri = pd.read_csv('{}/TrianglesSection.csv'.format(self.dir), sep=',') return vert, tri def __getNeighboringFaces(self, index: int) -> pd.DataFrame: """ Finds all mesh facets containing some point. Args: index: the index referring to the mesh point. Returns: DataFrame: Pandas DataFrame containing the triangles that have the point at index. The DataFrame contains the following columns: 'Vertex0', 'Vertex1', 'Vertex2', 'NormalX', 'NormalY', 'NormalZ', 'GroupID' """ facets_ = self.triangle_info.loc[ (self.triangle_info[['Vertex0']].values == index) | (self.triangle_info[['Vertex1']].values == index) | (self.triangle_info[['Vertex2']].values == index)] return facets_ def __update(self, mesh_: Union[pv.PolyData, pv.UnstructuredGrid]) -> None: """ Updates the CartoMesh to match the given mesh. Args: mesh_: A PyVista mesh of the type PolyData or UnstructuredGrid . Returns: None: Nothing. Updates the mesh. """ self.mesh = mesh_ self.points = mesh_.points self.n_points = len(self.points) self.edges = mesh_.extract_all_edges() if type(mesh_) == pv.PolyData: self.triangles = pvToPmCells(mesh_.faces) self.n_triangles = len(self.triangles) self.cells = None self.n_cells = 0 elif type(mesh_) == pv.UnstructuredGrid: self.cells = pvToPmCells(mesh_.cells) self.n_cells = len(self.cells) self.triangles = None self.n_triangles = 0 def __readSettings(self) -> None: """ Reads the settings.ini file and updates the CartoMesh accordingly. Returns: None: Nothing. Updates the mesh settings attribute. """ assert os.path.exists("settings.ini"), "No settings.ini file found" config = configparser.ConfigParser(inline_comment_prefixes='#') config.read('settings.ini') for section in config.sections(): self.settings[section.lower()] = {key: eval(val) for key, val in config[section].items()}
[docs] def writeEndoEpi(self, thickness: float, ratio: float) -> None: """ Reads in .csv files from carto2csv.py, calculates normals of mesh facets and adds two layers of points along these normals: the epi- and endocardium. Writes out these two layers as .txt files. Args: thickness: the distance between the inner (endo) and outer (epi) layer ratio: the ratio of distances from both layers to the original middle layer. A ratio of .8 will add an outer layer .8*thickness from the middle and an inner layer .2*thickness to the inside. Ratios > .5 are recommended to avoid self intersections. Returns: Nothing. Writes out two files: endo.txt and epi.txt """ self.thickness = thickness self.ratio = ratio outside = ratio * thickness inside = (1 - ratio) * thickness endo = open(self.dir + 'endo.txt', 'w+') epi = open(self.dir + 'epi.txt', 'w+') for f in (endo, epi): f.write(self.dir + "Index,X,Y,Z\n") tqbar = tqdm(range(self.points.shape[0]), desc=' Adding points') for index, point in self.point_info.iterrows(): tqbar.update(1) co = point[['X', 'Y', 'Z']] facets_ = self.__getNeighboringFaces(index) av_normal = calcAvNormal(facets_) newpoint = [e + outside * n for e, n in zip(co, av_normal)] newpoint2 = [e - inside * n for e, n in zip(co, av_normal)] for f, p in zip((endo, epi), (newpoint2, newpoint)): # loop over files and corresponding points f.write(str(index)) for comp in p: # components of point f.write(',' + str(comp * 1000.)) # Convert from mm to µm and write out f.write('\n') endo.close() epi.close() tqbar.close()
[docs] def splitLayer(self, thickness: float = .5, ratio: float = .8) -> None: """ Calls upon :func:`writeEndoEpi` to write 'endo.txt' and 'epi.txt': two files containing the point coordinates of the bounding mesh layers. These are read in again and used to construct two surface meshes. These two surface meshes are added together as one mesh. The class properties are updated to these points and faces. Args: thickness: the distance between the inner (endo) and outer (epi) layer ratio: the ratio of distances from both layers to the original middle layer. A ratio of .8 will add an outer layer .8*thickness from the middle and an inner layer .2*thickness to the inside. Ratios > .5 are recommended to avoid self intersections. Returns: Nothing """ if not os.path.exists(self.dir + 'endo.txt') or not os.path.exists(self.dir + 'epi.txt'): self.writeEndoEpi(thickness, ratio) assert self.layers < 2, "Amount of layers is already >= 2. Splitting aborted." endo = pd.read_csv(self.dir + 'endo.txt', sep=',', index_col=0) epi = pd.read_csv(self.dir + 'epi.txt', sep=',', index_col=0) f = pmToPvFaces(self.triangles) endo, epi = pv.PolyData(endo.values, f), pv.PolyData(epi.values, f) endo['color'] = epi['color'] = self.triangle_info['GroupID'] fullmesh = endo + epi fullmesh = fullmesh.ctp() self.points = fullmesh.points self.triangles = np.concatenate((self.triangles, np.array([e + len(self.points) for e in self.triangles]))) self.layers = 2 self.mesh = fullmesh
[docs] def cleanMesh(self, tol: float, max_iter: int = 10, print_si: bool = True) -> pv.PolyData: """ Calls upon PyVista's built-in clean method to clean the mesh. Afterwards, calls upon PyMesh's built-in methods remove_degenerated_triangles() and detect_self_intersection() to remove duplicated faces and triangles, and resolve self-intersections. Args: tol: tolerance passed to PyVista's built-in clean() method (absolute tolerance) max_iter: max amount of iterations PyMesh is allowed to try and fix self-intersections, duplicate faces and duplicate vertices. print_si: print the progress of fixing self-intersections Returns: PyVista PolyData: The cleaned mesh """ mesh_ = self.mesh.clean(lines_to_points=True, polys_to_lines=True, tolerance=tol, absolute=True) mesh_ = makePyMesh(mesh_) mesh_, info = pm.remove_degenerated_triangles(mesh_) si = len(pm.detect_self_intersection(mesh_)) if print_si: print('\t' + str(si) + ' self-intersections detected') i = 0 while i < max_iter and si != 0: # there are still self-intersections, max 10 iterations mesh_, info = pm.remove_duplicated_faces(mesh_) mesh_, info = pm.remove_duplicated_vertices(mesh_, tol=tol) mesh_ = pm.resolve_self_intersection(mesh_) si = len(pm.detect_self_intersection(mesh_)) if print_si: sys.stdout.write("\r" + '\tIteration {}: '.format(i + 1) + str(si) + ' self-intersection(s) left ') sys.stdout.flush() i += 1 print("") mesh_, info = pm.remove_duplicated_vertices(mesh_, tol=tol) mesh_, info = pm.remove_duplicated_faces(mesh_) mesh_ = makePyVista(mesh_).clean(lines_to_points=True, polys_to_lines=True, tolerance=tol, absolute=True) self.__update(mesh_) self.point_info = pd.DataFrame() # point info does no longer match self.triangle_info = pd.DataFrame() return mesh_
[docs] def getEdgeLengths(self, mesh: Union[pm.Mesh, bool] = None) -> np.ndarray: """ Gets all edge lengths. If a mesh is given, the mesh must be a PyMesh Mesh object. If no mesh is given (default), the CartoMesh's mesh attribute is used (PyVista PolyData or UnstructuredGrid) Args: mesh: A PyMesh Mesh object. Default: None Returns: np.ndarray: An array containing the edge lengths of te mesh. """ edges = mesh.extract_all_edges() if mesh else self.edges pmedges = pvToPmCells(edges.extract_cells(range(edges.n_cells)).cells) # extract edge ind as cells distances = [] for pair in pmedges: co1, co2 = edges.points[pair] distances.append(dist(co1, co2)) return np.array(distances)
[docs] def homogenizeMesh(self, nsteps=10, boxplot=False, return_dist=False, edge_range=(600., 1000.)) -> pv.PolyData: """ Iteratively splits long edges and collapses short edges until they all have a length between min_edge and max_edge Args: nsteps: Amount of steps to take to iteratively adapt the mesh edge lengths boxplot: Make a boxplot of the mesh edge lengths for each iteration step after the refinement procedure. return_dist: Return a 2xnsteps array of the mesh edge lengths edge_range: Minimum and maximum allowed edge lengths Returns: Union(PolyData, Tuple[PolyData, List]): Either the refined surface mesh in PyVista's PolyData format, or both the refined mesh and a list containing all the edge lengths for each iteration step. """ def calcAlpha(n_: int, nsteps_: int, max_edge_: float, longest_edge_: float) -> float: """ Calculate the longest allowed edge length (alpha) for a given iteration step. Alpha drops exponentially as the iterations progress. At step 0, alpha equals the longest edge of the input mesh. At the final step, alpha equals the maximum desired edge length. Args: n_: Current iteration step nsteps_: Total amount of iteration steps max_edge_: Maximum allowed edge length at final iteration step longest_edge_: The longest edge length of the input mesh before any refinement Returns: float: The longest allowed edge length for the current iteration step """ return (1 - n_ / nsteps_) * longest_edge_ * np.exp(-3. * n_ / nsteps_) + max_edge_ def calcTol(n_: int, nsteps_: int, min_edge_: float) -> float: """ Calculate the shortest allowed edge for a given iteration step. The shortest allowed edge augments linearly as the iteration steps progress. Args: n_: Current iteration step nsteps_: Total amount of iteration steps min_edge_: Minimum allowed edge length at the final iteration step. Returns: float: The minimum allowed edge length for the current iteration step. """ return 500. * (1 - n_ / nsteps_) + min_edge_ # min edge length drops linearly def plotEdgelengths(dist_range: np.ndarray, show: bool = True, save: bool = True) -> None: """ Plots a boxplot of the edge lengths at each iteration step dist_range should be a 2D array, each array entry containing all the edge lengths at an iteration step Args: dist_range: An NxM array containing the mesh edge lengths for each iteration step, where M is the amount of mesh edges at iteration step N. show: Show the plot. Default = True save: Save the plot. Default = True Returns: None: Nothing """ plt.style.use('fivethirtyeight') medians = [np.median(d) for d in dist_range] means = [np.mean(d) for d in dist_range] sigmas = [np.sqrt(v) for v in [np.var(d) for d in dist_range]] fig, ax = plt.subplots(figsize=(8, 6)) ax.set_ylabel("Edge length (µm)", size=22) ax.set_xlabel("Iteration step", size=22) plt.tight_layout(pad=1.8, h_pad=1.8) ax.fill((.5, nsteps + 1.5, nsteps + 1.5, .5), (max_edge, max_edge, min_edge, min_edge), color=colors[0], alpha=.3, label='Desired edgelength\n({} - {} µm)'.format(min_edge, max_edge)) plt.axhline(max_edge, color='black', lw=2.5) plt.axhline(min_edge, color='black', lw=2.5) polygonx = [.88] + list(range(2, nsteps + 2)) + list(reversed(range(2, nsteps + 2))) + [.88] polygony = [m + s for m, s in zip(means, sigmas)] + \ list(reversed([m - s for m, s in zip(means, sigmas)])) # ax.fill(polygonx, polygony, color=colors[0], alpha=.2, label="mean $\pm$ $\sigma$") ax.boxplot(dist_range, labels=range(len(dist_range)), whis=[0, 100], boxprops=dict(color="gray", linewidth=2.5), medianprops=dict(color="gray", linewidth=2.5), whiskerprops=dict(color="gray", linewidth=2.5), capprops=dict(color="gray", linewidth=2.5), zorder=1) ax.plot(range(1, nsteps + 2), [calcAlpha(step) for step in range(nsteps + 1)], color=colors[1], ) # xaxis needs to be shifted by one for matplotlib bullshit reasons, hence range(1, nsteps+2) ax.plot(range(1, nsteps + 2), [calcTol(step) for step in range(nsteps + 1)], color=colors[1], label=r'$\alpha$ and tolerance') ax.set_title("Mesh edge lengths during refinement", size=28) ax.legend(loc="upper right", prop={'size': 20}) ax.tick_params(axis='x', labelsize=18) ax.tick_params(axis='y', labelsize=18) if show: plt.show() if save: fig.savefig("../Plots/EdgeLengthRefinement_{}-{}.png".format(min_edge, max_edge), dpi=300) min_edge, max_edge = edge_range edge_lengths = self.getEdgeLengths() longest_edge = np.max(edge_lengths) mesh_ = makePyMesh(self.mesh) if boxplot: dist_range = [edge_lengths] # initialize list of distances during refinement procedure for n in tqdm(range(1, nsteps + 1), desc=' Resizing edges'): alpha = calcAlpha(n, nsteps, max_edge, longest_edge) tol = calcTol(n, nsteps, min_edge) splitmesh_, info = pm.split_long_edges(mesh_, max_edge_length=alpha) colmesh_, info = pm.collapse_short_edges(splitmesh_, tol, preserve_feature=True) colmesh_.remove_attribute('face_sources') # pymesh adds this attribute. Makes conversion difficult. mesh_ = colmesh_ if boxplot: dist_range.append(self.getEdgeLengths(mesh_)) if boxplot: plotEdgelengths(dist_range) print("\tCleaning mesh...") mesh_ = cleanMesh(makePyVista(mesh_), tol=min_edge / 3., iterations=6) self.__update(mesh_) self.name += '_{}-{}µm'.format(int(min_edge), int(max_edge)) # update name if return_dist: return mesh_, getEdgeLengths(makePyMesh(mesh_)) return mesh_ # return final version of mesh
[docs] def tetrahedralise(self, switches, n_col_retry=3) -> None: """ Runs TetGen as a bash command. Args: switches: Switches to use with the TetGen command. See https://www.wias-berlin.de/software/tetgen/switches.html n_col_retry: Amount of times to attempt to fix collinearities during the TetGen process. Returns: None: Nothing. Writes out a .vtk file. """ def runTetGen(name: str, switches_: str) -> [[int, int]]: """ Runs TetGen in terminal and captures output. Args: name: name of the base file switches_: Switches to be used together with TetGen bash command. Returns: [[int, int]]: A list containing pairs of indices. These pairs define a mesh segment that's colinear. """ cwd = os.getcwd() os.chdir(self.dir) tetcommand = "tetgen {} {}.smesh".format(switches_, name) command = ['stdbuf', '-o0', *tetcommand.split(" ")] print('\tNow running bash command {}\n'.format(" ".join(command))) print("\t-------------------- TetGen output --------------------") # adding stdbuf -o0 makes TetGen output real-time p = Popen(command, stdout=PIPE, stderr=PIPE, encoding='utf-8') colin_pair = [] for line in p.stdout: l_ = str(line.rstrip()) print("\t" + l_) # The following checks assume that, if an error is found, it's colinearity # This still needs to be expanded for overlapping lines and segments intersecting triangles if '1st' in l_ and 'facet' not in l_ and 'seg' not in l_: # line is of form "1st: [24578,24581]." # split at ":", drop first two (whitespace and "[") and last two ("]" and ".") chars # split at comma to make list again edge1 = [int(e) for e in l_.split(':')[1][2:-2].split(',')] if '2nd' in l_ and 'facet' not in l_ and 'seg' not in l_: # line is of form "2nd: [24578,24581]." edge2 = [int(e) for e in l_.split(':')[1][2:-2].split(',')] colin_pair.append([edge1, edge2]) p.stdout.flush() print("\t------------------ End TetGen output ------------------") os.chdir(cwd) return colin_pair colinear_pairs = runTetGen(self.name, switches) i = 0 mesh = self.mesh while i < n_col_retry and len(colinear_pairs) > 0: # check for collinear pairs print("\n\tCollinearity found! Adapting points...") mesh = makeNonCollinear(mesh, colinear_pairs) writeToSmesh(mesh, self.dir + self.name) colinear_pairs = runTetGen(self.name, switches) i += 1
[docs] def extractMyoAndNonMyo(self) -> Tuple[pv.PolyData, pv.PolyData]: """ Checks scalar data of the mesh in its current state (clinical tags) and extracts the part of the mesh with tags that correspond to myocardium (tag == 0). Returns: Tuple[pv.PolyData, pv.PolyData]: A tuple containing two PyVista PolyData meshes: a pointcloud of points corresponding to the mesh myocardium and a pointcloud of the rest of the mesh. """ myo = pv.PolyData() noncond = pv.PolyData() for tag in np.unique(self.mesh['color']): tagged_pointcloud = self.mesh.points[[self.mesh['color'] == tag]] if tag == 0: myo += pv.PolyData(tagged_pointcloud) else: noncond += pv.PolyData(tagged_pointcloud) return myo, noncond
[docs] def writeTags(self) -> None: """ Writes out all points in a mesh with a certain tag to a .csv file Returns: None: Nothing. Writes out .csv files. """ for tag in self.mesh['color'].unique(): tagged_pointcloud = self.mesh.points[[self.mesh['color'] == tag]] with open(self.dir + "Tag_{}.csv".format(tag), 'w+') as of: csvWriter = csv.writer(of, delimiter=',') csvWriter.writerows(tagged_pointcloud)
[docs] def writeAdjustNa2(self, tol: float = 1e-5, g_Na: float = 7.8, write_dat: bool = True) -> None: """ Writes adjustment file to close off Na2+ channels in cells where CV ~ 0 Args: tol: tolerance for when a point is considered to have a conduction velocity of 0. Points whose conduction velocity < tol are considered to be zero. g_Na: value for (maximum) Sodium channel conductance. Open Na-channels are set to this conductance. Default: 7.8 pS write_dat: write a .dat file for visual inspection in e.g. meshalyzer. Returns: None: Nothing. Writes out a .txt file and, if wanted, a .dat file """ def writeDat(d: str, mesh: pv.PolyData, name: str = "gNA2") -> None: """ Write a .dat file. This file is a column of 0s and 1s. The index column corresponds to the mesh point index. If the value equals to one on row i, then mesh.points[i] is a point whose Na2+ channel has been closed off. Args: d: Directory to write .dat file to mesh: A Pyvista PolyData mesh name: Name of the .dat file, without file extension. Default = 'gNA2' Returns: None: Nothing. Writes out .dat file <name>.dat """ datfile = open(d + name + ".dat", "w+") dat = np.zeros(mesh.n_points) dat[ptn == 0.] = 1 for e in dat: datfile.write(str(e) + '\n') datfile.close() assert self.mesh.n_cells or self.mesh.n_triangles, "Mesh does not contain any cells or triangles." cells = pvToPmCells(self.mesh.cells) if self.cells else pvToPmCells(self.mesh.triangles) speed = self.mesh["speed"] ptn = np.ones(len(self.mesh.points)) * g_Na for i in range(len(speed)): if speed[i] < tol: # if CV is ~ 0 -> close off Na channel vertices = cells[i] ptn[vertices] = 0. stimfile = open(self.dir + "gNA2.adj", "w+") stimfile.write(str(len(ptn)) + "\n" + "extra\n") for i in range(len(ptn)): stimfile.write(str(i) + " " + str(ptn[i]) + "\n") stimfile.close() if write_dat: writeDat(self.dir, self.mesh, "gNA2")
[docs] def setNonCondByIndexFiles(self, region_dir='Regions', write_dat=False, index_col="meshID") -> None: """ Opens directory region_dir and reads in .csv files there. These .csv files should contain the indices of points whose conduction velocity should be set to zero. Writes out a .dat file for each .csv file if wanted Args: region_dir: Name of directory containing the regions to be set to a conduction velocity of 0. These regions should be .csv files containing the indices of points to be set to CV=0 index_col: Name of the column in the .csv files that contain the point indices. Default=\"meshID\" for easy workflow in correspondence with :func:`mesh_tools.ptsToParaview` Returns: None: Nothing """ def writeDat(data: pd.DataFrame, n_points, name): """Writes a .dat file for a given pd.DataFrame""" # write scar .dat file datfile = open(name + ".dat", "w+") dat = np.zeros(n_points) dat[data["meshID"]] = 1 for e in dat: datfile.write(str(e) + '\n') datfile.close() def getNonCondRegions(meshdir, mesh, region_dir_=region_dir, write_dat_=write_dat): """""" non_cond_region_indices = [] if os.path.isdir(meshdir + region_dir_): # apply scars if directory exists for csvfile in glob.glob(meshdir + region_dir_ + "/*.csv"): scar = pd.read_csv(csvfile) if write_dat_: writeDat(scar, csvfile.split('.')[0], len(mesh.points)) for index in scar[index_col].values: non_cond_region_indices.append(index) return np.array(non_cond_region_indices) # Set manually selected scars to 0 velocity scars = getNonCondRegions(self.dir, self.mesh, 'Regions') self.mesh["speed"] = [0. if p in scars else self.mesh["speed"][p] for p in range(self.n_points)]
[docs] def applyCV(self, speed_file='speed.csv', write_csv=False, write_VTK_file=False, write_txt=True, write_dat=False, write_xyz=False, outdir='scale_factors/', region_dir='', ncv=None, speed_col="speed"): """ Applies conduction velocities on a reconstructed tetrahedralised carto mesh. Args: speed_file: Name of the file containing the coordinates and conduction velocity values to interpolate. write_csv: Write out the coordinates and speeds to a .csv file write_VTK_file: Write out the resulting mesh to .vtk write_txt: Write out the conduction velocities as scale factors (readable by OpenCARP) to <outdir> write_dat: Write out the regions of the mesh with CV=0 to .dat file, to visualize in Meshalyzer write_xyz: Write point cloud of the conduction velocites, as interpolated on the mesh outdir: Name of the directory to contain the scale factors aka conduction velocities region_dir: Name of the directory containing .csv files with indices of mesh points whose conduction velocity will be set to 0. Returns: None: Nothing. Writes out one or more of the following files in <outdir>: .csv, .vtk, .txt, .dat or a pointcloud .csv file if <write_xyz> == True. """ def applySpeedLimit(data_: pd.DataFrame, limit: tuple, speed_col: str = 'speed') -> pd.DataFrame: """ Given a DataFrame, cuts off all conduction velocities whose conduction velocity in the column <speed_col> have a value outside the range of <limit> Args: data_: The DataFrame containing the conduction velocities limit: The minimum and maximum allowed conduction velocities as a tuple with length 2 speed_col: Name of the DataFrame column containing the conduction velocities. Default = 'speed' Returns: DataFrame: DataFrame whose conduction velocities in the column <speed_col> have been cut off to match <speed_col> """ speeds = data_[speed_col] for i in range(len(speeds)): s = speeds[i] if s < limit[0]: speeds[i] = limit[0] if s > limit[1]: speeds[i] = limit[1] data_[speed_col] = speeds return data_ def randomizeCV(data_: pd.DataFrame, n_neighbors: int, speed_col: str = "speed") -> pd.DataFrame: """Given a mesh with conduction velocities, makes a variation on this conduction velocity distribution. A normal distribution is fitted to each point and their neighbors. From this distribution, a new conduction velocity is randomly sampled. Args: data_: the mesh in pd.DataFrame format. Must contain columns 'x', 'y' and 'z' with the coordinates in µm. n_neighbors: amount of neighbors to use for each normal distribution fitting. speed_col: name of the column containing the conduction velocity values for each mesh point. Returns: DataFrame: DataFrame containing the original point coordinates and updated conduction velocities""" print("\n\t#### Variation ", n) # tweak conduction velocities of input CV file points = data_[["x", "y", "z"]].values tree = nb.KDTree(points) speeds = np.zeros(len(data_)) for i in tqdm(range(len(points)), desc=' Calculating new velocities'): p = points[i] dist, neighbors = tree.query([p], k=n_neighbors) neighborCVs = input_data.loc[[int(e) for e in neighbors[0]]][speed_col] mean, sigma = np.mean(neighborCVs), np.std(neighborCVs) new_cv = np.random.normal(mean, sigma, 1) speeds[i] = np.abs(new_cv) data_[speed_col] = speeds return data_ if not ncv: n_cv = self.settings['reconstruction_parameters']['n_cv'] else: n_cv = ncv # read in data input_data = pd.read_csv(self.dir + speed_file, usecols=["speed", "x", "y", "z"]) input_data = applySpeedLimit(input_data, self.settings['cv_interpolation_parameters']['speed_limit']) med_speed = np.mean(input_data["speed"]) # create new dataframe for mutable purposes calculated_data = input_data for n in range(n_cv): if n != 0: # variations of conduction velocities calculated_data = randomizeCV(calculated_data, n_neighbors=self.settings['cv_interpolation_parameters']['n_neighbors']) # Create PolyData to use in interpolation data = applySpeedLimit(calculated_data, self.settings['cv_interpolation_parameters']['speed_limit']) pvdata = pv.PolyData(np.array([data["x"], data["y"], data["z"]]).T) pvdata["speed"] = data["speed"] # Interpolate on mesh print("\tInterpolating on mesh") mesh = self.mesh.interpolate(pvdata, radius=self.settings['cv_interpolation_parameters']['radius'], sharpness=self.settings['cv_interpolation_parameters']['sharpness'], strategy="null_value", null_value=med_speed, pass_cell_arrays=False) self.__update(mesh) # set mesh if region_dir: print("\tSetting regions to 0 conduction velocity") self.setNonCondByIndexFiles(region_dir, write_dat) pointdata = mesh["speed"] mesh = mesh.ptc() # point data to cell data cell_data = mesh["speed"] sq_point_data = pd.DataFrame([e ** 2 for e in pointdata], columns=["squared speed"]) sq_cell_data = pd.DataFrame([e ** 2 for e in cell_data], columns=["squared speed"]) self.__update(mesh) if not os.path.exists(self.dir + outdir): os.mkdir(self.dir + outdir) # write to csv file if write_csv: print("\tWriting squared speed to csv") sq_point_data.to_csv(self.dir + outdir + "sq_CV_point_{}.csv".format(n), index_label="PointID") sq_cell_data.to_csv(self.dir + outdir + "sq_CV_cell_{}.csv".format(n), index_label="CellID") if write_xyz: print("\tWriting point cloud") of = open(self.dir + outdir + "input_points_{}.txt".format(n), "w+") of.write("X,Y,Z,speed\n") for i in tqdm(range(len(pvdata.points))): p = pvdata.points[i] d = pvdata["speed"][i] for c in p: of.write(str(c) + ",") of.write(str(d) + '\n') of.close() # write text file to read in during simulation if write_txt: print("\tWriting txt file: {}scale_factor_{}.txt".format(outdir, n)) of = open(self.dir + outdir + "scale_factor_{}.txt".format(n), "w+") for e in sq_cell_data["squared speed"]: of.write(str(e) + '\n') of.close() # .dat file for visualisation purposes if write_dat: print("\tWriting dat file: {}scale_factor_{}.dat".format(outdir, n)) of = open(self.dir + outdir + "scale_factor_{}.dat".format(n), "w+") for e in sq_point_data["squared speed"]: of.write(str(e) + '\n') of.close() # write to vtk for inspection in paraview if write_VTK_file: print("\tWriting mesh to {}{}_CV{}.vtk".format(self.dir, self.name, n)) pv.save_meshio(self.dir + "{}_CV{}.vtk".format(self.name, n), self.mesh)
[docs] def reconstruct(self) -> None: """ Reads in .mesh file and writes out a refined tetrahedron mesh in .vtk format and carp format. If 'speed.csv' exists in the cwd, also interpolates these speeds on the mesh. speed.csv should be a csv file with columns 'x', 'y', 'z' and 'speed', where the xyz coordinates refer to point coordinates. Can be calculated with DGM. Args: Returns: Nothing. Writes out a .vtk file of the tetrahedron mesh. Writes out the same mesh in Carp text format. Updates the mesh to the tetrahedron mesh. """ def getPipeline(self_, boxplot, switches, refine_steps, min_edge, max_edge, n_cv, n_col_retry, keep_intmed): """Given a set of parameters, generate the functions with corresponding parameters to reconstruct a carto mesh from zero to simulation-ready. Args: self_: self boxplot: bool: Whether or not to make a boxplot of the edgelengths during refinement in :func:`homogenizeMesh` switches: str: The switches used in the tetgen command refine_steps: int: amount of refinement steps during :func:`homogenizeMesh` min_edge: float: Minimum allowed edge length max_edge: float: Maximum allowed edge length n_cv: int: Amount of conduction velocity distribution files to make. n_col_retry: int: Amount of times you want to try to fix colinearities during TetGen command. keep_intmed: bool: Unused. For completeness.""" pipeline_ = OrderedDict([ ( "Adding endo and epi point layers", {'function': [self_.splitLayer], 'args': [{}]} ), ( "Refining surface mesh", {'function': [self_.homogenizeMesh], 'args': [{'nsteps': refine_steps, 'boxplot': boxplot, 'edge_range': (min_edge, max_edge)}]} ), ( "Writing to .smesh", {'function': [writeToSmesh], 'args': [{'mesh': self_.mesh, 'name': self_.dir + self_.name}]} ), ( "Tetrahedralising with TetGen", {'function': [self_.tetrahedralise], 'args': [{'switches': switches, 'n_col_retry': n_col_retry}]} ), ( "Converting to carp and paraview-friendly format", {'function': [convertMesh_Meshtool, ptsToParaview], 'args': [{'meshname': self_.dir + self_.name}, {'filename': self_.dir + self_.name + '.pts', 'column_name': 'meshID'}]} ), ( "Cleaning with meshtool", {'function': [cleanMesh_Meshtool, convertMesh_Meshtool], 'args': [{'meshname': self_.dir + self_.name, 'threshold': .2}, {'meshname': self_.dir + self_.name, 'ifmt': 'carp_txt', 'ofmt': 'vtk'}]} ) ]) if n_cv: pipeline_.update({"Applying conduction velocities": {'function': [self.applyCV], 'args': [{'write_VTK_file': True}] } }) return pipeline_ print("\n####### Creating 3D tetrahedralized {}\n".format(self.name)) start = time.time() step = 1 pipeline = getPipeline(self, **self.settings['reconstruction_parameters']) for action_name in pipeline: print("\n---- {}. {}\n".format(step, action_name)) action = pipeline[action_name] for func, args in zip(action['function'], action['args']): func(**args) # Reinitialise pipeline dict to update function arguments pipeline = getPipeline(self, **self.settings['reconstruction_parameters']) step += 1 if not self.settings['reconstruction_parameters']['keep_intmed']: # delete intermediate files print("\n----- {}. Deleting intermediate files\n".format(step)) # remove intermediate files that weren't present before generating and aren't the .vtk file to_delete = ["mid.txt", "endo.txt", "epi.txt", "TrianglesSection.csv", "VerticesSection.csv", 'VerticesAttributesSection.csv', 'VerticesColorsSection.csv', self.name + ".smesh", self.name + '.1.mtr', self.name + '.1.mtr', self.name + '.1.p2t', self.name + '.1.node', self.name + '.1.edge', self.name + '.1.face', self.name + '.1.ele', 'myo.csv', 'noncond.csv'] for trash in to_delete: if os.path.exists(self.dir + trash): os.remove(self.dir + trash) print("\tDeleted ", trash) step += 1 duration = time.time() - start print("\n\tMesh reconstructed in {0}m {1:.2f}s".format(int(duration // 60), duration % 60)) self.__update(pv.read(self.dir + self.name + '.1.vtk'))
[docs] def plot(self) -> None: """ Plots the mesh in its current state using PyVista's Plotter() method. Returns: None: Nothing. Opens a VtkRenderer window to show the plot. """ p = pv.Plotter() p.add_mesh(self.mesh) p.show()