Source code for jade.RAbD_BM.tools_ab_db

import sqlite3
import os
import sys
import pandas
from collections import defaultdict
from jade.antibody import outliers
#A Collection of functions to query the AbDb




[docs]def get_cdr_data_table_df(db_path): """ Get a dataframe with typical info from the cdr_data table in the PyIgClassify db. :param db_con: sqlite3.con :rtype: pandas.DataFrame """ query="SELECT "\ "cdr_data.PDB," \ "cdr_data.gene," \ "cdr_data.CDR," \ "cdr_data.fullcluster," \ "cdr_data.length," \ "cdr_data.DistDegree," \ "cdr_data.seq "\ "FROM " \ "cdr_data " \ "WHERE " \ "cdr_clusters.datatag != 'loopKeyNotInPaper'" con = sqlite3.connect(db_path) df = pandas.read_sql_query(query, con) con.close()
return df
[docs]def get_total_entries(df, gene, cdr): """ Get a the total number of entries matching the gene and the cdr. Used for recovery. :param df: pandas.DataFrame :rtype: int """
return len(df[ (df['gene'] == gene) & (df['cdr'] == cdr)]) ############### Matching ###############
[docs]def get_length_matches(df, gene, cdr, length): """ Get a dataframe of the matching ("Recovered") rows (DataFrame). :param df: pandas.DataFrame :param length: int :rtype: pandas.DataFrame """
return df[(df['gene'] == gene) & (df['CDR'] == cdr) & (df['length'] == length )]
[docs]def get_cluster_matches(df, gene, cdr, cluster): """ Get a dataframe of the matching ("Recovered") rows (DataFrame). :param df: pandas.DataFrame :rtype: pandas.DataFrame: """
return df[(df['gene'] == gene) & (df['CDR'] == cdr) & (df['fullcluster'] == cluster )] ############### Recovery ###############
[docs]def get_length_enrichment(df, gene, cdr, length): """ Get the number of matches in the df and pdbid to the cdr and length :param df: pandas.DataFrame :param length: int :rtype: int """
return len(get_length_matches(df, gene, cdr, length)['length'])
[docs]def get_cluster_enrichment(df, gene, cdr, cluster): """ Get the number of matches in the df and pdbid to the cdr and cluster :param df: pandas.DataFrame :rtype: int """
return len(get_cluster_matches(df, gene, cdr, cluster)['fullcluster']) ###########################
[docs]def get_pdb_chain_subset(db, gene): """ Return a list of tuples of [pdb, chain] of the particular gene """ #print db_fname db.row_factory = sqlite3.Row c = db.cursor() c.execute("SELECT DISTINCT PDB, original_chain FROM cdr_data WHERE gene =?", (gene,)) rows = c.fetchall() entries = [] for row in rows: row.keys() entries.append([row['PDB'],row['original_chain']])
return entries
[docs]def get_all_lengths(db, cdr, limit_to_known = True, res_cutoff = 2.8, rfac_cutoff = .3): """ Get all unique lengths for a CDR """ c = db.cursor() lengths = [] if limit_to_known: in_data = ['loopKeyNotInPaper', cdr, res_cutoff, float(rfac_cutoff)] for row in c.execute("SELECT DISTINCT length from "+"cdr_data"+" WHERE datatag!=? and CDR=? and resolution<=? and rfactor<=?", in_data): lengths.append(row[0]) else: in_data = [cdr, res_cutoff, rfac_cutoff] for row in c.execute("SELECT DISTINCT length from "+"cdr_data WHERE CDR=? and resolution<=? and rfactor<=?", in_data): lengths.append(row[0]) c.close()
return lengths
[docs]def get_all_clusters_for_length(db, cdr, length, limit_to_known = True, res_cutoff = 2.8, rfac_cutoff = .3): """ Get all unique clusters for a length and a cdr """ c = db.cursor() clusters = [] if limit_to_known: data = ['loopKeyNotInPaper', cdr, length, res_cutoff, rfac_cutoff] for row in c.execute("SELECT DISTINCT fullcluster from cdr_data"+" WHERE datatag!=? and CDR=? and length=? and resolution <= ? and rfactor <= ?", data): clusters.append(row[0]) else: data = [cdr, length, res_cutoff, rfac_cutoff] for row in c.execute("SELECT DISTINCT fullcluster from cdr_data WHERE CDR=? and length=? and resolution<=? and rfac_cutoff<=?", data): clusters.append(row[0]) c.close()
return clusters
[docs]def get_center_for_cluster_and_length(db, cdr, length, cluster, data_names_array): sele = ", ".join(data_names_array) #print sele data = [] c = db.cursor() #print "Getting data for: "+" ".join([cdr, repr(length), cluster]) in_data = ['loopKeyNotInPaper', cdr, length, cluster] for row in c.execute("SELECT "+sele+" FROM cdr_data WHERE center=1 and datatag!=? and CDR=? and length=? and fullcluster=?",in_data): data.append(row) #print data if len(data) == 0: return data
else: return data[0]
[docs]def get_center_dih_degrees_for_cluster_and_length(db, cdr, length, cluster): """ Returns a dictionary of center dihedral angles in positional order. Or returns False if not found. result["phis'] = [phis as floats] result["psis"] = [Psis as floats] result["omegas"] = [Omegas as floats] """ result = defaultdict() result["phis"] = [] result["psis"] = [] result["omegas"] = [] data_names_array = ["dihedrals"] data = get_center_for_cluster_and_length(db, cdr, length, cluster, data_names_array) if len(data) == 0: return False dihedrals = data[0] dihSP = dihedrals.split(":") phii = 0; psii = 1; omega_i = 2 for i in range(0, length): phi = dihSP[phii]; psi = dihSP[psii]; omega = dihSP[omega_i] result["phis"].append(float(phi)); result["psis"].append(float(psi)); result["omegas"].append(float(omega)) phii+=3; psii+=3; omega_i+=3 assert(len(result["phis"]) == len(result["psis"]) == len(result["omegas"]))
return result
[docs]def get_dihedral_string_for_centers(db, limit_to_known = True): sele = "SELECT fullcluster, length_type, ss, dihedrals from cdr_data where center=1" data = defaultdict() c = db.cursor() for row in c.execute(sele): data[row[0]] = [row[1], row[2], row[3]]
return data
[docs]def get_data_for_cluster_and_length(db, cdr, length, cluster, data_names_array, limit_to_known = True, res_cutoff = 2.8, rfac_cutoff = .3): """ Get a set of data of a particular length, cdr, and cluster. data_names_array is a list of the types of data. Can include DISTINCT keyword Example: data_names_array = ["PDB", "original_chain", "new_chain", "sequence"] """ sele = ", ".join(data_names_array) #print sele data = [] c = db.cursor() if limit_to_known: in_data = ['loopKeyNotInPaper', cdr, length, cluster, res_cutoff, rfac_cutoff] for row in c.execute("SELECT "+sele+" FROM cdr_data WHERE datatag!=? and CDR=? and length=? and fullcluster=? and resolution<=? and rfactor <=?", in_data): data.append(row) else: in_data = [cdr, length, cluster, res_cutoff, rfac_cutoff] for row in c.execute("SELECT "+sele+" FROM cdr_data WHERE CDR=? and length=? and fullcluster=? and resolution<=? and rfactor<=?", in_data): data.append(row) c.close()
return data
[docs]def get_unique_sequences_for_cluster(db, cluster, include_outliers, outlier_definition = "conservative"): sequences = [] cur = db.cursor() for row in cur.execute("select DISTINCT seq FROM cdr_data WHERE fullcluster=? "+outliers.get_outlier_string(include_outliers, outlier_definition), [cluster]): sequences.append(row[0]) cur.close()
return sequences
[docs]def get_cdr_rmsd_for_entry(db, pdb, original_chain, cdr, length, fullcluster): #Make sure string to to unicode bullshit. sele = [str(pdb), str(original_chain), str(cdr), length, str(fullcluster)] cur = db.cursor() rmsds = [] for row in cur.execute("select bb_rmsd_cdr_align FROM cdr_data WHERE PDB=? AND original_chain = ? AND CDR=? AND length=? and fullcluster=?", sele): rmsds.append(row[0]) cur.close() if len(rmsds) < 1: return -1 else:
return rmsds[0]
[docs]def get_stem_rmsd_for_entry(db, pdb, original_chain, cdr, length, fullcluster): #Make sure string to to unicode bullshit. sele = [str(pdb), str(original_chain), str(cdr), length, str(fullcluster)] cur = db.cursor() rmsds = [] for row in cur.execute("select bb_rmsd_stem_align FROM cdr_data WHERE PDB=? AND original_chain = ? AND CDR=? AND length=? and fullcluster=?", sele): rmsds.append(row[0]) cur.close() if len(rmsds) < 1: return -1 else:
return rmsds[0]