From 4d7f78ce9d379a53457c735c79044ac4fe6f12c3 Mon Sep 17 00:00:00 2001 From: Jae-Joon Lee Date: Sun, 26 Jun 2022 22:45:56 +0900 Subject: [PATCH] add a notebook --- example 1.ipynb | 452 ++++++++++++++++++++++++++++++++++++++++++++++++ ingest_csv.py | 10 +- load_duckdb.py | 110 ++++++++++++ query_duckdb.py | 58 +++---- 4 files changed, 594 insertions(+), 36 deletions(-) create mode 100644 example 1.ipynb create mode 100644 load_duckdb.py diff --git a/example 1.ipynb b/example 1.ipynb new file mode 100644 index 0000000..17e3ed1 --- /dev/null +++ b/example 1.ipynb @@ -0,0 +1,452 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "a6ed759e", + "metadata": {}, + "outputs": [], + "source": [ + "import ray\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "a12d47c0", + "metadata": {}, + "outputs": [], + "source": [ + "from ray.util import ActorPool" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "5908285e", + "metadata": {}, + "outputs": [], + "source": [ + "runtime_env = {\"working_dir\": \".\", \"excludes\": [\"data\"],\n", + " \"pip\": [\"numpy\", \"astropy\", \"pyarrow\", \"duckdb\", \"cdshealpix\"]}\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "6e89a708", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2022-06-21 19:34:27,339\tINFO packaging.py:414 -- Creating a file package for local directory '.'.\n", + "2022-06-21 19:34:27,379\tINFO packaging.py:258 -- Pushing file package 'gcs://_ray_pkg_80c096dfd4365ba6.zip' (0.05MiB) to Ray cluster...\n", + "2022-06-21 19:34:27,397\tINFO packaging.py:267 -- Successfully pushed file package 'gcs://_ray_pkg_80c096dfd4365ba6.zip'.\n" + ] + }, + { + "data": { + "text/plain": [ + "ClientContext(dashboard_url='10.100.64.142:8265', python_version='3.9.5', ray_version='1.13.0', ray_commit='e4ce38d001dbbe09cd21c497fedd03d692b2be3e', protocol_version='2022-03-16', _num_clients=1, _context_to_restore=)" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray.init(\"ray://210.219.33.214:10001\", runtime_env=runtime_env)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "cdc69bac", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\"\"" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# check if dependencies are correctly installed\n", + "@ray.remote\n", + "def test():\n", + " import query_duckdb\n", + " return str(query_duckdb)\n", + "\n", + "ray.get(test.remote())" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "7ff0c59e", + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq\n", + "from astropy.coordinates import Longitude, Latitude\n", + "import astropy.units as u\n", + "import pandas as pd\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "031e5670", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "number of rows : 11804\n" + ] + }, + { + "data": { + "text/plain": [ + "pyarrow.Table\n", + "RA_2000: double\n", + "DEC_2000: double\n", + "hp20: uint64\n", + "RRISA_IDX: int64\n", + "----\n", + "RA_2000: [[44.819125,44.82058333333333,46.82104166666666,48.57745833333333,45.76116666666666,...,315.026375,320.011375,314.66649999999987,313.3123999999999,313.3123999999999]]\n", + "DEC_2000: [[1.2445833333333334,1.2458055555555556,2.015083333333333,4.8295555555555545,4.166777777777779,...,-5.093388888888889,-4.959055555555556,-2.8131388888888886,-2.3554888888888894,-2.3554]]\n", + "hp20: [[634175348,634187296,1205230641,6018545421,6620828486,...,13181215315778,13182652146362,13190847277265,13192624919968,13192624919971]]\n", + "RRISA_IDX: [[7707,3981,828,4685,574,...,7727,3977,1871,7524,7454]]" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# load the data\n", + "\n", + "arrow = pq.read_table(\"./data/xmatch_log.parquet\")\n", + "print(\"number of rows : \", len(arrow))\n", + "arrow" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "2b7e5179", + "metadata": {}, + "outputs": [], + "source": [ + "# push to the shared memory area.\n", + "arrow_ref = ray.put(arrow)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "b3a8d989", + "metadata": {}, + "outputs": [], + "source": [ + "from query_duckdb import DuckDbHpxQuery\n", + "DuckDbHpxQueryActor = ray.remote(DuckDbHpxQuery)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "c65d5ad3", + "metadata": {}, + "outputs": [], + "source": [ + "hpidx_key = \"hp20\"\n", + "depth = 20\n", + "\n", + "actors = [DuckDbHpxQueryActor.remote(arrow_ref, hpidx_key, depth)\n", + " for _ in range(20)]\n", + "pool = ActorPool(actors)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "3f9186b3", + "metadata": {}, + "outputs": [], + "source": [ + "ra = Longitude(\"20 17 47.2020844\", unit=\"hourangle\")\n", + "dec = Latitude(\"+38 01 58.552724\", unit=\"deg\")\n", + "rad = 1. * u.arcmin" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "fda04c5d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
RA_2000DEC_2000hp20RRISA_IDX
0304.4507538.03130639315088862047966
\n", + "
" + ], + "text/plain": [ + " RA_2000 DEC_2000 hp20 RRISA_IDX\n", + "0 304.45075 38.031306 3931508886204 7966" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# single call\n", + "r = actors[0].query_pandas.remote(ra, dec, rad, 11)\n", + "df = ray.get(r)\n", + "df\n" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "1a6ee469", + "metadata": {}, + "outputs": [], + "source": [ + "# This is just for a demonstration purpose.\n", + "gen = pool.map(lambda a, kw: a.query_pandas.remote(*kw),\n", + " [(ra, dec, rad, 11)] * 1000)\n", + "\n", + "dd = pd.concat(gen)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "c1ec8747", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
RA_2000DEC_2000hp20RRISA_IDX
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
...............
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
0304.4507538.03130639315088862047966
\n", + "

1000 rows × 4 columns

\n", + "
" + ], + "text/plain": [ + " RA_2000 DEC_2000 hp20 RRISA_IDX\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + ".. ... ... ... ...\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "0 304.45075 38.031306 3931508886204 7966\n", + "\n", + "[1000 rows x 4 columns]" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dd" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "be8a36c0", + "metadata": {}, + "outputs": [], + "source": [ + "ray.shutdown()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b95ae682", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ingest_csv.py b/ingest_csv.py index 2b95166..42f8342 100644 --- a/ingest_csv.py +++ b/ingest_csv.py @@ -18,10 +18,14 @@ def ingest(): lon, lat = (Longitude(df['RA_2000'] << u.deg), Latitude(df['DEC_2000'] << u.deg)) - hp11 = cdshealpix.lonlat_to_healpix(lon, lat, 11) - df["hp11"] = hp11 + hp20 = cdshealpix.lonlat_to_healpix(lon, lat, 20) + df["hp20"] = hp20 - df[["RA_2000", "DEC_2000", "hp11"]].to_parquet("xmatch_log.parquet") + # df[["RA_2000", "DEC_2000", "hp11"]].to_parquet("xmatch_log.parquet") + df = df[["RA_2000", "DEC_2000", "hp20", "RRISA_IDX"]] + df1 = df.sort_values(by="hp20") + + df1.to_parquet("xmatch_log.parquet", index=False) def test(): df = pd.read_parquet("xmatch_log.parquet") diff --git a/load_duckdb.py b/load_duckdb.py new file mode 100644 index 0000000..1435dd9 --- /dev/null +++ b/load_duckdb.py @@ -0,0 +1,110 @@ +# This is modified version to do some timing experiment. +# Use 'query_duckdb.py' instead. + +import duckdb +from healpix_pixrange import cone_search_pixrange +from astropy.coordinates import Longitude, Latitude +import astropy.units as u + +def get_filter(ra, dec, rad, max_depth): + """ + ra, dec, rad : value with proper units. + """ + max_depth = 11 + ddepth = 11 - max_depth + clon = Longitude(ra) + clat = Latitude(dec) + + k1, k2 = cone_search_pixrange(clon, clat, rad, max_depth) + + # shift operator does not work with numpy arrays with dtype=uint64 + filters = ['(hp11 BETWEEN {} AND {})'.format( + int(i1) << (2*ddepth), int(i2) << (2*ddepth) + ) for i1, i2 in zip(k1, k2)] + + return " OR ".join(filters) + + +def test2(): + import pyarrow as pa + import pyarrow.parquet as pq + + arrow = pq.read_table("xmatch_log.parquet") + df = arrow.to_pandas() + import pandas as pd + df2 = pd.concat([df]*100).reset_index() + # arrow = pa.Table.from_pydict({'i':[1,2,3,4], + # 'j':["one", "two", "three", "four"]}) + + arrow = pa.Table.from_pandas(df2) + + con = duckdb.connect() + + # con.execute('SELECT * FROM arrow') + # r = con.df() + + # q = """ + # CREATE TABLE igrins_tbl AS SELECT * FROM arrow; + # """ + + # r = con.execute(q).fetchall() + + ra = Longitude("20 17 47.2020844", unit="hourangle") + dec = Latitude("+38 01 58.552724", unit="deg") + + # ra, dec = 10.2 * u.deg, 20.5 * u.deg + rad = 1. * u.arcmin + + def gget(): + healpix_filter = get_filter(ra, dec, rad, 11) + + q = ("SELECT * " + # "FROM 'igrins_tbl' " + "FROM arrow " + "WHERE {} " + "LIMIT 100" + ).format(healpix_filter) + + r = con.execute(q).df() + return r + + # %timeit df = con.execute(q).df() + # 2.53 ms ± 303 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) + + # len = x10 + # 6 ms + + # len = x100 + # 36 ms + + from astropy.coordinates import SkyCoord + df = arrow.to_pandas() + df = df2 + c = SkyCoord(df["RA_2000"], df["DEC_2000"], unit="deg") + def get(): + sep = c.separation((SkyCoord(ra, dec))) + s = df[sep.arcmin < 1] + return s + + # %timeit s = get() + # 3 ms ± 276 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) + + # len = x 10 + # 15 ms + # len = x 100 + # 172ms + +def test(): + + db = duckdb.from_parquet("xmatch_log.parquet") + + q = ("SELECT * " + "FROM '{}' " + "WHERE hp20 BETWEEN 7852273965097 AND 7852273967343 " + "LIMIT 500" + ) + fn0 = fn.format(mm) + _ = duckdb.query(q.format(fn0)) + v = _.to_arrow_table() + + # query() diff --git a/query_duckdb.py b/query_duckdb.py index 462044c..8888718 100644 --- a/query_duckdb.py +++ b/query_duckdb.py @@ -1,11 +1,12 @@ import numpy as np import duckdb -from healpix_pixrange import cone_search_pixrange from astropy.coordinates import Longitude, Latitude import astropy.units as u import pyarrow as pa import pyarrow.parquet as pq +from healpix_pixrange import cone_search_pixrange + class DuckDbHpxQuery: def __init__(self, arrow, hpidx_key, depth): self._arrow = arrow @@ -48,6 +49,10 @@ class DuckDbHpxQuery: depth_approx = np.log2(nside_approx) return int(np.ceil(depth_approx + 1.)) + def query_pandas(self, ra, dec, rad, max_depth, nlimit=1024): + r = self.query(ra, dec, rad, max_depth, nlimit=nlimit) + return r.df() + def query(self, ra, dec, rad, max_depth, nlimit=1024): """ query based on healpix id. Note that it will contain some points that are outside the query radius. @@ -64,13 +69,12 @@ class DuckDbHpxQuery: ).format(self.tbl_name, healpix_filter, nlimit) r = self.con.execute(q) - return r def mytest(): arrow = pq.read_table("xmatch_log.parquet") - hpidx_key = "hp11" - depth = 11 + hpidx_key = "hp20" + depth = 20 db = DuckDbHpxQuery(arrow, hpidx_key, depth) @@ -82,44 +86,32 @@ def mytest(): df = r.df() -def get_filter(ra, dec, rad, max_depth): - """ - ra, dec, rad : value with proper units. - """ - max_depth = 11 - ddepth = 11 - max_depth - clon = Longitude(ra) - clat = Latitude(dec) +def ray_test(): + # ray example + import ray + ray.init() - k1, k2 = cone_search_pixrange(clon, clat, rad, max_depth) + from ray.util import ActorPool - # shift operator does not work with numpy arrays with dtype=uint64 - filters = ['(hp11 BETWEEN {} AND {})'.format( - int(i1) << (2*ddepth), int(i2) << (2*ddepth) - ) for i1, i2 in zip(k1, k2)] + DuckDbHpxQueryActor = ray.remote(DuckDbHpxQuery) - return " OR ".join(filters) + arrow = pq.read_table("xmatch_log.parquet") + hpidx_key = "hp20" + depth = 20 -def test2(): + arrow_ref = ray.put(arrow) - arrow = pq.read_table("xmatch_log.parquet") + actors = [DuckDbHpxQueryActor.remote(arrow_ref, hpidx_key, depth) + for _ in range(4)] + pool = ActorPool(actors) - con = duckdb.connect() ra = Longitude("20 17 47.2020844", unit="hourangle") dec = Latitude("+38 01 58.552724", unit="deg") - - # ra, dec = 10.2 * u.deg, 20.5 * u.deg rad = 1. * u.arcmin - healpix_filter = get_filter(ra, dec, rad, 11) - - q = ("SELECT * " - # "FROM 'igrins_tbl' " - "FROM arrow " - "WHERE {} " - "LIMIT 100" - ).format(healpix_filter) - - r = con.execute(q).df() + r = actors[0].query_pandas.remote(ra, dec, rad, 11) + df = ray.get(r) + gen = pool.map(lambda a, kw: a.query_pandas.remote(*kw), + [(ra, dec, rad, 11)] * 100) -- GitLab