diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index ff7731b666e..f7433d2f653 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -858,7 +858,10 @@ where } } } else if key.term <= compacted_term - && (key.idx < compacted_idx || key.idx == compacted_idx && !is_applying_snap) + && (key.idx < compacted_idx + || key.idx == compacted_idx + && !is_applying_snap + && !self.fsm.peer.pending_remove) { info!( "deleting applied snap file"; diff --git a/tests/failpoints/cases/test_snap.rs b/tests/failpoints/cases/test_snap.rs index 0d98c587701..6574f0f2011 100644 --- a/tests/failpoints/cases/test_snap.rs +++ b/tests/failpoints/cases/test_snap.rs @@ -235,7 +235,6 @@ fn test_destroy_peer_on_pending_snapshot() { sleep_ms(100); fail::remove(apply_snapshot_fp); - fail::remove(before_handle_normal_3_fp); cluster.must_put(b"k120", b"v1"); @@ -243,6 +242,143 @@ fn test_destroy_peer_on_pending_snapshot() { must_get_equal(&cluster.get_engine(3), b"k120", b"v1"); } +// The peer 3 in store 3 is isolated for a while and then recovered. +// During its applying snapshot, however the peer is destroyed and thus applying snapshot is canceled. +// And when it's destroyed (destroy is not finished either), the machine restarted. +// After the restart, the snapshot should be applied successfully.println! +// And new data should be written to store 3 successfully. +#[test] +fn test_destroy_peer_on_pending_snapshot_and_restart() { + let mut cluster = new_server_cluster(0, 3); + configure_for_snapshot(&mut cluster); + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + pd_client.must_add_peer(r1, new_peer(2, 2)); + pd_client.must_add_peer(r1, new_peer(3, 3)); + + cluster.must_put(b"k1", b"v1"); + // Ensure peer 3 is initialized. + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + let destroy_peer_fp = "destroy_peer_after_pending_move"; + fail::cfg(destroy_peer_fp, "return(true)").unwrap(); + + cluster.add_send_filter(IsolationFilterFactory::new(3)); + + for i in 0..20 { + cluster.must_put(format!("k1{}", i).as_bytes(), b"v1"); + } + + // skip applying snapshot into RocksDB to keep peer status is Applying + let apply_snapshot_fp = "apply_pending_snapshot"; + fail::cfg(apply_snapshot_fp, "return()").unwrap(); + + cluster.clear_send_filters(); + // Wait for leader send snapshot. + sleep_ms(100); + + // Don't send check stale msg to PD + let peer_check_stale_state_fp = "peer_check_stale_state"; + fail::cfg(peer_check_stale_state_fp, "return()").unwrap(); + + pd_client.must_remove_peer(r1, new_peer(3, 3)); + // Without it, pd_client.must_remove_peer does not trigger destroy_peer! + pd_client.must_add_peer(r1, new_peer(3, 4)); + + let before_handle_normal_3_fp = "before_handle_normal_3"; + // to pause ApplyTaskRes::Destroy so that peer gc could finish + fail::cfg(before_handle_normal_3_fp, "pause").unwrap(); + // Wait for leader send msg to peer 3. + // Then destroy peer 3 + sleep_ms(100); + + fail::remove(before_handle_normal_3_fp); // allow destroy run + + // restart node 3 + cluster.stop_node(3); + fail::remove(apply_snapshot_fp); + fail::remove(peer_check_stale_state_fp); + fail::remove(destroy_peer_fp); + cluster.run_node(3).unwrap(); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + // After peer 3 has applied snapshot, data should be got. + must_get_equal(&cluster.get_engine(3), b"k119", b"v1"); + // In the end the snapshot file should be gc-ed anyway, either by new peer or by store + let now = Instant::now(); + loop { + let mut snap_files = vec![]; + let snap_dir = cluster.get_snap_dir(3); + // snapfiles should be gc. + snap_files.extend(fs::read_dir(snap_dir).unwrap().map(|p| p.unwrap().path())); + if snap_files.is_empty() { + break; + } + if now.saturating_elapsed() > Duration::from_secs(5) { + panic!("snap files are not gc-ed"); + } + sleep_ms(20); + } + + cluster.must_put(b"k120", b"v1"); + // new data should be replicated to peer 4 in store 3 + must_get_equal(&cluster.get_engine(3), b"k120", b"v1"); +} + +// This test is to repro the issue #11618. +// Basically it aborts a snapshot and wait for an election done. (without fix, raft will panic) +// The test step is make peer 3 partitioned with rest. +// And then recover from partition and the leader will try to send a snapshot to peer3. +// Abort the snapshot and then wait for a election happening, we expect raft will panic +#[test] +fn test_abort_snapshot_and_wait_election() { + let mut cluster = new_server_cluster(0, 3); + cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(10); + cluster.cfg.raft_store.raft_election_timeout_ticks = 25; // > lease 240ms + cluster.cfg.raft_store.hibernate_regions = false; + configure_for_snapshot(&mut cluster); + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + pd_client.must_add_peer(r1, new_peer(2, 2)); + pd_client.must_add_peer(r1, new_peer(3, 1003)); + + cluster.must_put(b"k1", b"v1"); + let region = cluster.get_region(b"k1"); + // Ensure peer 3 is initialized. + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + + let apply_snapshot_fp = "region_apply_snap_abort"; + fail::cfg(apply_snapshot_fp, "return()").unwrap(); + + cluster.add_send_filter(IsolationFilterFactory::new(3)); + for i in 0..20 { + cluster.must_put(format!("k1{}", i).as_bytes(), b"v1"); + } + // Wait for leader send snapshot. + let (sx, rx) = mpsc::sync_channel::(10); + let recv_snapshot_filter = RegionPacketFilter::new(region.get_id(), 3) + .direction(Direction::Recv) + .msg_type(MessageType::MsgSnapshot) + .allow(1) + .set_msg_callback(Arc::new(move |_| { + sx.send(true).unwrap(); + })); + cluster.add_recv_filter(CloneFilterFactory(recv_snapshot_filter)); + + cluster.clear_send_filters(); // allow snapshot to sent over to peer 3 + rx.recv().unwrap(); // got the snapshot message + cluster.add_send_filter(IsolationFilterFactory::new(3)); // partition the peer 3 again + sleep_ms(500); // wait for election happen and expect raft will panic + cluster.clear_send_filters(); + cluster.clear_recv_filters(); +} + #[test] fn test_shutdown_when_snap_gc() { let mut cluster = new_node_cluster(0, 2);