scldm.datamodule.DataModule#

class scldm.datamodule.DataModule(train_adata_path, test_adata_path, adata_attr, adata_key, vocabulary_encoder, val_as_test=True, data_path=None, batch_size=256, test_batch_size=256, num_workers=4, seed=42, prefetch_factor=4, persistent_workers=True, drop_last_indices=False, drop_incomplete_batch=True, sample_genes='none', genes_seq_len=100, **kwargs)#

Attributes table#

CHECKPOINT_HYPER_PARAMS_KEY

CHECKPOINT_HYPER_PARAMS_NAME

CHECKPOINT_HYPER_PARAMS_TYPE

adata_inference

hparams

The collection of hyperparameters saved with save_hyperparameters().

hparams_initial

The collection of hyperparameters saved with save_hyperparameters().

name

Methods table#

collate_fn_annloader(batch, sample_genes, ...)

from_datasets([train_dataset, val_dataset, ...])

Create an instance from torch.utils.data.Dataset.

get_library_size(x)

Compute library size (total counts per cell).

load_from_checkpoint(checkpoint_path[, ...])

Primary way of loading a datamodule from a checkpoint.

load_state_dict(state_dict)

Called when loading a checkpoint, implement to reload datamodule state given datamodule state_dict.

on_after_batch_transfer(batch, dataloader_idx)

Override to alter or apply batch augmentations to your batch after it is transferred to the device.

on_before_batch_transfer(batch, dataloader_idx)

Override to alter or apply batch augmentations to your batch before it is transferred to the device.

on_exception(exception)

Called when the trainer execution is interrupted by an exception.

predict_dataloader()

An iterable or collection of iterables specifying prediction samples.

prepare_data()

Use this to download and prepare data.

save_hyperparameters(*args[, ignore, frame, ...])

Save arguments to hparams attribute.

setup([stage])

Called at the beginning of fit (train + validate), validate, test, or predict.

state_dict()

Called when saving a checkpoint, implement to generate and save datamodule state.

teardown(stage)

Called at the end of fit (train + validate), validate, test, or predict.

test_dataloader()

An iterable or collection of iterables specifying test samples.

train_dataloader()

An iterable or collection of iterables specifying training samples.

transfer_batch_to_device(batch, device, ...)

Override this hook if your DataLoader returns tensors wrapped in a custom data structure.

val_dataloader()

An iterable or collection of iterables specifying validation samples.

Attributes#

DataModule.CHECKPOINT_HYPER_PARAMS_KEY = 'datamodule_hyper_parameters'#
DataModule.CHECKPOINT_HYPER_PARAMS_NAME = 'datamodule_hparams_name'#
DataModule.CHECKPOINT_HYPER_PARAMS_TYPE = 'datamodule_hparams_type'#
DataModule.adata_inference#
DataModule.hparams#

The collection of hyperparameters saved with save_hyperparameters(). It is mutable by the user. For the frozen set of initial hyperparameters, use hparams_initial.

Returns:

Mutable hyperparameters dictionary

DataModule.hparams_initial#

The collection of hyperparameters saved with save_hyperparameters(). These contents are read-only. Manual updates to the saved hyperparameters can instead be performed through hparams.

Returns:

AttributeDict: immutable initial hyperparameters

DataModule.name: Optional[str] = None#

Methods#

DataModule.collate_fn_annloader(batch, sample_genes, genes_seq_len)#
classmethod DataModule.from_datasets(train_dataset=None, val_dataset=None, test_dataset=None, predict_dataset=None, batch_size=1, num_workers=0, **datamodule_kwargs)#

Create an instance from torch.utils.data.Dataset.

Return type:

LightningDataModule

Args:

train_dataset: Optional dataset or iterable of datasets to be used for train_dataloader() val_dataset: Optional dataset or iterable of datasets to be used for val_dataloader() test_dataset: Optional dataset or iterable of datasets to be used for test_dataloader() predict_dataset: Optional dataset or iterable of datasets to be used for predict_dataloader() batch_size: Batch size to use for each dataloader. Default is 1. This parameter gets forwarded to the

__init__ if the datamodule has such a name defined in its signature.

num_workers: Number of subprocesses to use for data loading. 0 means that the

data will be loaded in the main process. Number of CPUs available. This parameter gets forwarded to the __init__ if the datamodule has such a name defined in its signature.

**datamodule_kwargs: Additional parameters that get passed down to the datamodule’s __init__.

DataModule.get_library_size(x)#

Compute library size (total counts per cell).

Return type:

ndarray

DataModule.load_from_checkpoint(checkpoint_path, map_location=None, hparams_file=None, **kwargs)#

Primary way of loading a datamodule from a checkpoint. When Lightning saves a checkpoint it stores the arguments passed to __init__ in the checkpoint under "datamodule_hyper_parameters".

Any arguments specified through **kwargs will override args stored in "datamodule_hyper_parameters".

Return type:

Self

Args:

checkpoint_path: Path to checkpoint. This can also be a URL, or file-like object map_location:

If your checkpoint saved a GPU model and you now load on CPUs or a different number of GPUs, use this to map to the new setup. The behaviour is the same as in torch.load().

hparams_file: Optional path to a .yaml or .csv file with hierarchical structure

as in this example:

dataloader:
    batch_size: 32

You most likely won’t need this since Lightning will always save the hyperparameters to the checkpoint. However, if your checkpoint weights don’t have the hyperparameters saved, use this method to pass in a .yaml file with the hparams you’d like to use. These will be converted into a dict and passed into your LightningDataModule for use.

If your datamodule’s hparams argument is Namespace and .yaml file has hierarchical structure, you need to refactor your datamodule to treat hparams as dict.

**kwargs: Any extra keyword args needed to init the datamodule. Can also be used to override saved

hyperparameter values.

Return:

LightningDataModule instance with loaded weights and hyperparameters (if available).

Note:

load_from_checkpoint is a class method. You must use your LightningDataModule class to call it instead of the LightningDataModule instance, or a TypeError will be raised.

Example:

# load weights without mapping ...
datamodule = MyLightningDataModule.load_from_checkpoint('path/to/checkpoint.ckpt')

# or load weights and hyperparameters from separate files.
datamodule = MyLightningDataModule.load_from_checkpoint(
    'path/to/checkpoint.ckpt',
    hparams_file='/path/to/hparams_file.yaml'
)

# override some of the params with new values
datamodule = MyLightningDataModule.load_from_checkpoint(
    PATH,
    batch_size=32,
    num_workers=10,
)
DataModule.load_state_dict(state_dict)#

Called when loading a checkpoint, implement to reload datamodule state given datamodule state_dict.

Return type:

None

Args:

state_dict: the datamodule state returned by state_dict.

DataModule.on_after_batch_transfer(batch, dataloader_idx)#

Override to alter or apply batch augmentations to your batch after it is transferred to the device.

Return type:

Any

Note:

To check the current state of execution of this hook you can use self.trainer.training/testing/validating/predicting so that you can add different logic as per your requirement.

Args:

batch: A batch of data that needs to be altered or augmented. dataloader_idx: The index of the dataloader to which the batch belongs.

Returns:

A batch of data

Example:

def on_after_batch_transfer(self, batch, dataloader_idx):
    batch['x'] = gpu_transforms(batch['x'])
    return batch
See Also:
DataModule.on_before_batch_transfer(batch, dataloader_idx)#

Override to alter or apply batch augmentations to your batch before it is transferred to the device.

Return type:

Any

Note:

To check the current state of execution of this hook you can use self.trainer.training/testing/validating/predicting so that you can add different logic as per your requirement.

Args:

batch: A batch of data that needs to be altered or augmented. dataloader_idx: The index of the dataloader to which the batch belongs.

Returns:

A batch of data

Example:

def on_before_batch_transfer(self, batch, dataloader_idx):
    batch['x'] = transforms(batch['x'])
    return batch
See Also:
DataModule.on_exception(exception)#

Called when the trainer execution is interrupted by an exception.

Return type:

None

DataModule.predict_dataloader()#

An iterable or collection of iterables specifying prediction samples.

For more information about multiple dataloaders, see this section.

It’s recommended that all data downloads and preparation happen in prepare_data().

Note:

Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.

Return:

A torch.utils.data.DataLoader or a sequence of them specifying prediction samples.

DataModule.prepare_data()#

Use this to download and prepare data. Downloading and saving data with multiple processes (distributed settings) will result in corrupted data. Lightning ensures this method is called only within a single process, so you can safely add your downloading logic within.

Warning

DO NOT set state to the model (use setup instead) since this is NOT called on every device

Example:

def prepare_data(self):
    # good
    download_data()
    tokenize()
    etc()

    # bad
    self.split = data_split
    self.some_state = some_other_state()

In a distributed environment, prepare_data can be called in two ways (using prepare_data_per_node)

  1. Once per node. This is the default and is only called on LOCAL_RANK=0.

  2. Once in total. Only called on GLOBAL_RANK=0.

Example:

# DEFAULT
# called once per node on LOCAL_RANK=0 of that node
class LitDataModule(LightningDataModule):
    def __init__(self):
        super().__init__()
        self.prepare_data_per_node = True


# call on GLOBAL_RANK=0 (great for shared file systems)
class LitDataModule(LightningDataModule):
    def __init__(self):
        super().__init__()
        self.prepare_data_per_node = False

This is called before requesting the dataloaders:

model.prepare_data()
initialize_distributed()
model.setup(stage)
model.train_dataloader()
model.val_dataloader()
model.test_dataloader()
model.predict_dataloader()
Return type:

None

DataModule.save_hyperparameters(*args, ignore=None, frame=None, logger=True)#

Save arguments to hparams attribute.

Return type:

None

Args:
args: single object of dict, NameSpace or OmegaConf

or string names or arguments from class __init__

ignore: an argument name or a list of argument names from

class __init__ to be ignored

frame: a frame object. Default is None logger: Whether to send the hyperparameters to the logger. Default: True

Example::
>>> from pytorch_lightning.core.mixins import HyperparametersMixin
>>> class ManuallyArgsModel(HyperparametersMixin):
...     def __init__(self, arg1, arg2, arg3):
...         super().__init__()
...         # manually assign arguments
...         self.save_hyperparameters('arg1', 'arg3')
...     def forward(self, *args, **kwargs):
...         ...
>>> model = ManuallyArgsModel(1, 'abc', 3.14)
>>> model.hparams
"arg1": 1
"arg3": 3.14
>>> from pytorch_lightning.core.mixins import HyperparametersMixin
>>> class AutomaticArgsModel(HyperparametersMixin):
...     def __init__(self, arg1, arg2, arg3):
...         super().__init__()
...         # equivalent automatic
...         self.save_hyperparameters()
...     def forward(self, *args, **kwargs):
...         ...
>>> model = AutomaticArgsModel(1, 'abc', 3.14)
>>> model.hparams
"arg1": 1
"arg2": abc
"arg3": 3.14
>>> from pytorch_lightning.core.mixins import HyperparametersMixin
>>> class SingleArgModel(HyperparametersMixin):
...     def __init__(self, params):
...         super().__init__()
...         # manually assign single argument
...         self.save_hyperparameters(params)
...     def forward(self, *args, **kwargs):
...         ...
>>> model = SingleArgModel(Namespace(p1=1, p2='abc', p3=3.14))
>>> model.hparams
"p1": 1
"p2": abc
"p3": 3.14
>>> from pytorch_lightning.core.mixins import HyperparametersMixin
>>> class ManuallyArgsModel(HyperparametersMixin):
...     def __init__(self, arg1, arg2, arg3):
...         super().__init__()
...         # pass argument(s) to ignore as a string or in a list
...         self.save_hyperparameters(ignore='arg2')
...     def forward(self, *args, **kwargs):
...         ...
>>> model = ManuallyArgsModel(1, 'abc', 3.14)
>>> model.hparams
"arg1": 1
"arg3": 3.14
DataModule.setup(stage=None)#

Called at the beginning of fit (train + validate), validate, test, or predict. This is a good hook when you need to build models dynamically or adjust something about them. This hook is called on every process when using DDP.

Args:

stage: either 'fit', 'validate', 'test', or 'predict'

Example:

class LitModel(...):
    def __init__(self):
        self.l1 = None

    def prepare_data(self):
        download_data()
        tokenize()

        # don't do this
        self.something = else

    def setup(self, stage):
        data = load_data(...)
        self.l1 = nn.Linear(28, data.num_classes)
DataModule.state_dict()#

Called when saving a checkpoint, implement to generate and save datamodule state.

Return type:

dict[str, Any]

Returns:

A dictionary containing datamodule state.

DataModule.teardown(stage)#

Called at the end of fit (train + validate), validate, test, or predict.

Return type:

None

Args:

stage: either 'fit', 'validate', 'test', or 'predict'

DataModule.test_dataloader()#

An iterable or collection of iterables specifying test samples.

For more information about multiple dataloaders, see this section.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note:

Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

Note:

If you don’t need a test dataset and a test_step(), you don’t need to implement this method.

DataModule.train_dataloader()#

An iterable or collection of iterables specifying training samples.

For more information about multiple dataloaders, see this section.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

For data processing use the following pattern:

However, the above are only necessary for distributed processing.

Warning

do not assign state in prepare_data

Note:

Lightning tries to add the correct sampler for distributed and arbitrary hardware. There is no need to set it yourself.

DataModule.transfer_batch_to_device(batch, device, dataloader_idx)#

Override this hook if your DataLoader returns tensors wrapped in a custom data structure.

The data types listed below (and any arbitrary nesting of them) are supported out of the box:

For anything else, you need to define how the data is moved to the target device (CPU, GPU, TPU, …).

Note:

This hook should only transfer the data and not modify it, nor should it move the data to any other device than the one passed in as argument (unless you know what you are doing). To check the current state of execution of this hook you can use self.trainer.training/testing/validating/predicting so that you can add different logic as per your requirement.

Args:

batch: A batch of data that needs to be transferred to a new device. device: The target device as defined in PyTorch. dataloader_idx: The index of the dataloader to which the batch belongs.

Returns:

A reference to the data on the new device.

Example:

def transfer_batch_to_device(self, batch, device, dataloader_idx):
    if isinstance(batch, CustomBatch):
        # move all tensors in your custom data structure to the device
        batch.samples = batch.samples.to(device)
        batch.targets = batch.targets.to(device)
    elif dataloader_idx == 0:
        # skip device transfer for the first dataloader or anything you wish
        pass
    else:
        batch = super().transfer_batch_to_device(batch, device, dataloader_idx)
    return batch
See Also:
  • move_data_to_device()

  • apply_to_collection()

Return type:

Any

DataModule.val_dataloader()#

An iterable or collection of iterables specifying validation samples.

For more information about multiple dataloaders, see this section.

The dataloader you return will not be reloaded unless you set :paramref:`~pytorch_lightning.trainer.trainer.Trainer.reload_dataloaders_every_n_epochs` to a positive integer.

It’s recommended that all data downloads and preparation happen in prepare_data().

Note:

Lightning tries to add the correct sampler for distributed and arbitrary hardware There is no need to set it yourself.

Note:

If you don’t need a validation dataset and a validation_step(), you don’t need to implement this method.