Skip to content
Snippets Groups Projects
batch_runner.py 4.8 KiB
Newer Older
from pathlib import Path
import sys

from cadetrdm import clone, Options, ProjectRepo


class Study(ProjectRepo):
    def __init__(self, path, url, branch="main", *args, **kwargs):
        self.name = Path(path).parts[-1]
        self.url = url

        try:
            if not path.exists():
                clone(self.url, path)
                if branch != "main":
                    ProjectRepo(path).checkout(branch)
        except Exception as e:
            print(f"Error processing study {self.name}: {e}")
            return

        super().__init__(path, *args, **kwargs)


class Case:
    def __init__(self, study: Study, options: Options, name: str = None):
        if name is None:
            name = options.get_hash()
        self.name = name
        self.study = study
        self.options = options
r.jaepel's avatar
r.jaepel committed
        self._results_branch = None

    @property
    def status_file(self):
        return Path(self.study.path).parent / (Path(self.study.path).name + ".status")

    @property
    def status(self):
        status, _ = self._read_status()
        return status

    @status.setter
    def status(self, status):
        """Update the status file with the current execution status."""

        with open(self.status_file, "w") as f:
            f.write(f"{status}@{self.study.current_commit_hash}")

    @property
    def status_hash(self):
        _, status_hash = self._read_status()
        return status_hash

    def _read_status(self):
        """Check the status of the study and decide whether to proceed.

        Args:
            repo_path (Path): The path to the repository containing the status file.

        Returns:
            tuple: A tuple containing the status string and the current hash,
            or None, None if the status cannot be determined.
        """

        if not self.status_file.exists():
            return None, None

        with open(self.status_file) as f:
            status = f.read().strip()
            try:
                status, current_hash = status.split("@")
            except ValueError as e:
                if status == '':
                    return None, None
                else:
                    raise e

            return status, current_hash

    @property
    def is_running(self, ):
        if self.status == 'running':
            return True

        return False

    @property
    def has_results_for_this_run(self):
        if self.results_branch is None:
r.jaepel's avatar
r.jaepel committed
            return False
        else:
            return True

    @property
    def results_branch(self):
        # if self._results_branch is None:
        #     self._results_branch = self._get_results_branch()
        return self._get_results_branch()
r.jaepel's avatar
r.jaepel committed

    def _get_results_branch(self):
        output_log = self.study.output_log
        for log_entry in output_log:
            if (self.study.current_commit_hash == log_entry.project_repo_commit_hash
                    and self.options.get_hash() == log_entry.options_hash):
r.jaepel's avatar
r.jaepel committed
                return log_entry.output_repo_branch
        return None

    def run_study(self, force=False):
        """Run specified study commands in the given repository."""
        if self.is_running and not force:
            print(f"{self.study.name} is currently running. Skipping...")
            return

        print(f"Running {self.name} in {self.study.path} with: {self.options}")
        if not self.options.debug:
            self.study.update()
        else:
            print("WARNING: Not updating the repositories while in debug mode.")

        if self.has_results_for_this_run and not force:
            print(f"{self.study.path} has already been computed with these options. Skipping...")
            return

        try:
            cur_dir = os.getcwd()

            os.chdir(self.study.path)
            sys.path.append(str(self.study.path))
            module = importlib.import_module(self.study.name)

            self.status = 'running'

            module.main(self.options, str(self.study.path))

            print("Command execution successful.")
            self.status = 'finished'

            sys.path.remove(str(self.study.path))
            os.chdir(cur_dir)
        except (KeyboardInterrupt, Exception) as e:
            print(f"Command execution failed: {e}")
            self.status = 'failed'
            return
r.jaepel's avatar
r.jaepel committed

    @property
    def _results_path(self):
        return self.study.path / (self.study._output_folder + "_cached") / self.results_branch

    def load(self, ):
        if self.results_branch is None:
            print(f"No results available for Case({self.study.path, self.options.get_hash()[:7]})")
r.jaepel's avatar
r.jaepel committed
            return None

        if self._results_path.exists():
            return

        self.study.copy_data_to_cache(self.results_branch)

    @property
    def results_path(self):
        self.load()

        return self._results_path