diff --git a/program/inspection.py b/program/inspection.py index 995b715..7c3e2e9 100755 --- a/program/inspection.py +++ b/program/inspection.py @@ -6,10 +6,10 @@ def plot_DE_umap_save(adata,reference): with open('./scTriangulate_inspection/log.txt','w') as f: for cluster in adata.obs[reference].astype('category').cat.categories: - adata_s = adata[adata.obs[reference]==cluster,:] + adata_s = adata[adata.obs[reference]==cluster,:].copy() # first, save adata_s.h5ad (cellxgene) - adata_s.raw.to_adata().write('./scTriangulate_inspection/to_cellxgene_{}.h5ad'.format(cluster)) + adata_s.write('./scTriangulate_inspection/to_cellxgene_{}.h5ad'.format(cluster)) # second, umap sc.pl.umap(adata_s,color=['reassign_prefix']) diff --git a/program/metrics.py b/program/metrics.py index 463052c..d43140a 100755 --- a/program/metrics.py +++ b/program/metrics.py @@ -86,8 +86,8 @@ def marker_gene(adata, key): if adata.uns.get('rank_genes_groups') != None: del adata.uns['rank_genes_groups'] # perform t-test - sc.tl.rank_genes_groups(adata, key, method='t-test',n_genes=adata.raw.shape[1]) - all_genes = adata.raw.var_names.values # ndarray, all the genes + sc.tl.rank_genes_groups(adata, key, method='t-test',n_genes=adata.shape[1]) + all_genes = adata.var_names.values # ndarray, all the genes all_clusters = adata.obs[key].cat.categories # pd.Index, all the clusters cluster2gene = dict() # {'cluster1':[gene1,gene2..]} rank_uns = adata.uns['rank_genes_groups'] @@ -111,11 +111,11 @@ def marker_gene(adata, key): assign = all_clusters[np.argmin(np.array(index_store))] # get argmin, take the corresponding cluster cluster2gene[assign].append((gene,np.min(index_store))) # sort the cluster2gene - for key,value in cluster2gene.items(): + for key_,value in cluster2gene.items(): gene = [item[0] for item in value] rank = [item[1] for item in value] temp = sorted(zip(gene,rank),key=lambda x:x[1]) - cluster2gene[key] = [item[0] for item in temp] + cluster2gene[key_] = [item[0] for item in temp] result = pd.Series(cluster2gene).to_frame() result.columns = ['whole_marker_genes'] @@ -137,7 +137,7 @@ def marker_gene(adata, key): result['enrichr'] = col_enrichr result['purify'] = col_purify - + result.to_csv('./scTriangulate_result/marker_{0}.txt'.format(key),sep='\t') return result @@ -150,13 +150,12 @@ def reassign_score(adata,key,marker): pick = marker_genes[:num] # if the list doesn't have more than 30 markers, it is oK, python will automatically choose all pool.extend(pick) pool = list(set(pool)) - adata_now = adata.raw.to_adata() - adata_now = adata_now[:,pool] + adata_now = adata[:,pool] # reducing dimension from sklearn.decomposition import PCA reducer = PCA(n_components=30) - scoring = reducer.fit_transform(X=adata_now.X.toarray()) #become dense matrix + scoring = reducer.fit_transform(X=adata_now.X) from sklearn.preprocessing import LabelEncoder le = LabelEncoder() @@ -211,13 +210,13 @@ def tf_idf_bare_compute(df,cluster): return tf_idf_ori def tf_idf_for_cluster(adata,key): - df = pd.DataFrame(data=adata.raw.X.toarray(), index=adata.obs_names, columns=adata.raw.var_names) #become dense matrix + df = pd.DataFrame(data=adata.X, index=adata.obs_names, columns=adata.var_names) df['cluster'] = adata.obs[key].astype('str').values cluster_to_tfidf = {} # store tfidf score cluster_to_exclusive = {} # store exclusivly expressed genes for item in adata.obs[key].cat.categories: a = tf_idf_bare_compute(df,item) - a_names = adata.raw.var_names + a_names = adata.var_names test = pd.Series(data=a, index=a_names) test.sort_values(ascending=False, inplace=True) # remove artifact genes @@ -240,7 +239,7 @@ def SCCAF_score(adata, key): from sklearn.linear_model import LogisticRegression from sklearn.metrics import confusion_matrix # define X and Y and exclude cells whose cluster only have 1 cell - X = adata.raw.X.toarray() + X = adata.X Y = adata.obs[key].values # label encoding Y to numerical values diff --git a/program/prune.py b/program/prune.py index 5733bc6..876b885 100755 --- a/program/prune.py +++ b/program/prune.py @@ -11,6 +11,7 @@ import matplotlib.pyplot as plt import seaborn as sns from scipy.stats import rankdata +import multiprocessing as mp import scanpy as sc import anndata as ad @@ -71,34 +72,35 @@ def inclusiveness(obs,r,c): return fraction_r,fraction_c,result,nearly - -def reference_pruning(adata,reference,size_dict): - obs = adata.obs - obs['ori'] = np.arange(obs.shape[0]) # keep original index order in one column - pruned_chunks = [] # store pruned chunk, one chunk menas one reference cluster - for chunk in obs.groupby(by=reference): +def run_reference_pruning(chunk,reference,size_dict,obs): + with open('./scTriangulate_present/log_prune_step_{}.txt'.format(chunk[0]),'a') as log: + log.write('reference cluster: {}\n'.format(chunk[0])) + log.flush() subset = chunk[1] vc = subset['engraft'].value_counts() overlap_clusters = vc.index mapping = {} for cluster in overlap_clusters: + print('--- query cluster: {}'.format(cluster),file=log,flush=True) r = {reference:chunk[0]} - c = {cluster.split('_')[0]:cluster.split('_')[1]} + c = {cluster.split('$')[0]:cluster.split('$')[1]} fraction_r,fraction_c,result,nearly = inclusiveness(obs,r,c) if not result: # no inclusive, go back to reference annotation - mapping[cluster] = reference + '_' + chunk[0] + mapping[cluster] = reference + '$' + chunk[0] else: proportion_to_ref = vc.loc[cluster] / vc.sum() - proportion_to_self = vc.loc[cluster] / size_dict[cluster.split('_')[0]][cluster.split('_')[1]] + proportion_to_self = vc.loc[cluster] / size_dict[cluster.split('$')[0]][cluster.split('$')[1]] if proportion_to_ref < 0.1 and not nearly: # only cover < 10% reference cluster and it is not nearly included - mapping[cluster] = reference + '_' + chunk[0] + mapping[cluster] = reference + '$' + chunk[0] elif nearly and proportion_to_ref < 0.1 and proportion_to_self < 0.1: # it is nearly included, so evade the first catcher, but to_self proportion is low - mapping[cluster] = reference + '_' + chunk[0] + mapping[cluster] = reference + '$' + chunk[0] else: mapping[cluster] = cluster subset['reassign'] = subset['engraft'].map(mapping).values + print('finished first part',file=log,flush=True) + # change to most abundant type if engraft only have 1 vc2 = subset['reassign'].value_counts() most_abundant_cluster = vc2.loc[vc2==vc2.max()].index[0] # if multiple, just pick the first one @@ -106,10 +108,25 @@ def reference_pruning(adata,reference,size_dict): for i in range(subset.shape[0]): if subset.iloc[i]['reassign'] in exclude_clusters: subset.loc[:,'reassign'].iloc[i] = most_abundant_cluster # caution that Settingwithcopy issue - pruned_chunks.append(subset) + + print('finished second part',file=log,flush=True) + print(subset.shape,file=log,flush=True) + return subset + + +def reference_pruning(obs,reference,size_dict): + obs['ori'] = np.arange(obs.shape[0]) # keep original index order in one column + pruned_chunks = [] # store pruned chunk, one chunk menas one reference cluster + chunk_list = list(obs.groupby(by=reference)) + cores = len(chunk_list) + pool = mp.Pool(processes=cores) + r = [pool.apply_async(run_reference_pruning,args=(chunk,reference,size_dict,obs)) for chunk in chunk_list] + pool.close() + pool.join() + pruned_chunks = [collect.get() for collect in r] modified_obs = pd.concat(pruned_chunks) modified_obs.sort_values(by='ori',inplace=True) - adata.obs = modified_obs + return modified_obs diff --git a/program/scTriangulate.py b/program/scTriangulate.py index 4b3b04f..359f4ca 100755 --- a/program/scTriangulate.py +++ b/program/scTriangulate.py @@ -11,6 +11,7 @@ import matplotlib.pyplot as plt import seaborn as sns from scipy.stats import rankdata +from scipy.sparse import issparse,csr_matrix import multiprocessing as mp import scanpy as sc @@ -35,14 +36,14 @@ def check_filter_single_cluster(adata,key): return adata_valid + # to parallelize, define singular function -def each_key_program(adata,key): +def each_key_program(key): print(key,os.getpid()) - adata_to_compute = check_filter_single_cluster(adata,key) # every adata will be a copy + adata_to_compute = check_filter_single_cluster(adata,key) # be a view print('finished filtering') result = marker_gene(adata_to_compute,key=key) print('finished marker gene computing') - result.to_csv('./scTriangulate_result/marker_{0}.txt'.format(key),sep='\t') cluster_to_accuracy = reassign_score(adata_to_compute,key,result) print('finished reassign score') cluster_to_tfidf = tf_idf_for_cluster(adata_to_compute,key) @@ -62,6 +63,7 @@ def each_key_program(adata,key): # some diagnose plot draw_enrich_plots(key) draw_umap(adata,key) + print('finished diagnostic') # all the intermediate results needed to be returned collect = {'key':key, @@ -87,19 +89,23 @@ def each_key_program(adata,key): -# give an adata, have raw attribute (only need raw attribute), several obs column corresponding to different sets of annotations, +# give an adata, have raw attribute (only need X attribute), several obs column corresponding to different sets of annotations, # users supplied umap if preferred -adata = sc.read('./triangulate_input.h5ad') -query = ['leiden1','leiden2','leiden3'] -reference = 'leiden1' +adata = sc.read('./input.h5ad') +query = ['ch_hsc','ch_baso','ch_ly6d','ch_prime','ch_thymo','un_cd127','un_hsc','un_kit','un_multilin','un_prime','un_thymo'] +reference = 'ch_prime' # precomputing size size_dict,size_list = get_size(adata.obs,query) c,s = size_sort(size_list) +if issparse(adata.X): + adata.X = adata.X.toarray() + + # add a doublet column -counts_matrix = adata.raw.X.copy() +counts_matrix = adata.X scrub = scr.Scrublet(counts_matrix) doublet_scores,predicted_doublets = scrub.scrub_doublets(min_counts=1,min_cells=1) adata.obs['doublet_scores'] = doublet_scores @@ -109,20 +115,25 @@ def each_key_program(adata,key): plt.close() print('finished doublet check') +del counts_matrix +del scrub + + # compute metrics and map to original adata data_to_json = {} data_to_viewer = {} - cores = len(query) # make sure to request same numeber of cores as the length of query list pool = mp.Pool(processes=cores) -results = [pool.apply_async(each_key_program,args=(adata,key)) for key in query] # [collect,collect,collect,collect], will be AppyResult object +map_result = pool.map_async(each_key_program,query) # a map result object, need to call get() method +pool.close() +pool.join() +results = map_result.get() # [dict,dict,dict,dict] for collect in results: - collect = collect.get() key = collect['key'] - adata.obs['reassign_{}'.format(key)] = collect['col_reassign'] - adata.obs['tfidf_{}'.format(key)] = collect['col_tfidf'] - adata.obs['SCCAF_{}'.format(key)] = collect['col_SCCAF'] + adata.obs['reassign${}'.format(key)] = collect['col_reassign'] + adata.obs['tfidf${}'.format(key)] = collect['col_tfidf'] + adata.obs['SCCAF${}'.format(key)] = collect['col_SCCAF'] data_to_json[key] = collect['to_json'] data_to_viewer[key] = collect['to_viewer'] @@ -131,12 +142,15 @@ def each_key_program(adata,key): with open('./scTriangulate_present/key_cluster.p','wb') as f: pickle.dump(data_to_viewer,f) +adata.X = csr_matrix(adata.X) adata.write('./scTriangulate_result/after_metrics_computing.h5ad') adata.obs.to_csv('./scTriangulate_result/check_metrics.txt',sep='\t') print('finished metrics computing and diagnose plot generaton') print('----------------------------') -#adata = sc.read('./scTriangulate_result/after_metrics_computing.h5ad') + +# adata = sc.read('./scTriangulate_result/after_metrics_computing.h5ad') + # compute shaley value score_colname = ['reassign','tfidf','SCCAF'] @@ -148,64 +162,111 @@ def each_key_program(adata,key): width is how many score metrics ''' for i,key in enumerate(query): - practical_colname = [name + '_' + key for name in score_colname] + practical_colname = [name + '$' + key for name in score_colname] data[i,:,:] = adata.obs[practical_colname].values -with open('./scTriangulate_present/log.txt','w') as log: - final = [] - intermediate = [] - for i in range(data.shape[1]): - if i % 500 == 0 or i==data.shape[1]: - print('Cell{}'.format(i),file=log,flush=True) - layer = data[:,i,:] - result = [] - for j in range(layer.shape[0]): - result.append(shapley_value(j,layer)) - cluster_row = adata.obs.iloc[i].loc[query].values - to_take = which_to_take(adata,result,query,reference,cluster_row,size_dict) # which annotation this cell should adopt - final.append(to_take) - intermediate.append(result) - adata.obs['final_annotation'] = final - decisions = zip(*intermediate) - for i,d in enumerate(decisions): - adata.obs['{}_shapley'.format(query[i])] = d - +# parallelize +def run_shapley(data): + with open('./scTriangulate_present/log_shapley_step_{}.txt'.format(os.getpid()),'a') as log: + log.write('This core needs to process {} cells\n'.format(data.shape[1])) + log.flush() + final = [] + intermediate = [] + for i in range(data.shape[1]): + if i % 500 == 0 or i==data.shape[1]-1: + print('Cell{}'.format(i),file=log,flush=True) + layer = data[:,i,:] + result = [] + for j in range(layer.shape[0]): + result.append(shapley_value(j,layer)) + cluster_row = adata.obs.iloc[i].loc[query].values + to_take = which_to_take(result,query,reference,cluster_row,size_dict) # which annotation this cell should adopt + final.append(to_take) + intermediate.append(result) + return final,intermediate + +final = [] +intermediate = [] +cores = mp.cpu_count() +sub_datas = np.array_split(data,cores,axis=1) # [sub_data,sub_data,....] +pool = mp.Pool(processes=cores) +r = pool.map_async(run_shapley,sub_datas) +pool.close() +pool.join() +results = r.get() # [(final,intermediate), (), ()...] +for collect in results: + final.extend(collect[0]) + intermediate.extend(collect[1]) +adata.obs['final_annotation'] = final +decisions = list(zip(*intermediate)) +for i,d in enumerate(decisions): + adata.obs['{}_shapley'.format(query[i])] = d print('finished shapley computing') -# assign -assign = [] -for i in range(adata.obs.shape[0]): - name = adata.obs.iloc[i,:].loc['final_annotation'] - cluster = adata.obs.iloc[i,:].loc[name] - concat = name + '_' + cluster - assign.append(concat) -adata.obs['engraft'] = assign +adata.write('./scTriangulate_present/just_after_shapley.h5ad') + +# adata = sc.read('./just_after_shapley.h5ad') +# assign +# parallelize +def run_assign(obs): + with open('./scTriangulate_present/log_assign_step_{}.txt'.format(os.getpid()),'a') as log: + log.write('This core needs to process {} cells\n'.format(obs.shape[0])) + log.flush() + assign = [] + for i in range(obs.shape[0]): + if i % 500 == 0 or i==obs.shape[0]-1: + print('cell{}'.format(i),file=log,flush=True) + name = obs.iloc[i,:].loc['final_annotation'] + cluster = obs.iloc[i,:].loc[name] + concat = name + '$' + cluster + assign.append(concat) + obs['engraft'] = assign + return obs + +obs = adata.obs +obs_index = np.arange(obs.shape[0]) # [0,1,2,.....] +cores = mp.cpu_count() +sub_indices = np.array_split(obs_index,cores) # indices for each chunk [(0,1,2...),(56,57,58...),(),....] +sub_obs = [obs.iloc[sub_index,:] for sub_index in sub_indices] # [sub_df,sub_df,...] +pool = mp.Pool(processes=cores) +r = pool.map_async(run_assign,sub_obs) +pool.close() +pool.join() +results = r.get() # [sub_obs,sub_obs...] +obs = pd.concat(results) +adata.obs = obs print('finished engraft') +adata.write('./scTriangulate_present/just_after_engraft.h5ad') + + + # prune -reference_pruning(adata,reference,size_dict) +obs = reference_pruning(adata.obs,reference,size_dict) +adata.obs = obs print('finished pruning') +adata.write('./scTriangulate_present/just_after_prune.h5ad') + # prefix with reference cluster col1 = adata.obs['reassign'] col2 = adata.obs[reference] col = [] for i in range(len(col1)): - concat = reference + '_' + col2[i] + '|' + col1[i] + concat = reference + '$' + col2[i] + '|' + col1[i] col.append(concat) adata.obs['reassign_prefix'] = col print('finished prefix') # print out adata.obs.to_csv('./scTriangulate_present/shapley_annotation.txt',sep='\t') -adata.write('./scTriangulate_present/after_shapley.h5ad') -adata.raw.to_adata().write('./scTriangulate_present/after_shapley_to_cellxgene.h5ad') +adata.write('./scTriangulate_present/after_shapley_to_cellxgene.h5ad') + print('finished print out') # inspection (DE, small umap, seperate adata h5ad) -adata = sc.read('./scTriangulate_present/after_shapley.h5ad') plot_DE_umap_save(adata,reference) print('finished inspection') diff --git a/program/shapley.py b/program/shapley.py index 18aa59f..90b54f7 100755 --- a/program/shapley.py +++ b/program/shapley.py @@ -100,7 +100,7 @@ def shapley_value(index,data): -def which_to_take(adata,result,query,reference,cluster_row,size_dict): +def which_to_take(result,query,reference,cluster_row,size_dict): ''' query: [leiden0.5,leiden1,leiden2,gs] result: [0.3, 0.5, 0.4, 0.5]