From a063470680e98425a4dc5dd6ab5e3bdbfed7ef32 Mon Sep 17 00:00:00 2001 From: Jae-Joon Lee Date: Thu, 18 Jul 2024 00:41:34 +0900 Subject: [PATCH] initial commit --- README_spherex.md | 40 +++++ run_sphrex_eazypy_on_ray.py | 276 +++++++++++++++++++++++++++++++++++ test_eazypy_save_tempfilt.py | 36 +++++ 3 files changed, 352 insertions(+) create mode 100644 README_spherex.md create mode 100644 run_sphrex_eazypy_on_ray.py create mode 100644 test_eazypy_save_tempfilt.py diff --git a/README_spherex.md b/README_spherex.md new file mode 100644 index 0000000..ec6c908 --- /dev/null +++ b/README_spherex.md @@ -0,0 +1,40 @@ +Gaia_DR3.LS.PS1DR1.CatWISE.AllWISE.2MASS_NSIDE32_006741.parquet +60M (size), ~380k rows +3min in dufftown w/ 32cores, ~15G memory. + +total catalog : 877G +~750hour with 32 cores. + +# requirement + +dust_attenuation-0.5.dev12+gdf42887-py3-none-any.whl +eazy-0.6.9.dev13+g5356e74.d20240716-py3-none-any.whl + ray + pyarrow + pandas + joblib + jax + jaxopt + +# from the client +> pip install ray[client]" pyarrow pandas joblib + +> pip install dust_attenuation-0.5.dev12+gdf42887-py3-none-any.whl eazy-0.6.9.dev13+g5356e74.d20240716-py3-none-any.whl ray pyarrow pandas joblib jax jaxopt + +# ray server + +> conda create -n ray_node_py310 python=3.10 smart_open virtualenv +> conda activate ray_node_py310 +> pip install ray +> ray start --head --port=6379 --node-ip-address=127.0.0.1 + +# client + +# These packages is to run this script. +# ray[client] +# pyarrow +# pandas +# joblib +# astropy +# pip install https://archive.kasi.re.kr/kgmt/public/spherex_eazy/eazy-0.6.9.dev14+gd5c524c-py3-none-any.whl # This is need to save the hdf output + diff --git a/run_sphrex_eazypy_on_ray.py b/run_sphrex_eazypy_on_ray.py new file mode 100644 index 0000000..8468948 --- /dev/null +++ b/run_sphrex_eazypy_on_ray.py @@ -0,0 +1,276 @@ +# --- +# jupyter: +# jupytext: +# text_representation: +# extension: .py +# format_name: light +# format_version: '1.5' +# jupytext_version: 1.16.3 +# kernelspec: +# display_name: Python 3 (ipykernel) +# language: python +# name: python3 +# --- + +import ray + + +# + +uri_root = "https://archive.kasi.re.kr/kgmt/public/spherex_eazy/" + +ray_env_requirements = ["ray", "joblib", "jax", "jaxopt"] + +ray_env_requirements.extend([ + uri_root + "dust_attenuation-0.5.dev12+gdf42887-py3-none-any.whl", + uri_root + "eazy-0.6.9.dev14+gd5c524c-py3-none-any.whl" +]) + +runtime_env = {"pip": ray_env_requirements, + "working_dir": uri_root + "spherex_eazypy_data.zip"} + +# > ray start --head --port=6379 --node-ip-address=127.0.0.1 +# node_ip_address='192.168.10.65' +# ray.init(_node_ip_address=node_ip_address, runtime_env=runtime_env) +ray.init(runtime_env=runtime_env) + + +# + + +resources = ray.cluster_resources() +print(resources) + + +# + +@ray.remote +def check_env(nodename): + import eazy + from pathlib import Path + files = Path(".").glob("*") + return nodename, list(files) + +refs = [] +for node in ray.nodes(): + nodename = node["NodeName"] + resources = {f'node:{nodename}': 0.01} + ref = check_env.options(resources=resources).remote(nodename) + refs.append(ref) + +for nodename, files in ray.get(refs): + print(nodename, files) + +# This may take a while since the packages need to be installed and workind dir contents need to be downloaded. +# - + +from pathlib import Path +import numpy as np +from astropy.table import Table + + +# + +import ray +from joblib import parallel_backend +from joblib.parallel import register_parallel_backend +from ray.util.joblib.ray_backend import RayBackend + +class RayBackend2(RayBackend): + supports_return_generator = True + + + +# - + +class SPHERExRef2Eazy: + bands_ref = [ + 'Gaia_G', 'Gaia_BP', 'Gaia_RP', + 'WISE_W1', 'WISE_W2', 'WISE_W3', 'WISE_W4', + '2MASS_J', '2MASS_H', '2MASS_Ks', + 'PS1_g', 'PS1_r', 'PS1_i', 'PS1_z', 'PS1_y', + 'LS_g', 'LS_r', 'LS_z', + ] + + # bands_err = [b + '_error' for b in bands_ref] + + bands_eazy = [ + 'Gaia_G', 'Gaia_BP', 'Gaia_RP', + 'WISE_W1', 'WISE_W2', 'WISE_W3', 'WISE_W4', + '2MASS_J', '2MASS_H', '2MASS_Ks', + 'PS1_g', 'PS1_r', 'PS1_i', 'PS1_z', 'PS1_y', + 'DECam_g', 'DECam_r', 'DECam_z', + ] + + column_names = ["id"] + [c for b in bands_eazy for c in (f"f_{b}", f"e_{b}")] + ["z_spec"] + + def get_eazy_table(self, r): + # r = sel_phot + cols = [r['SPHERExRefID']] + n = len(cols[0]) + + for b in self.bands_ref: + # Do not use Legacy Survey, instead use PanSTARRS + if b in ['LS_g', 'LS_r', 'LS_z']: + flux = flux_err = np.zeros((n,)) - 9999 + else: + flux, flux_err = r[b], r[f"{b}_error"] + cols.append(flux) + cols.append(flux_err) + cols.append(r["z"]) + + cattab = Table(cols, names=self.column_names) + return cattab + + def get_empty_eazy_table(self): + from collections import defaultdict + d = defaultdict(list) + return self.get_eazy_table(d) + + + +def do_single_catalog_simple(catfile, cattab, tempfilt_data, nproc=-1): + + import eazy + + register_parallel_backend("ray", RayBackend2) + + # %% + + # ## 2. Parameters for eazy + # outputfile = 'SPHERExRefCat_eazypy_specz_photoz' + params = {} + params['CATALOG_FILE'] = cattab + params['MAIN_OUTPUT_FILE'] = "" # outputfile + params['FIX_ZSPEC'] = 'n' + #params['TEMPLATES_FILE'] = './templates.spherex/brown_cosmos_ebv.param' + params['TEMPLATES_FILE'] = './templates/fsps_full/tweak_fsps_QSF_12_v3.param' + params['TEMPLATE_COMBOS'] = 1 # one template at a time [NOT WORKING], default = 99 or 'a' + params['N_MIN_COLORS'] = 3 # EAZY default value : 5 + params['APPLY_PRIOR'] = 'y' + params['PRIOR_FILE'] = 'templates/prior_K_TAO.dat' + params['PRIOR_FILTER'] = 163 # 2MASS Ks band + + params['Z_MAX'] = 6.0 + params['Z_STEP'] = 0.01 + + params['VERBOSITY'] = 0 + + # %% + + ez = eazy.photoz.PhotoZ( + param_file='zphot.param.default.RefCat.eazypy', + translate_file='zphot.RefCat.translate', + zeropoint_file=None, + params=params, + load_prior=True, + load_products=False, + tempfilt_data=tempfilt_data + ) + + with parallel_backend('ray'): + ez.fit_catalog(n_proc=nproc, fitter="nnls_jax", do_tqdm=False) + + # FIXME: replace the CATALOG_FILE to a string + + return catfile, ez + + + +# + +import eazy.hdf5 + +def hdf_save(catfile, i, ez): + + print("saving", catfile, i) + ez.param.params["CATALOG_FILE"] = catfile.name + ext = f"_{i:03d}.hdf" + outname = outdir / (catfile.with_suffix("").name + ext) + eazy.hdf5.write_hdf5(ez, h5file=outname) + + + +# + +catdir = Path("./catdir") +outdir = Path("./outdir") + +catfiles = list(sorted(catdir.glob("Gaia*006741.parquet"))) + +catfiles_to_run = catfiles[:10] +maxrow = 100_000 # 400k rows take about 3 minutes on 32 cores +nproc = 16 +cpus = ray.cluster_resources()["CPU"] +MAX_NUM_PENDING_CATALOGS = int(cpus // nproc) +print("NPROC:", nproc) +print("MAX_NUM_PENDING_CATALOGS:", MAX_NUM_PENDING_CATALOGS) +print("N_catalogs:", len(catfiles_to_run)) + + +# + +#from functools import partial +#do_single_catalog = ray.remote(partial(do_single_catalog_simple, nproc=nproc)) # somehow partial does not seem to work + +@ray.remote +def do_single_catalog(catfile, cattab, tempfilt_data, nproc=nproc): # to overwrite nproc + return do_single_catalog_simple(catfile, cattab, tempfilt_data, nproc=nproc) + + + +# - + +tempfilt_data = np.load("tempfilt.npy") +tempfilt_data_r = ray.put(tempfilt_data) + + +def get_cattab(catfiles, maxrow=None): + spherex2eazy = SPHERExRef2Eazy().get_eazy_table + + for catfile in catfiles: + phot = Table.read(catfile) + + if (maxrow is None) or len(phot) < maxrow: + cattab = spherex2eazy(phot) + yield catfile, 0, cattab + else: + count = 0 + for i, istart in enumerate(range(0, len(phot), maxrow)): + cattab = spherex2eazy(phot[istart:istart+maxrow]) + yield catfile, i, cattab + count += 1 + # if TEST_N is not None and count > TEST_N: return + + + +# + +def run_with_ntask(ray_func, batches, ntask): + """run the task with arguments list of batches, with maximum simultaneously running task smaller than ntask""" + + result_refs = [] + + for args in batches: + # catfile, cattab = ray.get(ref) + if len(result_refs) >= ntask: + ready_refs, result_refs = ray.wait(result_refs, num_returns=1) + for _ in ray.get(ready_refs): + yield _ + + task = ray_func.remote(*args) + result_refs.append(task) + + while result_refs: + ready_refs, result_refs = ray.wait(result_refs, num_returns=1) + + for _ in ray.get(ready_refs): + yield _ + +def get_batch_args(catfiles, maxrow): # generator fro function arguments + for catfile, i, cattab in get_cattab(catfiles, maxrow=maxrow): + yield (catfile, i), cattab, tempfilt_data_r + +batche_args = get_batch_args(catfiles_to_run, maxrow=maxrow) +for (catfile1, i1), ez in run_with_ntask(do_single_catalog, batche_args, MAX_NUM_PENDING_CATALOGS): + hdf_save(catfile1, i1, ez) + +# - + +print(1) + +ray.shutdown() + + diff --git a/test_eazypy_save_tempfilt.py b/test_eazypy_save_tempfilt.py new file mode 100644 index 0000000..7552166 --- /dev/null +++ b/test_eazypy_save_tempfilt.py @@ -0,0 +1,36 @@ +import numpy as np +import eazy +import eazy.hdf5 +# print("eazy version: ", eazy.__version__) + +# to make an empty table +from spherex_refcat_to_eazy import SPHERExRef2Eazy + +cattab_empty = SPHERExRef2Eazy().get_empty_eazy_table() + +params = {} +params['CATALOG_FILE'] = cattab_empty +# params['MAIN_OUTPUT_FILE'] = outputfile +params['FIX_ZSPEC'] = 'n' +#params['TEMPLATES_FILE'] = './templates.spherex/brown_cosmos_ebv.param' +params['TEMPLATES_FILE'] = './templates/fsps_full/tweak_fsps_QSF_12_v3.param' +params['TEMPLATE_COMBOS'] = 1 # one template at a time [NOT WORKING], default = 99 or 'a' +params['N_MIN_COLORS'] = 3 # EAZY default value : 5 +params['APPLY_PRIOR'] = 'y' +params['PRIOR_FILE'] = 'templates/prior_K_TAO.dat' +params['PRIOR_FILTER'] = 163 # 2MASS Ks band + +params['Z_MAX'] = 6.0 +params['Z_STEP'] = 0.01 + +ez = eazy.photoz.PhotoZ( + param_file='zphot.param.default.RefCat.eazypy', + translate_file='zphot.RefCat.translate', + zeropoint_file=None, + params=params, + load_prior=True, + load_products=False, +) + +#if False: +np.save("tempfilt.npy", ez.tempfilt.tempfilt) -- GitLab