Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
MongoNetworkError,
MongoNetworkTimeoutError,
MongoServerClosedError,
MongoServerError,
MongoUnexpectedServerResponseError,
needsRetryableWriteLabel
} from '../error';
Expand Down Expand Up @@ -385,7 +386,7 @@ function calculateRoundTripTime(oldRtt: number, duration: number): number {
return alpha * duration + (1 - alpha) * oldRtt;
}

function markServerUnknown(server: Server, error?: MongoError) {
function markServerUnknown(server: Server, error?: MongoServerError) {
// Load balancer servers can never be marked unknown.
if (server.loadBalanced) {
return;
Expand All @@ -397,11 +398,7 @@ function markServerUnknown(server: Server, error?: MongoError) {

server.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(server.description.hostAddress, undefined, {
error,
topologyVersion:
error && error.topologyVersion ? error.topologyVersion : server.description.topologyVersion
})
new ServerDescription(server.description.hostAddress, undefined, { error })
);
}

Expand Down
104 changes: 37 additions & 67 deletions src/sdam/server_description.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Document, Long, ObjectId } from '../bson';
import type { MongoError } from '../error';
import { MongoRuntimeError, MongoServerError } from '../error';
import { arrayStrictEqual, compareObjectId, errorStrictEqual, HostAddress, now } from '../utils';
import type { ClusterTime } from './common';
import { ServerType } from './common';
Expand Down Expand Up @@ -31,14 +31,11 @@ export type TagSet = { [key: string]: string };
/** @internal */
export interface ServerDescriptionOptions {
/** An Error used for better reporting debugging */
error?: MongoError;
error?: MongoServerError;

/** The round trip time to ping this server (in ms) */
roundTripTime?: number;

/** The topologyVersion */
topologyVersion?: TopologyVersion;

/** If the client is in load balancing mode. */
loadBalanced?: boolean;
}
Expand All @@ -50,28 +47,25 @@ export interface ServerDescriptionOptions {
* @public
*/
export class ServerDescription {
private _hostAddress: HostAddress;
address: string;
type: ServerType;
hosts: string[];
passives: string[];
arbiters: string[];
tags: TagSet;

error?: MongoError;
topologyVersion?: TopologyVersion;
error: MongoServerError | null;
topologyVersion: TopologyVersion | null;
minWireVersion: number;
maxWireVersion: number;
roundTripTime: number;
lastUpdateTime: number;
lastWriteDate: number;

me?: string;
primary?: string;
setName?: string;
setVersion?: number;
electionId?: ObjectId;
logicalSessionTimeoutMinutes?: number;
me: string | null;
primary: string | null;
setName: string | null;
setVersion: number | null;
electionId: ObjectId | null;
logicalSessionTimeoutMinutes: number | null;

// NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level
$clusterTime?: ClusterTime;
Expand All @@ -83,14 +77,19 @@ export class ServerDescription {
* @param address - The address of the server
* @param hello - An optional hello response for this server
*/
constructor(address: HostAddress | string, hello?: Document, options?: ServerDescriptionOptions) {
if (typeof address === 'string') {
this._hostAddress = new HostAddress(address);
this.address = this._hostAddress.toString();
} else {
this._hostAddress = address;
this.address = this._hostAddress.toString();
constructor(
address: HostAddress | string,
hello?: Document,
options: ServerDescriptionOptions = {}
) {
if (address == null || address === '') {
throw new MongoRuntimeError('ServerDescription must be provided with a non-empty address');
}

this.address =
typeof address === 'string'
? HostAddress.fromString(address).toString(false) // Use HostAddress to normalize
: address.toString(false);
this.type = parseServerType(hello, options);
this.hosts = hello?.hosts?.map((host: string) => host.toLowerCase()) ?? [];
this.passives = hello?.passives?.map((host: string) => host.toLowerCase()) ?? [];
Expand All @@ -101,50 +100,20 @@ export class ServerDescription {
this.roundTripTime = options?.roundTripTime ?? -1;
this.lastUpdateTime = now();
this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0;

if (options?.topologyVersion) {
this.topologyVersion = options.topologyVersion;
} else if (hello?.topologyVersion) {
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this.topologyVersion = hello.topologyVersion;
}

if (options?.error) {
this.error = options.error;
}

if (hello?.primary) {
this.primary = hello.primary;
}

if (hello?.me) {
this.me = hello.me.toLowerCase();
}

if (hello?.setName) {
this.setName = hello.setName;
}

if (hello?.setVersion) {
this.setVersion = hello.setVersion;
}

if (hello?.electionId) {
this.electionId = hello.electionId;
}

if (hello?.logicalSessionTimeoutMinutes) {
this.logicalSessionTimeoutMinutes = hello.logicalSessionTimeoutMinutes;
}

if (hello?.$clusterTime) {
this.$clusterTime = hello.$clusterTime;
}
this.error = options.error ?? null;
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this.topologyVersion = this.error?.topologyVersion ?? hello?.topologyVersion ?? null;
this.setName = hello?.setName ?? null;
this.setVersion = hello?.setVersion ?? null;
this.electionId = hello?.electionId ?? null;
this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null;
this.primary = hello?.primary ?? null;
this.me = hello?.me?.toLowerCase() ?? null;
this.$clusterTime = hello?.$clusterTime ?? null;
}

get hostAddress(): HostAddress {
if (this._hostAddress) return this._hostAddress;
else return new HostAddress(this.address);
return HostAddress.fromString(this.address);
}

get allHosts(): string[] {
Expand Down Expand Up @@ -181,7 +150,8 @@ export class ServerDescription {
* in the {@link https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#serverdescription|SDAM spec}
*/
equals(other?: ServerDescription | null): boolean {
// TODO(NODE-4510): Check ServerDescription equality logic for nullish topologyVersion meaning "greater than"
// Despite using the comparator that would determine a nullish topologyVersion as greater than
// for equality we should only always perform direct equality comparison
const topologyVersionsEqual =
this.topologyVersion === other?.topologyVersion ||
compareTopologyVersion(this.topologyVersion, other?.topologyVersion) === 0;
Expand Down Expand Up @@ -271,8 +241,8 @@ function tagsStrictEqual(tags: TagSet, tags2: TagSet): boolean {
* ```
*/
export function compareTopologyVersion(
currentTv?: TopologyVersion,
newTv?: TopologyVersion
currentTv?: TopologyVersion | null,
newTv?: TopologyVersion | null
): 0 | -1 | 1 {
if (currentTv == null || newTv == null) {
return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return this.description.commonWireVersion;
}

get logicalSessionTimeoutMinutes(): number | undefined {
get logicalSessionTimeoutMinutes(): number | null {
return this.description.logicalSessionTimeoutMinutes;
}

Expand Down
87 changes: 42 additions & 45 deletions src/sdam/topology_description.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ObjectId } from '../bson';
import * as WIRE_CONSTANTS from '../cmap/wire_protocol/constants';
import { MongoError, MongoRuntimeError } from '../error';
import { MongoRuntimeError, MongoServerError } from '../error';
import { compareObjectId, shuffle } from '../utils';
import { ServerType, TopologyType } from './common';
import { ServerDescription } from './server_description';
Expand Down Expand Up @@ -32,29 +32,29 @@ export interface TopologyDescriptionOptions {
*/
export class TopologyDescription {
type: TopologyType;
setName?: string;
maxSetVersion?: number;
maxElectionId?: ObjectId;
setName: string | null;
maxSetVersion: number | null;
maxElectionId: ObjectId | null;
servers: Map<string, ServerDescription>;
stale: boolean;
compatible: boolean;
compatibilityError?: string;
logicalSessionTimeoutMinutes?: number;
logicalSessionTimeoutMinutes: number | null;
heartbeatFrequencyMS: number;
localThresholdMS: number;
commonWireVersion?: number;
commonWireVersion: number;

/**
* Create a TopologyDescription
*/
constructor(
topologyType: TopologyType,
serverDescriptions?: Map<string, ServerDescription>,
setName?: string,
maxSetVersion?: number,
maxElectionId?: ObjectId,
commonWireVersion?: number,
options?: TopologyDescriptionOptions
serverDescriptions: Map<string, ServerDescription> | null = null,
setName: string | null = null,
maxSetVersion: number | null = null,
maxElectionId: ObjectId | null = null,
commonWireVersion: number | null = null,
options: TopologyDescriptionOptions | null = null
) {
options = options ?? {};

Expand All @@ -64,22 +64,10 @@ export class TopologyDescription {
this.compatible = true;
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 0;
this.localThresholdMS = options.localThresholdMS ?? 15;

if (setName) {
this.setName = setName;
}

if (maxSetVersion) {
this.maxSetVersion = maxSetVersion;
}

if (maxElectionId) {
this.maxElectionId = maxElectionId;
}

if (commonWireVersion) {
this.commonWireVersion = commonWireVersion;
}
this.setName = setName ?? null;
this.maxElectionId = maxElectionId ?? null;
this.maxSetVersion = maxSetVersion ?? null;
this.commonWireVersion = commonWireVersion ?? 0;

// determine server compatibility
for (const serverDescription of this.servers.values()) {
Expand Down Expand Up @@ -108,12 +96,12 @@ export class TopologyDescription {
// value among ServerDescriptions of all data-bearing server types. If any have a null
// logicalSessionTimeoutMinutes, then TopologyDescription.logicalSessionTimeoutMinutes MUST be
// set to null.
this.logicalSessionTimeoutMinutes = undefined;
this.logicalSessionTimeoutMinutes = null;
for (const [, server] of this.servers) {
if (server.isReadable) {
if (server.logicalSessionTimeoutMinutes == null) {
// If any of the servers have a null logicalSessionsTimeout, then the whole topology does
this.logicalSessionTimeoutMinutes = undefined;
this.logicalSessionTimeoutMinutes = null;
break;
}

Expand Down Expand Up @@ -200,11 +188,6 @@ export class TopologyDescription {
// potentially mutated values
let { type: topologyType, setName, maxSetVersion, maxElectionId, commonWireVersion } = this;

if (serverDescription.setName && setName && serverDescription.setName !== setName) {
// TODO(NODE-4159): servers with an incorrect setName should be removed not marked Unknown
serverDescription = new ServerDescription(address, undefined);
}

const serverType = serverDescription.type;
const serverDescriptions = new Map(this.servers);

Expand All @@ -217,6 +200,19 @@ export class TopologyDescription {
}
}

if (
typeof serverDescription.setName === 'string' &&
typeof setName === 'string' &&
serverDescription.setName !== setName
) {
if (topologyType === TopologyType.Single) {
// "Single" Topology with setName mismatch is direct connection usage, mark unknown do not remove
serverDescription = new ServerDescription(address);
} else {
serverDescriptions.delete(address);
}
}

// update the actual server description
serverDescriptions.set(address, serverDescription);

Expand Down Expand Up @@ -311,15 +307,16 @@ export class TopologyDescription {
);
}

get error(): MongoError | undefined {
get error(): MongoServerError | null {
const descriptionsWithError = Array.from(this.servers.values()).filter(
(sd: ServerDescription) => sd.error
);

if (descriptionsWithError.length > 0) {
return descriptionsWithError[0].error;
}
return;

return null;
}

/**
Expand Down Expand Up @@ -366,10 +363,10 @@ function topologyTypeForServerType(serverType: ServerType): TopologyType {
function updateRsFromPrimary(
serverDescriptions: Map<string, ServerDescription>,
serverDescription: ServerDescription,
setName?: string,
maxSetVersion?: number,
maxElectionId?: ObjectId
): [TopologyType, string?, number?, ObjectId?] {
setName: string | null = null,
maxSetVersion: number | null = null,
maxElectionId: ObjectId | null = null
): [TopologyType, string | null, number | null, ObjectId | null] {
setName = setName || serverDescription.setName;
if (setName !== serverDescription.setName) {
serverDescriptions.delete(serverDescription.address);
Expand Down Expand Up @@ -436,7 +433,7 @@ function updateRsFromPrimary(
function updateRsWithPrimaryFromMember(
serverDescriptions: Map<string, ServerDescription>,
serverDescription: ServerDescription,
setName?: string
setName: string | null = null
): TopologyType {
if (setName == null) {
// TODO(NODE-3483): should be an appropriate runtime error
Expand All @@ -456,10 +453,10 @@ function updateRsWithPrimaryFromMember(
function updateRsNoPrimaryFromMember(
serverDescriptions: Map<string, ServerDescription>,
serverDescription: ServerDescription,
setName?: string
): [TopologyType, string?] {
setName: string | null = null
): [TopologyType, string | null] {
const topologyType = TopologyType.ReplicaSetNoPrimary;
setName = setName || serverDescription.setName;
setName = setName ?? serverDescription.setName;
if (setName !== serverDescription.setName) {
serverDescriptions.delete(serverDescription.address);
return [topologyType, setName];
Expand Down
Loading