Skip to content

Commit 91f35fe

Browse files
author
Anton Eriksson
authored
feat: New optional flow model for bipartite input
Option: --bipartite-teleportation
1 parent d8e95db commit 91f35fe

File tree

4 files changed

+227
-41
lines changed

4 files changed

+227
-41
lines changed

src/io/Config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ Config::Config(std::string flags, bool isCLI) : isCLI(isCLI)
3232

3333
api.addOptionArgument(skipAdjustBipartiteFlow, "skip-adjust-bipartite-flow", "Skip distributing all flow from the bipartite nodes to the primary nodes.", "Input", true);
3434

35+
api.addOptionArgument(bipartiteTeleportation, "bipartite-teleportation", "Teleport like the bipartite flow instead of two-step (unipartite) teleportation.", "Input", true);
36+
3537
api.addOptionArgument(weightThreshold, "weight-threshold", "Limit the number of links to read from the network. Ignore links with less weight than the threshold.", ArgType::number, "Input", true);
3638

3739
api.addOptionArgument(includeSelfLinks, 'k', "include-self-links", "Include links with the same source and target node.", "Input", true);

src/io/Config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ struct Config {
7373
unsigned int pathMarkovOrder = 1;
7474
bool bipartite = false;
7575
bool skipAdjustBipartiteFlow = false;
76+
bool bipartiteTeleportation = false;
7677
bool hardPartitions = false;
7778
bool nonBacktracking = false;
7879
bool parseWithoutIOStreams = false;
@@ -189,6 +190,7 @@ struct Config {
189190
pathMarkovOrder = other.pathMarkovOrder;
190191
bipartite = other.bipartite;
191192
skipAdjustBipartiteFlow = other.skipAdjustBipartiteFlow;
193+
bipartiteTeleportation = other.bipartiteTeleportation;
192194
hardPartitions = other.hardPartitions;
193195
nonBacktracking = other.nonBacktracking;
194196
parseWithoutIOStreams = other.parseWithoutIOStreams;

src/utils/FlowCalculator.cpp

Lines changed: 219 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -67,32 +67,56 @@ FlowCalculator::FlowCalculator(StateNetwork& network, const Config& config)
6767
unsigned int nodeIndex = 0;
6868
const auto& nodeLinkMap = network.nodeLinkMap();
6969

70-
// Store dangling nodes out-of-order,
71-
// with dangling nodes first to optimize calculation of dangilng rank
72-
for (const auto& node : network.nodes()) {
73-
const auto isDangling = nodeLinkMap.find(node.second) == nodeLinkMap.end();
74-
if (!isDangling) continue;
75-
76-
const auto& nodeId = node.second.id;
77-
nodeIndexMap[nodeId] = nodeIndex++;
78-
}
70+
if (network.isBipartite()) {
71+
// Preserve node order
72+
for (const auto& node : network.nodes()) {
73+
const auto nodeId = node.second.id;
74+
nodeIndexMap[nodeId] = nodeIndex++;
75+
}
76+
77+
auto bipartiteStartId = network.bipartiteStartId();
78+
bipartiteStartIndex = nodeIndexMap[bipartiteStartId];
79+
80+
} else {
81+
// Store dangling nodes out-of-order,
82+
// with dangling nodes first to optimize calculation of dangilng rank
83+
84+
for (const auto& node : network.nodes()) {
85+
const auto isDangling = nodeLinkMap.find(node.second) == nodeLinkMap.end();
86+
if (!isDangling) continue;
87+
88+
const auto& nodeId = node.second.id;
89+
nodeIndexMap[nodeId] = nodeIndex++;
90+
}
7991

80-
nonDanglingStartIndex = nodeIndex;
92+
nonDanglingStartIndex = nodeIndex;
8193

82-
for (const auto& node : network.nodes()) {
83-
const auto isDangling = nodeLinkMap.find(node.second) == nodeLinkMap.end();
84-
if (isDangling) continue;
94+
for (const auto& node : network.nodes()) {
95+
const auto isDangling = nodeLinkMap.find(node.second) == nodeLinkMap.end();
96+
if (isDangling) continue;
8597

86-
const auto& nodeId = node.second.id;
87-
nodeIndexMap[nodeId] = nodeIndex++;
98+
const auto& nodeId = node.second.id;
99+
nodeIndexMap[nodeId] = nodeIndex++;
100+
}
88101
}
89102

90103
numLinks = network.numLinks();
91104
flowLinks.resize(numLinks, { 0, 0, 0.0 });
92105
sumLinkWeight = network.sumLinkWeight();
93106
sumUndirLinkWeight = 2 * sumLinkWeight - network.sumSelfLinkWeight();
94107

108+
if (network.isBipartite()) {
109+
const auto bipartiteStartId = network.bipartiteStartId();
110+
111+
for (const auto& node : nodeLinkMap) {
112+
const auto sourceIsFeature = node.first.id >= bipartiteStartId;
113+
if (sourceIsFeature) continue;
114+
bipartiteLinkStartIndex += node.second.size();
115+
}
116+
}
117+
95118
unsigned int linkIndex = 0;
119+
unsigned int featureLinkIndex = bipartiteLinkStartIndex; // bipartite case
96120

97121
for (const auto& node : nodeLinkMap) {
98122
const auto sourceId = node.first.id;
@@ -107,10 +131,19 @@ FlowCalculator::FlowCalculator(StateNetwork& network, const Config& config)
107131
sumLinkOutWeight[sourceIndex] += linkWeight;
108132
nodeFlow[sourceIndex] += linkWeight / sumUndirLinkWeight;
109133

110-
flowLinks[linkIndex].source = sourceIndex;
111-
flowLinks[linkIndex].target = targetIndex;
112-
flowLinks[linkIndex].flow = linkWeight;
113-
++linkIndex;
134+
if (network.isBipartite() && sourceId >= network.bipartiteStartId()) {
135+
// Link from feature node to ordinary node
136+
flowLinks[featureLinkIndex].source = sourceIndex;
137+
flowLinks[featureLinkIndex].target = targetIndex;
138+
flowLinks[featureLinkIndex].flow = linkWeight;
139+
++featureLinkIndex;
140+
} else {
141+
// Ordinary link, or unipartite
142+
flowLinks[linkIndex].source = sourceIndex;
143+
flowLinks[linkIndex].target = targetIndex;
144+
flowLinks[linkIndex].flow = linkWeight;
145+
++linkIndex;
146+
}
114147

115148
if (sourceIndex != targetIndex) {
116149
if (config.isUndirectedFlow()) {
@@ -130,7 +163,11 @@ FlowCalculator::FlowCalculator(StateNetwork& network, const Config& config)
130163
calcUndirectedFlow();
131164
break;
132165
case FlowModel::directed:
133-
calcDirectedFlow(network, config);
166+
if (network.isBipartite() && config.bipartiteTeleportation) {
167+
calcDirectedBipartiteFlow(network, config);
168+
} else {
169+
calcDirectedFlow(network, config);
170+
}
134171
break;
135172
case FlowModel::undirdir:
136173
case FlowModel::outdirdir:
@@ -164,7 +201,7 @@ void FlowCalculator::calcDirdirFlow(const Config& config) noexcept
164201
Log() << "\n -> Using undirected links, switching to directed after steady state.";
165202

166203
//Take one last power iteration
167-
const auto nodeFlowSteadyState = std::vector<double>(nodeFlow);
204+
const std::vector<double> nodeFlowSteadyState(nodeFlow);
168205
nodeFlow.assign(numNodes, 0.0);
169206

170207
for (const auto& link : flowLinks) {
@@ -200,7 +237,8 @@ struct IterationResult {
200237
};
201238

202239
template <typename Iteration>
203-
IterationResult powerIterate(double alpha, Iteration&& iter) {
240+
IterationResult powerIterate(double alpha, Iteration&& iter)
241+
{
204242
unsigned int iterations = 0;
205243
double beta = 1.0 - alpha;
206244
double err = 0.0;
@@ -255,7 +293,7 @@ void FlowCalculator::calcDirectedFlow(const StateNetwork& network, const Config&
255293
}
256294
}
257295

258-
auto nodeFlowTmp = std::vector<double>(numNodes, 0.0);
296+
std::vector<double> nodeFlowTmp(numNodes, 0.0);
259297
double danglingRank;
260298

261299
// Calculate PageRank
@@ -317,32 +355,168 @@ void FlowCalculator::calcDirectedFlow(const StateNetwork& network, const Config&
317355
}
318356
}
319357

358+
void FlowCalculator::calcDirectedBipartiteFlow(const StateNetwork& network, const Config& config) noexcept
359+
{
360+
Log() << "\n -> Using bipartite " << (config.recordedTeleportation ? "recorded" : "unrecorded") << " teleportation to " << (config.teleportToNodes ? "nodes" : "links") << ". " << std::flush;
361+
362+
const auto bipartiteStartId = network.bipartiteStartId();
363+
364+
if (config.teleportToNodes) {
365+
for (const auto& nodeIt : network.nodes()) {
366+
auto& node = nodeIt.second;
367+
if (node.id < bipartiteStartId) {
368+
nodeTeleportRates[nodeIndexMap[node.id]] = node.weight;
369+
}
370+
}
371+
} else {
372+
// Teleport proportionally to out-degree, or in-degree if recorded teleportation.
373+
// Two-step degree: sum of products between incoming and outgoing links from bipartite nodes
374+
375+
if (config.recordedTeleportation) {
376+
for (auto link = begin(flowLinks) + bipartiteLinkStartIndex; link != end(flowLinks); ++link) {
377+
// target is an ordinary node
378+
nodeTeleportRates[link->target] += link->flow;
379+
}
380+
} else {
381+
// Unrecorded teleportation
382+
383+
for (auto link = begin(flowLinks); link != begin(flowLinks) + bipartiteLinkStartIndex; ++link) {
384+
// source is an ordinary node
385+
nodeTeleportRates[link->source] += link->flow;
386+
}
387+
}
388+
}
389+
390+
normalize(nodeTeleportRates);
391+
392+
nodeFlow = nodeTeleportRates;
393+
394+
// Normalize link weights with respect to its source nodes total out-link weight;
395+
for (auto& link : flowLinks) {
396+
if (sumLinkOutWeight[link.source] > 0) {
397+
link.flow /= sumLinkOutWeight[link.source];
398+
}
399+
}
400+
401+
std::vector<unsigned int> danglingIndices;
402+
for (size_t i = 0; i < numNodes; ++i) {
403+
if (nodeOutDegree[i] == 0) {
404+
danglingIndices.push_back(i);
405+
}
406+
}
407+
408+
std::vector<double> nodeFlowTmp(numNodes, 0.0);
409+
double danglingRank;
410+
411+
// Calculate two-step PageRank
412+
const auto iteration = [&](const auto iteration, const double alpha, const double beta) {
413+
danglingRank = 0.0;
414+
for (const auto& i : danglingIndices) {
415+
danglingRank += nodeFlow[i];
416+
}
417+
418+
// Flow from teleportation
419+
const auto teleportationFlow = alpha + beta * danglingRank;
420+
for (unsigned int i = 0; i < bipartiteStartIndex; ++i) {
421+
nodeFlowTmp[i] = teleportationFlow * nodeTeleportRates[i];
422+
}
423+
424+
for (unsigned int i = bipartiteStartIndex; i < numNodes; ++i) {
425+
nodeFlowTmp[i] = 0.0;
426+
}
427+
428+
// Flow from links
429+
// First step
430+
for (auto link = begin(flowLinks); link != begin(flowLinks) + bipartiteLinkStartIndex; ++link) {
431+
nodeFlow[link->target] += beta * link->flow * nodeFlow[link->source];
432+
}
433+
434+
// Second step back to primary nodes
435+
for (auto link = begin(flowLinks) + bipartiteLinkStartIndex; link != end(flowLinks); ++link) {
436+
nodeFlowTmp[link->target] += link->flow * nodeFlow[link->source];
437+
}
438+
439+
// Update node flow from the power iteration above and check if converged
440+
double nodeFlowDiff = -1.0;
441+
double error = 0.0;
442+
for (unsigned int i = 0; i < bipartiteStartIndex; ++i) {
443+
nodeFlowDiff += nodeFlowTmp[i];
444+
error += std::abs(nodeFlowTmp[i] - nodeFlow[i]);
445+
}
446+
447+
nodeFlow = nodeFlowTmp;
448+
449+
// Normalize if needed
450+
if (std::abs(nodeFlowDiff) > 1.0e-10) {
451+
Log() << "(Normalizing ranks after " << iteration << " power iterations with error " << nodeFlowDiff << ") ";
452+
normalize(nodeFlow, nodeFlowDiff + 1.0);
453+
}
454+
455+
return error;
456+
};
457+
458+
const auto result = powerIterate(config.teleportationProbability, iteration);
459+
460+
double sumNodeRank = 1.0;
461+
double beta = result.beta;
462+
463+
if (!config.recordedTeleportation) {
464+
//Take one last power iteration excluding the teleportation (and normalize node flow to sum 1.0)
465+
sumNodeRank = 1.0 - danglingRank;
466+
nodeFlow.assign(numNodes, 0.0);
467+
468+
for (auto link = begin(flowLinks); link != begin(flowLinks) + bipartiteLinkStartIndex; ++link) {
469+
nodeFlowTmp[link->target] += link->flow * nodeFlowTmp[link->source];
470+
}
471+
// Second step back to primary nodes
472+
for (auto link = begin(flowLinks) + bipartiteLinkStartIndex; link != end(flowLinks); ++link) {
473+
nodeFlow[link->target] += link->flow * nodeFlowTmp[link->source];
474+
}
475+
476+
beta = 1.0;
477+
}
478+
479+
// Update the links with their global flow from the PageRank values.
480+
// Note: beta is set to 1 if unrecorded teleportation
481+
for (auto& link : flowLinks) {
482+
link.flow *= beta * nodeFlowTmp[link.source] / sumNodeRank;
483+
}
484+
}
485+
320486
void FlowCalculator::finalize(StateNetwork& network, const Config& config, bool normalizeNodeFlow) noexcept
321487
{
322488
// TODO: Skip bipartite flow adjustment for directed / rawdir / .. ?
323-
if (network.isBipartite() && !config.skipAdjustBipartiteFlow) {
489+
if (network.isBipartite()) {
324490
Log() << "\n -> Using bipartite links.";
325491

326-
// Only links between ordinary nodes and feature nodes in bipartite network
327-
// Don't code feature nodes -> distribute all flow from those to ordinary nodes
328-
for (auto& link : flowLinks) {
329-
auto bipartiteStartIndex = nodeIndexMap[network.bipartiteStartId()];
330-
auto sourceIsFeature = link.source >= bipartiteStartIndex;
492+
if (!config.skipAdjustBipartiteFlow && !config.bipartiteTeleportation) {
493+
// Only links between ordinary nodes and feature nodes in bipartite network
494+
// Don't code feature nodes -> distribute all flow from those to ordinary nodes
495+
for (auto& link : flowLinks) {
496+
auto sourceIsFeature = link.source >= bipartiteStartIndex;
497+
498+
if (sourceIsFeature) {
499+
nodeFlow[link.target] += link.flow;
500+
nodeFlow[link.source] = 0.0; // Doesn't matter if done multiple times on each node.
501+
} else {
502+
nodeFlow[link.source] += link.flow;
503+
nodeFlow[link.target] = 0.0; // Doesn't matter if done multiple times on each node.
504+
}
505+
// TODO: Should flow double before moving to nodes, does it cancel out in normalization?
331506

332-
if (sourceIsFeature) {
333-
nodeFlow[link.target] += link.flow;
334-
nodeFlow[link.source] = 0.0; // Doesn't matter if done multiple times on each node.
335-
} else {
336-
nodeFlow[link.source] += link.flow;
337-
nodeFlow[link.target] = 0.0; // Doesn't matter if done multiple times on each node.
507+
// Markov time 2 on the full network will correspond to markov time 1 between the real nodes.
508+
link.flow *= 2;
338509
}
339510
// TODO: Should flow double before moving to nodes, does it cancel out in normalization?
340511

341-
// Markov time 2 on the full network will correspond to markov time 1 between the real nodes.
342-
link.flow *= 2;
343-
}
512+
normalizeNodeFlow = true;
344513

345-
normalizeNodeFlow = true;
514+
} else if (config.bipartiteTeleportation) {
515+
for (auto& link : flowLinks) {
516+
// Markov time 2 on the full network will correspond to markov time 1 between the real nodes.
517+
link.flow *= 2;
518+
}
519+
}
346520
}
347521

348522
if (config.useNodeWeightsAsFlow) {
@@ -373,11 +547,16 @@ void FlowCalculator::finalize(StateNetwork& network, const Config& config, bool
373547

374548
double sumLinkFlow = 0.0;
375549
unsigned int linkIndex = 0;
550+
auto featureLinkIndex = bipartiteLinkStartIndex;
376551

377552
for (auto& node : network.m_nodeLinkMap) {
378553
for (auto& link : node.second) {
379554
auto& linkData = link.second;
380-
linkData.flow = flowLinks[linkIndex++].flow;
555+
if (network.isBipartite() && node.first.id >= network.bipartiteStartId()) {
556+
linkData.flow = flowLinks[featureLinkIndex++].flow;
557+
} else {
558+
linkData.flow = flowLinks[linkIndex++].flow;
559+
}
381560
sumLinkFlow += linkData.flow;
382561
}
383562
}

src/utils/FlowCalculator.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,17 @@ class FlowCalculator {
6060

6161
void calcUndirectedFlow() noexcept;
6262
void calcDirectedFlow(const StateNetwork&, const Config&) noexcept;
63+
void calcDirectedBipartiteFlow(const StateNetwork&, const Config&) noexcept;
6364
void calcDirdirFlow(const Config&) noexcept;
6465
void calcRawdirFlow() noexcept;
6566

6667
void finalize(StateNetwork&, const Config&, bool) noexcept;
6768

6869
unsigned int numNodes;
6970
unsigned int numLinks;
70-
unsigned int nonDanglingStartIndex;
71+
unsigned int nonDanglingStartIndex = 0;
72+
unsigned int bipartiteStartIndex = 0;
73+
unsigned int bipartiteLinkStartIndex = 0;
7174

7275
double sumLinkWeight;
7376
double sumUndirLinkWeight;

0 commit comments

Comments
 (0)