diff --git a/run_sphrex_eazypy_on_ray.py b/run_sphrex_eazypy_on_ray.py index e5ab610120cd73517b797432b0bd2568f027ce0d..bd22f8cc0bd8afcfa87875ae8c5d575a422c9c51 100644 --- a/run_sphrex_eazypy_on_ray.py +++ b/run_sphrex_eazypy_on_ray.py @@ -217,6 +217,11 @@ outdir = Path("./outdir") catfiles = list(sorted(catdir.glob("Gaia*.parquet"))) catfiles_to_run = catfiles[:10] # we will only run the first 10 files for a test. +# - + +# # PARAMETERS for RAY TASKS +# +# In the cell below, you need to adjust the parameters so that they are suitable for your environment. `nproc` is a number of cores that will be used to process a single catalog, and `MAX_NUM_PENDING_CATALOGS` is a number of catalogs to process simultaneously. `nproc` x `MAX_NUM_PENDING_CATALOGS` should be similar to number of total cores in your ray cluster. A large catalog will be split into multiple smaller ones. `maxrow` sets the maximum number of rows for the split. maxrow = 400_000 # 400k rows take about 3 minutes on 32 cores nproc = 16 @@ -226,18 +231,14 @@ print("# of cores in the cluster:", cpus) print("NPROC:", nproc) print("MAX_NUM_PENDING_CATALOGS:", MAX_NUM_PENDING_CATALOGS) print("N_catalogs:", len(catfiles_to_run)) - + # + # This is the actual function that will be run on the ray cluster. Note that we override the value of npc. @ray.remote def do_single_catalog(catfile, cattab, tempfilt_data, nproc=nproc): # to override nproc - # FIXME FIXME FIXME FIXME - return do_single_catalog_simple(catfile, cattab[:10000], tempfilt_data, nproc=nproc) - -#from functools import partial -#do_single_catalog = ray.remote(partial(do_single_catalog_simple, nproc=nproc)) # somehow partial does not seem to work + return do_single_catalog_simple(catfile, cattab, tempfilt_data, nproc=nproc) @@ -280,8 +281,11 @@ def get_batch_args(catfiles, maxrow): # generator fro function arguments yield (I, catfile, i), cattab, tempfilt_data_r - # + +# The generator below and the commented out contents in the next cell is to run the eazypy in a blocking way +# (the cell will run until all the processing is done). On the othe hand, you may prefer to us asyncio version below. +# Your jupyter session will be still interactive while eazypy is being run in the background. + # A generator to push a catalog to the ray cluster with a maximum number pending catalogs. def run_with_ntask(ray_func, batches, ntask): @@ -316,7 +320,7 @@ def run_with_ntask(ray_func, batches, ntask): # + -# async io version +# asyncio version async def async_run_with_ntask(ray_func, batches, ntask): """run the task with arguments list of batches, with maximum simultaneously running task smaller than ntask""" @@ -338,9 +342,14 @@ async def async_run_with_ntask(ray_func, batches, ntask): for _ in ready_refs: yield await _ - +# - + + + # + +# easypy will be running as an asyncio background task. + import asyncio taskmap = [False] * len(catfiles_to_run) async def run_tasks(): @@ -351,13 +360,31 @@ async def run_tasks(): taskmap[I] = True #task = await run_tasks() +import time +ctime = time.time() task = asyncio.create_task(run_tasks()) + +# + +# Run this cell to check the status +# No that the catalog is marked to be processed if its first batch is done. + +n_catalog = len(catfiles_to_run) +n_finished = np.sum(taskmap) +elapsed_hour = (time.time() - ctime) / 3600. +remaining_hour = elapsed_hour / n_finished * (n_catalog - n_finished) + +print(f"{n_finished} out of {n_catalog} catalogs are processed for {elapsed_hour:.2f}h.") +print(f"{remaining_hour:.2f}h to process the rest.") + # - import matplotlib.pyplot as plt plt.plot(taskmap) #print(taskmap) + +# Note that this will block jupyter notebook until the processing is done. This is meand to do the garbage collecting +# after the processing is done. await task ray.shutdown() diff --git a/run_sphrex_eazypy_on_ray_for_vcs.ipynb b/run_sphrex_eazypy_on_ray_for_vcs.ipynb new file mode 100644 index 0000000000000000000000000000000000000000..0f1b0cc30cbda5b1e4b8fce1467067b2c32ab996 --- /dev/null +++ b/run_sphrex_eazypy_on_ray_for_vcs.ipynb @@ -0,0 +1,585 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "3ab403b0", + "metadata": { + "jp-MarkdownHeadingCollapsed": true + }, + "source": [ + "# Running eazypy on ray\n", + "\n", + "We will run `eazypy` with spherex referece catalog on the ray cluster (https://www.ray.io/).\n", + "The original `eazypy` (https://github.com/gbrammer/eazy-py) supports multi-core execution uwing python's `multiprocessing` module (https://docs.python.org/3/library/multiprocessing.html). This makes tricky to run eazypy on the multi-node cluster.\n", + "To make things simple, I have modified the eazypy (https://github.com/leejjoon/eazy-py) and replaced the use of multiprocessing with `joblib` module (https://joblib.readthedocs.io/en/stable/). `joblib` supports multiple backends including multiprocessing itself.\n", + "Furthermore, `ray` does provide its own implementation of joblib backend. On the other hand, using joblib with ray backend likely here may not be efficitn enough as this does not fully utilize ray's shared memory feautures.\n" + ] + }, + { + "cell_type": "markdown", + "id": "7f34926e", + "metadata": {}, + "source": [ + "# Setting up ray cluster\n", + "\n", + "All the nodes should have ray installed. We will simply use python 3.10 but the version should not matter. We will create a conda environment for ray together with two additional packages that are required.\n", + "\n", + "```sh\n", + "conda create -n ray_node_py310 python=3.10 smart_open virtualenv\n", + "conda activate ray_node_py310\n", + "pip install ray[default]\n", + "```\n", + "\n", + "On the head node, we start the master ray instance by\n", + "\n", + "```sh\n", + "ray start --head --port 6379\n", + "```\n", + "\n", + "For the rest of the node,\n", + "\n", + "```sh\n", + "ray start --address='IP_ADDRESS_OF_THE_HEAD_NODE:6379'\n", + "```\n", + "\n", + "The head and sibling nodes need to communicate. And several ports need to be open. The list of ports that need to be open is a bit ambiguous for now. And to make things easy, we simply open all the ports between the nodes. " + ] + }, + { + "cell_type": "markdown", + "id": "970ac81e", + "metadata": {}, + "source": [ + "# Runtime evironment\n", + "\n", + "`ray` allows you to set up your own runtimeenvironment to run your tasks. Your runtime environment will include python virtual environet with list of packages to be installd and the contents of the working directory. To run our eazypy taks, we will set up a virtual enviroment with `ray`, `joblib`, `jax` and `jaxopt` and the customized `eazy` package and the `dust_attenumation` package from their github repo (both are wheel-packaged and uloaded to KASI's minio storage which is publicly available. The workdir content can be set up from a zip file, and the zip file is also uploaded to the minio server, which contains all the necessary files to run eazypy. \n", + "\n", + "Note that the runtime environment won't include reference catalogs. Reference catalogs will be accessed by the node this notebook is running (see below) and will be transfered to the ray nodes by ray's own shared memory behind the scene." + ] + }, + { + "cell_type": "markdown", + "id": "55433d22", + "metadata": {}, + "source": [ + "# Running this notebook\n", + "\n", + "To run this notebook, your environment require these packages. `ray`, `pyarrow`, `pandas` `joblib`, `astropy` and `eazypy` (the original eazypy should suffice).\n", + "\n", + "This notebook should be run on the machine which has an access to the reference catalog and all the output hdf files will be saved in the same node. The reference catalogs are under `catdir` and the output hdf files will be saved under `outdir`.\n", + "\n", + "The template will also be read from the notebook. For now, we will use a pre-build template for the band set of reference catalog.\n", + "\n", + "Note that, for now, the node you are running this notebook should be a member of ray cluster. If you want to run this notebook from a node that is not a member of the ray cluster, you need to install `ray[client]` package and `ray.init` with a pointer to the head node.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4d95f08", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "import ray" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f5194e38", + "metadata": {}, + "outputs": [], + "source": [ + "# We will join the ray cluster with the runtime environment definition for eazypy.\n", + "\n", + "uri_root = \"https://archive.kasi.re.kr/kgmt/public/spherex_eazy/\"\n", + "\n", + "ray_env_requirements = [\"ray\", \"joblib\", \"jax\", \"jaxopt\"]\n", + "\n", + "ray_env_requirements.extend([\n", + " uri_root + \"dust_attenuation-0.5.dev12+gdf42887-py3-none-any.whl\",\n", + " uri_root + \"eazy-0.6.9.dev14+gd5c524c-py3-none-any.whl\"\n", + "])\n", + "\n", + "runtime_env = {\"pip\": ray_env_requirements,\n", + " \"working_dir\": uri_root + \"spherex_eazypy_data.zip\"}\n", + "\n", + "ray.init(runtime_env=runtime_env)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5c739653", + "metadata": {}, + "outputs": [], + "source": [ + "from pprint import pprint\n", + "resources = ray.cluster_resources()\n", + "pprint(resources)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c2386165", + "metadata": {}, + "outputs": [], + "source": [ + "# we will first run a simple test task so that runtime environement are set up in each node. Note that\n", + "# this is not necessary. We are just doing this to make sure that the runtime env is correctly set up.\n", + "\n", + "@ray.remote\n", + "def check_env(nodename):\n", + " import eazy\n", + " from pathlib import Path\n", + " files = Path(\".\").glob(\"*\")\n", + " return nodename, list(files)\n", + "\n", + "refs = []\n", + "for node in ray.nodes():\n", + " nodename = node[\"NodeName\"]\n", + " resources = {f'node:{nodename}': 0.01}\n", + " ref = check_env.options(resources=resources).remote(nodename)\n", + " refs.append(ref)\n", + " \n", + "for nodename, files in ray.get(refs):\n", + " print(nodename, \", \".join(sorted(p.name for p in files)))\n", + "\n", + "# This may take a while since the packages need to be installed and workind dir contents need to be downloaded.\n", + "# This will print out contents of the working directory set up in each node." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8b64f275", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "import numpy as np\n", + "from astropy.table import Table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39ac21b1", + "metadata": {}, + "outputs": [], + "source": [ + "# We will register ray backend for the joblib. Somehow the defulat RayBackend from ray does not support generator.\n", + "# We simply overwrite the class attribute so that generator can be supported.\n", + "\n", + "import ray\n", + "from joblib import parallel_backend\n", + "from joblib.parallel import register_parallel_backend\n", + "from ray.util.joblib.ray_backend import RayBackend\n", + "\n", + "class RayBackend2(RayBackend):\n", + " supports_return_generator = True\n", + "\n", + "# The registration itself should be done at each node and will be executed inside the ray task." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7567b984", + "metadata": {}, + "outputs": [], + "source": [ + "# This is is the main task that will be run on ray. We will overwrite the nproc parameter later. Note\n", + "# that the function need to be wraped by `ray.remote` and this will be done later.\n", + "\n", + "def do_single_catalog_simple(catfile, cattab, tempfilt_data, nproc=-1):\n", + "\n", + " import eazy\n", + "\n", + " register_parallel_backend(\"ray\", RayBackend2)\n", + "\n", + " # %%\n", + "\n", + " # ## 2. Parameters for eazy\n", + " # outputfile = 'SPHERExRefCat_eazypy_specz_photoz'\n", + " params = {}\n", + " params['CATALOG_FILE'] = cattab\n", + " params['MAIN_OUTPUT_FILE'] = \"\" # outputfile\n", + " params['FIX_ZSPEC'] = 'n'\n", + " #params['TEMPLATES_FILE'] = './templates.spherex/brown_cosmos_ebv.param'\n", + " params['TEMPLATES_FILE'] = './templates/fsps_full/tweak_fsps_QSF_12_v3.param'\n", + " params['TEMPLATE_COMBOS'] = 1 # one template at a time [NOT WORKING], default = 99 or 'a'\n", + " params['N_MIN_COLORS'] = 3 # EAZY default value : 5\n", + " params['APPLY_PRIOR'] = 'y'\n", + " params['PRIOR_FILE'] = 'templates/prior_K_TAO.dat'\n", + " params['PRIOR_FILTER'] = 163 # 2MASS Ks band\n", + "\n", + " params['Z_MAX'] = 6.0\n", + " params['Z_STEP'] = 0.01\n", + "\n", + " params['VERBOSITY'] = 0\n", + "\n", + " # %%\n", + "\n", + " ez = eazy.photoz.PhotoZ(\n", + " param_file='zphot.param.default.RefCat.eazypy',\n", + " translate_file='zphot.RefCat.translate',\n", + " zeropoint_file=None,\n", + " params=params,\n", + " load_prior=True,\n", + " load_products=False,\n", + " tempfilt_data=tempfilt_data\n", + " )\n", + "\n", + " with parallel_backend('ray'):\n", + " ez.fit_catalog(n_proc=nproc, fitter=\"nnls_jax\", do_tqdm=False)\n", + "\n", + " # FIXME: replace the CATALOG_FILE to a string\n", + "\n", + " return catfile, ez\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "46ed3e93", + "metadata": {}, + "outputs": [], + "source": [ + "# This is to save the output as a hdf. This will be run locally.\n", + "\n", + "import eazy.hdf5\n", + "\n", + "def hdf_save(catfile, i, ez):\n", + "\n", + " print(\"saving\", catfile, i)\n", + " ez.param.params[\"CATALOG_FILE\"] = catfile.name\n", + " ext = f\"_{i:03d}.hdf\"\n", + " outname = outdir / (catfile.with_suffix(\"\").name + ext) \n", + " eazy.hdf5.write_hdf5(ez, h5file=outname)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "469204dc", + "metadata": {}, + "outputs": [], + "source": [ + "# We will setup the list of reference catalog that will be processed. Some reference catalog is too big and\n", + "# better to be split to reduce the memory pressure. If any of the file has number of rows larger than `maxrow`, we will split the catalog.\n", + "# We also set number of nproc here.\n", + "\n", + "# The number of avaialbe cores in the cluster divided by `nproc` gives you a number of catalog that needs to be simultanesouly processed,\n", + "# which is `MAX_NUM_PENDING_CATALOGS`. `MAX_NUM_PENDING_CATALOGS` can be larger than the value defined here, but too large number will \n", + "# increase the memory of the queue.\n", + "\n", + "catdir = Path(\"./catdir\")\n", + "outdir = Path(\"./outdir\")\n", + "\n", + "catfiles = list(sorted(catdir.glob(\"Gaia*.parquet\")))\n", + "\n", + "catfiles_to_run = catfiles[:10] # we will only run the first 10 files for a test." + ] + }, + { + "cell_type": "markdown", + "id": "2ccb1095", + "metadata": {}, + "source": [ + "# PARAMETERS for RAY TASKS\n", + "\n", + "In the cell below, you need to adjust the parameters so that they are suitable for your environment. `nproc` is a number of cores that will be used to process a single catalog, and `MAX_NUM_PENDING_CATALOGS` is a number of catalogs to process simultaneously. `nproc` x `MAX_NUM_PENDING_CATALOGS` should be similar to number of total cores in your ray cluster. A large catalog will be split into multiple smaller ones. `maxrow` sets the maximum number of rows for the split." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91c59ada", + "metadata": {}, + "outputs": [], + "source": [ + "maxrow = 400_000 # 400k rows take about 3 minutes on 32 cores\n", + "nproc = 16\n", + "cpus = ray.cluster_resources()[\"CPU\"]\n", + "MAX_NUM_PENDING_CATALOGS = int(cpus // nproc)\n", + "print(\"# of cores in the cluster:\", cpus)\n", + "print(\"NPROC:\", nproc)\n", + "print(\"MAX_NUM_PENDING_CATALOGS:\", MAX_NUM_PENDING_CATALOGS)\n", + "print(\"N_catalogs:\", len(catfiles_to_run))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1d4493a4", + "metadata": {}, + "outputs": [], + "source": [ + "# This is the actual function that will be run on the ray cluster. Note that we override the value of npc.\n", + "\n", + "@ray.remote\n", + "def do_single_catalog(catfile, cattab, tempfilt_data, nproc=nproc): # to override nproc\n", + " return do_single_catalog_simple(catfile, cattab, tempfilt_data, nproc=nproc)\n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "61bd9247", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "# We load the pre-build template for the spherex reference catalog.\n", + "tempfilt_data = np.load(\"tempfilt.npy\")\n", + "tempfilt_data_r = ray.put(tempfilt_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bec4e973", + "metadata": {}, + "outputs": [], + "source": [ + "# A generator to open the reference catalog on the flux and convert it to the catalog for eazypy.\n", + "# This will also split the large catalog into smaller ones.\n", + "\n", + "from spherex_refcat_to_eazy import SPHERExRef2Eazy\n", + "\n", + "def get_cattab(catfiles, maxrow=None):\n", + " spherex2eazy = SPHERExRef2Eazy().get_eazy_table\n", + "\n", + " for I, catfile in enumerate(catfiles):\n", + " phot = Table.read(catfile)\n", + "\n", + " if (maxrow is None) or len(phot) < maxrow:\n", + " cattab = spherex2eazy(phot)\n", + " yield I, catfile, 0, cattab\n", + " else:\n", + " count = 0\n", + " for i, istart in enumerate(range(0, len(phot), maxrow)):\n", + " cattab = spherex2eazy(phot[istart:istart+maxrow])\n", + " yield I, catfile, i, cattab\n", + " count += 1\n", + " # if TEST_N is not None and count > TEST_N: return\n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a7722c1", + "metadata": {}, + "outputs": [], + "source": [ + "def get_batch_args(catfiles, maxrow): # generator fro function arguments\n", + " for I, catfile, i, cattab in get_cattab(catfiles, maxrow=maxrow):\n", + " yield (I, catfile, i), cattab, tempfilt_data_r" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f599b93e", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "# The generator below and the commented out contents in the next cell is to run the eazypy in a blocking way\n", + "# (the cell will run until all the processing is done). On the othe hand, you may prefer to us asyncio version below.\n", + "# Your jupyter session will be still interactive while eazypy is being run in the background. \n", + "\n", + "# A generator to push a catalog to the ray cluster with a maximum number pending catalogs.\n", + "\n", + "def run_with_ntask(ray_func, batches, ntask):\n", + " \"\"\"run the task with arguments list of batches, with maximum simultaneously running task smaller than ntask\"\"\"\n", + " \n", + " result_refs = []\n", + "\n", + " for args in batches:\n", + " # catfile, cattab = ray.get(ref)\n", + " if len(result_refs) >= ntask:\n", + " ready_refs, result_refs = ray.wait(result_refs, num_returns=1)\n", + " for _ in ray.get(ready_refs):\n", + " yield _\n", + "\n", + " task = ray_func.remote(*args)\n", + " result_refs.append(task)\n", + "\n", + " while result_refs:\n", + " ready_refs, result_refs = ray.wait(result_refs, num_returns=1)\n", + "\n", + " for _ in ray.get(ready_refs):\n", + " yield _\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04e18328", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "# This will run eazypy on the ray cluster, and it will take long.\n", + "\n", + "# batche_args = get_batch_args(catfiles_to_run, maxrow=maxrow)\n", + "# for (I, catfile1, i1), ez in run_with_ntask(do_single_catalog, batche_args, MAX_NUM_PENDING_CATALOGS):\n", + "# hdf_save(catfile1, i1, ez)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4a8f5d2", + "metadata": { + "lines_to_next_cell": 0 + }, + "outputs": [], + "source": [ + "# asyncio version\n", + "\n", + "async def async_run_with_ntask(ray_func, batches, ntask):\n", + " \"\"\"run the task with arguments list of batches, with maximum simultaneously running task smaller than ntask\"\"\"\n", + " \n", + " result_refs = set()\n", + "\n", + " for args in batches:\n", + " # catfile, cattab = ray.get(ref)\n", + " if len(result_refs) >= ntask:\n", + " ready_refs, result_refs = await asyncio.wait(result_refs)\n", + " for _ in ready_refs:\n", + " yield await _\n", + "\n", + " task = ray_func.remote(*args)\n", + " result_refs.add(task)\n", + "\n", + " while result_refs:\n", + " ready_refs, result_refs = await asyncio.wait(result_refs)\n", + "\n", + " for _ in ready_refs:\n", + " yield await _" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ce259f2f", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15598369", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2041ba31", + "metadata": {}, + "outputs": [], + "source": [ + "# easypy will be running as an asyncio background task.\n", + "\n", + "import asyncio\n", + "taskmap = [False] * len(catfiles_to_run)\n", + "async def run_tasks():\n", + " batch_args = get_batch_args(catfiles_to_run, maxrow=maxrow)\n", + " global taskmap\n", + " async for (I, catfile1, i1), ez in async_run_with_ntask(do_single_catalog, batch_args, MAX_NUM_PENDING_CATALOGS):\n", + " hdf_save(catfile1, i1, ez)\n", + " taskmap[I] = True\n", + "\n", + "#task = await run_tasks()\n", + "import time\n", + "ctime = time.time()\n", + "task = asyncio.create_task(run_tasks())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "78923c64", + "metadata": {}, + "outputs": [], + "source": [ + "# Run this cell to check the status\n", + "# No that the catalog is marked to be processed if its first batch is done.\n", + "\n", + "n_catalog = len(catfiles_to_run)\n", + "n_finished = np.sum(taskmap)\n", + "elapsed_hour = (time.time() - ctime) / 3600.\n", + "remaining_hour = elapsed_hour / n_finished * (n_catalog - n_finished)\n", + "\n", + "print(f\"{n_finished} out of {n_catalog} catalogs are processed for {elapsed_hour:.2f}h.\")\n", + "print(f\"{remaining_hour:.2f}h to process the rest.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ef89e138", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "plt.plot(taskmap)\n", + "#print(taskmap)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "081a1856", + "metadata": {}, + "outputs": [], + "source": [ + "# Note that this will block jupyter notebook until the processing is done. This is meand to do the garbage collecting \n", + "# after the processing is done.\n", + "await task" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "009ba940", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "ray.shutdown()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}