Friends-of-Friends Catalog from Horizon Run 4 Simulation

Introduction

We provide a FoF halo catalog at z = 0 from the Horizon Run 4 Simulation (Kim, Park, L'Huillier & Hong, 2015, JKAS, 48, 123). The basic parameters and propertis about the Horizon Run 4 can be found at http://sdss.kias.re.kr/eostm/halo.php.


On this webpage, we specifically release the catalog in parquet format, commonly used in big data platforms, and briefly demonstrate how to handle this big catalog using the modern big data platform, Apache Spark. The download link for the parquet table is here.


FYI, Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language. The details about Apache Parquet can be found at https://parquet.apache.org/

Demonstration: Handling Big Catalogs using Apache Spark

[1] How to write a csv in parquet using Apache Spark

Before demonstrating how to handle big parquet tables, we briefly show how to save a csv file as a parquet table. Though the csv format is human-readable and easy to read and save structured data sets, I strongly recommend to use parquet when handling Big Data for many good reasons.

Import basic libraries

In [1]:
# Basic common libraries
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree
import gc
In [2]:
# Basic PySpark Libraries 
from pyspark import SparkContext   
from pyspark.sql import SQLContext

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
from pyspark.sql.window import Window as W

# Initializing Spark Shell 
# This initialization process may vary for different cloud/on-premise services.
sqlsc = SQLContext(sc)
sc.setCheckpointDir("hdfs://master:54310/tmp/spark/checkpoints")
In [3]:
# Enabling Apache Arrow 
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

Read the data

In [4]:
# define the schema before reading the csv file
halo_schema = T.StructType([ \
                            T.StructField('haloid', T.IntegerType(), False), \
                            T.StructField('px', T.FloatType(), False), \
                            T.StructField('py', T.FloatType(), False), \
                            T.StructField('pz', T.FloatType(), False), \
                            T.StructField('vx', T.FloatType(), False), \
                            T.StructField('vy', T.FloatType(), False), \
                            T.StructField('vz', T.FloatType(), False), \
                            T.StructField('halomass', T.FloatType(), False), \
                          ])
In [5]:
%%time
halodf = sqlsc.read.csv("hdfs://master:54310/data/cosmo/hr4/halo_z0.csv",\
                        header=False, schema = halo_schema)
CPU times: user 4.53 ms, sys: 475 µs, total: 5.01 ms
Wall time: 1.55 s
In [6]:
halodf.show(3,truncate=True)
+---------+---------+---------+---------+----------+----------+---------+-------------+
|   haloid|       px|       py|       pz|        vx|        vy|       vz|     halomass|
+---------+---------+---------+---------+----------+----------+---------+-------------+
|322225520|106.23875|2820.2603|310.53067|   -593.39|  42.42728|117.49196|3.29162502E14|
|127093960|1015.0091| 3070.103|2687.5447|-361.36716| -34.88201|   980.29|  5.796312E14|
| 95586173|1150.7571| 656.3275|195.96417|  295.5281|-117.53244|203.30292| 7.4870011E14|
+---------+---------+---------+---------+----------+----------+---------+-------------+
only showing top 3 rows

In [7]:
halodf.printSchema()
root
 |-- haloid: integer (nullable = true)
 |-- px: float (nullable = true)
 |-- py: float (nullable = true)
 |-- pz: float (nullable = true)
 |-- vx: float (nullable = true)
 |-- vy: float (nullable = true)
 |-- vz: float (nullable = true)
 |-- halomass: float (nullable = true)

Save it as a parquet

In [8]:
import pyarrow as pa
import pyarrow.parquet as pq
In [9]:
%%time
halodf \
    .write.option("compression", "snappy") \
    .mode("overwrite") \
    .save("hdfs://master:54310/data/cosmo/hr4/hr4-fof-halo-z0.parquet.snappy")
CPU times: user 17.7 ms, sys: 0 ns, total: 17.7 ms
Wall time: 2min 8s

[2] How to read and analyze this big parquet table

Read the parquet table to a dataframe df

In [10]:
%%time
df = spark.read.parquet("hdfs://master:54310/data/cosmo/hr4/hr4-fof-halo-z0.parquet.snappy")
CPU times: user 2.97 ms, sys: 0 ns, total: 2.97 ms
Wall time: 348 ms

Show the basic statistics using .describe()

The default PySpark output using .describe().show(). But this does not look pretty.

In [11]:
%%time
df.describe().show()
+-------+--------------------+-----------------+------------------+-----------------+-------------------+-------------------+-------------------+--------------------+
|summary|              haloid|               px|                py|               pz|                 vx|                 vy|                 vz|            halomass|
+-------+--------------------+-----------------+------------------+-----------------+-------------------+-------------------+-------------------+--------------------+
|  count|           362123180|        362123180|         362123180|        362123180|          362123180|          362123180|          362123180|           362123180|
|   mean|1.8106158949999997E8|1574.948769942889|1574.5065974890745|1575.757963181069|0.05927183167360858|0.03251674191949916|0.04197048582812971|2.309020271127732E12|
| stddev|1.0453595787073964E8|909.2939271893532| 909.5066398131257|909.4171871354725| 291.90762587818676|  292.2171983689464|  291.0346444428143|1.323339007570650...|
|    min|                   0|        -3.628511|         -3.603208|         0.163826|         -2907.2056|         -2714.5393|         -2624.5703|       2.70611202E11|
|    max|           362123179|        3149.9463|         3149.9321|        3154.1765|          2742.1318|          2738.2966|           2734.925|        5.6967899E15|
+-------+--------------------+-----------------+------------------+-----------------+-------------------+-------------------+-------------------+--------------------+

CPU times: user 18.2 ms, sys: 1.77 ms, total: 19.9 ms
Wall time: 2min 25s
In [12]:
%%time
df.describe().toPandas().set_index('summary').transpose()
/home/shong/anaconda3/lib/python3.7/site-packages/pyarrow/util.py:39: FutureWarning: pyarrow.open_stream is deprecated as of 0.17.0, please use pyarrow.ipc.open_stream instead
  warnings.warn(msg, FutureWarning)
CPU times: user 48.5 ms, sys: 961 µs, total: 49.4 ms
Wall time: 2min 26s
Out[12]:
summary count mean stddev min max
haloid 362123180 1.8106158949999994E8 1.0453595787073967E8 0 362123179
px 362123180 1574.948769942889 909.2939271893531 -3.628511 3149.9463
py 362123180 1574.5065974890745 909.5066398131257 -3.603208 3149.9321
pz 362123180 1575.7579631810702 909.4171871354725 0.163826 3154.1765
vx 362123180 0.05927183167360856 291.9076258781867 -2907.2056 2742.1318
vy 362123180 0.03251674191949915 292.2171983689464 -2714.5393 2738.2966
vz 362123180 0.04197048582812967 291.0346444428142 -2624.5703 2734.925
halomass 362123180 2.309020271127733E12 1.3233390075706514E13 2.70611202E11 5.6967899E15

The total number of halos is over 362 millions.

Visualize the halo distribution

Select a 2 Mpc slice as slicedf, where df.pz < 2.0

In [13]:
%%time
slicedf = df.filter(df.pz < 2.0).select(['px','py','pz']).toPandas()
CPU times: user 20.1 ms, sys: 16.3 ms, total: 36.4 ms
Wall time: 1.91 s
In [14]:
len(slicedf.index)
Out[14]:
193071

Plot this halo slice

In [15]:
# plot settings
plt.rc('font', family='serif') 
plt.rc('font', serif='Times New Roman') 
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'


fig = plt.figure(figsize=(15,15))

plt.axis([0,3150,0,3150])
#plt.axes().set_aspect('equal', 'datalim')
plt.title(r'Horizon Run 4 : Halo Slice where $z < 2$ $h^{-1}$ Mpc')
plt.xlabel(r'$x$ [$h^{-1}$ Mpc]')
plt.ylabel(r'$y$ [$h^{-1}$ Mpc]')
plt.scatter(slicedf.px.values,slicedf.py.values,marker=".",s=1.0,color='grey')


plt.show()

[3] Practical Example

We will extract $31^3$ sub-volumes with the size, $L_{sub} = 100 h^{-1}$Mpc, from the whole HR4 catalog.

We will generate a training sample of spatial halo distributions with a box size, [$100 h^{-1}$Mpc]$^3$, from the whole halo catalog. The extracted sample of $31^3 = 29791$ sub-boxes is large enough to be used as a training sample for Deep Learning projects.

[3.1] Assign a division (or, grid) index to each halo.

In [16]:
lsys = 3150.0
mlength = 100.0
mdiv = 31
In [17]:
subdf = df \
        .withColumn('ixdiv',F.floor(df.px/mlength)) \
        .withColumn('iydiv',F.floor(df.py/mlength)) \
        .withColumn('izdiv',F.floor(df.pz/mlength))
In [18]:
# Some buggy negative coordinate values. We replace these negative indices with `31`
subdf = subdf \
        .withColumn('ixdiv',F.when(subdf.ixdiv == -1, 31).otherwise(subdf.ixdiv)) \
        .withColumn('iydiv',F.when(subdf.iydiv == -1, 31).otherwise(subdf.iydiv)) \
        .withColumn('izdiv',F.when(subdf.izdiv == -1, 31).otherwise(subdf.izdiv))

IMPORTANT NOTE: Most Big Data plaforms use the scheme, called lazy evaluation, not easy to be explained in this short notebook. Please be advised by this link for details: https://en.wikipedia.org/wiki/Lazy_evaluation#:~:text=In%20programming%20language%20theory%2C%20lazy,avoids%20repeated%20evaluations%20(sharing).

In [19]:
subdf.cache()
Out[19]:
DataFrame[haloid: int, px: float, py: float, pz: float, vx: float, vy: float, vz: float, halomass: float, ixdiv: bigint, iydiv: bigint, izdiv: bigint]
In [20]:
%%time
subdf.show(10,truncate=True)
+---------+---------+---------+---------+----------+----------+----------+-------------+-----+-----+-----+
|   haloid|       px|       py|       pz|        vx|        vy|        vz|     halomass|ixdiv|iydiv|izdiv|
+---------+---------+---------+---------+----------+----------+----------+-------------+-----+-----+-----+
|322225520|106.23875|2820.2603|310.53067|   -593.39|  42.42728| 117.49196|3.29162502E14|    1|   28|    3|
|127093960|1015.0091| 3070.103|2687.5447|-361.36716| -34.88201|    980.29|  5.796312E14|   10|   30|   26|
| 95586173|1150.7571| 656.3275|195.96417|  295.5281|-117.53244| 203.30292| 7.4870011E14|   11|    6|    1|
|248741686|2745.5166|2020.5542|839.78326| 149.90967| -512.4188|-661.92096|1.55998296E14|   27|   20|    8|
|292585703|1129.6155|1974.3558|1253.1699|-332.87952|-214.82208| -405.1472|3.26916402E14|   11|   19|   12|
|274107747|589.67566|2289.6702|2542.1628|-151.16937| 267.27695|-358.16284|2.77196099E14|    5|   22|   25|
| 59442017|1486.9604| 2534.887| 2683.816| 119.70858| -239.6665| 208.11072|1.08109201E14|   14|   25|   26|
| 93782908|3036.3958|13.892702|2686.4326|-382.11667| -679.7706| 241.65086|1.14170901E14|   30|    0|   26|
|355104768|1077.6719|230.39247|2675.5627|-205.13126|-125.70828| -77.81846| 7.3316702E14|   10|    2|   26|
|324447992|1666.9918|658.03186| 472.5904|-160.42426| 240.42766| -164.4379|5.46760913E14|   16|    6|    4|
+---------+---------+---------+---------+----------+----------+----------+-------------+-----+-----+-----+
only showing top 10 rows

CPU times: user 1.23 ms, sys: 581 µs, total: 1.81 ms
Wall time: 3.64 s

Sanity Check for idiv values

In [21]:
%%time
ixmin = subdf.agg({"ixdiv":"min"}).collect()[0][0]
iymin = subdf.agg({"iydiv":"min"}).collect()[0][0]
izmin = subdf.agg({"izdiv":"min"}).collect()[0][0]
print("idiv code min = ", ixmin," ",iymin," ",izmin)
idiv code min =  0   0   0
CPU times: user 13.7 ms, sys: 6.69 ms, total: 20.4 ms
Wall time: 28.2 s
In [22]:
%%time
ixmax = subdf.agg({"ixdiv":"max"}).collect()[0][0]
iymax = subdf.agg({"iydiv":"max"}).collect()[0][0]
izmax = subdf.agg({"izdiv":"max"}).collect()[0][0]
print("idiv code max = ", ixmax," ",iymax," ",izmax)
idiv code max =  31   31   31
CPU times: user 14.5 ms, sys: 2.99 ms, total: 17.5 ms
Wall time: 1.25 s

[3.2] Select top 1000 halos in mass for each subvolume, then merge them all into a single dataframe.

In [23]:
%%time
#  ixdiv, iydiv, izdiv, from 0 to 30
#from tqdm import tqdm
idmax = 31 # idmax=31 == [0, ... ,30]
traindf = pd.DataFrame()
icounter = 0
numhalo = 2000 # how many halos, selected from each sub-volume
for ix in range(idmax):
    for iy in range(idmax):
        for iz in range(idmax):
            gc.collect()
            tmppd = subdf.filter((F.col('ixdiv') ==ix) & (F.col('iydiv') ==iy) & (F.col('izdiv') ==iz))\
                    .select('haloid','px','py','pz','halomass').toPandas()
            tmppd = tmppd.sort_values(by='halomass',ascending=False)[0:numhalo] #sort the DF based on halomass
            tmppd['isect'] = np.full(numhalo,icounter) #id for each section 
            
            traindf = pd.concat([traindf,tmppd])
            icounter = icounter + 1
CPU times: user 1d 2h 16min 41s, sys: 9h 19min 8s, total: 1d 11h 35min 49s
Wall time: 9h 55min 42s

These triple loops are an ad-hoc implementation, since I only need to run this process once. If you find a better way to reduce the running time with more optimized codes, you can learn about pandas udf; especially, the GROUPED_MAP. This can boost up the speed to extract sub-volumes as grouped-maps. For details about pandas udf, https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html#:~:text=Grouped%20map%20Pandas%20UDFs%20first,as%20a%20new%20Spark%20DataFrame%20.

In [24]:
traindf = traindf.reset_index(drop=True)
In [25]:
traindf.head(3)
Out[25]:
haloid px py pz halomass isect
0 318792177 98.907692 47.874481 65.046234 3.037070e+14 0
1 7145778 0.998974 13.699450 33.076996 1.684916e+14 0
2 303802715 86.756271 74.397171 88.292831 1.273767e+14 0
In [26]:
# Save this train sample to a parquet table
pq.write_table(pa.Table.from_pandas(traindf), \
               'hdfs://master:54310/data/cosmo/hr4/hr4-z0-train.parquet.snappy', compression='snappy')