-
-
Notifications
You must be signed in to change notification settings - Fork 402
fix(#10875): sentinel wait for transitions infodoc to be valid #11135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
656eb5f
3ef5980
3de6f0d
2add51a
800b60f
5def568
5b419a4
08acb4c
ffc737b
34e75b0
98c7671
604abdf
395278b
feecb0b
aa6947f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -151,33 +151,61 @@ | |||||||||||||
| }; | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| const saveTransitions = change => { | ||||||||||||||
| return saveProperty(change.id, change.info, 'transitions', {}); | ||||||||||||||
| const saveTransitions = (change, clearInvalid = false) => { | ||||||||||||||
| return modifyInfoDoc(change.id, infoDoc => { | ||||||||||||||
| infoDoc.transitions = (change.info && change.info.transitions) || {}; | ||||||||||||||
|
Check warning on line 156 in shared-libs/infodoc/src/infodoc.js
|
||||||||||||||
| // If there is a mid-write flag set, clear it now to mark the infoDoc valid again | ||||||||||||||
| if (clearInvalid) { | ||||||||||||||
| delete infoDoc.invalid_rev; | ||||||||||||||
| } | ||||||||||||||
| }, change.info); | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| const saveCompletedTasks = (id, infodoc, completedTasks = []) => { | ||||||||||||||
| return saveProperty(id, infodoc, 'completed_tasks', completedTasks); | ||||||||||||||
|
witash marked this conversation as resolved.
|
||||||||||||||
| return modifyInfoDoc(id, infoDoc => { | ||||||||||||||
| infoDoc.completed_tasks = (infodoc && infodoc.completed_tasks) || completedTasks; | ||||||||||||||
| }, infodoc); | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| const setInvalidRev = (id, invalidRev) => { | ||||||||||||||
| return modifyInfoDoc(id, infoDoc => { | ||||||||||||||
| infoDoc.invalid_rev = invalidRev; | ||||||||||||||
| }); | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| const clearInvalidRev = (id) => { | ||||||||||||||
| return modifyInfoDoc(id, infoDoc => { | ||||||||||||||
| delete infoDoc.invalid_rev; | ||||||||||||||
| }); | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| const saveProperty = async (id, infodoc, property, defaultValue = {}) => { | ||||||||||||||
| let updatedInfoDoc; | ||||||||||||||
| // Fetch the infodoc. If it is missing, return `fallback` (to be created) when provided; | ||||||||||||||
| // otherwise the 404 is raised. | ||||||||||||||
| const fetchInfoDoc = async (id, fallback) => { | ||||||||||||||
| try { | ||||||||||||||
| updatedInfoDoc = await db.sentinel.get(getInfoDocId(id)); | ||||||||||||||
| updatedInfoDoc[property] = (infodoc && infodoc[property]) || defaultValue; | ||||||||||||||
| return await db.sentinel.get(getInfoDocId(id)); | ||||||||||||||
| } catch (err) { | ||||||||||||||
| if (err.status !== 404) { | ||||||||||||||
| if (err.status !== 404 || !fallback) { | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would be easier to understand if reversed:
Suggested change
There also seems to be a behavior change here, where if fallback is not passed, the 404 would throw compared to before when it did not throw. Is there a reason for this shift?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reversed the condition for readability. the intention was to not change existing behavior for the existing functions, but allow the new functions to raise errors if the infodoc doesn't exist instead of silently creating it. But it's more consistent with the existing functions to silently create a blank infoDoc, so changed to do that instead. |
||||||||||||||
| throw err; | ||||||||||||||
| } | ||||||||||||||
| updatedInfoDoc = infodoc; | ||||||||||||||
| return fallback; | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| // Fetch the infodoc, apply `modify`, and save, retrying on conflict so the change always lands on the | ||||||||||||||
| // latest rev. | ||||||||||||||
| const modifyInfoDoc = async (id, modify, fallback) => { | ||||||||||||||
| const infoDoc = await fetchInfoDoc(id, fallback); | ||||||||||||||
|
|
||||||||||||||
| modify(infoDoc); | ||||||||||||||
|
|
||||||||||||||
| try { | ||||||||||||||
| return await db.sentinel.put(updatedInfoDoc); | ||||||||||||||
| return await db.sentinel.put(infoDoc); | ||||||||||||||
| } catch (err) { | ||||||||||||||
| if (err.status !== 409) { | ||||||||||||||
| throw err; | ||||||||||||||
| } | ||||||||||||||
| return saveProperty(id, infodoc, property, defaultValue); | ||||||||||||||
| return modifyInfoDoc(id, modify, fallback); | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -283,6 +311,8 @@ | |||||||||||||
| bulkUpdate: bulkUpdate, | ||||||||||||||
| saveTransitions: saveTransitions, | ||||||||||||||
| saveCompletedTasks: saveCompletedTasks, | ||||||||||||||
| setInvalidRev: setInvalidRev, | ||||||||||||||
| clearInvalidRev: clearInvalidRev, | ||||||||||||||
|
|
||||||||||||||
| // Used to update infodoc metadata that occurs at write time. A delete does not count as a write | ||||||||||||||
| // in this instance, as deletes resolve as infodoc cleanups once sentinel's background-cleanup | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -604,6 +604,62 @@ describe('infodoc', () => { | |
| assert.deepEqual(db.sentinel.put.args[20], [{ ...info, transitions: change.info.transitions }]); | ||
| }); | ||
| }); | ||
|
|
||
| it('clears invalid_rev when clearInvalid is set', () => { | ||
| const info = { _id: 'some-info', doc_id: 'some', invalid_rev: '1-abc' }; | ||
| const change = { id: 'some', info: { transitions: { one: { ok: true } } } }; | ||
| sinon.stub(db.sentinel, 'get').resolves(info); | ||
| sinon.stub(db.sentinel, 'put').resolves(); | ||
|
|
||
| return lib.saveTransitions(change, true).then(() => { | ||
| const saved = db.sentinel.put.args[0][0]; | ||
| assert.isUndefined(saved.invalid_rev); | ||
| }); | ||
| }); | ||
| }); | ||
|
|
||
| describe('setInvalidRev / clearInvalidRev', () => { | ||
| it('setInvalidRev marks the infodoc mid-write', () => { | ||
| const info = { _id: 'some-info', doc_id: 'some' }; | ||
| sinon.stub(db.sentinel, 'get').resolves(info); | ||
| sinon.stub(db.sentinel, 'put').resolves(); | ||
|
|
||
| return lib.setInvalidRev('some', '1-abc').then(() => { | ||
| assert.deepEqual(db.sentinel.get.args[0], ['some-info']); | ||
| assert.equal(db.sentinel.put.args[0][0].invalid_rev, '1-abc'); | ||
| }); | ||
| }); | ||
|
|
||
| it('clearInvalidRev removes the mid-write marker', () => { | ||
| const info = { _id: 'some-info', doc_id: 'some', invalid_rev: '1-abc' }; | ||
| sinon.stub(db.sentinel, 'get').resolves(info); | ||
| sinon.stub(db.sentinel, 'put').resolves(); | ||
|
|
||
| return lib.clearInvalidRev('some').then(() => { | ||
| assert.isUndefined(db.sentinel.put.args[0][0].invalid_rev); | ||
| }); | ||
| }); | ||
|
|
||
| it('retries on 409 conflict', () => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding this test.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stubbed db.sentinel.put to reject with 409 on the first 100 calls, then resolve on the 101st |
||
| const info = { _id: 'some-info', doc_id: 'some' }; | ||
| sinon.stub(db.sentinel, 'get').resolves(info); | ||
| const put = sinon.stub(db.sentinel, 'put'); | ||
| put.onCall(0).rejects({ status: 409 }); | ||
| put.onCall(1).resolves(); | ||
|
|
||
| return lib.setInvalidRev('some', '1-abc').then(() => { | ||
| assert.equal(db.sentinel.put.callCount, 2); | ||
| }); | ||
| }); | ||
|
|
||
| it('throws non-409 errors', () => { | ||
| sinon.stub(db.sentinel, 'get').resolves({ _id: 'some-info', doc_id: 'some' }); | ||
| sinon.stub(db.sentinel, 'put').rejects({ status: 500 }); | ||
|
|
||
| return lib.setInvalidRev('some', '1-abc') | ||
| .then(() => assert.fail('should have thrown')) | ||
| .catch(err => assert.equal(err.status, 500)); | ||
| }); | ||
| }); | ||
|
|
||
| describe('saveCompletedTasks', () => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,13 +45,34 @@ | |
| const transitions = []; | ||
| let loadErrors = false; | ||
|
|
||
| const MAX_INFODOC_WAIT = 5; | ||
| const INFODOC_WAIT_INTERVAL = 100; | ||
|
|
||
| const isInfoDocMidWrite = infoDoc => infoDoc && infoDoc.invalid_rev !== undefined; | ||
|
Check warning on line 51 in shared-libs/transitions/src/transitions/index.js
|
||
|
|
||
| const getConsistentInfoDoc = (change, retriesLeft) => { | ||
|
witash marked this conversation as resolved.
Outdated
|
||
| return infodoc.get(change).then(infoDoc => { | ||
| if (!isInfoDocMidWrite(infoDoc) || retriesLeft <= 0) { | ||
| return infoDoc; | ||
| } | ||
| return new Promise(resolve => setTimeout(resolve, INFODOC_WAIT_INTERVAL)) | ||
| .then(() => getConsistentInfoDoc(change, retriesLeft - 1)); | ||
| }); | ||
| }; | ||
|
|
||
| // applies all loaded transitions over a change | ||
| const processChange = (change, callback) => { | ||
| lineage | ||
| .fetchHydratedDoc(change.id) | ||
| .then(doc => { | ||
| change.doc = doc; | ||
| return infodoc.get(change).then(infoDoc => { | ||
| return getConsistentInfoDoc(change, MAX_INFODOC_WAIT).then(infoDoc => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the central change in the PR , but I don't see any test covering
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added 'should wait for transitions_started to clear before processing the change' and 'should skip processing the change when transitions_started never clears'. |
||
| if (isInfoDocMidWrite(infoDoc)) { | ||
| logger.warn( | ||
| `transitions: infodoc for ${change.id} still mid-write after ${MAX_INFODOC_WAIT} retries, skipping` | ||
| ); | ||
| return callback(); | ||
| } | ||
| change.info = infoDoc; | ||
| change.initialProcessing = !infoDoc.transitions; | ||
| // Remove transitions from doc since those | ||
|
|
@@ -107,10 +128,11 @@ | |
|
|
||
| // doc was not changed by any transition, so we save the original doc | ||
| change.doc = docs.find(doc => doc._id === change.id); | ||
| saveDoc(change, (err, result) => { | ||
| callback(null, err || result); | ||
| }); | ||
| }); | ||
| saveDoc(change).then( | ||
| result => callback(null, result), | ||
| err => callback(null, err) | ||
| ); | ||
| }, { manageInfoDocRev: true }); | ||
| }); | ||
| async.series(operations, (err, results) => { | ||
| return err ? reject(err) : resolve(results); | ||
|
|
@@ -235,46 +257,61 @@ | |
| * did nothing and saving is unnecessary. If results has a true value in | ||
| * it then a change was made. | ||
| */ | ||
| const finalize = ({ change, results }, callback) => { | ||
|
witash marked this conversation as resolved.
|
||
| const finalize = ({ change, results, manageInfoDocRev = false }, callback) => { | ||
|
witash marked this conversation as resolved.
Outdated
|
||
| finalizeChange({ change, results, manageInfoDocRev }) | ||
| .then(result => callback(null, result)) | ||
| .catch(err => { | ||
| logger.error(`error saving changes on doc ${change.id} seq ${change.seq}: %o`, err); | ||
| callback(err); | ||
| }); | ||
| }; | ||
|
|
||
| const finalizeChange = async ({ change, results, manageInfoDocRev }) => { | ||
| logger.debug(`transition results: ${JSON.stringify(results)}`); | ||
|
|
||
| const changed = _.some(results, i => Boolean(i)); | ||
| if (!changed) { | ||
| logger.debug( | ||
| `nothing changed skipping saveDoc for doc ${change.id} seq ${change.seq}` | ||
| ); | ||
| // info.transitions is how we know if a doc has been processed by Sentinel before. Even if no transitions ran, | ||
| // we still want to save transitions, so we know it's been processed. | ||
| return Promise | ||
| .resolve() | ||
| .then(() => { | ||
| if (change.initialProcessing) { | ||
| return infodoc.saveTransitions(change); | ||
| } | ||
| }) | ||
| .then(() => callback()) | ||
| .catch(err => callback(err)); | ||
| if (!_.some(results, Boolean)) { | ||
| logger.debug(`nothing changed skipping saveDoc for doc ${change.id} seq ${change.seq}`); | ||
| // info.transitions is how we know Sentinel has processed a doc; record it even when nothing ran. | ||
| if (change.initialProcessing) { | ||
| await infodoc.saveTransitions(change); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| logger.debug(`calling saveDoc on doc ${change.id} seq ${change.seq}`); | ||
| return manageInfoDocRev ? saveForApi(change) : saveForSentinel(change); | ||
| }; | ||
|
|
||
| saveDoc(change, (err, result) => { | ||
| // todo: how to handle a failed save? for now just | ||
| // waiting until next change and try again. | ||
| if (err) { | ||
| logger.error(`error saving changes on doc ${change.id} seq ${change.seq}: %o`, err); | ||
| return callback(err); | ||
| } | ||
| // Sentinel processing: save the doc, then record transitions. Sentinel never writes the invalid_rev | ||
| // marker; it only reads it (see getConsistentInfoDoc) to skip docs API is still writing. | ||
| const saveForSentinel = async change => { | ||
| const result = await saveDoc(change); | ||
| logger.info(`saved changes on doc ${change.id} seq ${change.seq}`); | ||
| await infodoc.saveTransitions(change); | ||
| return result; | ||
| }; | ||
|
|
||
| // API processing: mark the infodoc mid-write (invalid_rev) around the doc write so a concurrent | ||
| // sentinel read detects it and waits. The marker is cleared as part of the transitions write on | ||
| // success, or rolled back on any failure after it's set. | ||
| const saveForApi = async change => { | ||
| await infodoc.setInvalidRev(change.id, change.doc._rev ?? null); | ||
| try { | ||
| const result = await saveDoc(change); | ||
| logger.info(`saved changes on doc ${change.id} seq ${change.seq}`); | ||
| infodoc.saveTransitions(change) | ||
| .then(() => callback(null, result)) | ||
| .catch(err => callback(err)); | ||
| }); | ||
| await infodoc.saveTransitions(change, true); | ||
| return result; | ||
| } catch (err) { | ||
| await infodoc | ||
| .clearInvalidRev(change.id) | ||
| .catch(clearErr => logger.error(`error clearing invalid_rev on doc ${change.id}: %o`, clearErr)); | ||
| throw err; | ||
| } | ||
| }; | ||
|
|
||
| const saveDoc = (change, callback) => { | ||
| const saveDoc = change => { | ||
| lineage.minify(change.doc); | ||
| db.medic.put(change.doc, callback); | ||
| return db.medic.put(change.doc); | ||
| }; | ||
|
|
||
| /* | ||
|
|
@@ -335,7 +372,7 @@ | |
| .then(changed => callback(null, changed)); // return the promise instead | ||
| }; | ||
|
|
||
| const applyTransitions = (change, callback) => { | ||
| const applyTransitions = (change, callback, { manageInfoDocRev = false } = {}) => { | ||
| const operations = transitions | ||
| .map(transition => { | ||
| const opts = { | ||
|
|
@@ -353,7 +390,7 @@ | |
| * function. All we care about are results and whether we need to | ||
| * save or not. | ||
| */ | ||
| async.series(operations, (err, results) => finalize({ change, results }, callback)); | ||
| async.series(operations, (err, results) => finalize({ change, results, manageInfoDocRev }, callback)); | ||
| }; | ||
|
|
||
| const availableTransitions = () => { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.