Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
CADET-RDM
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
IBG-1
ModSim
CADET
CADET-RDM
Commits
b4a31ae7
Commit
b4a31ae7
authored
1 year ago
by
Ronald Jäpel
Browse files
Options
Downloads
Patches
Plain Diff
Add docstrings
parent
4a689a49
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
cadetrdm/utils.py
+205
-93
205 additions, 93 deletions
cadetrdm/utils.py
with
205 additions
and
93 deletions
cadetrdm/utils.py
+
205
−
93
View file @
b4a31ae7
import
os
import
json
from
datetime
import
datetime
import
random
import
shutil
import
contextlib
...
...
@@ -15,14 +14,20 @@ except ImportError:
class
BaseRepo
:
def
__init__
(
self
,
repository_path
=
None
,
search_parent_directories
=
False
,
*
args
,
**
kwargs
):
"""
from git.Repo:
Base class handling most git workflows.
:param repository_path:
Path to the root directory of the repository.
:param search_parent_directories:
if True, all parent directories will be searched for a valid repo as well.
Please note that this was the default behaviour in older versions of GitPython,
which is considered a bug though.
:param args:
Args handed to git.Repo()
:param kwargs:
Kwargs handed to git.Repo()
"""
if
repository_path
is
None
or
repository_path
==
"
.
"
:
repository_path
=
os
.
getcwd
()
self
.
git_repo
=
git
.
Repo
(
repository_path
,
search_parent_directories
=
search_parent_directories
,
*
args
,
**
kwargs
)
...
...
@@ -58,13 +63,24 @@ class BaseRepo:
self
.
_earliest_commit
=
earliest_commit
return
self
.
_earliest_commit
def
delete_active_branch
(
self
):
def
delete_active_branch_if_branch_is_empty
(
self
):
"""
Delete the currently active branch and checkout the master branch
:return:
"""
previous_branch
=
self
.
active_branch
.
name
if
str
(
self
.
head
.
commit
)
==
self
.
earliest_commit
:
self
.
git
.
checkout
(
"
master
"
)
self
.
git
.
branch
(
"
-d
"
,
previous_branch
)
def
add_all_files
(
self
,
automatically_add_new_files
=
True
):
"""
Stage all changes to git. This includes new, untracked files as well as modified files.
:param automatically_add_new_files:
If this is set to false a user input will be prompted if untracked files are about to be added.
:return:
List of all staged changes.
"""
untracked_files
=
""
if
len
(
self
.
untracked_files
)
>
0
:
untracked_files
=
"
\n
"
.
join
([
"
-
"
+
file
for
file
in
self
.
untracked_files
])
...
...
@@ -88,6 +104,18 @@ class BaseRepo:
self
.
git
.
add
(
f
)
return
self
.
untracked_files
+
changed_files
def
reset_hard_to_head
(
self
):
proceed
=
input
(
f
'
The output directory contains the following uncommitted changes:
\n
'
f
'
{
self
.
untracked_files
+
self
.
changed_files
}
\n
'
f
'
These will be lost if you continue
\n
'
f
'
Proceed? Y/n
\n
'
)
if
not
(
proceed
.
lower
()
==
"
y
"
or
proceed
==
""
):
raise
KeyboardInterrupt
# reset all tracked files to previous commit, -q silences output
self
.
git
.
reset
(
"
-q
"
,
"
--hard
"
,
"
HEAD
"
)
# remove all untracked files and directories, -q silences output
self
.
git
.
clean
(
"
-q
"
,
"
-f
"
,
"
-d
"
)
@property
def
changed_files
(
self
):
changed_files
=
self
.
git
.
diff
(
None
,
name_only
=
True
).
split
(
'
\n
'
)
...
...
@@ -100,6 +128,9 @@ class BaseRepo:
return
len
(
self
.
untracked_files
)
>
0
or
len
(
self
.
changed_files
)
>
0
def
update_package_list
(
self
):
"""
Use
"
conda env export
"
and
"
pip freeze
"
to create environment.yml and pip_requirements.txt files.
"""
repo_path
=
self
.
working_dir
print
(
"
Dumping conda environment.yml, this might take a moment.
"
)
os
.
system
(
f
"
conda env export >
{
repo_path
}
/conda_environment.yml
"
)
...
...
@@ -111,6 +142,16 @@ class BaseRepo:
os
.
system
(
f
"
pip list --not-required --format freeze >
{
repo_path
}
/pip_independent_requirements.txt
"
)
def
commit
(
self
,
message
:
str
,
add_all
=
True
,
update_packages
=
True
):
"""
Commit current state of the repository.
:param message:
Commit message
:param add_all:
Option to add all changed and new files to git automatically.
:param update_packages:
Option to automatically dump the python environment information into environment.yml files.
"""
if
not
self
.
exist_unstaged_changes
:
print
(
f
"
No changes to commit in repo
{
self
.
working_dir
}
"
)
return
...
...
@@ -124,25 +165,54 @@ class BaseRepo:
print
(
"
\n
"
+
commit_return
+
"
\n
"
)
def
git_ammend
(
self
,
):
"""
Call git commit with options --amend --no-edit
"""
self
.
git
.
commit
(
"
--amend
"
,
"
--no-edit
"
)
def
print_status
(
self
):
"""
prints git status
"""
print
(
self
.
git
.
status
())
def
print_log
(
self
):
"""
Prints the git log
"""
print
(
self
.
git
.
log
())
def
stash_all_changes
(
self
):
"""
Adds all untracked files to git and then stashes all changes.
Will raise a RuntimeError if no changes are found.
"""
if
not
self
.
exist_unstaged_changes
:
raise
RuntimeError
(
"
No changes in repo to stash.
"
)
self
.
git
.
add
(
"
.
"
)
self
.
git
.
stash
()
def
prepare_new_branch
(
self
,
branch_name
):
"""
Prepares a new branch to recieve data. This includes:
- creating the new branch,
- checking the new branch out, and
- resetting the HEAD of the branch to the initialization commit on the master branch.
This thereby produces a clear, empty directory for data, while still maintaining
.gitignore and .gitatributes
# ToDO: Add to feature requests: If .gitignore is changed later, each reset like this will loose those changes
:param branch_name:
Name of the new branch.
"""
self
.
git
.
checkout
(
'
-b
'
,
branch_name
)
# equivalent to $ git checkout -b %branch_name
self
.
git
.
reset
(
'
--hard
'
,
self
.
earliest_commit
)
# equivalent to $ git reset --hard %commit_hash
def
apply_stashed_changes
(
self
):
"""
Apply the last stashed changes.
If a
"
CONFLICT (modify/delete)
"
error is encountered, this is ignored.
All other errors are raised.
"""
try
:
self
.
git
.
stash
(
'
pop
'
)
# equivalent to $ git stash pop
except
git
.
exc
.
GitCommandError
as
e
:
...
...
@@ -153,23 +223,30 @@ class BaseRepo:
raise
e
def
test_for_uncommitted_changes
(
self
):
"""
Raise a RuntimeError if uncommitted changes are in the repository.
:return:
"""
if
self
.
exist_unstaged_changes
:
raise
RuntimeError
(
f
"
Found uncommitted changes in the repository
{
self
.
working_dir
}
.
"
)
class
ProjectRepo
(
BaseRepo
):
def
__init__
(
self
,
repository_path
=
None
,
output_folder
=
None
,
*
args
,
**
kwargs
):
"""
:param search_parent_directories:
if True, all parent directories will be searched for a valid repo as well.
Please note that this was the default behaviour in older versions of GitPython,
which is considered a bug though.
Class for Project-Repositories. Handles interaction between the project repo and
the output (i.e. results) repo.
:param repository_path:
Path to the root of the git repository.
:param output_folder:
Path to the root of the output repository.
:param args:
Additional args to be handed to BaseRepo.
:param kwargs:
Additional kwargs to be handed to BaseRepo.
"""
if
repository_path
is
None
or
repository_path
==
"
.
"
:
repository_path
=
os
.
getcwd
()
super
().
__init__
(
repository_path
,
*
args
,
**
kwargs
)
if
output_folder
is
not
None
:
...
...
@@ -185,41 +262,35 @@ class ProjectRepo(BaseRepo):
raise
ValueError
(
"
The output repo has not been set yet.
"
)
return
self
.
_output_repo
def
set_output_repo
(
self
,
output_repo_folder_name
):
self
.
_output_repo
=
ProjectRepo
(
os
.
path
.
join
(
self
.
working_dir
,
output_repo_folder_name
),
output_folder
=
False
)
self
.
_output_folder
=
output_repo_folder_name
def
get_new_output_branch_name
(
self
):
"""
Get new branch name
"""
"""
Construct a name for the new branch in the output repository.
:return: the new branch name
"""
project_repo_hash
=
str
(
self
.
head
.
commit
)
timestamp
=
datetime
.
now
().
strftime
(
"
%Y-%m-%d-%H-%M-%S-%f
"
)[:
-
4
]
branch_name
=
"
_
"
.
join
([
str
(
self
.
active_branch
),
project_repo_hash
[:
7
],
self
.
_output_folder
,
timestamp
])
return
branch_name
def
commit_results
(
self
,
message
):
self
.
test_for_uncommitted_changes
()
self
.
output_repo
.
stash_all_changes
()
new_branch_name
=
self
.
get_new_output_branch_name
()
self
.
output_repo
.
prepare_new_branch
(
new_branch_name
)
self
.
output_repo
.
apply_stashed_changes
()
"""
Actual Git commit
"""
self
.
_output_repo
.
git
.
add
(
"
.
"
)
self
.
_output_repo
.
git
.
commit
(
"
-m
"
,
message
)
self
.
update_output_master_logs
()
def
check_results_master
(
self
):
"""
Checkout the master branch, which contains all the log files.
"""
self
.
_most_recent_branch
=
self
.
_output_repo
.
active_branch
.
name
self
.
_output_repo
.
git
.
checkout
(
"
master
"
)
def
reload_recent_results
(
self
):
"""
Checkout the most recent previous branch.
"""
self
.
_output_repo
.
git
.
checkout
(
self
.
_most_recent_branch
)
def
update_output_master_logs
(
self
):
"""
Dumps all the metadata information about the project repositories state and
the commit hash and branch name of the ouput repository into the master branch of
the output repository.
"""
output_branch_name
=
str
(
self
.
_output_repo
.
active_branch
)
output_repo_hash
=
str
(
self
.
_output_repo
.
head
.
commit
)
...
...
@@ -263,6 +334,15 @@ class ProjectRepo(BaseRepo):
self
.
_most_recent_branch
=
output_branch_name
def
cache_previous_results
(
self
,
branch_name
,
file_path
):
"""
Load previously generated results to iterate upon.
:param branch_name:
Name of the branch of the output repository in which the results are stored
:param file_path:
Relative path within the output repository to the file you wish to load.
:return:
Absolute path to the newly copied file.
"""
if
self
.
output_repo
.
exist_unstaged_changes
:
self
.
output_repo
.
stash_all_changes
()
has_stashed_changes
=
True
...
...
@@ -290,6 +370,19 @@ class ProjectRepo(BaseRepo):
@contextlib.contextmanager
def
load_previous_result_file
(
self
,
branch_name
,
file_path
,
*
args
,
**
kwargs
):
"""
Context manager around load_previous_result_file that directly opens a handle to the loaded file.
:param branch_name:
Name of the branch of the output repository in which the results are stored
:param file_path:
Relative path within the output repository to the file you wish to load.
:param args:
Args to be handed to the open() function
:param kwargs:
kwargs to be handed to the open() function
:return:
Handle to the copied file.
"""
cached_filepath
=
self
.
load_previous_result_file
(
branch_name
,
file_path
)
file_handle
=
open
(
cached_filepath
,
*
args
,
**
kwargs
)
try
:
...
...
@@ -298,32 +391,46 @@ class ProjectRepo(BaseRepo):
file_handle
.
close
()
def
remove_cached_files
(
self
):
"""
Delete all previously cached results.
"""
if
os
.
path
.
exists
(
self
.
_output_folder
+
"
_cached
"
):
shutil
.
rmtree
(
self
.
_output_folder
+
"
_cached
"
)
def
enter_context
(
self
,
):
"""
Enter the tracking context. This includes:
- Ensure no uncommitted changes in the project repository
- Remove all uncommitted changes in the output repository
- Clean up empty branches in the output repository
- Create a new empty output branch in the output repository
:return:
The name of the newly created output branch.
"""
self
.
test_for_uncommitted_changes
()
output_repo
=
self
.
output_repo
if
output_repo
.
exist_unstaged_changes
:
proceed
=
input
(
f
'
The output directory contains the following uncommitted changes:
\n
'
f
'
{
output_repo
.
untracked_files
+
output_repo
.
changed_files
}
\n
'
f
'
These will be lost if you continue
\n
'
f
'
Proceed? Y/n
\n
'
)
if
not
(
proceed
.
lower
()
==
"
y
"
or
proceed
==
""
):
raise
KeyboardInterrupt
# reset all tracked files to previous commit, -q silences output
output_repo
.
git
.
reset
(
"
-q
"
,
"
--hard
"
,
"
HEAD
"
)
# remove all untracked files and directories, -q silences output
output_repo
.
git
.
clean
(
"
-q
"
,
"
-f
"
,
"
-d
"
)
output_repo
.
reset_hard_to_head
()
output_repo
.
delete_active_branch
()
# rename to make more transparent why
output_repo
.
delete_active_branch
_if_branch_is_empty
()
new_branch_name
=
self
.
get_new_output_branch_name
()
output_repo
.
prepare_new_branch
(
new_branch_name
)
return
new_branch_name
def
exit_context
(
self
,
message
):
"""
After running all project code, this prepares the commit of the results to the output repository. This includes
- Ensure no uncommitted changes in the project repository
- ToDO: ensure commit hash of project repository is unchanged since enter_context()?
- Stage all changes in the output repository
- Commit all changes in the output repository with the given commit message.
- Update the log files in the master branch of the output repository.
:param message:
Commit message for the output repository commit.
"""
self
.
test_for_uncommitted_changes
()
print
(
"
Completed computations, commiting results
"
)
...
...
@@ -337,6 +444,12 @@ class ProjectRepo(BaseRepo):
@contextlib.contextmanager
def
track_results
(
self
,
results_commit_message
:
str
):
"""
Context manager to be used when runnning project code that produces output that should
be tracked in the output repository.
:param results_commit_message:
Commit message for the commit of the output repository.
"""
new_branch_name
=
self
.
enter_context
()
try
:
yield
new_branch_name
...
...
@@ -350,63 +463,44 @@ class ResultsRepo(BaseRepo):
pass
class
TrackResults
:
def
__init__
(
self
,
results_commit_message
:
str
,
repo_path
:
str
=
None
):
if
repo_path
is
None
:
print
(
"
DataContext started without explicit repo_path. Trying current working directory
"
)
self
.
repo
=
ProjectRepo
(
"
.
"
)
else
:
self
.
repo
=
ProjectRepo
(
repo_path
)
self
.
message
=
results_commit_message
def
__enter__
(
self
):
self
.
repo
.
test_for_uncommitted_changes
()
output_repo
=
self
.
repo
.
output_repo
if
output_repo
.
exist_unstaged_changes
:
proceed
=
input
(
f
'
The output directory contains the following uncommitted changes:
\n
'
f
'
{
output_repo
.
untracked_files
+
output_repo
.
changed_files
}
\n
'
f
'
These will be lost if you continue
\n
'
f
'
Proceed? Y/n
\n
'
)
if
not
(
proceed
.
lower
()
==
"
y
"
or
proceed
==
""
):
raise
KeyboardInterrupt
# reset all tracked files to previous commit, -q silences output
output_repo
.
git
.
reset
(
"
-q
"
,
"
--hard
"
,
"
HEAD
"
)
# remove all untracked files and directories, -q silences output
output_repo
.
git
.
clean
(
"
-q
"
,
"
-f
"
,
"
-d
"
)
output_repo
.
delete_active_branch
()
new_branch_name
=
self
.
repo
.
get_new_output_branch_name
()
output_repo
.
prepare_new_branch
(
new_branch_name
)
def
__exit__
(
self
,
exc_type
,
exc_value
,
exc_tb
):
self
.
repo
.
test_for_uncommitted_changes
()
if
exc_type
is
None
:
print
(
"
Completed computations, commiting results
"
)
self
.
repo
.
output_repo
.
git
.
add
(
"
.
"
)
commit_return
=
self
.
repo
.
output_repo
.
git
.
commit
(
"
-m
"
,
self
.
message
)
print
(
"
\n
"
+
commit_return
+
"
\n
"
)
self
.
repo
.
update_output_master_logs
()
self
.
repo
.
remove_cached_files
()
def
add_linebreaks
(
input_list
):
"""
Add linebreaks between each entry in the input_list
"""
return
[
line
+
"
\n
"
for
line
in
input_list
]
def
init_lfs
(
lfs_filetypes
):
def
init_lfs
(
lfs_filetypes
:
list
,
path
:
str
=
None
):
"""
Initialize lfs in the git repository at the path.
If path is None, the current working directory is used.
:param lfs_filetypes:
List of file types to be handled by lfs.
Format should be e.g. [
"
*.jpg
"
,
"
*.png
"
] for jpg and png files.
"""
if
path
is
not
None
:
previous_path
=
os
.
getcwd
()
os
.
chdir
(
path
)
os
.
system
(
f
"
git lfs install
"
)
lfs_filetypes_string
=
"
"
.
join
(
lfs_filetypes
)
os
.
system
(
f
"
git lfs track
{
lfs_filetypes_string
}
"
)
def
write_lines_to_file
(
path
,
lines
):
with
open
(
path
,
"
a
"
)
as
f
:
if
path
is
not
None
:
os
.
chdir
(
previous_path
)
def
write_lines_to_file
(
path
,
lines
,
open_type
=
"
a
"
):
"""
Convenience function. Write lines to a file at path with added newlines between each line.
:param path:
Path to file.
:param lines:
List of lines to be written to file.
:param open_type:
The way the file should be opened. I.e.
"
a
"
for append and
"
w
"
for fresh write.
"""
with
open
(
path
,
open_type
)
as
f
:
f
.
writelines
(
add_linebreaks
(
lines
))
...
...
@@ -420,6 +514,24 @@ def is_tool(name):
def
initialize_git_repo
(
path_to_repo
:
str
,
output_repo_name
:
(
str
|
bool
)
=
"
output
"
,
gitignore
:
list
=
None
,
gitattributes
:
list
=
None
,
lfs_filetypes
:
list
=
None
,
output_repo_kwargs
:
dict
=
None
):
"""
Initialize a git repository at the given path with an optional included output results repository.
:param path_to_repo:
Path to main repository.
:param output_repo_name:
Name for the output repository.
:param gitignore:
List of files to be added to the gitignore file.
:param gitattributes:
List of lines to be added to the gittatributes file
:param lfs_filetypes:
List of filetypes to be handled by git lfs.
:param output_repo_kwargs:
kwargs to be given to the creation of the output repo initalization function.
Include gitignore, gitattributes, and lfs_filetypes kwargs.
:return:
"""
if
not
is_tool
(
"
git-lfs
"
):
raise
RuntimeError
(
"
Git LFS is not installed. Please install it via e.g. apt-get install git-lfs or the
"
"
instructions found below
\n
"
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment