Newer
Older
from urllib.request import urlretrieve
from cadetrdm.io_utils import recursive_chmod, write_lines_to_file, wait_for_user, init_lfs
from cadetrdm.remote_integration import GitHubRemote, GitLabRemote
try:
import git
except ImportError:
# Adding this hint to save users the confusion of trying $pip install git
raise ImportError("No module named git, please install the gitpython package")
from cadetrdm.web_utils import ssh_url_to_http_url
from cadetrdm.io_utils import delete_path
def validate_is_output_repo(path_to_repo):
with open(os.path.join(path_to_repo, ".cadet-rdm-data.json"), "r") as file_handle:
rdm_data = json.load(file_handle)
if rdm_data["is_project_repo"]:
raise ValueError("Please use the URL to the output repository.")
def __init__(self, repository_path=None, search_parent_directories=True, *args, **kwargs):
Base class handling most git workflows.
:param repository_path:
Path to the root directory of the repository.
:param search_parent_directories:
if True, all parent directories will be searched for a valid repo as well.
Please note that this was the default behaviour in older versions of GitPython,
which is considered a bug though.
:param args:
Args handed to git.Repo()
:param kwargs:
Kwargs handed to git.Repo()
if repository_path is None or repository_path == ".":
if type(repository_path) is str:
repository_path = Path(repository_path)
self._git_repo = git.Repo(repository_path, search_parent_directories=search_parent_directories, *args, **kwargs)
self._git = self._git_repo.git
self._most_recent_branch = self.active_branch.name
self._earliest_commit = None
@property
def active_branch(self):
@property
def untracked_files(self):
@property
def current_commit_hash(self):
return str(self.head.commit)
@property
def path(self):
return Path(self._git_repo.working_dir)
@property
def bare(self):
return self._git_repo.bare
@property
def working_dir(self):
print("Deprecation Warning. .working_dir is getting replaced with .path")
@property
def head(self):
@property
def remotes(self):
@property
def remote_urls(self):
if len(self.remotes) == 0:
print(RuntimeWarning(f"No remote for repo at {self.path} set yet. Please add remote ASAP."))
return [str(remote.url) for remote in self.remotes]
def earliest_commit(self):
if self._earliest_commit is None:
*_, earliest_commit = self._git_repo.iter_commits()
self._earliest_commit = earliest_commit
return self._earliest_commit
return list()
@property
def data_json_path(self):
@property
def cache_json_path(self):
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
return self.path / ".cadet-rdm-cache.json"
@property
def has_changes_upstream(self):
try:
remote_hash = str(self.remotes[0].fetch()[0].commit)
if self.current_commit_hash != remote_hash:
return True
else:
return False
except git.GitCommandError as e:
traceback.print_exc()
print(f"Git command error in {self.path}: {e}")
def fetch(self):
self._git.fetch()
def update(self):
try:
self.fetch()
if self.has_changes_upstream:
print(f"New changes detected in {self.remotes[0].origin}, pulling updates...")
self.remotes[0].origin.pull()
except git.GitCommandError as e:
traceback.print_exc()
print(f"Git command error in {self.path}: {e}")
:param remote_url:
:param remote_name:
:return:
"""
self._git_repo.create_remote(remote_name, url=remote_url)
with open(self.data_json_path, "r") as handle:
rdm_data = json.load(handle)
if rdm_data["is_project_repo"]:
# This folder is a project repo. Use a project repo class to easily access the output repo.
output_repo = ProjectRepo(self.path).output_repo
if output_repo.active_branch != "main":
if output_repo.exist_uncomitted_changes:
output_repo.stash_all_changes()
output_repo.checkout("main")
output_repo.add_list_of_remotes_in_readme_file("project_repo", self.remote_urls)
output_repo.commit("Add remote for project repo", verbosity=0)
if rdm_data["is_output_repo"]:
project_repo.update_output_remotes_json()
project_repo.add_list_of_remotes_in_readme_file("output_repo", self.remote_urls)
project_repo.commit("Add remote for output repo", verbosity=0)
def add_filetype_to_lfs(self, file_type):
"""
Add the filetype given in file_type to the GIT-LFS tracking
:param file_type:
Wildcard formatted string. Examples: "*.png" or "*.xlsx"
:return:
"""
init_lfs(lfs_filetypes=[file_type], path=self.path)
self.add_all_files()
self.commit(f"Add {file_type} to lfs")
def import_remote_repo(self, source_repo_location, source_repo_branch, target_repo_location=None):
"""
Import a remote repo and update the cadet-rdm-cache
:param source_repo_location:
Path or URL to the source repo.
Example https://jugit.fz-juelich.de/IBG-1/ModSim/cadet/agile_cadet_rdm_presentation_output.git
or git@jugit.fz-juelich.de:IBG-1/ModSim/cadet/agile_cadet_rdm_presentation_output.git
:param source_repo_branch:
Branch of the source repo to check out.
:param target_repo_location:
Place to store the repo. If None, the external_cache folder is used.
:return:
Path to the cloned repository
"""
if "://" in str(source_repo_location):
source_repo_name = source_repo_location.split("/")[-1]
else:
source_repo_name = Path(source_repo_location).name
target_repo_location = self.path / "external_cache" / source_repo_name
target_repo_location = self.path / target_repo_location
self.add_path_to_gitignore(target_repo_location)
print(f"Cloning from {source_repo_location} into {target_repo_location}")
multi_options = ["--filter=blob:none", "--branch", source_repo_branch, "--single-branch"]
repo = git.Repo.clone_from(source_repo_location, target_repo_location, multi_options=multi_options)
repo.git.clear_cache()
self.update_cadet_rdm_cache_json(source_repo_branch=source_repo_branch,
target_repo_location=target_repo_location,
source_repo_location=source_repo_location)
return target_repo_location
def add_path_to_gitignore(self, path_to_be_ignored):
"""
Add the path to the .gitignore file
:param path_to_be_ignored:
:return:
"""
path_to_be_ignored = self.ensure_relative_path(path_to_be_ignored)
with open(self.path / ".gitignore", "r") as file_handle:
gitignore = file_handle.readlines()
gitignore[-1] += "\n" # Sometimes there is no trailing newline
if str(path_to_be_ignored) + "\n" not in gitignore:
gitignore.append(str(path_to_be_ignored) + "\n")
with open(self.path / ".gitignore", "w") as file_handle:
file_handle.writelines(gitignore)
def update_cadet_rdm_cache_json(self, source_repo_location, source_repo_branch, target_repo_location):
"""
Update the information in the .cadet_rdm_cache.json file
:param source_repo_location:
Path or URL to the source repo.
:param source_repo_branch:
Name of the branch to check out.
:param target_repo_location:
Path where to put the repo or data
"""
with open(self.cache_json_path, "w") as file_handle:
file_handle.writelines("{}")
with open(self.cache_json_path, "r") as file_handle:
rdm_cache = json.load(file_handle)
repo = BaseRepo(target_repo_location)
commit_hash = repo.current_commit_hash
if "__example/path/to/repo__" in rdm_cache.keys():
rdm_cache.pop("__example/path/to/repo__")
target_repo_location = str(self.ensure_relative_path(target_repo_location))
if isinstance(source_repo_location, Path):
source_repo_location = source_repo_location.as_posix()
rdm_cache[target_repo_location] = {
"source_repo_location": source_repo_location,
"branch_name": source_repo_branch,
"commit_hash": commit_hash,
}
with open(self.cache_json_path, "w") as file_handle:
json.dump(rdm_cache, file_handle, indent=2)
def ensure_relative_path(self, input_path):
"""
Turn the input path into a relative path, relative to the repo working directory.
:param input_path:
:return:
"""
if type(input_path) is str:
input_path = Path(input_path)
if input_path.is_absolute:
relative_path = input_path.relative_to(self.path)
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
else:
relative_path = input_path
return relative_path
def verify_unchanged_cache(self):
"""
Verify that all repos referenced in .cadet-rdm-data.json are
in an unmodified state. Raises a RuntimeError if the commit hash has changed or if
uncommited changes are found.
:return:
"""
with open(self.cache_json_path, "r") as file_handle:
rdm_cache = json.load(file_handle)
if "__example/path/to/repo__" in rdm_cache.keys():
rdm_cache.pop("__example/path/to/repo__")
for repo_location, repo_info in rdm_cache.items():
try:
repo = BaseRepo(repo_location)
repo._git.clear_cache()
except git.exc.NoSuchPathError:
raise git.exc.NoSuchPathError(f"The imported repository at {repo_location} was not found.")
self.verify_cache_folder_is_unchanged(repo_location, repo_info["commit_hash"])
def verify_cache_folder_is_unchanged(self, repo_location, commit_hash):
"""
Verify that the repo located at repo_location has no uncommited changes and that the current commit_hash
is equal to the given commit_hash
:param repo_location:
:param commit_hash:
:return:
"""
repo = BaseRepo(repo_location)
commit_changed = repo.current_commit_hash != commit_hash
uncommited_changes = repo.exist_uncomitted_changes
if commit_changed or uncommited_changes:
raise RuntimeError(f"The contents of {repo_location} have been modified. Don't do that.")
repo._git.clear_cache()
def checkout(self, *args, **kwargs):
self._most_recent_branch = self.active_branch
self._git.checkout(*args, **kwargs)
def push(self, remote=None, local_branch=None, remote_branch=None, push_all=True):
Name of the remote branch to push to.
if local_branch is None:
local_branch = self.active_branch
if remote_branch is None:
remote_branch = local_branch
if remote is None:
if len(self._git_repo.remotes) == 0:
raise RuntimeError("No remote has been set for this repository yet.")
remote = [str(remote.name) for remote in self._git_repo.remotes][0]
remote_interface = self._git_repo.remotes[remote]
if push_all:
push_results = remote_interface.push(all=True)
else:
push_results = remote_interface.push(refspec=f'{local_branch}:{remote_branch}')
for push_res in push_results:
print(push_res.summary)
if hasattr(self, "output_repo") and push_all:
self.output_repo.push()
def delete_active_branch_if_branch_is_empty(self):
"""
Delete the currently active branch and checkout the main branch
previous_branch = self.active_branch.name
if previous_branch == "main":
Ronald Jäpel
committed
return
commit_of_current_main = str(self._git.rev_parse("main"))
Ronald Jäpel
committed
commit_of_current_branch = str(self.head.commit)
if commit_of_current_branch == commit_of_current_main:
Ronald Jäpel
committed
print("Removing empty branch", previous_branch)
self._git.branch("-d", previous_branch)
def add_all_files(self, automatically_add_new_files=True):
"""
Stage all changes to git. This includes new, untracked files as well as modified files.
:param automatically_add_new_files:
If this is set to false a user input will be prompted if untracked files are about to be added.
:return:
List of all staged changes.
"""
def reset_hard_to_head(self, force_entry=False):
if not force_entry:
proceed = wait_for_user(f'The output directory contains the following uncommitted changes:\n'
f'{self.untracked_files + self.changed_files}\n'
f' These will be lost if you continue\n'
f'Proceed?')
raise KeyboardInterrupt
# reset all tracked files to previous commit, -q silences output
self._git.reset("-q", "--hard", "HEAD")
# remove all untracked files and directories, -q silences output
try:
self._git.clean("-q", "-f", "-d")
except git.exc.GitCommandError:
self._git.clean("-q", "-f", "-d")
changed_files = self._git.diff(None, name_only=True).split('\n')
if "" in changed_files:
changed_files.remove("")
return changed_files
@property
def exist_uncomitted_changes(self):
return len(self._git.status("--porcelain")) > 0
def dump_package_list(self, target_folder):
"""
Use "conda env export" and "pip freeze" to create environment.yml and pip_requirements.txt files.
"""
if target_folder is not None:
dump_path = target_folder
else:
print("Dumping conda environment.yml, this might take a moment.")
try:
os.system(f"conda env export > {dump_path}/conda_environment.yml")
print("Dumping conda independent environment.yml, this might take a moment.")
os.system(f"conda env export --from-history > {dump_path}/conda_independent_environment.yml")
except Exception as e:
print("Could not dump conda environment due to the following error:")
print(e)
os.system(f"pip freeze > {dump_path}/pip_requirements.txt")
print("Dumping pip independent requirements.txt.")
os.system(f"pip list --not-required --format freeze > {dump_path}/pip_independent_requirements.txt")
def commit(self, message: str, add_all=True, verbosity=1):
"""
Commit current state of the repository.
:param message:
Commit message
:param add_all:
Option to add all changed and new files to git automatically.
:param verbosity:
Option to choose degree of printed feedback.
if verbosity >= 1:
print(f"No changes to commit in repo {self.path}")
return
print(f"Commiting changes to repo {self.path}")
try:
commit_return = self._git.commit("-m", message)
print("\n" + commit_return + "\n")
except:
pass
"""
Call git commit with options --amend --no-edit
"""
self._git.commit("--amend", "--no-edit")
def stash_all_changes(self):
"""
Adds all untracked files to git and then stashes all changes.
Will raise a RuntimeError if no changes are found.
"""
raise RuntimeError("No changes in repo to stash.")
def apply_stashed_changes(self):
"""
Apply the last stashed changes.
If a "CONFLICT (modify/delete)" error is encountered, this is ignored.
All other errors are raised.
"""
self._git.stash('pop') # equivalent to $ git stash pop
# Will raise error because the stash cannot be applied without conflicts. This is expected
if 'CONFLICT (modify/delete)' in e.stdout:
pass
else:
raise e
def test_for_uncommitted_changes(self):
"""
Raise a RuntimeError if uncommitted changes are in the repository.
:return:
"""
raise RuntimeError(f"Found uncommitted changes in the repository {self.path}.")
def add_list_of_remotes_in_readme_file(self, repo_identifier: str, remotes_url_list: list):
if len(remotes_url_list) > 0:
remotes_url_list_http = [ssh_url_to_http_url(remote)
for remote in remotes_url_list]
output_link_line = " and ".join(f"[{repo_identifier}]({output_repo_remote})"
for output_repo_remote in remotes_url_list_http) + "\n"
with open(readme_filepath, "r") as file_handle:
filelines = file_handle.readlines()
filelines_giving_output_repo = [i for i in range(len(filelines))
if filelines[i].startswith(f"[{repo_identifier}](")]
if len(filelines_giving_output_repo) == 1:
line_to_be_modified = filelines_giving_output_repo[0]
filelines[line_to_be_modified] = output_link_line
elif len(filelines_giving_output_repo) == 0:
filelines.append("The output repo can be found at:\n")
filelines.append(output_link_line)
else:
raise RuntimeError(f"Multiple lines in the README.md at {readme_filepath}"
f" link to the {repo_identifier}. "
"Can't automatically update the link.")
with open(readme_filepath, "w") as file_handle:
file_handle.writelines(filelines)
class ProjectRepo(BaseRepo):
search_parent_directories=True, *args, **kwargs):
"""
Class for Project-Repositories. Handles interaction between the project repo and
the output (i.e. results) repo.
:param repository_path:
Path to the root of the git repository.
:param output_folder:
Deprecated: Path to the root of the output repository.
:param search_parent_directories:
if True, all parent directories will be searched for a valid repo as well.
Please note that this was the default behaviour in older versions of GitPython,
which is considered a bug though.
:param args:
Additional args to be handed to BaseRepo.
:param kwargs:
Additional kwargs to be handed to BaseRepo.
"""
super().__init__(repository_path, search_parent_directories=search_parent_directories, *args, **kwargs)
if output_folder is not None:
print("Deprecation Warning. Setting the outputfolder manually during repo instantiation is deprecated"
" and will be removed in a future update.")
if not self.data_json_path.exists():
raise RuntimeError(f"Folder {self.path} does not appear to be a CADET-RDM repository.")
if "output_remotes" not in metadata:
# this enables upgrades from v0.0.23 to v0.0.24.
output_remotes_path = self.path / "output_remotes.json"
with open(output_remotes_path, "r") as handle:
output_remotes = json.load(handle)
metadata["output_remotes"] = output_remotes
self._output_folder = metadata["output_remotes"]["output_folder_name"]
self._output_repo = OutputRepo(self.path / self._output_folder)
repo_version = metadata["cadet_rdm_version"]
cadetrdm_version = cadetrdm.__version__
if cadetrdm_version != repo_version:
print(f"Repo version {repo_version} is outdated. Current CADET-RDM version is {cadetrdm_version}\n"
"Updating the repository now.")
self._update_version(repo_version)
metadata["cadet_rdm_version"] = cadetrdm_version
with open(self.data_json_path, "w") as f:
json.dump(metadata, f, indent=2)
self.add(self.data_json_path)
self.commit("update cadetrdm version", add_all=False)
self._on_context_enter_commit_hash = None
@property
def output_repo(self):
if self._output_repo is None:
raise ValueError("The output repo has not been set yet.")
return self._output_repo
major, minor, patch = [int(x) for x in current_version.split(".")]
version_sum = major * 1000 * 1000 + minor * 1000 + patch
if version_sum < 9:
self._convert_csv_to_tsv_if_necessary()
self._add_jupytext_file(self.path)
self._expand_tsv_header()
output_remotes_path = self.path / "output_remotes.json"
delete_path(output_remotes_path)
self.add(output_remotes_path)
def _add_jupytext_file(path_root: str | Path = "."):
jupytext_lines = ['# Pair ipynb notebooks to py:percent text notebooks', 'formats: "ipynb,py:percent"']
write_lines_to_file(Path(path_root) / "jupytext.yml", lines=jupytext_lines, open_type="w")
def create_remotes(self, name, namespace, url=None, username=None):
"""
Create project in gitlab and add the projects as remotes to the project and output repositories
:param url:
:param namespace:
:param name:
:return:
"""
if "github" in url:
remote = GitHubRemote()
else:
remote = GitLabRemote()
response_project = remote.create_remote(url=url, namespace=namespace, name=name, username=username)
response_output = remote.create_remote(url=url, namespace=namespace, name=name + "_output", username=username)
self.add_remote(response_project.ssh_url_to_repo)
self.output_repo.add_remote(response_output.ssh_url_to_repo)
self.push(push_all=True)
def get_new_output_branch_name(self):
"""
Construct a name for the new branch in the output repository.
:return: the new branch name
"""
project_repo_hash = str(self.head.commit)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
branch_name = "_".join([timestamp, self._output_folder, "from", str(self.active_branch), project_repo_hash[:7]])
return branch_name
def check_results_main(self):
Checkout the main branch, which contains all the log files.
self._most_recent_branch = self._output_repo.active_branch.name
self._output_repo._git.checkout("main")
def reload_recent_results(self):
self._output_repo._git.checkout(self._most_recent_branch)
def print_output_log(self):
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def fill_data_from_cadet_rdm_json(self, re_load=False):
"""
Iterate through all references within the .cadet-rdm-data.json and
load or re-load the data.
:param re_load:
If true: delete and re-load all data. If false, existing data will be left as-is.
:return:
"""
with open(self.cache_json_path, "r") as file_handle:
rdm_cache = json.load(file_handle)
if "__example/path/to/repo__" in rdm_cache.keys():
rdm_cache.pop("__example/path/to/repo__")
for repo_location, repo_info in rdm_cache.items():
if os.path.exists(repo_location) and re_load is False:
continue
elif os.path.exists(repo_location) and re_load is True:
delete_path(repo_location)
if repo_info["source_repo_location"] == ".":
self.copy_data_to_cache(branch_name=repo_info["branch_name"])
else:
self.import_remote_repo(
target_repo_location=repo_location,
source_repo_location=repo_info["source_repo_location"],
source_repo_branch=repo_info["branch_name"])
@property
def results_log_file(self):
# note: if filename of "log.tsv" is changed,
# this also has to be changed in the gitattributes of the init repo func
return self.output_repo.output_log_file_path
@property
def results_log(self):
def print_results_log(self):
self.output_repo.print_output_log()
if not self.results_log_file.exists():
return
with open(self.results_log_file, "r") as f:
lines = f.readlines()
new_header = [
"Output repo commit message",
"Output repo branch",
"Output repo commit hash",
"Project repo commit hash",
"Project repo folder name",
"Project repo remotes",
"Python sys args",
"Tags",
"Options hash", ]
with open(self.results_log_file, "w") as f:
f.writelines(["\t".join(new_header) + "\n"])
f.writelines(lines[1:])
self.output_repo.add(self.results_log_file)
self.output_repo.commit("Update tsv header", add_all=False)
"""
If not tsv log is found AND a csv log is found, convert the csv to tsv.
:return:
"""
if self.results_log_file.exists():
return
csv_filepath = self.path / self._output_folder / "log.csv"
# We have just initialized the repo and neither tsv nor csv exist.
return
with open(csv_filepath) as csv_handle:
csv_lines = csv_handle.readlines()
tsv_lines = [line.replace(",", "\t") for line in csv_lines]
with open(self.results_log_file, "w") as f:
f.writelines(tsv_lines)
write_lines_to_file(path=self.path / ".gitattributes",
open_type="a")
def update_output_main_logs(self, ):
"""
Dumps all the metadata information about the project repositories state and
the commit hash and branch name of the ouput repository into the main branch of
output_branch_name = str(self._output_repo.active_branch)
output_repo_hash = str(self._output_repo.head.commit)
output_commit_message = self._output_repo.active_branch.commit.message
output_commit_message = output_commit_message.replace("\n", "; ")
self._output_repo._git.checkout("main")
logs_folderpath = self.path / self._output_folder / "run_history" / output_branch_name
os.makedirs(logs_folderpath)
json_filepath = logs_folderpath / "metadata.json"
meta_info_dict = {
"Output repo commit message": output_commit_message,
"Output repo branch": output_branch_name,
"Output repo commit hash": output_repo_hash,
"Project repo commit hash": str(self.head.commit),
"Project repo remotes": self.remote_urls,
"Tags": ", ".join(self.tags),
tsv_header = "\t".join(meta_info_dict.keys())
tsv_data = "\t".join([str(x) for x in meta_info_dict.values()])
with open(json_filepath, "w") as f:
json.dump(meta_info_dict, f, indent=2)
if not self.results_log_file.exists():
with open(self.results_log_file, "w") as f:
with open(self.results_log_file, "r") as f:
existing_header = f.readline().replace("\n", "")
raise ValueError("The used structure of the meta_dict doesn't match the header found in log.tsv")
with open(self.results_log_file, "a") as f:
self.dump_package_list(logs_folderpath)
self.copy_code(logs_folderpath)
self._output_repo._git.commit("-m", f"log for '{output_commit_message}' \n"
f"of branch '{output_branch_name}'")
self._output_repo._git.checkout(output_branch_name)
self._most_recent_branch = output_branch_name
def copy_code(self, target_path):
"""
Clone only the current branch of the project repo to the target_path
and then compress it into a zip file.
:param target_path:
:return:
"""
if type(target_path) is str:
target_path = Path(target_path)
code_tmp_folder = target_path / "git_repo"
multi_options = ["--filter=blob:none", "--single-branch"]
git.Repo.clone_from(self.path, code_tmp_folder, multi_options=multi_options)
shutil.make_archive(target_path / "code", "zip", code_tmp_folder)
delete_path(code_tmp_folder)
def commit(self, message: str, add_all=True, verbosity=1):
"""
Commit current state of the repository.
:param message:
Commit message
:param add_all:
Option to add all changed and new files to git automatically.
:param verbosity:
Option to choose degree of printed feedback.
self.update_output_remotes_json()
super().commit(message=message, add_all=add_all, verbosity=verbosity)
def update_output_remotes_json(self):
output_repo_remotes = self.output_repo.remote_urls
self.add_list_of_remotes_in_readme_file("output_repo", output_repo_remotes)
with open(self.data_json_path, "r") as file_handle:
metadata = json.load(file_handle)
remotes_dict = {remote.name: str(remote.url) for remote in self.output_repo.remotes}
metadata["output_remotes"] = {"output_folder_name": self._output_folder, "output_remotes": remotes_dict}
with open(self.data_json_path, "w") as file_handle:
json.dump(metadata, file_handle, indent=2)
self.add(self.data_json_path)
def download_file(self, url, file_path):
Download the file from the url and put it in the output+file_path location.
:param file_path:
:param url:
:return:
Returns a tuple containing the path to the newly created
data file as well as the resulting HTTPMessage object.
"""
absolute_file_path = self.output_data(file_path)
return urlretrieve(url, absolute_file_path)
def input_data(self, file_path, branch_name=None):
Load previously generated results to iterate upon.
:param file_path:
Can be relative path within the cached output repository to the file you wish to load.
OR relative path within the actual output repository, if branch_name is given.
:param branch_name:
Name of the branch of the output repository in which the results are stored. If none,
the cached_output is used.
:return:
Absolute path to the newly copied file.
"""
if branch_name is None and os.path.exists(file_path):
return file_path
if branch_name is None and not os.path.exists(file_path):
if os.sep not in branch_name_and_path:
sep = "/"
else:
sep = os.sep
branch_name, file_path = branch_name_and_path.split(sep, maxsplit=1)
if self.output_repo.exist_uncomitted_changes:
self.output_repo.stash_all_changes()
has_stashed_changes = True
else:
has_stashed_changes = False
previous_branch = self.output_repo.active_branch.name
self.output_repo._git.checkout(branch_name)
source_filepath = self.output_repo.path / file_path
target_folder = self.path / (self._output_folder + "_cached") / branch_name
os.makedirs(target_folder, exist_ok=True)
target_filepath = target_folder / file_path
if target_filepath.exists():
os.chmod(target_filepath, S_IWRITE)
shutil.copyfile(source_filepath, target_filepath)
os.chmod(target_filepath, S_IREAD)
self.output_repo._git.checkout(previous_branch)
if has_stashed_changes:
self.output_repo.apply_stashed_changes()
return target_filepath
@property
def output_path(self):
return self.output_data()
def output_data(self, sub_path=None):
"""
Return an absolute path with the repo_dir/output_dir/sub_path
:param sub_path:
:return:
"""
return self.path / self.output_repo.path / sub_path
def remove_cached_files(self):
if (self.path / (self._output_folder + "_cached")).exists():
delete_path(self.path / (self._output_folder + "_cached"))
def test_for_correct_repo_setup(self):
"""