Read CSV then save it to a parquet

In [1]:
# generate edges 
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree
import gc

# plot settings
plt.rc('font', family='serif') 
plt.rc('font', serif='Times New Roman') 
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'
In [2]:
from pyspark import SparkContext   
from pyspark.sql import SQLContext

#sc = SparkContext(master='local[3]', appName='calgraph')
sqlsc = SQLContext(sc)
#sc.setCheckpointDir("./checkpoints")
#sc.setCheckpointDir("hdfs://localhost:8020/myhdfs/spark/checkpoints")
sc.setCheckpointDir("hdfs://master:54310/tmp/spark/checkpoints")

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
  • ### Read the halo csv file to the dataframe halodf
In [3]:
halo_schema = T.StructType([\
                            T.StructField('haloid', T.IntegerType(), False),\
                            T.StructField('px', T.FloatType(), False),\
                            T.StructField('py', T.FloatType(), False),\
                            T.StructField('pz', T.FloatType(), False),\
                            T.StructField('halomass', T.FloatType(), False),\
                          ])
In [4]:
#halodf = sqlsc.read.csv("hdfs://master:54310/data/spark/multiverse/omp31/halos.z0.csv",\
#                        header=True, schema = halo_schema)

halodf = sqlsc.read.csv("hdfs://master:54310/data/spark/hr4/hr4z0.csv",\
                        header=True, schema = halo_schema)
In [5]:
halodf.cache()
sys.getsizeof(halodf)
Out[5]:
64
In [6]:
halodf.show(3,truncate=True)
+------+---------+---------+---------+-------------+
|haloid|       px|       py|       pz|     halomass|
+------+---------+---------+---------+-------------+
|     0|106.23875|2820.2603|310.53067|3.29161999E14|
|     1|1015.0091| 3070.103|2687.5447|   5.79631E14|
|     2|1150.7571| 656.3275|195.96417| 7.4869997E14|
+------+---------+---------+---------+-------------+
only showing top 3 rows

In [7]:
%time halodf.describe().show()
+-------+--------------------+------------------+-----------------+------------------+--------------------+
|summary|              haloid|                px|               py|                pz|            halomass|
+-------+--------------------+------------------+-----------------+------------------+--------------------+
|  count|           362123180|         362123180|        362123180|         362123180|           362123180|
|   mean|       1.810615895E8|1574.9487699428876|1574.506597489074|1575.7579631810704|2.309020225382061E12|
| stddev|1.0453595787073919E8| 909.2939271893582|909.5066398131238|  909.417187135473|1.323339016543346...|
|    min|                   0|         -3.628511|        -3.603208|          0.163826|       2.70611005E11|
|    max|           362123179|         3149.9463|        3149.9321|         3154.1765|        5.6967899E15|
+-------+--------------------+------------------+-----------------+------------------+--------------------+

CPU times: user 8.57 ms, sys: 5.13 ms, total: 13.7 ms
Wall time: 43.3 s
In [8]:
halodf = halodf.withColumnRenamed('haloid','id')
In [9]:
#halodf = halodf.filter(halodf['halomass'] > 5.0E11)\
#.filter(halodf['px'] >= 0).filter(halodf['py'] >= 0).filter(halodf['pz'] >= 0)\
#.filter(halodf['px'] < 3072.0).filter(halodf['py'] < 3072.0).filter(halodf['pz'] < 3072.0)
    
In [10]:
#halodf = halodf.filter(halodf['halomass'] > 5.0E11)\
#.filter(halodf['px'] < 3072.0).filter(halodf['py'] < 3072.0).filter(halodf['pz'] < 3072.0)
In [11]:
halodf = halodf.filter(halodf['halomass'] > 5.0E11)
In [12]:
halodf.describe().show()
+-------+--------------------+------------------+------------------+-----------------+--------------------+
|summary|                  id|                px|                py|               pz|            halomass|
+-------+--------------------+------------------+------------------+-----------------+--------------------+
|  count|           206140716|         206140716|         206140716|        206140716|           206140716|
|   mean|1.1377166839059344E8|1574.8011172244348|1574.4438209396415|1575.793809915998|3.783604754810456E12|
| stddev| 7.559122444725068E7| 909.2859419887142|  909.499292609313|909.4213531875901|1.739491312221791...|
|    min|                   0|         -3.628511|         -3.603208|         0.192092|       5.05141002E11|
|    max|           362123158|         3149.9138|         3149.9255|        3154.1765|        5.6967899E15|
+-------+--------------------+------------------+------------------+-----------------+--------------------+

In [13]:
%%time
halodf.write.mode('overwrite').parquet("hdfs://master:54310/data/spark/hr4/hr4z0.parquet")
CPU times: user 3.85 ms, sys: 2.72 ms, total: 6.57 ms
Wall time: 20.6 s