Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ class AuthenticationError extends Error {
}
}

class ReplicationLimitError extends Error {
}

module.exports = {
PublicError,
NotFoundError,
PermissionError,
AuthenticationError,
ReplicationLimitError
};
21 changes: 20 additions & 1 deletion api/src/services/replication/authorization.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ const request = require('@medic/couch-request');
const environment = require('@medic/environment');
const nouveau = require('@medic/nouveau');
const { DOC_IDS, PREFIXES, DOC_TYPES } = require('@medic/constants');
const { ReplicationLimitError } = require('../../errors');


const REPLICATION_LIMITS = {
SUBJECTS_COUNT: 100 * 1000,
SUBJECT_HITS: 500 * 1000,
UNPURGED_DOCS_COUNT: 50 * 1000

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REP_LIMIT = 100k

SUBJECTS_COUNT = n1
SUBJECT_HITS = n
5

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support disabling limits

};
const ALL_KEY = '_all'; // key in the docs_by_replication_key view for records everyone can access
const UNASSIGNED_KEY = '_unassigned'; // key in the docs_by_replication_key view for unassigned records
const MEDIC_CLIENT_DDOC = '_design/medic-client';
Expand Down Expand Up @@ -629,14 +636,25 @@ const getDocsByReplicationKeyNouveau = async (authorizationContext) => {
uri: `${environment.couchUrl}/_design/medic/_nouveau/docs_by_replication_key`,
body: {
q: `key:(${chunk.map(nouveau.escapeKeys).join(' OR ')})`,
limit: nouveau.RESULTS_LIMIT,
limit: REPLICATION_LIMITS.SUBJECT_HITS,
}
});

if (!response.hits || !response.hits.length) {
continue;
}

// A single doc can have multiple hits (for different subjects), so this is not a strict "doc limit", but
// it is our "give up" limit
if ((hits.length + response.hits) > REPLICATION_LIMITS.SUBJECT_HITS) {
authorizationContext.userCtx.replicationLimitExceeded = true;
throw new ReplicationLimitError(
`User "${authorizationContext.userCtx.name}" exceeds the subject hits limit with at least [${
hits.length + response.hits
}] subject hits.`,
);
}

for (const hit of response.hits) {
hits.push({
id: hit.id,
Expand Down Expand Up @@ -724,6 +742,7 @@ const getViewResults = (doc) => {
};

module.exports = {
REPLICATION_LIMITS,
DEFAULT_DDOCS,
updateContext,
getDefaultDocs,
Expand Down
70 changes: 50 additions & 20 deletions api/src/services/replication/replication.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,63 @@
const db = require('../../db');
const authorization = require('./authorization');
const { REPLICATION_LIMITS } = require('./authorization');
const purgedDocs = require('./purged-docs');
const _ = require('lodash');
const replicationLimitLog = require('./replication-limit-log');
const dataContext = require('../data-context');
const config = require('../../config');
const { users, updateUser } = require('@medic/user-management')(config, db, dataContext);
const { ReplicationLimitError } = require('../../errors');

const assertUserAllowedToReplicate = async ({ name }) => {
const { replication_blocked } = await users.getUserDoc(name);
if (replication_blocked === true) {
throw new Error(`User "${name}" has been banned from replicating.`);
}
};

const getContext = async (userCtx) => {
await assertUserAllowedToReplicate(userCtx);
const info = await db.medic.info();
const authContext = await authorization.getAuthorizationContext(userCtx);
userCtx.subjectsCount = authContext.subjectIds.length;
const docsByReplicationKey = await authorization.getDocsByReplicationKey(authContext);

const allowedIds = authorization.filterAllowedDocIds(authContext, docsByReplicationKey);
userCtx.docsCount = allowedIds.length;
const unpurgedIds = await purgedDocs.getUnPurgedIds(userCtx, allowedIds);
userCtx.unpurgedDocsCount = unpurgedIds.length;

const excludeTasks = { includeTasks: false };
const warnIds = authorization.filterAllowedDocIds(authContext, docsByReplicationKey, excludeTasks);
const unpurgedWarnIds = _.intersection(unpurgedIds, warnIds);

await replicationLimitLog.put(userCtx.name, unpurgedIds.length, allowedIds.length);

return {
docIds: unpurgedIds,
warnDocIds: unpurgedWarnIds,
warn: unpurgedWarnIds.length >= replicationLimitLog.DOC_IDS_WARN_LIMIT,
limit: replicationLimitLog.DOC_IDS_WARN_LIMIT,
lastSeq: info.update_seq,
};
try {
if (userCtx.subjectsCount > REPLICATION_LIMITS.SUBJECTS_COUNT) {
throw new ReplicationLimitError(
`User "${userCtx.name}" exceeds the subject limit with [${userCtx.subjectsCount}] subjects.`
);
}
const docsByReplicationKey = await authorization.getDocsByReplicationKey(authContext);

const allowedIds = authorization.filterAllowedDocIds(authContext, docsByReplicationKey);
userCtx.docsCount = allowedIds.length;
const unpurgedIds = await purgedDocs.getUnPurgedIds(userCtx, allowedIds);
userCtx.unpurgedDocsCount = unpurgedIds.length;
if (userCtx.unpurgedDocsCount > REPLICATION_LIMITS.UNPURGED_DOCS_COUNT) {
throw new ReplicationLimitError(
`User "${userCtx.name}" exceeds the document replication limit with [${userCtx.unpurgedDocsCount}] documents.`
);
}

const excludeTasks = { includeTasks: false };
const warnIds = authorization.filterAllowedDocIds(authContext, docsByReplicationKey, excludeTasks);
const unpurgedWarnIds = _.intersection(unpurgedIds, warnIds);

await replicationLimitLog.put(userCtx.name, unpurgedIds.length, allowedIds.length);

return {
docIds: unpurgedIds,
warnDocIds: unpurgedWarnIds,
warn: unpurgedWarnIds.length >= replicationLimitLog.DOC_IDS_WARN_LIMIT,
limit: replicationLimitLog.DOC_IDS_WARN_LIMIT,
lastSeq: info.update_seq,
};
} catch (e) {
if (e instanceof ReplicationLimitError) {
await updateUser(userCtx.name, { replication_blocked: true }, true);
}
throw e;
}
};

const getDocIdsRevPairs = async (docIds) => {
Expand Down
2 changes: 2 additions & 0 deletions shared-libs/user-management/src/users.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const USER_EDITABLE_FIELDS = RESTRICTED_USER_EDITABLE_FIELDS.concat([
'type',
'roles',
'oidc_username',
'replication_blocked'
]);

const RESTRICTED_SETTINGS_EDITABLE_FIELDS = [
Expand Down Expand Up @@ -374,6 +375,7 @@ const mapUser = (user, setting, facilities) => {
external_id: setting.external_id,
known: user.known,
oidc_username: user.oidc_username,
replication_blocked: user.replication_blocked
};
};

Expand Down
Loading