# generate edges
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree
import gc
# plot settings
plt.rc('font', family='serif')
plt.rc('font', serif='Times New Roman')
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'
from pyspark import SparkContext
from pyspark.sql import SQLContext
#sc = SparkContext(master='local[3]', appName='calgraph')
sqlsc = SQLContext(sc)
#sc.setCheckpointDir("./checkpoints")
#sc.setCheckpointDir("hdfs://localhost:8020/myhdfs/spark/checkpoints")
sc.setCheckpointDir("hdfs://master:54310/tmp/spark/checkpoints")
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
halodf
halo_schema = T.StructType([\
T.StructField('haloid', T.IntegerType(), False),\
T.StructField('px', T.FloatType(), False),\
T.StructField('py', T.FloatType(), False),\
T.StructField('pz', T.FloatType(), False),\
T.StructField('halomass', T.FloatType(), False),\
])
#halodf = sqlsc.read.csv("hdfs://master:54310/data/spark/multiverse/omp31/halos.z0.csv",\
# header=True, schema = halo_schema)
halodf = sqlsc.read.csv("hdfs://master:54310/data/spark/hr4/hr4z0.csv",\
header=True, schema = halo_schema)
halodf.cache()
sys.getsizeof(halodf)
halodf.show(3,truncate=True)
%time halodf.describe().show()
halodf = halodf.withColumnRenamed('haloid','id')
#halodf = halodf.filter(halodf['halomass'] > 5.0E11)\
#.filter(halodf['px'] >= 0).filter(halodf['py'] >= 0).filter(halodf['pz'] >= 0)\
#.filter(halodf['px'] < 3072.0).filter(halodf['py'] < 3072.0).filter(halodf['pz'] < 3072.0)
#halodf = halodf.filter(halodf['halomass'] > 5.0E11)\
#.filter(halodf['px'] < 3072.0).filter(halodf['py'] < 3072.0).filter(halodf['pz'] < 3072.0)
halodf = halodf.filter(halodf['halomass'] > 5.0E11)
halodf.describe().show()
%%time
halodf.write.mode('overwrite').parquet("hdfs://master:54310/data/spark/hr4/hr4z0.parquet")