Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions ldap/servers/slapd/back-ldbm/ldbm_add.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ ldbm_back_add(Slapi_PBlock *pb)
int result_sent = 0;
int32_t parent_op = 0;
int32_t betxn_callback_fails = 0; /* if a BETXN fails we need to revert entry cache */
int32_t parent_locked = 0; /* non-zero when parent entry cache lock is held */
struct timespec parent_time;

if (slapi_pblock_get(pb, SLAPI_CONN_ID, &conn_id) < 0) {
Expand Down Expand Up @@ -219,8 +220,19 @@ ldbm_back_add(Slapi_PBlock *pb)
txn.back_txn_txn = NULL; /* ready to create the child transaction */
for (retry_count = 0; retry_count < RETRY_TIMES; retry_count++) {
if (txn.back_txn_txn && (txn.back_txn_txn != parent_txn)) {
/* Don't release SERIAL LOCK */
dblayer_txn_abort_ext(li, &txn, PR_FALSE);
/*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is not a confusion between backend lock and db_env lock.
Initially the backend lock was held during all lmdb_add and db_env release during retry/sleep.
With the move of plugins in betxn (#351) it was changed from dblayer_txn_abort (release the db_env) to dblayer_txn_abort_ext(...FALSE) (to not release the db_env) but the comment was about 'Serial Lock' that in my mind means backend lock.
Of course we do not want to release the backend lock for a retry but it looks fine to me to release db_env.

What is not clear to me is the impact of releasing it. IIUC it allows others txn but on different backend. Is it the expected benefit ?

* Unlock entry cache locks before releasing SERIAL LOCK
* to maintain lock ordering (SERIAL LOCK -> entry locks)
* and prevent AB-BA deadlock on retry.
* Entries stay refcounted in cache, so pointers remain valid.
*/
if (parent_locked && parent_modify_c.old_entry) {
cache_unlock_entry(&inst->inst_cache, parent_modify_c.old_entry);
parent_locked = 0;
}

/* Release SERIAL LOCK */
dblayer_txn_abort(be, &txn);
noabort = 1;
slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
/* must duplicate addingentry before returning it to cache,
Expand Down Expand Up @@ -267,11 +279,19 @@ ldbm_back_add(Slapi_PBlock *pb)
}
/* dblayer_txn_begin holds SERIAL lock,
* which should be outside of locking the entry (find_entry2modify) */
if (0 == retry_count) {
/* First time, hold SERIAL LOCK */
retval = dblayer_txn_begin(be, parent_txn, &txn);
noabort = 0;
retval = dblayer_txn_begin(be, parent_txn, &txn);
if (0 != retval) {
if (LDBM_OS_ERR_IS_DISKFULL(retval)) {
disk_full = 1;
ldap_result_code = LDAP_OPERATIONS_ERROR;
goto diskfull_return;
}
ldap_result_code = LDAP_OPERATIONS_ERROR;
goto error_return;
}
noabort = 0;

if (0 == retry_count) {
if (!is_tombstone_operation) {
rc = slapi_setbit_int(rc, SLAPI_RTN_BIT_FETCH_EXISTING_DN_ENTRY);
}
Expand Down Expand Up @@ -391,24 +411,27 @@ ldbm_back_add(Slapi_PBlock *pb)
*/
/* JCMREPL - Warning: A Plugin could cause an infinite loop by always returning a result code that requires some action. */
}
} else {
/* Otherwise, no SERIAL LOCK */
retval = dblayer_txn_begin_ext(li, parent_txn, &txn, PR_FALSE);
}
if (0 != retval) {
if (LDBM_OS_ERR_IS_DISKFULL(retval)) {
disk_full = 1;
ldap_result_code = LDAP_OPERATIONS_ERROR;
goto diskfull_return;
}
ldap_result_code = LDAP_OPERATIONS_ERROR;
goto error_return;
}
noabort = 0;

/* stash the transaction for plugins */
slapi_pblock_set(pb, SLAPI_TXN, txn.back_txn_txn);

/* Re-lock parent after SERIAL LOCK to maintain lock ordering */
if (retry_count > 0 && parent_found && parent_modify_c.old_entry) {
int lock_rc = cache_lock_entry(&inst->inst_cache,
parent_modify_c.old_entry);
if (lock_rc != 0) {
slapi_log_err(SLAPI_LOG_ERR, "ldbm_back_add",
"conn=%" PRIu64 " op=%d Failed to re-lock parent "
"entry after deadlock retry (rc=%d)\n",
conn_id, op_id, lock_rc);
ldap_result_code = LDAP_OPERATIONS_ERROR;
retval = -1;
goto error_return;
}
parent_locked = 1;
}

if (0 == retry_count) { /* execute just once */
/* Nothing in this if crause modifies persistent store.
* it's called just once. */
Expand Down Expand Up @@ -439,6 +462,7 @@ ldbm_back_add(Slapi_PBlock *pb)
goto error_return;
}
modify_init(&parent_modify_c, parententry);
parent_locked = 1;
}

/* Check if the entry we have been asked to add already exists */
Expand Down Expand Up @@ -1445,6 +1469,12 @@ ldbm_back_add(Slapi_PBlock *pb)
slapi_log_err(SLAPI_LOG_BACKLDBM, "ldbm_back_add",
"conn=%" PRIu64 " op=%d modify_term: old_entry=0x%p, new_entry=0x%p\n",
conn_id, op_id, parent_modify_c.old_entry, parent_modify_c.new_entry);
/* Parent unlocked during retry but not re-locked (error path).
* Return to cache and NULL out to prevent double-unlock in modify_term. */
if (!parent_locked && parent_modify_c.old_entry) {
CACHE_RETURN(&(inst->inst_cache), &(parent_modify_c.old_entry));
parent_modify_c.old_entry = NULL;
}
myrc = modify_term(&parent_modify_c, be);
done_with_pblock_entry(pb, SLAPI_ADD_EXISTING_DN_ENTRY);
done_with_pblock_entry(pb, SLAPI_ADD_EXISTING_UNIQUEID_ENTRY);
Expand Down
110 changes: 99 additions & 11 deletions ldap/servers/slapd/back-ldbm/ldbm_delete.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
Connection *pb_conn;
int32_t parent_op = 0;
int32_t betxn_callback_fails = 0; /* if a BETXN fails we need to revert entry cache */
int32_t e_locked = 0; /* non-zero when entry 'e' cache lock is held */

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should better use bool and true/false

int32_t parent_locked = 0; /* non-zero when parent entry cache lock is held */
struct timespec parent_time;

if (slapi_pblock_get(pb, SLAPI_CONN_ID, &conn_id) < 0) {
Expand Down Expand Up @@ -198,8 +200,23 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
if (txn.back_txn_txn && (txn.back_txn_txn != parent_txn)) { /* retry_count > 0 */
Slapi_Entry *ent = NULL;

/* Don't release SERIAL LOCK */
dblayer_txn_abort_ext(li, &txn, PR_FALSE);
/*
* Unlock entry cache locks before releasing SERIAL LOCK
* to maintain lock ordering (SERIAL LOCK -> entry locks)
* and prevent AB-BA deadlock on retry.
* Entries stay refcounted in cache, so pointers remain valid.
*/
if (e_locked) {
cache_unlock_entry(&inst->inst_cache, e);
e_locked = 0;
}
if (parent_locked && parent_modify_c.old_entry) {
cache_unlock_entry(&inst->inst_cache, parent_modify_c.old_entry);
parent_locked = 0;
}

/* Release SERIAL LOCK */
dblayer_txn_abort(be, &txn);
slapi_pblock_set(pb, SLAPI_TXN, parent_txn);

/* reset original entry */
Expand Down Expand Up @@ -257,13 +274,7 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
}
#endif
}
if (0 == retry_count) {
/* First time, hold SERIAL LOCK */
retval = dblayer_txn_begin(be, parent_txn, &txn);
} else {
/* Otherwise, no SERIAL LOCK */
retval = dblayer_txn_begin_ext(li, parent_txn, &txn, PR_FALSE);
}
retval = dblayer_txn_begin(be, parent_txn, &txn);
if (0 != retval) {
if (LDBM_OS_ERR_IS_DISKFULL(retval)) disk_full = 1;
ldap_result_code= LDAP_OPERATIONS_ERROR;
Expand All @@ -273,6 +284,39 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
/* stash the transaction */
slapi_pblock_set(pb, SLAPI_TXN, txn.back_txn_txn);

/* Re-lock entries after SERIAL LOCK to maintain lock ordering */
if (retry_count > 0) {
if (e) {
int lock_rc = cache_lock_entry(&inst->inst_cache, e);
if (lock_rc != 0) {
slapi_log_err(SLAPI_LOG_ERR, "ldbm_back_delete",
"conn=%" PRIu64 " op=%d Failed to re-lock entry "
"after deadlock retry (rc=%d)\n",
conn_id, op_id, lock_rc);
CACHE_RETURN(&inst->inst_cache, &e);
e = NULL;
ldap_result_code = LDAP_OPERATIONS_ERROR;
retval = -1;
goto error_return;
}
e_locked = 1;
}
if (parent_found && parent_modify_c.old_entry) {
int lock_rc = cache_lock_entry(&inst->inst_cache,
parent_modify_c.old_entry);
if (lock_rc != 0) {
slapi_log_err(SLAPI_LOG_ERR, "ldbm_back_delete",
"conn=%" PRIu64 " op=%d Failed to re-lock parent "
"entry after deadlock retry (rc=%d)\n",
conn_id, op_id, lock_rc);
ldap_result_code = LDAP_OPERATIONS_ERROR;
retval = -1;
goto error_return;
}
parent_locked = 1;
}
}

if (0 == retry_count) { /* just once */
/* find and lock the entry we are about to modify */
/*
Expand All @@ -291,6 +335,7 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
slapi_log_err(SLAPI_LOG_BACKLDBM, "ldbm_back_delete", "Deleting entry is already deleted.\n");
goto error_return; /* error result sent by find_entry2modify() */
}
e_locked = 1;
ep_id = e->ep_id;

/* JCMACL - Shouldn't the access check be before the has children check...
Expand Down Expand Up @@ -356,6 +401,7 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
CACHE_REMOVE(&inst->inst_cache, e);
}
cache_unlock_entry(&inst->inst_cache, e);
e_locked = 0;
CACHE_RETURN(&inst->inst_cache, &e);
/*
* e is unlocked and no longer in cache.
Expand Down Expand Up @@ -540,17 +586,49 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
* Its possible that the parent entry retrieved from the cache in id2entry
* could be removed before we lock it, because tombstone purging updated/replaced
* the parent. If we fail to lock the entry, just try again.
*
* Must not sleep while holding SERIAL LOCK -- abort txn
* first, sleep, then re-begin (same as deadlock retry).
*/
while (1) {
parent = id2entry(be, pid, &txn, &retval);
if (parent && (cache_retry = cache_lock_entry(&inst->inst_cache, parent))) {
/* Failed to obtain parent entry's entry lock */
if (cache_retry == RETRY_CACHE_LOCK &&
cache_retry_count < LDBM_CACHE_RETRY_COUNT) {
/* try again */
CACHE_RETURN(&(inst->inst_cache), &parent);
/* Unlock entry, release SERIAL LOCK, sleep, re-acquire */
cache_unlock_entry(&inst->inst_cache, e);
e_locked = 0;
dblayer_txn_abort(be, &txn);
slapi_pblock_set(pb, SLAPI_TXN, parent_txn);
DS_Sleep(PR_MillisecondsToInterval(100));
cache_retry_count++;
txn.back_txn_txn = NULL;
retval = dblayer_txn_begin(be, parent_txn, &txn);
if (0 != retval) {
if (LDBM_OS_ERR_IS_DISKFULL(retval))
disk_full = 1;
ldap_result_code = LDAP_OPERATIONS_ERROR;
goto error_return;
}
slapi_pblock_set(pb, SLAPI_TXN, txn.back_txn_txn);
/* Re-lock target entry */
{
int lock_rc = cache_lock_entry(&inst->inst_cache, e);
if (lock_rc != 0) {
slapi_log_err(SLAPI_LOG_ERR, "ldbm_back_delete",
"conn=%" PRIu64 " op=%d Failed to re-lock "
"entry during parent cache lock retry (rc=%d)\n",
conn_id, op_id, lock_rc);
CACHE_RETURN(&inst->inst_cache, &e);
e = NULL;
ldap_result_code = LDAP_OPERATIONS_ERROR;
retval = -1;
goto error_return;
}
e_locked = 1;
}
continue;
}
retval = -1;
Expand All @@ -576,6 +654,7 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
int isglue;
size_t haschildren = 0;
int op = PARENTUPDATE_DEL;
parent_locked = 1;

/* Unfortunately findentry doesn't tell us whether it just
* didn't find the entry, or if there was an error, so we
Expand Down Expand Up @@ -1507,7 +1586,10 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
CACHE_REMOVE(&inst->inst_cache, e);
}
}
cache_unlock_entry(&inst->inst_cache, e);
if (e_locked) {
cache_unlock_entry(&inst->inst_cache, e);
e_locked = 0;
}
CACHE_RETURN(&inst->inst_cache, &e);
/*
* e is unlocked and no longer in cache.
Expand Down Expand Up @@ -1549,6 +1631,12 @@ int delete_tombstone_entry = 0; /* We must remove the given tombstone entry from
"conn=%" PRIu64 " op=%d modify_term: old_entry=0x%p, new_entry=0x%p, in_cache=%d\n",
conn_id, op_id, parent_modify_c.old_entry, parent_modify_c.new_entry,
inst ? cache_is_in_cache(&inst->inst_cache, parent_modify_c.new_entry):-1);
/* Parent unlocked during retry but not re-locked (error path).
* Return to cache and NULL out to prevent double-unlock in modify_term. */
if (!parent_locked && parent_modify_c.old_entry) {
CACHE_RETURN(&(inst->inst_cache), &(parent_modify_c.old_entry));
parent_modify_c.old_entry = NULL;
}
myrc = modify_term(&parent_modify_c, be);
if (free_delete_existing_entry) {
done_with_pblock_entry(pb, SLAPI_DELETE_EXISTING_ENTRY);
Expand Down
Loading
Loading