Skip to content

Commit a800b57

Browse files
Merge branch 'newarchitecture' into refactor/human_readable_logs_in_console
2 parents 33d66be + 5399905 commit a800b57

File tree

7 files changed

+287
-42
lines changed

7 files changed

+287
-42
lines changed

plugins/views/api/aggregator.js

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ var plugins = require('../../pluginManager.ts'),
33
const UnifiedEventSource = require('../../../api/eventSource/UnifiedEventSource.js');
44
const log = require('../../../api/utils/log.js')('views:aggregator');
55
const crypto = require('crypto');
6+
const viewsUtils = require('./parts/viewsUtils.js');
67

78
(function() {
8-
99
var forbiddenSegValues = [];
1010

1111
for (let i = 1; i < 32; i++) {
@@ -44,6 +44,36 @@ const crypto = require('crypto');
4444
"referrer": true
4545
};
4646

47+
48+
plugins.register("/batcher/fail", function(ob) {
49+
if (ob.db === "countly" && (ob.collection.indexOf("app_viewdata") === 0)) {
50+
//omit segment using app_id and segment name
51+
if (ob.data && ob.data.updateOne && ob.data.updateOne.update && ob.data.updateOne.update.$set) {
52+
var appId = ob.data.updateOne.update.$set.a;
53+
var segment = ob.data.updateOne.update.$set.s;
54+
if (appId && segment) {
55+
log.d("calling segment omiting for " + appId + " - " + segment);
56+
viewsUtils.ommit_segments({extend: true, db: common.db, omit: [segment], appId: appId, params: {"qstring": {}, "user": {"_id": "SYSTEM", "username": "SYSTEM"}}}, function(err) {
57+
if (err) {
58+
log.e(err);
59+
}
60+
});
61+
}
62+
}
63+
}
64+
else if (ob.db === "countly" && ob.collection === "views") {
65+
//Failed to update root document
66+
if (ob.data && ob.data.updateOne && ob.data.updateOne.filter) {
67+
var _id = ob.data.updateOne.filter._id;
68+
if (_id) {
69+
log.d("Failed to update root document for app " + _id + ". There are too many segments/values stored. Run cleanup for core document.");
70+
}
71+
viewsUtils.cleanupRootDocument(common.db, _id);
72+
}
73+
}
74+
});
75+
76+
4777
//Recording views
4878
plugins.register("/aggregator", async function() {
4979
const eventSource = new UnifiedEventSource('views-insert', {
@@ -121,7 +151,7 @@ const crypto = require('crypto');
121151
if (escapedViewSegments[segKey] || (viewMeta.omit && viewMeta.omit.indexOf(segKey) !== -1)) {
122152
continue;
123153
}
124-
else if (segKey !== 'platform' && (!viewMeta.segments[segKey] && Object.keys(viewMeta.segments).length > plugins.getConfig("views").segment_limit)) {
154+
else if (segKey !== 'platform' && (!viewMeta.segments[segKey] && Object.keys(viewMeta.segments).length >= plugins.getConfig("views").segment_limit)) {
125155
continue;
126156
}
127157
segments.push(segKey);
@@ -178,7 +208,7 @@ const crypto = require('crypto');
178208
escapedMetricVal = "[CLY]" + escapedMetricVal;
179209
}
180210

181-
if (viewMeta.segments[segments[i]] && (!viewMeta.segments[segments[i]][escapedMetricVal] && Object.keys(viewMeta.segments[segments[i]]).length > plugins.getConfig("views").segment_value_limit)) {
211+
if (viewMeta.segments[segments[i]] && (!viewMeta.segments[segments[i]][escapedMetricVal] && Object.keys(viewMeta.segments[segments[i]]).length >= plugins.getConfig("views").segment_value_limit)) {
182212
continue;
183213
}
184214

@@ -201,16 +231,19 @@ const crypto = require('crypto');
201231
tmpTimeObjZero["d." + time.month + "." + escapedMetricVal + prop] = update[prop];
202232
}
203233
}
204-
205-
common.manualWriteBatcher.add("app_viewdata", tmpMonthId, {"$inc": tmpTimeObjMonth, "$set": {"n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.month}}, "countly", {token: token});
206-
common.manualWriteBatcher.add("app_viewdata", tmpZeroId, {"$inc": tmpTimeObjZero, "$set": {"n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.zero}}, "countly", {token: token});
234+
common.manualWriteBatcher.add("app_viewdata", tmpMonthId, {"$inc": tmpTimeObjMonth, "$set": {"a": next.a, "n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.month, "s": segments[i]}}, "countly", {token: token});
235+
common.manualWriteBatcher.add("app_viewdata", tmpZeroId, {"$inc": tmpTimeObjZero, "$set": {"a": next.a, "n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.zero, "s": segments[i]}}, "countly", {token: token});
207236
}
208237
if (Object.keys(meta_update).length > 0) {
209-
common.db.collection("views").updateOne({_id: common.db.ObjectID(next.a)}, {"$set": meta_update}, {upsert: true}, function(err3) {
210-
if (err3) {
211-
log.e(err3);
238+
//Flush meta document
239+
try {
240+
await common.db.collection("views").updateOne({_id: common.db.ObjectID(next.a)}, {"$set": meta_update}, {upsert: true});
241+
}
242+
catch (err3) {
243+
if (err3.errorResponse && err3.errorResponse.code === 17419) {
244+
viewsUtils.cleanupRootDocument(common.db, next.a);
212245
}
213-
});
246+
}
214247
}
215248
var dd = {view: next.n, "a": next.a};
216249
if (Object.keys(update_doc).length > 0) {
@@ -329,8 +362,8 @@ const crypto = require('crypto');
329362
tmpTimeObjZero["d." + time.month + "." + escapedMetricVal + prop] = update[prop];
330363
}
331364

332-
common.manualWriteBatcher.add("app_viewdata", tmpMonthId, {"$inc": tmpTimeObjMonth, "$set": {"n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.month}}, "countly", {token: token});
333-
common.manualWriteBatcher.add("app_viewdata", tmpZeroId, {"$inc": tmpTimeObjZero, "$set": {"n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.zero}}, "countly", {token: token});
365+
common.manualWriteBatcher.add("app_viewdata", tmpMonthId, {"$inc": tmpTimeObjMonth, "$set": {"a": next.a, "n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.month, "s": segments[i]}}, "countly", {token: token});
366+
common.manualWriteBatcher.add("app_viewdata", tmpZeroId, {"$inc": tmpTimeObjZero, "$set": {"a": next.a, "n": next.n, "vw": next.a + "_" + view_id, "m": dateIds.zero, "s": segments[i]}}, "countly", {token: token});
334367
}
335368
}
336369

plugins/views/api/api.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,11 +1332,14 @@ const FEATURE_NAME = 'views';
13321332
else if (params.qstring.method === "get_view_segments") {
13331333
validateRead(params, FEATURE_NAME, function() {
13341334
var res = {segments: [], domains: []};
1335-
common.db.collection("views").findOne({'_id': common.db.ObjectID(params.app_id)}, function(err1, res1) {
1335+
common.db.collection("views").findOne({'_id': common.db.ObjectID(params.qstring.app_id)}, function(err1, res1) {
1336+
if (err1) {
1337+
log.e(err1);
1338+
}
13361339
if (res1 && res1.segments) {
13371340
res.segments = res1.segments;
13381341
for (var k in res1.segments) {
1339-
res1.segments[k] = Object.keys(res1.segments[k]) || [];
1342+
res.segments[k] = Object.keys(res1.segments[k]) || [];
13401343
}
13411344
}
13421345
if (res1 && res1.omit) {

plugins/views/api/parts/viewsUtils.js

Lines changed: 94 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,21 @@ var plugins = require('../../../pluginManager.ts');
33
var log = common.log('views:api');
44

55
module.exports = {
6-
ommit_segments: function(options, callback) {
6+
/**
7+
* Omits specified segments from views
8+
* @param {Object} options - options for omitting
9+
* @param {Object} [options.db] - database connection (optional, defaults to common.db)
10+
* @param {Array<string>} [options.omit] - array of segment names to omit
11+
* @param {string} options.appId - application ID
12+
* @param {Object} options.params - params object
13+
* @param {Object} [options.params.qstring] - query string object
14+
* @param {Object} [options.params.user] - user object
15+
* @param {string} [options.params.user._id] - user ID
16+
* @param {string} [options.params.user.username] - username
17+
* @param {boolean} [options.extend] - whether to extend existing omit list or replace it
18+
* @param {function(string=): void} callback - callback function called with optional error message
19+
*/
20+
ommit_segments: async function(options, callback) {
721
var db = options.db || common.db;
822
var omit = options.omit || [];
923
var appId = options.appId;
@@ -21,40 +35,92 @@ module.exports = {
2135
else {
2236
updateOp = {$set: {omit: omit}, "$unset": unset};
2337
}
24-
db.collection('views').updateOne({"_id": db.ObjectID(appId)}, updateOp, function(err5) {
25-
if (err5) {
26-
log.e(err5);
27-
callback("Updating database failed");
38+
39+
try {
40+
await db.collection('views').updateOne({"_id": db.ObjectID(appId)}, updateOp);
41+
plugins.dispatch("/systemlogs", {params: params, action: "view_segments_ommit", data: { update: omit}});
42+
43+
var errCn = 0;
44+
for (var z = 0; z < omit.length; z++) {
45+
var colName = "app_viewdata";
46+
try {
47+
await db.collection(colName).deleteMany({"_id": {"$regex": "^" + appId + "_" + omit + "_.*"}});
48+
}
49+
catch (err) {
50+
if (err.code !== 26) { //if error is not collection not found.(Because it is possible for it to not exist)
51+
log.e(JSON.stringify(err));
52+
errCn++;
53+
}
54+
}
55+
}
56+
log.d("Segments omittion compleated for:" + JSON.stringify(omit));
57+
if (errCn > 0) {
58+
plugins.dispatch("/systemlogs", {params: params, action: "view_segments_ommit_complete", data: { app_id: appId, update: omit, error: "Failed to delete some(" + errCn + ") collections. Please call omiting again."}});
2859
}
2960
else {
30-
plugins.dispatch("/systemlogs", {params: params, action: "view_segments_ommit", data: { update: omit}});
31-
32-
var promises = [];
33-
var errCn = 0;
34-
for (var z = 0; z < omit.length; z++) {
35-
var colName = "app_viewdata";
36-
promises.push(new Promise(function(resolve2) {
37-
common.db.collection(colName).deleteMany({"_id": {"$regex": "^" + appId + "_" + omit + "_.*"}}, function(err) {
38-
if (err && err.code !== 26) { //if error is not collection not found.(Because it is possible for it to not exist)
39-
log.e(JSON.stringify(err));
40-
errCn++;
61+
plugins.dispatch("/systemlogs", {params: params, action: "view_segments_ommit_complete", data: { app_id: appId, update: omit}});
62+
}
63+
callback();
64+
}
65+
catch (error) {
66+
log.e(error);
67+
callback("Updating database failed");
68+
}
69+
70+
},
71+
cleanupRootDocument: async function(db, appId) {
72+
try {
73+
var doc = await db.collection('views').findOne({"_id": db.ObjectID(appId)});
74+
if (!doc) {
75+
log.d("No root document found for app " + appId + ", skipping cleanup.");
76+
return;
77+
}
78+
var viewsConfig = plugins.getConfig("views") || {};
79+
var segment_value_limit = typeof viewsConfig.segment_value_limit === "number" ? viewsConfig.segment_value_limit : 10;
80+
var changes_collected = false;
81+
var omitted = doc.omit || [];
82+
83+
//Remove still stored omited segments(that should not be in doc anymore)
84+
for (var z = 0; z < omitted.length; z++) {
85+
if (doc.segments && doc.segments[omitted[z]]) {
86+
changes_collected = true;
87+
try {
88+
await this.ommit_segments({db, omit: [omitted[z]], appId, params: {qstring: {}, user: {_id: "system", username: "system"}}, extend: true}, function(err) {
89+
if (err) {
90+
log.e("Failed to omit segment " + omitted[z] + ": " + err);
4191
}
42-
resolve2();
4392
});
44-
}));
45-
}
46-
Promise.all(promises).then(function() {
47-
log.d("Segments omittion compleated for:" + JSON.stringify(omit));
48-
if (errCn > 0) {
49-
plugins.dispatch("/systemlogs", {params: params, action: "view_segments_ommit_complete", data: { app_id: appId, update: omit, error: "Failed to delete some(" + errCn + ") collections. Please call omiting again."}});
93+
5094
}
51-
else {
52-
plugins.dispatch("/systemlogs", {params: params, action: "view_segments_ommit_complete", data: { app_id: appId, update: omit}});
95+
catch (err) {
96+
log.e("Failed to omit segment " + omitted[z] + ": " + err);
5397
}
54-
});
55-
callback();
98+
}
5699
}
57-
});
58100

101+
//Look for any segment having more values than limit and omit it if it is not already omitted.
102+
if (!changes_collected && doc.segments) {
103+
for (var seg in doc.segments) {
104+
if (Object.keys(doc.segments[seg]).length > segment_value_limit && !omitted.includes(seg)) {
105+
try {
106+
await this.ommit_segments({db, omit: [seg], appId, params: {qstring: {}, user: {_id: "system", username: "system"}}, extend: true}, function(err) {
107+
if (err) {
108+
log.e("Failed to omit segment " + seg + ": " + err);
109+
}
110+
});
111+
112+
}
113+
catch (err) {
114+
log.e("Failed to omit segment " + seg + ": " + err);
115+
}
116+
}
117+
}
118+
}
119+
log.d("Cleanup of root document for app " + appId + " completed.");
120+
121+
}
122+
catch (error) {
123+
log.e("Failed to cleanup root document for app " + appId + ": " + error);
124+
}
59125
}
60126
};

plugins/views/scripts/fixViews.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/**
22
Path to file: {COUNTLY DIR}/plugins/views/scripts/fixViews.js
33
Script fixes indexes for views collections and merges views if there are views marked to be merged.
4+
SCRIPT IS DEPRECATED FOR NEWARCH. Needs to be rewritten to work with the new architecture
45
**/
56

67
var pluginManager = require('../../pluginManager.ts'),

plugins/views/scripts/omitViewSegments.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/** SCRIPT IS DEPRECATED FOR NEWARCH. Needs to be rewritten to work with the new architecture */
12
//Put script in ./countly/plugins/views/scripts/omitViewSegments.js
23
/*
34
The script deletes data for specific segments in aggregated data. It also can set an omitting list in the database to ensure that segments are also omitted on incoming data.

plugins/views/scripts/renameViews.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/** SCRIPT IS DEPRECATED FOR NEWARCH. Needs to be rewritten to work with the new architecture */
12
/**
23
Path to file: {COUNTLY DIR}/plugins/views/scripts/renameViews.js
34
Script renames views for app.

0 commit comments

Comments
 (0)