Skip to content
Snippets Groups Projects
test_jupyter.ipynb 9.8 KiB
Newer Older
Ronald Jäpel's avatar
Ronald Jäpel committed
{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "a5c6949c-da6a-40ab-aec7-de4534582ab2",
   "metadata": {},
   "outputs": [],
   "source": [
    "import nbformat as nbf\n",
    "import sys\n",
    "import os\n",
    "from pathlib import Path\n",
    "\n",
    "import nbformat\n",
    "import time\n",
    "from nbconvert.preprocessors import ExecutePreprocessor\n",
    "\n",
    "from nbconvert.nbconvertapp import NbConvertApp\n",
    "from ipylab import JupyterFrontEnd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "e4dd4791-5e5b-4fa7-89f1-8a9e574c300e",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1\n"
     ]
    }
   ],
   "source": [
    "print(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "e983e571-58f5-4310-8584-064fe4ac3570",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2\n"
     ]
    }
   ],
   "source": [
    "print(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "b28fb6d3-faa9-4788-86a1-4ec0d1026aa8",
   "metadata": {},
   "outputs": [],
   "source": [
    "def check_execution_order(notebook_path,\n",
    "                          check_all_executed=False,\n",
    "                          check_all_except_last_executed=True,\n",
    "                          check_top_to_bottom=False,\n",
    "                          check_in_order=True,\n",
    "                          exclude_last_cell=False):\n",
    "    ntbk = nbf.read(notebook_path, nbf.NO_CONVERT)\n",
    "\n",
    "    # extract all code cells (disregard markdown, raw and others), then extract the execution order\n",
    "    output_cells = [cell for cell in ntbk.cells if cell[\"cell_type\"] == \"code\"]\n",
    "    # remove empty cells\n",
    "    non_empty_cells = [cell for cell in output_cells if cell[\"source\"] != \"\"]\n",
    "    execution_counts = [cell[\"execution_count\"] for cell in non_empty_cells]\n",
    "\n",
    "    def _all_none(item_list):\n",
    "        return all([i is None for i in item_list])\n",
    "\n",
    "    # return early if no cells were executed\n",
    "    if _all_none(execution_counts):\n",
    "        return True\n",
    "\n",
    "    pass_check = [True]\n",
    "\n",
    "    def _check_all_executed(execution_counts: list) -> bool:\n",
    "        \"\"\"Check all cells were executed.\n",
    "\n",
    "        Parameters\n",
    "        ----------\n",
    "        execution_counts : list\n",
    "            execution_counts\n",
    "\n",
    "        Returns\n",
    "        -------\n",
    "        bool\n",
    "        \"\"\"\n",
    "        return not None in execution_counts\n",
    "\n",
    "    def _check_in_order(execution_counts: list) -> bool:\n",
    "        \"\"\"Check that execution counts that aren't None go from 1 to N.\n",
    "\n",
    "        Parameters\n",
    "        ----------\n",
    "        execution_counts : list\n",
    "            execution counts\n",
    "\n",
    "        Returns\n",
    "        -------\n",
    "        bool\n",
    "        \"\"\"\n",
    "        print(execution_counts)\n",
    "        execution_counts = [x for x in execution_counts if x is not None]\n",
    "        count_range = len(execution_counts)\n",
    "        if exclude_last_cell:\n",
    "            count_range = count_range - 1\n",
    "        is_in_order = all([execution_counts[i] < execution_counts[i + 1] for i in range(count_range - 1)])\n",
    "        return is_in_order\n",
    "\n",
    "    if check_in_order:\n",
    "        pass_check.append(_check_in_order(execution_counts))\n",
    "\n",
    "    if check_all_executed:\n",
    "        pass_check.append(_check_all_executed(execution_counts))\n",
    "    \n",
    "    if check_all_except_last_executed:\n",
    "        pass_check.append(_check_all_executed(execution_counts[:-1]))\n",
    "\n",
    "    if check_top_to_bottom:\n",
    "        pass_check.append(\n",
    "            _check_all_executed(execution_counts) and _check_in_order(execution_counts)\n",
    "        )\n",
    "    return all(pass_check)\n",
    "\n",
    "\n",
    "def save_ipynb():\n",
    "    app = JupyterFrontEnd()\n",
    "    print(\"Saving\", end=\"\")\n",
    "    # note: docmanager:save doesn't lock the python thread until saving is completed.\n",
    "    # Sometimes, new changes aren't completely saved before checks are performed.\n",
    "    # Waiting for 0.1 seconds seems to prevent that.\n",
    "    app.commands.execute('docmanager:save')\n",
    "    time.sleep(0.1)\n",
    "    print(\"\")\n",
    "\n",
    "\n",
    "def reload_notebook():\n",
    "    app = JupyterFrontEnd()\n",
    "    app.commands.execute('docmanager:reload')\n",
    "\n",
    "\n",
    "def wait_for_user(message):\n",
    "    proceed = input(message + \" Y/n\")\n",
    "    if proceed.lower() == \"y\" or proceed == \"\":\n",
    "        return True\n",
    "    else:\n",
    "        return False\n",
    "\n",
    "\n",
    "def clear_and_rerun_notebook(notebook_filename, force_rerun=False, timeout=600):\n",
    "    if \"nbconvert_call\" in sys.argv:\n",
    "        print(\"Finished rerun\")\n",
    "        return\n",
    "\n",
    "    save_ipynb()\n",
    "    time.sleep(1)\n",
    "\n",
    "    is_in_order = check_execution_order(notebook_filename)\n",
    "\n",
    "    if is_in_order and not force_rerun:\n",
    "        print(\"Notebook was already executed in order.\")\n",
    "        return\n",
    "    else:\n",
    "        rerun_confirmed_bool = wait_for_user(\"Notebook was not in order, rerun notebook now?\")\n",
    "        if not rerun_confirmed_bool and not force_rerun:\n",
    "            print(\"Aborting.\")\n",
    "            return\n",
    "\n",
    "    print(\"Rerunning.\")\n",
    "    with open(notebook_filename) as f:\n",
    "        nb = nbformat.read(f, as_version=4)\n",
    "\n",
    "    ep = ExecutePreprocessor(timeout=timeout, kernel_name='python3', extra_arguments=[\"nbconvert_call\"])\n",
    "    ep.preprocess(nb, )\n",
    "\n",
    "    with open(notebook_filename, 'w', encoding='utf-8') as f:\n",
    "        nbformat.write(nb, f)\n",
    "\n",
    "    reload_notebook()\n",
    "\n",
    "\n",
    "def convert_ipynb(filepath, formats=None):\n",
    "    if formats is None:\n",
    "        formats = [\"html\", \"python\"]\n",
    "    app = NbConvertApp()\n",
    "    app.initialize()\n",
    "    output_root_directory = os.path.join(r\"C:\\Users\\ronal\\PycharmProjects\\git_lfs_test_2\", \"results\",\n",
    "                                         os.path.basename(filepath.replace('.', '_')))\n",
    "    for format in formats:\n",
    "        app.export_format = format\n",
    "        app.notebooks = [filepath]\n",
    "        app.output_base = os.path.join(output_root_directory, os.path.basename(filepath.replace('.ipynb', '')))\n",
    "        if not os.path.exists(output_root_directory):\n",
    "            os.makedirs(output_root_directory)\n",
    "        app.start()\n",
    "\n",
    "\n",
    "\n",
    "def export_all_figures():\n",
    "    import junix\n",
    "\n",
    "    for file in os.listdir(git_repo.working_dir):\n",
    "        if file.endswith(\"ipynb\"):\n",
    "            file_without_extension = Path(file).stem\n",
    "            images = junix.export_images(filepath=os.path.join(git_repo.working_dir, file),\n",
    "                                         output_dir=os.path.join(git_repo.working_dir, \"results\",\n",
    "                                                                 file.replace(\".\", \"_\")),\n",
    "                                         prefix=file_without_extension)\n",
    "            convert_ipynb(filepath=os.path.join(git_repo.working_dir, file), formats=[\"html\"])\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "99404052-11f1-499b-a23f-4a90ed917300",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Saving\n",
      "[13, 14, 15, 24, None, 25, 26]\n"
     ]
    },
    {
     "name": "stdin",
     "output_type": "stream",
     "text": [
      "Notebook was not in order, rerun notebook now? Y/n n\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Aborting.\n"
     ]
    }
   ],
   "source": [
    "clear_and_rerun_notebook(\"test_jupyter.ipynb\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "fec9df11-5622-49d8-8e08-c6e8bbe73dfd",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Saving\n",
      "[13, 14, 15, 24, 27, None, 26]\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "False"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "save_ipynb()\n",
    "time.sleep(1)\n",
    "check_execution_order(\"test_jupyter.ipynb\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "81841f17-c8d7-4a1e-b13d-d3f5a19adfd4",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Saving\n",
      "[13, 14, 15, 24, 27, 28, None]\n",
      "Notebook was already executed in order.\n"
     ]
    }
   ],
   "source": [
    "clear_and_rerun_notebook(\"test_jupyter.ipynb\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}