We provide a FoF halo catalog at z = 0 from the Horizon Run 4 Simulation (Kim, Park, L'Huillier & Hong, 2015, JKAS, 48, 123). The basic parameters and propertis about the Horizon Run 4 can be found at http://sdss.kias.re.kr/eostm/halo.php.
On this webpage, we specifically release the catalog in parquet format, commonly used in big data platforms, and briefly demonstrate how to handle this big catalog using the modern big data platform, Apache Spark. The download link for the parquet table is here.
FYI, Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language. The details about Apache Parquet can be found at https://parquet.apache.org/
csv
in parquet
using Apache Spark¶Before demonstrating how to handle big parquet tables, we briefly show how to save a
csv
file as aparquet
table. Though thecsv
format is human-readable and easy to read and save structured data sets, I strongly recommend to useparquet
when handling Big Data for many good reasons.
# Basic common libraries
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree
import gc
# Basic PySpark Libraries
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
from pyspark.sql.window import Window as W
# Initializing Spark Shell
# This initialization process may vary for different cloud/on-premise services.
sqlsc = SQLContext(sc)
sc.setCheckpointDir("hdfs://master:54310/tmp/spark/checkpoints")
# Enabling Apache Arrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# define the schema before reading the csv file
halo_schema = T.StructType([ \
T.StructField('haloid', T.IntegerType(), False), \
T.StructField('px', T.FloatType(), False), \
T.StructField('py', T.FloatType(), False), \
T.StructField('pz', T.FloatType(), False), \
T.StructField('vx', T.FloatType(), False), \
T.StructField('vy', T.FloatType(), False), \
T.StructField('vz', T.FloatType(), False), \
T.StructField('halomass', T.FloatType(), False), \
])
%%time
halodf = sqlsc.read.csv("hdfs://master:54310/data/cosmo/hr4/halo_z0.csv",\
header=False, schema = halo_schema)
halodf.show(3,truncate=True)
halodf.printSchema()
import pyarrow as pa
import pyarrow.parquet as pq
%%time
halodf \
.write.option("compression", "snappy") \
.mode("overwrite") \
.save("hdfs://master:54310/data/cosmo/hr4/hr4-fof-halo-z0.parquet.snappy")
parquet
table¶df
¶%%time
df = spark.read.parquet("hdfs://master:54310/data/cosmo/hr4/hr4-fof-halo-z0.parquet.snappy")
.describe()
¶.describe().show()
. But this does not look pretty.¶%%time
df.describe().show()
pandas
dataframe¶%%time
df.describe().toPandas().set_index('summary').transpose()
The total number of halos is over 362 millions.
slicedf
, where df.pz < 2.0
¶%%time
slicedf = df.filter(df.pz < 2.0).select(['px','py','pz']).toPandas()
len(slicedf.index)
# plot settings
plt.rc('font', family='serif')
plt.rc('font', serif='Times New Roman')
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'
fig = plt.figure(figsize=(15,15))
plt.axis([0,3150,0,3150])
#plt.axes().set_aspect('equal', 'datalim')
plt.title(r'Horizon Run 4 : Halo Slice where $z < 2$ $h^{-1}$ Mpc')
plt.xlabel(r'$x$ [$h^{-1}$ Mpc]')
plt.ylabel(r'$y$ [$h^{-1}$ Mpc]')
plt.scatter(slicedf.px.values,slicedf.py.values,marker=".",s=1.0,color='grey')
plt.show()
We will generate a training sample of spatial halo distributions with a box size, [$100 h^{-1}$Mpc]$^3$, from the whole halo catalog. The extracted sample of $31^3 = 29791$ sub-boxes is large enough to be used as a training sample for Deep Learning projects.
lsys = 3150.0
mlength = 100.0
mdiv = 31
subdf = df \
.withColumn('ixdiv',F.floor(df.px/mlength)) \
.withColumn('iydiv',F.floor(df.py/mlength)) \
.withColumn('izdiv',F.floor(df.pz/mlength))
# Some buggy negative coordinate values. We replace these negative indices with `31`
subdf = subdf \
.withColumn('ixdiv',F.when(subdf.ixdiv == -1, 31).otherwise(subdf.ixdiv)) \
.withColumn('iydiv',F.when(subdf.iydiv == -1, 31).otherwise(subdf.iydiv)) \
.withColumn('izdiv',F.when(subdf.izdiv == -1, 31).otherwise(subdf.izdiv))
IMPORTANT NOTE: Most Big Data plaforms use the scheme, called lazy evaluation, not easy to be explained in this short notebook. Please be advised by this link for details: https://en.wikipedia.org/wiki/Lazy_evaluation#:~:text=In%20programming%20language%20theory%2C%20lazy,avoids%20repeated%20evaluations%20(sharing).
subdf.cache()
%%time
subdf.show(10,truncate=True)
idiv
values¶%%time
ixmin = subdf.agg({"ixdiv":"min"}).collect()[0][0]
iymin = subdf.agg({"iydiv":"min"}).collect()[0][0]
izmin = subdf.agg({"izdiv":"min"}).collect()[0][0]
print("idiv code min = ", ixmin," ",iymin," ",izmin)
%%time
ixmax = subdf.agg({"ixdiv":"max"}).collect()[0][0]
iymax = subdf.agg({"iydiv":"max"}).collect()[0][0]
izmax = subdf.agg({"izdiv":"max"}).collect()[0][0]
print("idiv code max = ", ixmax," ",iymax," ",izmax)
%%time
# ixdiv, iydiv, izdiv, from 0 to 30
#from tqdm import tqdm
idmax = 31 # idmax=31 == [0, ... ,30]
traindf = pd.DataFrame()
icounter = 0
numhalo = 2000 # how many halos, selected from each sub-volume
for ix in range(idmax):
for iy in range(idmax):
for iz in range(idmax):
gc.collect()
tmppd = subdf.filter((F.col('ixdiv') ==ix) & (F.col('iydiv') ==iy) & (F.col('izdiv') ==iz))\
.select('haloid','px','py','pz','halomass').toPandas()
tmppd = tmppd.sort_values(by='halomass',ascending=False)[0:numhalo] #sort the DF based on halomass
tmppd['isect'] = np.full(numhalo,icounter) #id for each section
traindf = pd.concat([traindf,tmppd])
icounter = icounter + 1
These triple loops are an ad-hoc implementation, since I only need to run this process once. If you find a better way to reduce the running time with more optimized codes, you can learn about
pandas udf
; especially, theGROUPED_MAP
. This can boost up the speed to extract sub-volumes as grouped-maps. For details aboutpandas udf
, https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html#:~:text=Grouped%20map%20Pandas%20UDFs%20first,as%20a%20new%20Spark%20DataFrame%20.
traindf = traindf.reset_index(drop=True)
traindf.head(3)
# Save this train sample to a parquet table
pq.write_table(pa.Table.from_pandas(traindf), \
'hdfs://master:54310/data/cosmo/hr4/hr4-z0-train.parquet.snappy', compression='snappy')