Measuring graph statistics of the SDSS Skeleton galaxies using python-igraph and pyspark's parallel computation capability.
linklen
using PySpark/UDF. For this, we will broadcast two dataframes, one the galaxy catalog galdf
and the other kdtree galkdtree
# generate edges
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree
# plot settings
plt.rc('font', family='serif')
plt.rc('font', serif='Times New Roman')
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'
# galaxy dataframe
galdf = pd.read_csv("./D2018Mar_SDSSvolskbfHR4.txt",delimiter=r"\s+")
galdf.describe()
galdf.head()
# how to "select" data using multiple criteria
# Key Point : use `(condition1) & (condition2)`
galdf[(galdf["pz"] < 20.0) & (galdf["pz"] > -20.0)].pz.values
#
# 3d point distribution on a sphere
#
fig = plt.figure(figsize=(12,12))
#plt.axis([-1,1,-1,1])
plt.axes().set_aspect('equal', 'datalim')
plt.title(r'SDSS Skeleton Galaxies')
plt.xlabel(r'$x$')
plt.ylabel(r'$y$')
plt.scatter(galdf[np.abs(galdf.pz.values) < 20.].px.values,\
galdf[np.abs(galdf.pz.values) < 20.].py.values,s=1.5,marker=".", color='grey')
plt.show()
#
# using earth-map api_s for utilizing various projections
#
"""
from mpl_toolkits.basemap import Basemap
fig = plt.figure(figsize=(13,12))
m = Basemap(projection='ortho',lat_0=27,lon_0=260)
x, y = m(galdf.ra.values,galdf.dec.values)
m.scatter(x,y,s=2,marker='o',color='grey',alpha=0.5)
m.drawmeridians(np.arange(0,360,30))
m.drawparallels(np.arange(-90,90,30))
plt.title(r'SDSS Skeleton Galaxies')
#fig.savefig("visSDSS.png")
plt.show()
"""
# define a spherical distance function
# angular distance between two polar coordinates : (phi,theta)
def polarangdist (p1,t1,p2,t2,debug=False) :
innerproduct = np.sin(np.radians(t1))*np.cos(np.radians(p1))*np.sin(np.radians(t2))*np.cos(np.radians(p2))\
+ np.sin(np.radians(t1))*np.sin(np.radians(p1))*np.sin(np.radians(t2))*np.sin(np.radians(p2))\
+ np.cos(np.radians(t1)) * np.cos(np.radians(t2))
if np.abs(innerproduct) > 1.:
if np.abs(innerproduct) < 1.001:
innerproduct = 1.0
if debug:
print "Minor roundoff issue in polarangdist : set innerproduct = 1.0"
else:
print "Funny things happen in polarangdist"
sys.exit(1)
return np.degrees(np.arccos(innerproduct))
# now generate a kdtree for finding nearest neighbor
galkdtree = cKDTree(galdf[['px','py','pz']])
sys.getsizeof(galkdtree)
sys.getsizeof(galdf)
# let's test how scipy/kdtree works
galkdtree.query(galdf.loc[4,['px','py','pz']],k=[2])
#
# Generate a FOF network using the KDtree above
#
#
from igraph import *
#angcut = 0.1 # in degree
lengcut = 5.0 # in [1/h Mpc]
inearest = 1
#numtotal = len(galdf.index)
numtotal = 5
thisangdist = 0.
edgelists = []
for ivertex in range(numtotal):
inearest=2 # this is the first nearest neighbor in kdtree
dummy, inow = galkdtree.query(galdf.loc[ivertex,['px','py','pz']],k=[inearest])
"""
thisangdist= polarangdist(hotdf.loc[ivertex,['phi']].values[0],hotdf.loc[ivertex,['theta']].values[0],\
hotdf.loc[inow[0],['phi']].values[0],hotdf.loc[inow[0],['theta']].values[0],\
debug=True)"""
#while (thisangdist <= angcut):
while (dummy[0] <= lengcut):
#print ivertex, inow[0], " : ", dummy[0], inearest, " : sphdist = ", thisangdist
#print ivertex, inow[0], " : ", dummy[0], inearest
#print galdf.loc[ivertex,['ra']].values[0]
#print galdf.loc[ivertex,['ra','dec','px','py','pz']]
#print galdf.loc[inow[0],['ra']].values[0]
#print galdf.loc[inow[0],['ra','dec','px','py','pz']]
edgelists.append([ivertex,inow[0]])
inearest = inearest + 1 # the next nearest one
dummy, inow = galkdtree.query(galdf.loc[ivertex,['px','py','pz']],k=[inearest])
"""
thisangdist = polarangdist(hotdf.loc[ivertex,['phi']].values[0],hotdf.loc[ivertex,['theta']].values[0],\
hotdf.loc[inow[0],['phi']].values[0],hotdf.loc[inow[0],['theta']].values[0],\
debug=True)"""
print edgelists[:20]
import gc
gc.collect()
from pyspark import SparkContext
from pyspark.sql import SQLContext
#sc = SparkContext(master='local[3]', appName='calgraph')
sqlsc = SQLContext(sc)
#sc.setCheckpointDir("./checkpoints")
#sc.setCheckpointDir("hdfs://localhost:8020/myhdfs/spark/checkpoints")
sc.setCheckpointDir("hdfs://master:54310/tmp/spark/checkpoints")
## Define basic index and linking length dataframe
#ilink = np.arange(1,20)
#linklen = np.arange(1,20)*0.2 + 0.2
#ilink = np.arange(1,4)
#linklen = np.arange(1,4)*0.5
ilink = np.arange(1,60)
linklen = np.arange(1,60)*0.1 + 0.1
print ilink
print linklen
numdata = len(linklen)
linkdf = pd.DataFrame(list(zip(ilink, linklen)),\
columns=['ilink','linklen'])
## Now, setting for pyspark
bcastgalkdtree = sc.broadcast(galkdtree)
bcastgaldf =sc.broadcast(galdf)
sparkdf = sqlsc.createDataFrame(linkdf).persist()
sparkdf.printSchema()
sparkdf.show(5,truncate=True)
object
needs this prefix(?) .value
for accessing to its own attributes. ## Define an UDF for applying this to the `linklen` column
def testPassingBcastVariable(linklen, curgaldf, curtree):
dummy, inow = curtree.value.query(curgaldf.value.loc[4,['px','py','pz']],k=[2])
#dummy, inow = curtree.query([104.,179.,-136.],k=[2])
return dummy[0].item()
def temp(linklen, curgaldf, curtree):
dummy, inow = curtree.query(curgaldf.loc[4,['px','py','pz']],k=[2])
#dummy, inow = curtree.query([104.,179.,-136.],k=[2])
return dummy[0].item()
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
from functools import partial
testudf = F.udf(partial(testPassingBcastVariable, curtree = bcastgalkdtree, curgaldf = bcastgaldf),\
T.DoubleType())
#graphudf = F.udf(getGraphStatistics, T.DoubleType())
tmpudf = F.udf(lambda x: x*x, T.DoubleType())
#getGraphStatistics(0.5,galdf,galkdtree)
type(temp(0.5,galdf,galkdtree))
galkdtree.query([104.,179.,-136.],k=[2])
bcastgalkdtree.value.query([104.,179.,-136.],k=[2])
bcastgaldf.value.loc[4,['px','py','pz']]
sparkdf.withColumn('tmp',tmpudf('linklen')).show(5)
testdf = sparkdf.withColumn('gstat',testudf('linklen'))
testdf.persist().printSchema()
print len(bcastgaldf.value.index)
gStats
gets inputs of one column variable linklen
from dataframe and two broadcasted dataframes curgaldf
and curtree
. Hence, got a column and apply this UDF with two passed broadcasted dataframe arguments. numpy
types are not compatible with Spark's UDF
. Hence, return
with numpyObject.item()
by converting them to generic python types.# curgaldf and curtree are Broadcasted Variables. Hence, they need ".value"
def gStats(linklen, curgaldf, curtree):
lengcut = linklen
inearest = 1
numtotal = len(curgaldf.value.index)
thisangdist = 0.
edgelists = []
g = Graph()
for ivertex in range(numtotal):
inearest=2 # this is the first nearest neighbor in kdtree
dummy, inow = curtree.value.query(curgaldf.value.loc[ivertex,['px','py','pz']],k=[inearest])
"""thisangdist= polarangdist(hotdf.loc[ivertex,['phi']].values[0],hotdf.loc[ivertex,['theta']].values[0],\
hotdf.loc[inow[0],['phi']].values[0],hotdf.loc[inow[0],['theta']].values[0],\
debug=False)"""
#while (thisangdist <= angcut):
while (dummy[0] <= lengcut):
edgelists.append([ivertex,inow[0]])
inearest = inearest + 1 # the next nearest one
dummy, inow = curtree.value.query(curgaldf.value.loc[ivertex,['px','py','pz']],k=[inearest])
"""thisangdist = polarangdist(hotdf.loc[ivertex,['phi']].values[0],hotdf.loc[ivertex,['theta']].values[0],\
hotdf.loc[inow[0],['phi']].values[0],hotdf.loc[inow[0],['theta']].values[0],\
debug=False)"""
# make a graph and do the jobs
g = Graph(edgelists)
transitivity = g.transitivity_undirected()
diameter = np.double(g.diameter()).item()
localcc = g.transitivity_avglocal_undirected()
numclique = np.double(g.clique_number()).item()
subcomp = g.components()
gcompfrac = np.double(np.max(subcomp.sizes()))/np.double(g.vcount())
numsubcomptwo = len(list(filter(lambda size: size >= 2, subcomp.sizes())))
maxdg = np.max(g.degree())
dgcent = np.double(np.sum(maxdg-g.degree()))/((g.vcount() - 2) * (g.vcount() - 1))
return [diameter,transitivity,localcc,numclique,gcompfrac.item(),\
np.double(numsubcomptwo).item(),dgcent.item()]
"""
%%time
tmpout = gStats(1.0,bcastgaldf,bcastgalkdtree)
"""
# check the return types, which should be compatible for Spark/UDF's return types
#type(tmpout[6])
#tmpout
gstatudf = F.udf(partial(gStats, curtree = bcastgalkdtree, curgaldf = bcastgaldf),\
T.ArrayType(T.DoubleType()))
sparkdf.show()
# Let's do the jobs on workers
%time
gstatdf = sparkdf.withColumn('gstat',gstatudf('linklen')).cache()
%time gstatdf.collect()
#show each DoubleType in the `gstat` ArrayType
%time gstatdf.select(F.col("gstat").getItem(1)).show()
gstatdf
(spark/dataframe) to gstatpd
(pandas/dataframe) gstatpd = gstatdf.select("ilink","linklen").toPandas()
gstatpd.head()
tmppd = gstatdf.select(F.col("gstat").getItem(1).alias("transitivity")).toPandas()
re = pd.merge(gstatpd,\
gstatdf.select(F.col("gstat").getItem(0).alias("diameter")).toPandas(),\
left_index=True,right_index=True)
re = pd.merge(re,\
gstatdf.select(F.col("gstat").getItem(1).alias("transitivity")).toPandas(),\
left_index=True,right_index=True)
re = pd.merge(re,\
gstatdf.select(F.col("gstat").getItem(2).alias("localcc")).toPandas(),\
left_index=True,right_index=True)
re = pd.merge(re,\
gstatdf.select(F.col("gstat").getItem(3).alias("numclique")).toPandas(),\
left_index=True,right_index=True)
re = pd.merge(re,\
gstatdf.select(F.col("gstat").getItem(4).alias("gcomp")).toPandas(),\
left_index=True,right_index=True)
re = pd.merge(re,\
gstatdf.select(F.col("gstat").getItem(5).alias("subcomp")).toPandas(),\
left_index=True,right_index=True)
re = pd.merge(re,\
gstatdf.select(F.col("gstat").getItem(6).alias("dgcent")).toPandas(),\
left_index=True,right_index=True)
re.head()
re.describe()
import pickle
with open('SDSSGraphSpark.pickle','wb') as f:
pickle.dump(re,f)
f.close() #keep dumping the current results to overwrite the pickle.
import matplotlib.pyplot as plt
# plot settings
plt.rc('font', family='serif')
plt.rc('font', serif='Times New Roman')
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'
lenmax = 6.0
fig = plt.figure(figsize=(12,11))
plt.subplot(321)
plt.axis([0,lenmax,0,1.0])
#plt.title("Diameter")
#plt.yscale('log')
#plt.xscale('log')
plt.xlabel(r'Linking Length [Degree]')
plt.ylabel(r'Giant Component Fraction')
plt.scatter(re.linklen.values,re.gcomp.values,s=10)
plt.plot(re.linklen.values,re.gcomp.values)
#plt.text(0.2,0.4,'CMB COLD Spots',color='b')
plt.subplot(322)
plt.axis([0,lenmax,0,500])
#plt.title("Diameter")
#plt.yscale('log')
#plt.xscale('log')
plt.xlabel(r'Linking Length [Degree]')
plt.ylabel(r'Diameter')
plt.scatter(re.linklen.values,re.diameter.values,s=10)
plt.plot(re.linklen.values,re.diameter.values)
plt.subplot(323)
plt.axis([0,lenmax,0.5,0.7])
#plt.title("Diameter")
#plt.yscale('log')
#plt.xscale('log')
plt.xlabel(r'Linking Length [Degree]')
plt.ylabel(r'Triangular Configurations')
plt.plot(re.linklen.values,re.transitivity.values,label='transitivity')
plt.scatter(re.linklen.values,re.transitivity.values,s=10)
plt.plot(re.linklen.values,re.localcc.values,color='green',label='average lcc')
plt.scatter(re.linklen.values,re.localcc.values,color='green',s=10)
plt.legend(loc=4)
plt.subplot(324)
plt.axis([0,lenmax,0,40])
#plt.title("Diameter")
#plt.yscale('log')
#plt.xscale('log')
plt.xlabel(r'Linking Length [Degree]')
plt.ylabel(r'Clique Number')
plt.scatter(re.linklen.values,re.numclique.values,s=10)
plt.plot(re.linklen.values,re.numclique.values)
plt.subplot(325)
plt.axis([0,lenmax,0,25000])
#plt.title("Diameter")
#plt.yscale('log')
#plt.xscale('log')
plt.xlabel(r'Linking Length [Degree]')
plt.ylabel(r'Number of Subcomponents')
plt.scatter(re.linklen.values,re.subcomp.values,s=10)
plt.plot(re.linklen.values,re.subcomp.values)
plt.subplot(326)
plt.axis([0,lenmax,0,0.0012])
#plt.title("Diameter")
#plt.yscale('log')
#plt.xscale('log')
plt.xlabel(r'Linking Length [Degree]')
plt.ylabel(r'Degree Centralization')
plt.scatter(re.linklen.values,re.dgcent.values,s=10)
plt.plot(re.linklen.values,re.dgcent.values)
plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=1.0)
fig.savefig("graphSparkSDSS.eps")
plt.show()