Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import sys
import time
import os
from pathlib import Path
from ipylab import JupyterFrontEnd
import junix
import nbformat as nbf
from nbconvert.preprocessors import ExecutePreprocessor
from nbconvert.nbconvertapp import NbConvertApp
class Notebook:
def __init__(self, notebook_path):
self.notebook_path = Path(notebook_path)
def check_execution_order(self,
check_all_executed=False,
check_top_to_bottom=False,
check_in_order=True,
exclude_last_cell=False):
notebook = nbf.read(self.notebook_path, nbf.NO_CONVERT)
# extract all code cells (disregard markdown, raw and others), then extract the execution order
output_cells = [cell for cell in notebook.cells if cell["cell_type"] == "code"]
# remove empty cells
non_empty_cells = [cell for cell in output_cells if cell["source"] != ""]
execution_counts = [cell["execution_count"] for cell in non_empty_cells]
def _all_none(item_list):
return all([i is None for i in item_list])
# return early if no cells were executed
if _all_none(execution_counts):
return True
pass_check = [True]
def _check_all_executed(execution_counts: list) -> bool:
"""Check all cells were executed.
Parameters
----------
execution_counts : list
execution_counts
Returns
-------
bool
"""
return not None in execution_counts
def _check_in_order(execution_counts: list) -> bool:
"""Check that execution counts that aren't None go from 1 to N.
Parameters
----------
execution_counts : list
execution counts
Returns
-------
bool
"""
execution_counts = [x for x in execution_counts if x is not None]
count_range = len(execution_counts) - 1
if exclude_last_cell:
count_range = count_range - 1
print(execution_counts)
is_in_order = all([execution_counts[i] < execution_counts[i + 1] for i in range(count_range)])
return is_in_order
if check_in_order:
pass_check.append(_check_in_order(execution_counts))
if check_all_executed:
pass_check.append(_check_all_executed(execution_counts))
if check_top_to_bottom:
pass_check.append(
_check_all_executed(execution_counts) and _check_in_order(execution_counts)
)
return all(pass_check)
def save_ipynb(self):
app = JupyterFrontEnd()
print("Saving", end="")
# note: docmanager:save doesn't lock the python thread until saving is completed.
# Sometimes, new changes aren't completely saved before checks are performed.
# Waiting for 0.1 seconds seems to prevent that.
app.commands.execute('docmanager:save')
time.sleep(0.1)
print("")
def reload_notebook(self):
app = JupyterFrontEnd()
app.commands.execute('docmanager:reload')
def wait_for_user(self, message):
proceed = input(message + " Y/n")
if proceed.lower() == "y" or proceed == "":
return True
else:
return False
def clear_and_rerun_notebook(self, force_rerun=False, timeout=600):
if "nbconvert_call" in sys.argv:
return
self.save_ipynb()
time.sleep(1)
is_in_order = self.check_execution_order(self.notebook_filename, exclude_last_cell=False)
if is_in_order and not force_rerun:
print("Notebook was already executed in order.")
return
else:
rerun_confirmed_bool = self.wait_for_user("Notebook was not in order, rerun notebook now?")
if not rerun_confirmed_bool and not force_rerun:
print("Aborting.")
return
print("Rerunning.")
with open(self.notebook_filename) as f:
nb = nbf.read(f, as_version=4)
ep = ExecutePreprocessor(timeout=timeout, kernel_name='python3', extra_arguments=["nbconvert_call"])
ep.preprocess(nb, )
with open(self.notebook_filename, 'w', encoding='utf-8') as f:
nbf.write(nb, f)
self.reload_notebook()
def convert_ipynb(self, formats: list = None):
if formats is None:
formats = ["pdf", "python"]
app = NbConvertApp()
app.initialize()
output_root_directory = os.path.join(r"C:\Users\ronal\PycharmProjects\git_lfs_test_2", "results",
self.notebook_path.name.replace('.', '_'))
for export_format in formats:
app.export_format = export_format
app.notebooks = [self.notebook_path]
app.output_base = os.path.join(output_root_directory,
self.notebook_path.name.replace('.ipynb', ''))
if not os.path.exists(output_root_directory):
os.makedirs(output_root_directory)
app.start()
def export_all_figures(self, output_dir):
file_without_extension = self.notebook_path.stem
images = junix.export_images(filepath=self.notebook_path,
output_dir=os.path.join(output_dir,
str(self.notebook_path.name).replace(".", "_")),
prefix=file_without_extension)