from pprint import pprint from argparse import ArgumentParser from util import getQuery, pickleQuery import numpy as np import pandas as p import matplotlib matplotlib.use('agg') import matplotlib.pyplot as plt import seaborn as sns from scipy.spatial.distance import squareform from scipy.cluster.hierarchy import dendrogram, linkage, cophenet, fcluster from tqdm import tqdm from itertools import combinations from math import factorial as f def cluster(dcmat, method, nclusters): """Cluster provided correlation dataframe """ lmat = squareform(dcmat) lobj = linkage(lmat, method = method) clabs = [x + 1 for x in range(nclusters)] clusts = fcluster(lobj, nclusters, criterion='maxclust') clustdf = p.DataFrame({'icp_id' : dcmat.index.values, 'cluster' : clusts}) return lobj, clustdf def dendro(lobj, clustdf, numclusts, icps, fname): clabs = [x + 1 for x in range(numclusts)] cpal = dict(zip(clabs, sns.color_palette("colorblind", numclusts).as_hex())) # Algorithm via # ldict = {icp_id:cpal[cluster] for icp_id, cluster in zip(clustdf.icp_id, clustdf.cluster)} link_cols = {} for i, i12 in enumerate(lobj[:,:2].astype(int)): c1, c2 = (link_cols[x] if x > len(lobj) else ldict[clustdf.icp_id[x]] for x in i12) link_cols[i+1+len(lobj)] = c1 if c1 == c2 else '#000000' plt.figure(figsize = (25, 10)) plt.title('ICP Clustering Dendrogram') plt.xlabel('ICP ID/(Number of ICPs)') plt.ylabel('distance') dendrogram( lobj, labels = icps, leaf_rotation=90, leaf_font_size=8, link_color_func = lambda x: link_cols[x], color_threshold = None ) plt.savefig(fname) def main(): parser = ArgumentParser(description='Cluster from pre-existing distance correlation matrix in pickled dataframe') parser.add_argument("-i", "--input", dest="input", help = "input pickle path", metavar="PATH", required = True) parser.add_argument("-o", "--output", dest="output", help = "output pickle path", metavar="PATH", required = True) parser.add_argument("--method", dest="method", help = "clustering method; default 'ward'", metavar = "METHOD", default = "ward") parser.add_argument("--clusters", dest="numclusters", help = "number of clusters; default: 9", metavar = "NUM", default = 9, type = int) parser.add_argument("-d", "--dendrogram", dest = "incdendro", help = "draw dendrogram", action ="store_true") parser.add_argument("-t", "--tree", dest="treepath", help="Filename for dendrogram (if -d)", metavar="PATH") parser.add_argument("-v", "--verbose", dest = "verbose", action ="store_true") args = parser.parse_args() if args.incdendro and args.treepath is None: parser.error("-d/--dendrogram requires -t/--tree PATH") if args.verbose: print("Clustering") sourcep = p.read_pickle(args.input) l, c = cluster(sourcep, args.method, args.numclusters) c.to_pickle(args.output) if args.incdendro: if args.verbose: print("Drawing dendrogram") icps = sourcep.index.values dendro(l, c, args.numclusters, icps, args.treepath) if __name__ == "__main__": main()