import logging
import os
import warnings
import pandas as pd
import scipy.spatial
import scipy.sparse as sparse
import numpy as np
from joblib import Parallel, delayed
from typing import Literal
import random
from .console import COMPUTE_LOGGER, GENESET_LOGGER, INIT_LOGGER
from .interactive import GeneSetActivityScoresReport, PermutationTestReport
from .computation import (
bulk_standard_scale,
bulk_normalize,
gLPCA_sparse,
partition_kmeans_stratified,
partition_naive,
)
_init_logger = logging.getLogger(INIT_LOGGER)
_compute_logger = logging.getLogger(COMPUTE_LOGGER)
[docs]
class GESSO:
"""GESSO (Gene sEt activity Score analysis with Spatial lOcation)
is a model for spatially informed gene set expression analysis.
"""
[docs]
def __init__(
self,
expression_df: pd.DataFrame,
locations_df: pd.DataFrame,
genesets_df: pd.DataFrame | None = None,
k: int = 6,
normalize_counts_method: Literal[
"normalize", "normalize-log1p", "none"
] = "none",
verbose: bool = True,
):
"""Constructs a GESSO (Gene sEt activity Score analysis with Spatial lOcation)
model for spatially informed gene set expression analysis. Given spatial
transcriptomics data and a gene set or pathway, GESSO will return
a gene set activity score (GAS) for each spatial location (spot).
Parameters
----------
expression_df : pd.DataFrame ~ (n_spots, n_genes)
A DataFrame containing n_spots rows and n_genes columns.
The index will be interpreted as the spot ID.
The columns will be interpreted as gene names.
locations_df : pd.DataFrame ~ (n_spots, 2)
A DataFrame containing n_spots rows and 2 columns.
The index will be interpreted as the spot ID.
The index of `locations_df` must match the index of the `expression_df`.
The columns must be named 'x' and 'y'.
Each row represents the location (xy coordinates) of that spot.
genesets_df : pd.DataFrame ~ (n_genes, n_genesets) | None
Default: None.
A DataFrame containing n_genes rows and n_genesets columns.
The index will be interpreted as gene names.
The columns will be interpreted as geneset names.
The values must be binary (0 or 1). Entry (i, j) is 1 if gene i is
in geneset j, and 0 otherwise.
If None, gene sets can be provided later during GAS computation.
k : int
Default: 6. For k-nearest neighbors construction of the
location graph Laplacian.
normalize_counts_method : Literal["normalize", "normalize-log1p", "none"]
Default: "none". How to normalize the counts for each
spot. If "normalize", first scales the total counts for each
spot vector (row) to 1, then multiplies each spot vector (row)
by the median of the total counts for all spot vectors.
If "normalize-log1p", follows steps for "normalize" but also includes a
log1p transformation.
verbose : bool
Default: True. Per-instance override for emitting log messages from
this model. When True (default), messages flow through the standard
``gesso.*`` loggers; configure output via ``gesso.logging``. When
False, all messages from this model are suppressed regardless of
logger configuration.
"""
# preprocess input data
self._expression_df = expression_df.T.copy()
self._locations_df = locations_df.copy()
if genesets_df is not None:
self._genesets_df = genesets_df.copy()
else:
self._genesets_df = None
self._verbose = verbose
self._original_geneset_sizes: dict[str, int] = {}
self._force_common_genes()
self._force_common_cellid()
self._verify_examples_match()
self._verify_locations_df()
self._verify_gene_match()
self._laplacian = self._compute_laplacian_knn(k=k)
self._k = k
if normalize_counts_method == "normalize":
self._expression_df = pd.DataFrame(
bulk_normalize(self._expression_df.to_numpy(), log1p=False),
index=self._expression_df.index,
columns=self._expression_df.columns,
)
self._log_init("Normalized expression data with strategy 'normalize'.")
elif normalize_counts_method == "normalize-log1p":
self._expression_df = pd.DataFrame(
bulk_normalize(self._expression_df.to_numpy(), log1p=True),
index=self._expression_df.index,
columns=self._expression_df.columns,
)
self._log_init(
"Normalized expression data with strategy 'normalize-log1p'."
)
elif normalize_counts_method != "none":
raise ValueError("Invalid input for parameter 'normalize_counts'.")
self._q_cache = None
self._log_init("Model initialization complete.")
def _log_init(self, msg: str, level: int = logging.INFO) -> None:
if self._verbose:
_init_logger.log(level, msg)
def _log_compute(self, msg: str, level: int = logging.INFO) -> None:
if self._verbose:
_compute_logger.log(level, msg)
[docs]
def compute_gas(
self,
genesets: list[str] | None = None,
genesets_dict: dict[str, list[str]] | None = None,
beta: float = 0.33,
compute_method: Literal["cpu", "lowres"] = "cpu",
n_jobs: int = -1,
n_partitions: int | None = None,
partition_method: Literal["random", "stratified_kmeans"] = "stratified_kmeans",
partition_seed: int = 42,
store_gene_contributions: bool = True,
verbose: bool | None = None,
) -> GeneSetActivityScoresReport:
"""
Parameters
----------
genesets : list[str]
Default: None.
A list of gene set names for which the gene set activity scores (GASs)
should be computed. If None (and genesets_dict is None),
computes gene set activity scores for all genesets
provided in the provided genesets DataFrame.
genesets_dict : dict[str, list[str]] | None
Default: None.
A dictionary where the keys are geneset names and the values are lists
of genes in the geneset. Overrides the genesets parameter.
beta : float
Default: 0.33. Must be in the interval [0, 1]. Suggested beta < 0.5.
compute_method : Literal["cpu-sparse", "cpu", "lowres-sparse", "lowres"]
The method to use for computation.
n_jobs : int
Default: 1. Number of parallel jobs to run. If -1, uses half of
all available CPUs.
n_partitions : int | None
Default: None. Number of low resolution subsets to use for the lowres
method. Must be an integer if compute_method is "lowres-sparse" or
"lowres". Ignored if compute_method is "cpu-sparse" or "cpu".
If not specified, uses `n_partitions = int(n_spots / 5000)`.
If `n_partitions < 2`, uses `n_partitions = 2`.
partition_method : Literal["random", "stratified_kmeans"]
Default: "stratified_kmeans". Method to use for partitioning the
spots into subsets for the low resolution method. Ignored if
compute_method is "cpu-sparse" or "cpu".
partition_seed : int
Default: 42. Random seed for reproducibility.
store_gene_contributions : bool
Default: True. If True, stores gene contribution values.
Set to False for memory-intensive tasks that do not require gene contribution values.
verbose : bool | None
Default: None. Per-call override. If None, inherits from the model's
``verbose`` setting. If False, suppresses all messages (including
per-geneset worker output) for this call regardless of logger
configuration. If True, emits messages subject to the configuration
in ``gesso.logging``.
Returns
-------
GeneSetActivityScoresReport
A report containing the gene set activity scores DataFrame and
gene contribution DataFrames (if store_gene_contributions is True).
"""
if beta < 0 or beta > 1:
raise ValueError('Parameter "beta" must be in interval [0, 1].')
if genesets is not None and self._genesets_df is None:
raise ValueError(
"Gene sets DataFrame not provided. Cannot compute activity scores."
)
if genesets is None and genesets_dict is None:
if self._genesets_df is None:
raise ValueError(
"Gene sets DataFrame not provided. Cannot compute activity scores."
)
genesets = self._genesets_df.columns.to_list()
if not isinstance(genesets, list):
raise ValueError('Parameter "genesets" must be a list.')
elif genesets is None:
genesets = list(genesets_dict.keys())
if n_jobs == -1:
n_jobs = os.cpu_count()
if n_jobs < 1:
n_jobs = 1
n_jobs = min(len(genesets), n_jobs)
expression_gene_set = set(self._expression_df.index)
for geneset_name in genesets:
if genesets_dict is None:
n_filtered = int((self._genesets_df[geneset_name] == 1).sum())
n_original = self._original_geneset_sizes.get(
geneset_name, n_filtered
)
else:
genes = genesets_dict[geneset_name]
n_original = len(genes)
n_filtered = sum(1 for g in genes if g in expression_gene_set)
if n_original > 0 and n_filtered / n_original < 0.05:
warnings.warn(
f"Gene set '{geneset_name}': only {n_filtered}/{n_original} "
f"({100 * n_filtered / n_original:.2f}%) genes remain after "
f"filtering to genes in the dataset (< 5%). Activity scores "
f"for this gene set may be unreliable.",
stacklevel=2,
)
call_verbose = self._verbose if verbose is None else verbose
worker_log_level = logging.getLogger(GENESET_LOGGER).getEffectiveLevel()
# begin computation
if compute_method == "cpu":
if call_verbose:
_compute_logger.info(
f"Beginning activity score computation for {len(genesets)} "
f"gene sets with {n_jobs} jobs. Method used: {compute_method}."
)
gas_df = pd.DataFrame(columns=self._expression_df.columns)
geneset_to_gene_contributions_df_dict = dict()
L = self._laplacian
method_f = gLPCA_sparse
def process_geneset(
geneset: str, genes_in_geneset, job_num: int
) -> tuple[str, np.ndarray, np.ndarray, pd.Index]:
# keep only genes present in the expression matrix (preserves order)
genes_in_geneset = [
g for g in genes_in_geneset if g in expression_gene_set
]
X: np.ndarray = self._expression_df.loc[genes_in_geneset].to_numpy()
X = bulk_standard_scale(X, axis=1)
u, v, _, _ = method_f(
X=X,
L=L,
beta=beta,
geneset_name=geneset,
genes_in_geneset=genes_in_geneset,
job_num=job_num,
gene_contribution_sign_assignment_method="sign_overall_expression_proxy",
verbose=call_verbose,
worker_log_level=worker_log_level,
)
return geneset, v, u, genes_in_geneset
if genesets_dict is None:
results = Parallel(n_jobs=n_jobs)(
delayed(process_geneset)(
geneset,
self._genesets_df[self._genesets_df[geneset] == 1].index,
i + 1,
)
for i, geneset in enumerate(genesets)
)
else:
results = Parallel(n_jobs=n_jobs)(
delayed(process_geneset)(geneset, genes_in_geneset, i + 1)
for i, (geneset, genes_in_geneset) in enumerate(
genesets_dict.items()
)
)
for geneset, v, u, genes_in_geneset in results:
gas_df.loc[geneset] = v
if store_gene_contributions:
geneset_to_gene_contributions_df_dict[geneset] = pd.DataFrame(
u, index=genes_in_geneset, columns=[geneset]
)
return GeneSetActivityScoresReport(
gas_df=gas_df.transpose(),
locations_df=self._locations_df,
geneset_to_gene_contributions_df_dict=geneset_to_gene_contributions_df_dict,
)
elif compute_method == "lowres":
if call_verbose:
_compute_logger.info(
f"Beginning low resolution activity score computation for "
f"{len(genesets)} gene sets with {n_jobs} jobs. Method used: "
f"{compute_method}."
)
if n_partitions is None:
n_partitions = max(int(len(self._locations_df) / 5000), 2)
if partition_method == "random":
partitioned_indices = partition_naive(
df=self._locations_df, k=n_partitions, seed=partition_seed
)
elif partition_method == "stratified_kmeans":
partitioned_indices = partition_kmeans_stratified(
df=self._locations_df, k=n_partitions, seed=partition_seed
)
else:
raise ValueError(
f"Invalid input for parameter 'partition_method': "
f"{partition_method}."
)
method_f = gLPCA_sparse
def process_geneset(
geneset: str,
genes_in_geneset: pd.Index,
subset_index: pd.Index,
geneset_num: int,
subset_num: int,
job_num: int,
) -> tuple[str, np.ndarray, np.ndarray, pd.Index, int, int]:
X: np.ndarray = self._expression_df.loc[
genes_in_geneset, subset_index
].to_numpy()
local_laplacian = self._compute_laplacian_knn(
k=self._k, locations_df=self._locations_df.loc[subset_index]
)
X = bulk_standard_scale(X, axis=1)
u, v, _, _ = method_f(
X=X,
L=local_laplacian,
beta=beta,
geneset_name=geneset,
genes_in_geneset=genes_in_geneset,
job_num=job_num,
gene_contribution_sign_assignment_method="sign_overall_expression_proxy",
verbose=call_verbose,
worker_log_level=worker_log_level,
)
return (
geneset,
v,
u,
genes_in_geneset,
subset_index,
geneset_num,
subset_num,
)
parallel_input_list = []
job_num = 1
for geneset_num, geneset in enumerate(genesets):
if genesets_dict is None:
genes_in_geneset = self._genesets_df[
self._genesets_df[geneset] == 1
].index
else:
genes_in_geneset = genesets_dict[geneset]
# keep only genes present in the expression matrix (preserves order)
genes_in_geneset = [
g for g in genes_in_geneset if g in expression_gene_set
]
for subset_num, subset_index in enumerate(partitioned_indices):
parallel_input_list.append(
(
geneset,
genes_in_geneset,
subset_index,
geneset_num,
subset_num,
job_num,
)
)
job_num += 1
results = Parallel(n_jobs=n_jobs)(
delayed(process_geneset)(arg0, arg1, arg2, arg3, arg4, arg5)
for arg0, arg1, arg2, arg3, arg4, arg5 in parallel_input_list
)
geneset_to_reference_gene_idx = {}
geneset_to_flip_flags = {}
geneset_to_flip_count = {}
for result_idx, (geneset, _, u, _, _, _, _) in enumerate(results):
if geneset not in geneset_to_reference_gene_idx:
# first instance of low-res image for geneset
geneset_to_reference_gene_idx[geneset] = int(np.argmax(u))
geneset_to_flip_flags[geneset] = {result_idx: False}
geneset_to_flip_count[geneset] = 0
else:
median_weight = np.median(u)
needs_flip = (
u[geneset_to_reference_gene_idx[geneset]] < median_weight
)
geneset_to_flip_flags[geneset][result_idx] = needs_flip
geneset_to_flip_count[geneset] += int(needs_flip)
geneset_to_flip_majority = {}
for geneset in geneset_to_flip_flags.keys():
geneset_to_flip_majority[geneset] = (
geneset_to_flip_count[geneset]
> len(geneset_to_flip_flags[geneset]) / 2
)
gas_updates = []
if store_gene_contributions:
geneset_to_gene_contributions_list_dict = {g: [] for g in genesets}
for result_idx, (geneset, v, u, _, subset_index, _, _) in enumerate(
results
):
flip = geneset_to_flip_flags[geneset][result_idx]
do_flip = geneset_to_flip_majority[geneset] ^ flip # flip if needed
v_final = -v if do_flip else v
u_final = -u if do_flip else u
gas_updates.append((geneset, subset_index, v_final))
if store_gene_contributions:
geneset_to_gene_contributions_list_dict[geneset].append(u_final)
gas_df = pd.DataFrame(
np.nan, index=genesets, columns=self._expression_df.columns
)
# update GAS DataFrame
for geneset, subset_index, v in gas_updates:
gas_df.loc[geneset, subset_index] = v
# average gene contribution values across subsets
geneset_to_gene_contributions_df_dict = {}
if store_gene_contributions:
for (
geneset,
gene_contributions,
) in geneset_to_gene_contributions_list_dict.items():
genes_in_geneset = (
genesets_dict[geneset]
if genesets_dict is not None
else self._genesets_df[self._genesets_df[geneset] == 1].index
)
# match the filtering applied during computation above
genes_in_geneset = [
g for g in genes_in_geneset if g in expression_gene_set
]
gene_contributions_average = np.mean(gene_contributions, axis=0)
geneset_to_gene_contributions_df_dict[geneset] = pd.DataFrame(
gene_contributions_average,
index=genes_in_geneset,
columns=[geneset],
)
return GeneSetActivityScoresReport(
gas_df=gas_df.transpose(),
locations_df=self._locations_df,
geneset_to_gene_contributions_df_dict=geneset_to_gene_contributions_df_dict,
)
else:
raise ValueError("Invalid input for parameter 'compute_method'.")
[docs]
def htest_elevated_gas(
self,
geneset: str | None = None,
genes_in_geneset: list[str] | None = None,
beta: float = 0.33,
n_permutations: int = 500,
seed: int = 42,
n_jobs: int = -1,
) -> PermutationTestReport:
"""Conducts a permutation test at each spot to systematically identify
spots with significantly elevated gene set activity.
The null hypothesis is that the gene set activity score
at each spot is not significantly different from the
activity score of a randomly sampled set of genes
of the same size as the geneset.
Parameters
----------
geneset : str | None
Default: None. Name of the gene set to test. If None, genes_in_geneset must
be provided.
genes_in_geneset : list[str] | None
Default: None. List of genes in the gene set to test. If None, geneset must
be provided. Overrides geneset if not None.
beta : float
Default: 0.33. Must be in the interval [0, 1]. Suggested beta < 0.5.
n_permutations : int
Default: 500. Number of random gene sets to sample for the test.
seed : int
Default: 42. Random seed for reproducibility.
n_jobs : int
Default: -1. Number of parallel jobs to run. If -1, uses all available CPUs.
Returns
-------
PermutationTestReport
A report containing the gene set activity scores and p-values for each spot.
"""
if geneset is None and genes_in_geneset is None:
raise ValueError("Both 'geneset' and 'genes_in_geneset' cannot be None.")
all_genes = sorted(self._expression_df.index.to_list())
if geneset is not None:
if genes_in_geneset is None:
genes_in_geneset = self._genesets_df[
self._genesets_df[geneset] == 1
].index.to_list()
geneset_name = geneset
# if both geneset and genes_in_geneset are provided,
# we use genes_in_geneset, but keep the geneset as geneset name.
geneset_name = geneset
else:
if genes_in_geneset is None:
raise ValueError(
"If 'geneset' is None, 'genes_in_geneset' must be provided."
)
geneset_name = "USER_DEFINED"
genesets_dict = {geneset_name: genes_in_geneset}
# initialize an rng
random.seed(seed)
null_geneset_names = []
for i in range(n_permutations):
null_genes = random.sample(all_genes, len(genes_in_geneset))
random_geneset_name = f"random_geneset_{i+1}"
genesets_dict[random_geneset_name] = null_genes
null_geneset_names.append(random_geneset_name)
activity_scores_df = self.compute_gas(
genesets_dict=genesets_dict, beta=beta, n_jobs=n_jobs
).gas_df()
location_index = self._locations_df.index
# reindex by location index to ensure alignment
activity_scores_df = activity_scores_df.loc[location_index]
p_cap = activity_scores_df[geneset_name].to_numpy()
p_matrix = activity_scores_df[null_geneset_names].to_numpy().T
prob_greater = np.sum(p_matrix > p_cap, axis=0) / len(p_matrix)
p_vals = prob_greater
permutation_test_df = self._locations_df[["x", "y"]].join(
pd.DataFrame({"p": p_vals}, index=self._locations_df.index)
)
# since we already reindexed activity_scores_df by location_index,
# we can safely assign the geneset activity scores directly
permutation_test_df["gas"] = activity_scores_df[geneset_name].to_numpy()
# reorder columns to match expected output
permutation_test_df = permutation_test_df[["x", "y", "gas", "p"]]
return PermutationTestReport(
geneset=geneset_name,
permutation_test_df=permutation_test_df,
)
def _compute_laplacian_knn(
self, k: int = 20, locations_df: pd.DataFrame | None = None
) -> sparse.csr_matrix:
"""
Computes the graph laplacian describing topology of
locations based on k-nearest neighbors.
Parameters
----------
k : int
Default: 20. Number of nearest neighbors to connect for each location.
locations_df : pd.DataFrame | None
Default: None. DataFrame containing spatial coordinates of spots.
If None, uses the spatial coordinates provided during initialization.
Returns
-------
sparse.csr_matrix
"""
if locations_df is not None:
locations = locations_df[["x", "y"]].values
else:
locations = self._locations_df[["x", "y"]].values
N = locations.shape[0]
# Use cKDTree for efficient nearest neighbor search
tree = scipy.spatial.cKDTree(locations)
_, indices = tree.query(locations, k=k + 1, workers=-1) # +1 to exclude self
# Create sparse adjacency matrix
rows = np.repeat(np.arange(N), k)
cols = indices[:, 1:].ravel() # Exclude first column (self)
data = np.ones(N * k)
adjacency_matrix = sparse.csr_matrix((data, (rows, cols)), shape=(N, N))
# Compute Laplacian
degrees = adjacency_matrix.sum(axis=1).A1
laplacian = sparse.diags(degrees) - adjacency_matrix
self._log_init(
f"Constructed Laplacian matrix from location data with {k} nearest neighbors.",
level=logging.DEBUG,
)
return laplacian
def _verify_gene_match(self):
"""Checks that all genes match (i.e., indices of
self._gene_expression_df and self._genesets_df are equivalent).
Should be called after preprocessing.
"""
if self._genesets_df is None:
return
if len(self._expression_df) == 0:
raise ValueError(
"No genes remain after preprocessing. "
"Please ensure gene IDs match in gene_expression_df "
"and genesets_df."
)
expression_indices = self._expression_df.index
geneset_indices = self._genesets_df.index
if len(expression_indices) != len(geneset_indices):
raise ValueError(
"Number of genes in expression_df doesn't match "
"number of genes in genesets_df"
)
if np.array_equal(expression_indices.values, geneset_indices.values):
return
def check_match(idx_1, idx_2):
if idx_1 != idx_2:
return f"{idx_1} != {idx_2}"
return None
results = Parallel(n_jobs=-1)(
delayed(check_match)(idx1, idx2)
for idx1, idx2 in zip(expression_indices, geneset_indices)
)
mismatches = [result for result in results if result is not None]
if mismatches:
raise ValueError(
"Gene index mismatch following preprocessing: " + ", ".join(mismatches)
)
def _verify_examples_match(self):
"""Checks that all examples match (i.e., columns of
self._gene_expression_df and index of self._locations_df are equivalent).
Should be called prior to preprocessing.
"""
def check_match(col_1, idx_2):
if col_1 != idx_2:
return f"{col_1} != {idx_2}"
return None
columns = self._expression_df.columns
indices = self._locations_df.index
if len(columns) != len(indices):
raise ValueError(
"Number of columns in expression_df doesn't match number of "
"indices in locations_df"
)
if np.array_equal(columns.values, indices.values):
return
results = Parallel(n_jobs=-1)(
delayed(check_match)(col, idx) for col, idx in zip(columns, indices)
)
mismatches = [result for result in results if result is not None]
if mismatches:
raise ValueError(
"Examples column-index mismatch following preprocessing: "
+ ", ".join(mismatches)
)
def _verify_genesets(self, genesets: list[str]):
"""Checks that all genesets of interest actually exist in
self._genesets_df.
Parameters
----------
genesets : list[str]
"""
geneset_set = set(self._genesets_df.index)
# Use numpy for a quick check
if np.all(np.isin(genesets, list(geneset_set))):
return
def check_geneset(geneset):
if geneset not in geneset_set:
return geneset
return None
results = Parallel(n_jobs=-1)(
delayed(check_geneset)(geneset) for geneset in genesets
)
missing_genesets = [result for result in results if result is not None]
if missing_genesets:
raise ValueError(
"Query gene set(s) not in input geneset df: "
f"{', '.join(missing_genesets)}"
)
def _verify_locations_df(self):
"""
Checks that the format of locations_df is reasonable.
Verifies the presence of 'x' and 'y' columns and ensures
they contain numeric data.
"""
required_columns = {"x", "y"}
columns = set(self._locations_df.columns)
missing_columns = required_columns - columns
if missing_columns:
raise ValueError(
"Missing required columns in locations df: "
f"{', '.join(missing_columns)}"
)
for col in required_columns:
if not np.issubdtype(self._locations_df[col].dtype, np.number):
raise ValueError(
f"Column '{col}' in locations df must contain numeric data"
)
if self._locations_df[list(required_columns)].isnull().any().any():
raise ValueError("locations df contains NaN values in 'x' or 'y' columns")
if np.isinf(self._locations_df[list(required_columns)]).any().any():
raise ValueError(
"locations df contains infinite values in 'x' or 'y' columns"
)
def _force_common_genes(self):
"""
Finds the common subset of genes. Then, indexes the gene set and
expression dataframes to only include the common genes.
"""
if self._genesets_df is None:
return
def process_dataframe(df: pd.DataFrame) -> pd.DataFrame:
df = df[~df.index.duplicated(keep="first")]
df.index.name = None
return df
self._expression_df = process_dataframe(self._expression_df)
self._genesets_df = process_dataframe(self._genesets_df)
# record original geneset sizes prior to common-gene filtering
self._original_geneset_sizes = (
(self._genesets_df == 1).sum(axis=0).astype(int).to_dict()
)
genes_geneset_set = set(self._genesets_df.index)
genes_expression_set = set(self._expression_df.index)
common_genes_set = genes_geneset_set.intersection(genes_expression_set)
n_genes_removed_geneset = len(genes_geneset_set - common_genes_set)
n_genes_removed_expression = len(genes_expression_set - common_genes_set)
if n_genes_removed_geneset > 0:
self._log_init(
f"Removed {n_genes_removed_geneset} genes not found in geneset data. "
f"{len(common_genes_set)} genes remain."
)
if n_genes_removed_expression > 0:
self._log_init(
f"Removed {n_genes_removed_expression} genes not found in expression "
f"data. {len(common_genes_set)} genes remain."
)
self._log_init(
f"Identified {len(common_genes_set)} common genes in the gene set "
"and expression data."
)
original_expression_order = self._expression_df.index
common_genes_list = [
gene_id
for gene_id in original_expression_order
if gene_id in common_genes_set
]
self._genesets_df = self._genesets_df.loc[common_genes_list]
self._expression_df = self._expression_df.loc[common_genes_list]
def _force_common_cellid(self):
"""
Finds the common subset of spot/cell id index between the location and
expression dataframes. Then, indexes the location and
expression dataframes to only include the intersection of their indices.
"""
obs_locations_set = set(self._locations_df.index)
obs_expression_set = set(self._expression_df.columns)
common_spots_set = obs_locations_set.intersection(obs_expression_set)
n_spots_removed_location = len(obs_locations_set - common_spots_set)
n_spots_removed_expression = len(obs_expression_set - common_spots_set)
if n_spots_removed_location > 0:
self._log_init(
f"Removed {n_spots_removed_location} spots not found in expression "
f"data. {len(common_spots_set)} spots remain."
)
if n_spots_removed_expression > 0:
self._log_init(
f"Removed {n_spots_removed_expression} spots not found in location "
f"data. {len(common_spots_set)} spots remain."
)
self._log_init(
f"Identified {len(common_spots_set)} common spots in the location "
"and expression data."
)
original_loc_order = self._locations_df.index
common_spots_list = [
spot_id for spot_id in original_loc_order if spot_id in common_spots_set
]
self._locations_df = self._locations_df.loc[common_spots_list]
self._expression_df = self._expression_df[common_spots_list]