Programming Guide
MESH prototype is built on top of Apache Spark/GraphX open-source framework. The prototype consists of a concise but expressive hypergraph API which can be used to implement various hypergraph algorithms.
Download Link for MESH code
MESH API
-
hyperVertices
def hyperVertices: HyperVertexRDD[HVD]
A HyperVertexRDD containing HyperVertices and their associated attributes in this HyperGraph.
HVD : the HyperVertex data type
Example:
Count the number of HyperVetices
vertexCount = hypergraph.hyperVertices.count
-
hyperEdges
def hyperEdges: HyperEdgeRDD[HED]
A HyperEdgeRDD containing HyperEdges and their associated attributes in this HyperGraph.
HED : the HyperEdge data type
Example:
Count the number of HyperEdges
vertexCount = hypergraph.hyperVertices.count
-
toGraph
def toGraph[ED : ClassTag](mapHyperEdge: HED => ED)(combineEdges: (ED, ED) => ED): Graph[HVD, ED]
Transforms HyperGraph to Graph using function mapHyperEdge and combineEdges. Transforms HyperVertexA - HyperEdge - HyperVertexB relation to VertexA - Edge - VertexB relation. The value of HyperEdge is transformed to the value of Edge using mapHyperEdge function and the values of Edge generated from different partitions are merged using combineEdges function.
ED : type of the edge attribute
mapHyperEdge : the function from a HyperEdge value to an edge value
combineEdges : an associative operator used to combine edge values from different partitions
Example:
Construct a Graph consisting a set of vertices and edges from a given HyperGraph.
val graph = hypergraph.toGraph { case (c, w) => w.toDouble / c } (_ + _)
-
mapHyperVertices
def mapHyperVertices[HVD2 : ClassTag](f: HyperVertex[HVD] => HVD2): HyperGraph[HVD2, HED]
Transforms each HyperVertex attribute in the HyperGraph using function f.
HVD2 : the new HyperVertex data type
f : the function from a HyperVertex object to a new HyperVertex value
Example:
To change the HyperVertex values from one type to another
val augmentedHg = hg.mapHyperVertices(hv => if (hv.id == sourceId) hv.attr -> 0.0 else hv.attr -> Double.PositiveInfinity ))
-
mapHyperEdges
def mapHyperEdges[HED2 : ClassTag](f: HyperEdge[HED] => HED2): HyperGraph[HVD, HED2]
Transforms each HyperEdge attribute in the HyperGraph using function f.
HED2 : the new HyperEdge data type
f : the function from a HyperEdge object to a new HyperEdge value
Example:
Change the HyperEdge per-vertex values from one type to another
val augmentedHg = hg.mapHyperEdges(he => he.attr -> Double.PositiveInfinity)
-
degrees
def degrees(): HyperGraph[Int, Int]
Construct a new HyperGraph containing attribute information of the number of incoming edges of HyperVertices and HyperEdges.
Example:
Analyze degree information of HyperGraph
val hgWithDegrees = hypergraph.degrees var hvDegreeSum = 0.0 var hvMaxDegree = 0.0 hgWithDegrees.hyperVertices.collect foreach { case HyperVertex(vid, degree) => hvDegreeSum = hvDegreeSum + degree if(degree > hvMaxDegree){ hvMaxDegree = degree } }
-
subHyperGraph
def subHyperGraph(hvPred: HyperVertex[HVD] => Boolean = hv => true, hePred: HyperEdge[HED] => Boolean = he => true, retainIsolated: Boolean = false): HyperGraph[HVD, HED]
Returns the HyperGraph containing only HyperVertices and HyperEdges, which satisfies the predicates.
hvPred : the HyperVertex Predicate, which takes a HyperVertex object and evaluates to true if the HyperVetex is to be included in the subHyperGraph
hePred : the HyperEdge Predicate, which takes a HyperEdge object and evaluates to true if the HyperEdge is to be included in the subHyperGraph
retainIsolated : a boolean option. Set Yes to retain HyperVertices without any outgoing edges to HyperEdges. Set False to exclude such HyperVertices from the returning subHyperGraph. Default value is False.
Example:
Generate a new HyperGraph from a given HyperGraph, satisfying certain conditions
val minHyperEdgeCardinality = 2 val subHg = hypergraph.subHyperGraph(hePred = _.attr._1 >= minHyperEdgeCardinality)
-
dual
def dual: HyperGraph[HED, HVD]
Generate a new HyperGraph transforming HyperEdges to HyperVertices and HyperVertices to HyperEdges and reversing directions of edges
-
compute
def compute[ToE : ClassTag, ToV : ClassTag]( maxIters: Int, initialMessage: ToV, hvProgram: Program[HVD, ToV, ToE], heProgram: Program[HED, ToE, ToV]): HyperGraph[HVD, HED]
Execute a Pregel-like bulk-synchronous HyperVertex and HyperEdge parallel computation.
User-defined HyperVertex-program,hvProgram is executed in parallel on each HyperVertex receiving any inbound messages and computing a new value for the HyperVertex. User-defined HyperEdge-program,heProgram is executed in parallel on each HyperEdge receiving any inbound messages and computing a new value for the HyperEdge.Messages which are destined to the same HyperVertex or HyperEdge are merged as per MessageCombiner function given in HyperVertex-program or HyperEdge-program.
In the first iteration, all HyperVertice receive the initialMessage. On subsequent iterations, if a HyperVertex does not receive a message then the HyperVertex-program is not invoked. In the same way, if a HyperEdge does not receive a message, then the HyperEdge-program is not invoked. This function iterates until there are no remaining messages, or for maxItersstrong> iterations.
ToE : the data type of a message sent to HyperEdge or received by HyperEdge
ToV : the data type of a message sent to HyperVertex or received by HyperVertex
maxIters : maximum iteration
hvProgram : User-defined HyperVertex-program
heProgram : User-defined HyperEdge-program
Examples
Label Propagation Example
def lp[HVD, HED](hg: HyperGraph[HVD, HED], maxIters: Int): HyperGraph[(HVD, Long), (HED, Long)] = { type Community = VertexId type Msg = Map[Community, Int] type VAttr = (HVD, Community) type HEAttr = (HED, Community) def mostFrequent(msg: Msg): Community = msg.maxBy(_.swap)._1 val lpVProc: Procedure[VAttr, Msg, Msg] = (ss, id, attr, msg, ctx) => { val (vd, _) = attr val newCommunity = if (ss == 0) id else mostFrequent(msg) ctx.become((vd, newCommunity)) ctx.broadcast(Map(newCommunity -> 1)) } val lpHeProc: Procedure[HEAttr, Msg, Msg] = (ss, id, attr, msg, ctx) => { val (hed, _) = attr val newCommunity = mostFrequent(msg) ctx.become((hed, newCommunity)) ctx.broadcast(Map(newCommunity -> 1)) } // Augment vertices and Hyperedges with their corresponding labels val initialCommunity: Community = 0 val augmentedHg = augment(hg, initialCommunity, initialCommunity) val initialMessage: Msg = Map(initialCommunity -> 1) augmentedHg.compute[Msg, Msg](maxIters, initialMessage, lpVProc, lpHeProc) }
Shortest Path Example
def pr[HVD](hg: HyperGraph[HVD, (Int, Int)], maxIters: Int, sourceId: Int = 1): HyperGraph[(HVD, Double), ((Int, Int), Double)] = { // Set up initial shortest path graph val augmentedHg = hg.mapHyperVertices(hv => if (hv.id == sourceId) hv.attr -> 0.0 else hv.attr -> Double.PositiveInfinity ).mapHyperEdges(he => he.attr -> Double.PositiveInfinity) type ToV = (Double, Double) type ToE = Double val prHvProcedure: Procedure[(HVD, Double), ToV, ToE] = (ss, id, attr, msg, ctx) => { if (ss == 0) { val (hvd, dist) = attr val (_, newDist) = msg if (dist < newDist) { ctx.broadcast(dist + 1.0) } } else { val (hvd, dist) = attr val (totalWeight, update) = msg val newDist = update / totalWeight if (dist > newDist) { ctx.become((hvd, newDist)) ctx.broadcast(newDist + 1.0) } } } val prHeProcedure: Procedure[((Int, Int), Double), ToE, ToV] = (ss, id, attr, msg, ctx) => { val ((cardinality, weight), dist) = attr val newDist = msg if (dist > newDist) { ctx.become((cardinality, weight), newDist) ctx.broadcast((weight, newDist)) } } val initialMessage = (0.0, Double.PositiveInfinity) augmentedHg.compute[ToE, ToV](maxIters, initialMessage, prHvProcedure, prHeProcedure) }
Connected Component Example
def hvProgram[HVD] = new Program[(HVD, VertexId), VertexId, VertexId] { def messageCombiner: MessageCombiner[VertexId] = _ max _ def procedure: Procedure[(HVD, VertexId), VertexId, VertexId] = (ss, id, attr, msg, ctx) => { // Set this vertex's identifier to the greatest id it's heard from val (oldVd, oldId) = attr val newId = if (ss == 0) id else msg max oldId val newAttr = (oldVd, newId) ctx.become(newAttr) // And if we're just getting started, or if we just changed states, then let // neighbors know if (msg > oldId || ss == 0) { ctx.broadcast(newId) } } } def heProgram[HED] = new Program[(HED, VertexId), VertexId, VertexId] { def messageCombiner: MessageCombiner[VertexId] = _ max _ def procedure: Procedure[(HED, VertexId), VertexId, VertexId] = (ss, id, attr, msg, ctx) => { // Set this hyperedges's identifier to the greatest id it's heard from val (oldEd, oldId) = attr val newId = if (ss == 0) id else msg max oldId val newAttr = (oldEd, newId) ctx.become(newAttr) // And if we're just getting started, or if we just changed states, then let // neighbors know if (msg > oldId || ss == 0) { ctx.broadcast(newId) } } } def apply[HVD, HED](hg: HyperGraph[HVD, HED]): HyperGraph[(HVD, VertexId), (HED, VertexId)] = { hg.debugPrint("ORIGINAL hg") val payload: VertexId = 0 val augmentedHg = augment(hg, payload, payload) augmentedHg.debugPrint("AUGMENTED hg") val initialMessage: VertexId = 0 val computedHg = augmentedHg.compute(10, initialMessage, hvProgram[HVD], heProgram[HED]) computedHg.debugPrint("COMPUTED hg") computedHg }
Page Rank Example
def pr[HVD](hg: HyperGraph[HVD, (Int, Int)], maxIters: Int, alpha: Double = 0.15): HyperGraph[(HVD, Double), ((Int, Int), Double)] = { // Augment vertices and hyperedges with their rank val augmentedHg = augment(hg, 1.0, 0.0) type ToV = (Double, Double) type ToE = Double val prHvProcedure: Procedure[(HVD, Double), ToV, ToE] = (ss, id, attr, msg, ctx) => { if (ss == 0) { // Effectively a no-op, but based on our current implementation, we need // to emit something or else we terminate! ctx.broadcast(0.0) } else { // Receive (sum(weights), sum(ranks)) from hyperedges. // In order to effectively send out (rank * weight/totalWeight), we can // actually send rank / totalWeight, and let the hyperedge then perform // the final multiplication by its weight. val (totalWeight, rank) = msg val (hvd, _) = attr val newRank = alpha + (1.0 - alpha) * rank ctx.become((hvd, newRank)) ctx.broadcast(newRank / totalWeight) } } val prHeProcedure: Procedure[((Int, Int), Double), ToE, ToV] = (ss, id, attr, msg, ctx) => { if (ss == 0) { // Distribute our weight to vertices so they can compute total weight of their hyperedges; // distribute enough rank so each edge receives 1 total. val ((cardinality, weight), _) = attr ctx.become((cardinality, weight), 0.0) ctx.broadcast((weight, 1.0 / cardinality)) } else { // Vertices sent us rank / totalWeight; we need to do the remaining multiplication // by our own weight. val ((cardinality, weight), _) = attr val rank = msg * weight ctx.become(((cardinality, weight), rank)) ctx.broadcast((weight, rank / cardinality.toDouble)) } } val initialMessage = (0.0, 0.0) // Doesn't matter; vertices no-op on ss 0 anyway augmentedHg.compute[ToE, ToV](maxIters, initialMessage, prHvProcedure, prHeProcedure) }