diff --git a/src/dtx/dtx_coll.c b/src/dtx/dtx_coll.c index 7e7c8199155..4ff3d06d6ad 100644 --- a/src/dtx/dtx_coll.c +++ b/src/dtx/dtx_coll.c @@ -81,9 +81,10 @@ dtx_coll_prep_ult(void *arg) } if (dcpa->dcpa_result != 0) { - if (dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST) + if (dcpa->dcpa_result < 0 && dcpa->dcpa_result != -DER_INPROGRESS && + dcpa->dcpa_result != -DER_NONEXIST) D_ERROR("Failed to load mbs for "DF_DTI", opc %u: "DF_RC"\n", - DP_DTI(&dci->dci_xid), opc, DP_RC(rc)); + DP_DTI(&dci->dci_xid), opc, DP_RC(dcpa->dcpa_result)); goto out; } diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index ecb156729ed..190020102cf 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1271,7 +1271,6 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul int status = -1; int rc = 0; bool aborted = false; - bool unpin = false; D_ASSERT(cont != NULL); @@ -1339,7 +1338,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * it persistently. Otherwise, the subsequent DTX resync may not find it as * to regard it as failed transaction and abort it. */ - if (result == 0 && !dth->dth_active && !dth->dth_prepared && !dth->dth_solo && + if (!dth->dth_active && !dth->dth_prepared && (dth->dth_dist || dth->dth_modification_cnt > 0)) { result = vos_dtx_attach(dth, true, dth->dth_ent != NULL ? true : false); if (unlikely(result < 0)) { @@ -1363,14 +1362,12 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul if (DAOS_FAIL_CHECK(DAOS_DTX_MISS_COMMIT)) dth->dth_sync = 1; - /* For synchronous DTX, do not add it into CoS cache, otherwise, - * we may have no way to remove it from the cache. - */ if (dth->dth_sync) goto sync; D_ASSERT(dth->dth_mbs != NULL); +cache: if (dlh->dlh_coll) { rc = dtx_cos_add(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_epoch, DCF_EXP_CMT | DCF_COLL); @@ -1378,38 +1375,47 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul size = sizeof(*dte) + sizeof(*mbs) + dth->dth_mbs->dm_data_size; D_ALLOC(dte, size); if (dte == NULL) { - dth->dth_sync = 1; - goto sync; - } - - mbs = (struct dtx_memberships *)(dte + 1); - memcpy(mbs, dth->dth_mbs, size - sizeof(*dte)); - - dte->dte_xid = dth->dth_xid; - dte->dte_ver = dth->dth_ver; - dte->dte_refs = 1; - dte->dte_mbs = mbs; + rc = -DER_NOMEM; + } else { + mbs = (struct dtx_memberships *)(dte + 1); + memcpy(mbs, dth->dth_mbs, size - sizeof(*dte)); + + dte->dte_xid = dth->dth_xid; + dte->dte_ver = dth->dth_ver; + dte->dte_refs = 1; + dte->dte_mbs = mbs; + + if (!(mbs->dm_flags & DMF_SRDG_REP)) + flags = DCF_EXP_CMT; + else if (dth->dth_modify_shared) + flags = DCF_SHARED; + else + flags = 0; - if (!(mbs->dm_flags & DMF_SRDG_REP)) - flags = DCF_EXP_CMT; - else if (dth->dth_modify_shared) - flags = DCF_SHARED; - else - flags = 0; + rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, + dth->dth_epoch, flags); + dtx_entry_put(dte); + } + } - rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, - dth->dth_epoch, flags); - dtx_entry_put(dte); + /* + * NOTE: If we failed to add the committable DTX into CoS cache, then we also have no way + * to commit (or abort) the DTX because of out of memory. Such DTX will be finally + * committed via next DTX resync (after recovered from OOM). + * + * Here, we only warning to notify the trouble, but not failed the transaction. + */ + if (rc != 0) { + D_WARN(DF_UUID": Fail to cache %s DTX "DF_DTI": "DF_RC"\n", + DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular", + DP_DTI(&dth->dth_xid), DP_RC(rc)); + D_GOTO(out, result = 0); } - if (rc == 0) { - if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) { - vos_dtx_mark_committable(dth); - if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll) - sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req); - } - } else { - dth->dth_sync = 1; + if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) { + vos_dtx_mark_committable(dth); + if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll) + sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req); } sync: @@ -1428,10 +1434,13 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul rc = dtx_commit(cont, &dte, NULL, 1, false); } - if (rc != 0) + if (rc != 0) { D_WARN(DF_UUID": Fail to sync %s commit DTX "DF_DTI": "DF_RC"\n", DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular", DP_DTI(&dth->dth_xid), DP_RC(rc)); + dth->dth_sync = 0; + goto cache; + } /* * NOTE: The semantics of 'sync' commit does not guarantee that all @@ -1451,7 +1460,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * to locally retry for avoiding related forwarded RPC timeout, instead, * The leader will trigger retry globally without abort 'prepared' ones. */ - if (unpin || (result < 0 && result != -DER_AGAIN && !dth->dth_solo)) { + if (result < 0 && result != -DER_AGAIN && !dth->dth_solo) { /* 1. Drop partial modification for distributed transaction. * 2. Remove the pinned DTX entry. */ diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 2ccbfec2734..c203c861ff1 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -1657,8 +1657,9 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d } D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, - "Collectively commit DTX "DF_DTI": %d/%d/%d\n", - DP_DTI(&dce->dce_xid), rc, rc1, rc2); + "Collectively commit DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/%d\n", + DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), + rc, rc1, rc2); return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2; } @@ -1717,8 +1718,9 @@ dtx_coll_abort(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc rc2 = 0; D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, - "Collectively abort DTX "DF_DTI": %d/%d/%d\n", - DP_DTI(&dce->dce_xid), rc, rc1, rc2); + "Collectively abort DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/%d\n", + DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), + rc, rc1, rc2); return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2; } @@ -1766,8 +1768,9 @@ dtx_coll_check(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc } D_CDEBUG((rc < 0 && rc != -DER_NONEXIST) || (rc1 < 0 && rc1 != -DER_NONEXIST), DLOG_ERR, - DB_TRACE, "Collectively check DTX "DF_DTI": %d/%d/\n", - DP_DTI(&dce->dce_xid), rc, rc1); + DB_TRACE, "Collectively check DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/\n", + DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), + rc, rc1); return dce->dce_ranks != NULL ? rc : rc1; } diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index f99f4df14e3..9fc615c2a8b 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -108,6 +108,9 @@ struct ds_cont_child { uint32_t sc_dtx_committable_count; uint32_t sc_dtx_committable_coll_count; + /* Last timestamp when EC aggregation reports -DER_INPROGRESS. */ + uint64_t sc_ec_agg_busy_ts; + /* The global minimum EC aggregation epoch, which will be upper * limit for VOS aggregation, i.e. EC object VOS aggregation can * not cross this limit. For simplification purpose, all objects diff --git a/src/object/cli_obj.c b/src/object/cli_obj.c index 2c0c0a952ea..55866d65b51 100644 --- a/src/object/cli_obj.c +++ b/src/object/cli_obj.c @@ -4834,11 +4834,14 @@ obj_comp_cb(tse_task_t *task, void *data) D_ASSERT(daos_handle_is_inval(obj_auxi->th)); D_ASSERT(obj_is_modification_opc(obj_auxi->opc)); - if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0) - /* XXX: it is must because miss to set "RESEND" flag, that is bug. */ - D_ASSERTF(0, - "Miss 'RESEND' flag (%x) when resend the RPC for task %p: %u\n", - obj_auxi->flags, task, obj_auxi->retry_cnt); + if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0) { + D_ERROR("Be complained as TX ID reused for unknown reason, " + "task %p, opc %u, flags %x, retry_cnt %u\n", + task, obj_auxi->opc, obj_auxi->flags, obj_auxi->retry_cnt); + task->dt_result = -DER_IO; + obj_auxi->io_retry = 0; + goto args_fini; + } if (obj_auxi->opc == DAOS_OBJ_RPC_UPDATE) { daos_obj_rw_t *api_args = dc_task_get_args(obj_auxi->obj_task); @@ -4864,6 +4867,7 @@ obj_comp_cb(tse_task_t *task, void *data) } } +args_fini: if (obj_auxi->opc == DAOS_OBJ_RPC_COLL_PUNCH) obj_coll_oper_args_fini(&obj_auxi->p_args.pa_coa); diff --git a/src/object/srv_ec_aggregate.c b/src/object/srv_ec_aggregate.c index 71c630fa947..50b513d1612 100644 --- a/src/object/srv_ec_aggregate.c +++ b/src/object/srv_ec_aggregate.c @@ -2667,8 +2667,13 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, struct ec_agg_param *ec_agg_param = agg_param->ap_data; vos_iter_param_t iter_param = { 0 }; struct vos_iter_anchors anchors = { 0 }; + struct dtx_handle *dth = NULL; + struct dtx_share_peer *dsp; + struct dtx_id dti = { 0 }; + struct dtx_epoch epoch = { 0 }; + daos_unit_oid_t oid = { 0 }; + int blocks = 0; int rc = 0; - int blocks = 0; /* * Avoid calling into vos_aggregate() when aborting aggregation @@ -2715,8 +2720,32 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL); retry: + epoch.oe_value = epr->epr_hi; + rc = dtx_begin(cont->sc_hdl, &dti, &epoch, 0, cont->sc_pool->spc_map_version, &oid, + NULL, 0, 0, NULL, &dth); + if (rc != 0) + goto update_hae; + rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb, - agg_iterate_post_cb, ec_agg_param, NULL); + agg_iterate_post_cb, ec_agg_param, dth); + if (rc == -DER_INPROGRESS && !d_list_empty(&dth->dth_share_tbd_list)) { + uint64_t now = daos_gettime_coarse(); + + /* Report warning per each 10 seconds to avoid log flood. */ + if (now - cont->sc_ec_agg_busy_ts > 10) { + while ((dsp = d_list_pop_entry(&dth->dth_share_tbd_list, + struct dtx_share_peer, dsp_link)) != NULL) { + D_WARN(DF_CONT ": EC aggregate hit non-committed DTX " DF_DTI "\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), + DP_DTI(&dsp->dsp_xid)); + dtx_dsp_free(dsp); + } + + cont->sc_ec_agg_busy_ts = now; + } + } + + dtx_end(dth, cont, rc); /* Post_cb may not being executed in some cases */ agg_clear_extents(&ec_agg_param->ap_agg_entry); diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 041ea903c4f..d6bddb0810a 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2089,6 +2089,8 @@ obj_ioc_init(uuid_t pool_uuid, uuid_t coh_uuid, uuid_t cont_uuid, crt_rpc_t *rpc ioc->ioc_opc = opc_get(rpc->cr_opc); rc = ds_cont_find_hdl(pool_uuid, coh_uuid, &coh); if (rc) { + D_ERROR("Can not find the cont hdl "DF_UUID"/"DF_UUID": "DF_RC"\n", + DP_UUID(pool_uuid), DP_UUID(coh_uuid), DP_RC(rc)); if (rc == -DER_NONEXIST) rc = -DER_NO_HDL; return rc; @@ -3045,7 +3047,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc) * newer epoch. */ orw->orw_epoch = d_hlc_get(); - orw->orw_flags &= ~ORF_RESEND; + orw->orw_flags |= ORF_RESEND; flags = 0; d_tm_inc_counter(opm->opm_update_restart, 1); goto again2; @@ -3964,7 +3966,7 @@ ds_obj_punch_handler(crt_rpc_t *rpc) * epoch. */ opi->opi_epoch = d_hlc_get(); - opi->opi_flags &= ~ORF_RESEND; + opi->opi_flags |= ORF_RESEND; flags = 0; goto again2; case -DER_AGAIN: @@ -5664,7 +5666,10 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) if (ocpi->ocpi_flags & ORF_RESEND) { again1: - tmp = 0; + if (ocpi->ocpi_flags & ORF_LEADER) + tmp = 0; + else + tmp = ocpi->ocpi_epoch; rc = dtx_handle_resend(ioc.ioc_vos_coh, &ocpi->ocpi_xid, &tmp, &version); switch (rc) { case -DER_ALREADY: @@ -5676,6 +5681,11 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) break; case -DER_NONEXIST: break; + case -DER_MISMATCH: + rc = vos_dtx_abort(ioc.ioc_vos_coh, &ocpi->ocpi_xid, tmp); + if (rc < 0 && rc != -DER_NONEXIST) + D_GOTO(out, rc); + break; default: D_GOTO(out, rc); } @@ -5728,7 +5738,7 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) switch (rc) { case -DER_TX_RESTART: ocpi->ocpi_epoch = d_hlc_get(); - ocpi->ocpi_flags &= ~ORF_RESEND; + ocpi->ocpi_flags |= ORF_RESEND; flags = 0; goto again2; case -DER_AGAIN: @@ -5755,12 +5765,14 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) max_ver = version; DL_CDEBUG(rc != 0 && rc != -DER_INPROGRESS && rc != -DER_TX_RESTART, DLOG_ERR, DB_IO, rc, - "(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u epc " - DF_X64" pmv %u/%u, with dti "DF_DTI", bulk_tgt_sz %u, bulk_tgt_nr %u, " - "tgt_nr %u, forward width %u, forward depth %u, flags %x", + "(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u in "DF_UUID + DF_UUID"/"DF_UUID" with epc "DF_X64", pmv %u/%u, dti "DF_DTI", bulk_tgt_sz %u, " + "bulk_tgt_nr %u, tgt_nr %u, forward width %u, forward depth %u, flags %x", (ocpi->ocpi_flags & ORF_LEADER) ? "leader" : (ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"), rpc, - DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, ocpi->ocpi_epoch, + DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, + DP_UUID(ocpi->ocpi_po_uuid), DP_UUID(ocpi->ocpi_co_hdl), + DP_UUID(ocpi->ocpi_co_uuid), ocpi->ocpi_epoch, ocpi->ocpi_map_ver, max_ver, DP_DTI(&ocpi->ocpi_xid), ocpi->ocpi_bulk_tgt_sz, ocpi->ocpi_bulk_tgt_nr, (unsigned int)ocpi->ocpi_tgts.ca_count, ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags); diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 0f87bfd4d0e..1fab6c734da 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -312,16 +312,38 @@ dtx_act_ent_update(struct btr_instance *tins, struct btr_record *rec, if (unlikely(!dae_old->dae_aborted)) { /* - * XXX: There are two possible reasons for that: - * - * 1. Client resent the RPC but without set 'RESEND' flag. - * 2. Client reused the DTX ID for different modifications. - * - * Currently, the 1st case is more suspected. + * If the new entry and the old entry are for the same transaction, then the RPC + * for the new one will take 'RESEND' flag, that will cause the old one has been + * aborted before arriving at here. So it is quite possible that the new one and + * the old one are for different transactions. */ - D_ERROR("The TX ID "DF_DTI" may be reused for epoch "DF_X64" vs "DF_X64"\n", - DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); - return -DER_TX_ID_REUSED; + if (DAE_EPOCH(dae_old) < DAE_EPOCH(dae_new)) { + D_ERROR("The TX ID "DF_DTI" may be reused for epoch "DF_X64" vs "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); + return -DER_TX_ID_REUSED; + } + + /* + * If the old entry has higher epoch, it is quite possible that the resent RPC + * was handled before the original RPC (corresponding to 'dae_new'). Returning + * -DER_INPROGRESS to make the RPC sponsor to retry the RPC with 'RESEND' flag, + * then related RPC handler logic will handle such case. + */ + if (DAE_EPOCH(dae_old) > DAE_EPOCH(dae_new)) { + D_ERROR("Resent RPC may be handled before original one for DTX "DF_DTI + " with epoch "DF_X64" vs "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); + return -DER_INPROGRESS; + } + + /* + * The two entries uses the same epoch, then it may be caused by repeated RPCs + * from different sources, such as multiple relay engines forward the same RPC + * to current target. We need to notify related caller to avoid such case. + */ + D_ERROR("Receive repeated DTX "DF_DTI" with epoch "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old)); + return -DER_MISC; } rec->rec_off = umem_ptr2off(&tins->ti_umm, dae_new); @@ -1172,16 +1194,19 @@ vos_dtx_check_availability(daos_handle_t coh, uint32_t entry, } if (intent == DAOS_INTENT_PURGE) { - uint32_t age = d_hlc_age2sec(DAE_XID(dae).dti_hlc); + uint64_t now = daos_gettime_coarse(); /* * The DTX entry still references related data record, * then we cannot (vos) aggregate related data record. + * Report warning per each 10 seconds to avoid log flood. */ - if (age >= DAOS_AGG_THRESHOLD) - D_WARN("DTX "DF_DTI" (state:%u, age:%u) still references the data, " - "cannot be (VOS) aggregated\n", - DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), age); + D_CDEBUG(now - cont->vc_agg_busy_ts > 10, DLOG_WARN, DB_TRACE, + "DTX "DF_DTI" (state:%u, flags:%x, age:%u) still references the data, " + "cannot be (VOS) aggregated\n", DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), + DAE_FLAGS(dae), (unsigned int)d_hlc_age2sec(DAE_XID(dae).dti_hlc)); + if (now - cont->vc_agg_busy_ts > 10) + cont->vc_agg_busy_ts = now; return ALB_AVAILABLE_DIRTY; } @@ -1909,8 +1934,13 @@ vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, daos_epoch_t e = *epoch; *epoch = DAE_EPOCH(dae); - if (e != 0 && e != DAE_EPOCH(dae)) - return -DER_MISMATCH; + if (e != 0) { + if (e > DAE_EPOCH(dae)) + return -DER_MISMATCH; + + if (e < DAE_EPOCH(dae)) + return -DER_TX_RESTART; + } } return vos_dae_is_prepare(dae) ? DTX_ST_PREPARED : DTX_ST_INITED; diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 7d4dd3ac166..5aa0a1fadbb 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -353,6 +353,8 @@ struct vos_container { daos_epoch_range_t vc_epr_aggregation; /* Current ongoing discard EPR */ daos_epoch_range_t vc_epr_discard; + /* Last timestamp when VOS aggregation reports -DER_TX_BUSY */ + uint64_t vc_agg_busy_ts; /* Last timestamp when VOS aggregation reporting ENOSPACE */ uint64_t vc_agg_nospc_ts; /* Last timestamp when IO reporting ENOSPACE */