From 83556e3c4a54f476da944e8d805180a9bfa4e779 Mon Sep 17 00:00:00 2001 From: yuuya uezato Date: Wed, 10 Apr 2019 16:51:37 +0900 Subject: [PATCH] =?UTF-8?q?issue27=E3=81=AE=E3=83=91=E3=83=83=E3=83=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/nvm/file.rs | 2 +- src/storage/builder.rs | 25 ++++++- src/storage/journal/mod.rs | 2 +- src/storage/journal/nvm_buffer.rs | 64 ++++++++++++++++-- src/storage/journal/options.rs | 18 ++++- src/storage/journal/region.rs | 8 ++- src/storage/journal/ring_buffer.rs | 101 ++++++++++++++++++++++++++--- src/storage/mod.rs | 15 +++-- 8 files changed, 209 insertions(+), 26 deletions(-) diff --git a/src/nvm/file.rs b/src/nvm/file.rs index 0e7e1f7..12693fa 100644 --- a/src/nvm/file.rs +++ b/src/nvm/file.rs @@ -487,7 +487,7 @@ mod tests { let mut parent = dir.as_ref(); while let Some(p) = parent.parent() { parent = p; - }; + } assert!(create_parent_directories(parent).is_ok()); Ok(()) } diff --git a/src/storage/builder.rs b/src/storage/builder.rs index 25746e3..b9badf3 100644 --- a/src/storage/builder.rs +++ b/src/storage/builder.rs @@ -107,6 +107,29 @@ impl StorageBuilder { self } + /// ジャーナルバッファをdiskにflushする際に、 + /// atomicにdiskに永続化されるように安全な手順で書き出す。 + /// + /// これにより、 + /// ジャーナルバッファを書き出している途中でプロセスがクラッシュしても + /// 再起動後には書き出し直前の状態に戻ることができる。 + pub fn journal_safe_flush(&mut self, safe_flush: bool) -> &mut Self { + self.journal.buffer_options.safe_flush = safe_flush; + self + } + + /// enqueue時にGoToFrontでEndOfRecordsを上書きする必要がある際は、 + /// 先に新しいレコードとEndOfRecordsを書き込んでから、 + /// その後にGoToFrontを書き込むようにする。 + /// + /// GoToFrontを書き込んでからジャーナル領域先頭にseekするという + /// 比較的長い処理をしている間にプロセスがクラッシュしても、 + /// 再起動後には直前の状態の戻ることができる。 + pub fn journal_safe_enqueue(&mut self, safe_enqueue: bool) -> &mut Self { + self.journal.buffer_options.safe_enqueue = safe_enqueue; + self + } + /// メトリクス用の共通設定を登録する. /// /// デフォルト値は`MetricBuilder::new()`. @@ -175,7 +198,7 @@ impl StorageBuilder { header.block_size.contains(nvm.block_size()), ErrorKind::InvalidInput ); - let mut journal_options = self.journal.clone(); + let mut journal_options = self.journal; journal_options.block_size = header.block_size; // UUIDをチェック diff --git a/src/storage/journal/mod.rs b/src/storage/journal/mod.rs index 873d92a..3b4e4a5 100644 --- a/src/storage/journal/mod.rs +++ b/src/storage/journal/mod.rs @@ -1,6 +1,6 @@ pub use self::header::{JournalHeader, JournalHeaderRegion}; pub use self::nvm_buffer::JournalNvmBuffer; -pub use self::options::JournalRegionOptions; +pub use self::options::{JournalBufferOptions, JournalRegionOptions}; pub use self::record::{JournalEntry, JournalRecord}; pub use self::region::JournalRegion; diff --git a/src/storage/journal/nvm_buffer.rs b/src/storage/journal/nvm_buffer.rs index 0b699a2..68cad1c 100644 --- a/src/storage/journal/nvm_buffer.rs +++ b/src/storage/journal/nvm_buffer.rs @@ -54,6 +54,14 @@ pub struct JournalNvmBuffer { // ジャーナル領域が発行した読み込み要求を、 // 内部NVMのブロック境界に合うようにアライメントするために使用される。 read_buf: AlignedBytes, + + // バッファの安全なflushを行うかどうかを意味するフラグ + // + // trueの場合は、バッファの先頭から512バイト以降を書き出してsyncした後に、 + // 先頭から512バイトをatomicに書き出す。 + // + // falseの場合は、バッファ全体をdiskに向けて単にflushする + safe_flush: bool, } impl JournalNvmBuffer { /// 新しい`JournalNvmBuffer`インスタンスを生成する. @@ -65,7 +73,7 @@ impl JournalNvmBuffer { /// /// ただし、シーク時には、シーク地点を含まない次のブロック境界までのデータは /// 上書きされてしまうので注意が必要. - pub fn new(nvm: N) -> Self { + pub fn new(nvm: N, safe_flush: bool) -> Self { let block_size = nvm.block_size(); JournalNvmBuffer { inner: nvm, @@ -74,6 +82,7 @@ impl JournalNvmBuffer { write_buf_offset: 0, write_buf: AlignedBytes::new(0, block_size), read_buf: AlignedBytes::new(0, block_size), + safe_flush, } } @@ -95,13 +104,60 @@ impl JournalNvmBuffer { } } + /* + * バッファの内容をdiskに書き出す。 + * + * 関数名が表すように、flushを意図したものであってdiskへの同期までは考慮していない。 + * ただし、safe_syncがtrueの場合は、書き出し順をコントロールするために、 + * 内部的にパラメタ化されているNVMのsyncメソッドを呼び出すことになる。 + * ただしその場合でも、実装の都合により、メモリバッファ全体がdiskへ永続化されるとは限らない。 + */ fn flush_write_buf(&mut self) -> Result<()> { if self.write_buf.is_empty() || !self.maybe_dirty { return Ok(()); } - track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?; - track_io!(self.inner.write(&self.write_buf))?; + if self.safe_flush { + /* + * issue 27(https://github.com/frugalos/cannyls/issues/27)を考慮した + * 順序づいた書き込みを行う。 + * + * ここで順序とは次を意味する + * 1. 書き込みバッファの512バイト以降を全て書き出す。 + * 2. Diskへの同期命令を発行する。 + * 3. 書き込みバッファの先頭512バイトを書き出す。 + * + * 3.のステップで既存のEORを上書きするため、 + * これを最後に行うことにより、DiskからEORが消えた状態になることを避ける。 + * + * パフォーマンスに関する問題点: + * a. Diskへの同期命令はミリセカンド単位でのブロックを生じる。 + * b. ステップ3でシークが生じるのでシーケンシャルwriteでなくなってしまう。 + * + * 先頭512バイトについて: + * 先頭は512*nバイトであれば、DIRECT_IOとの兼ね合いとしては問題がない。 + * ただし、「多くのHDDについては」512バイト=セクタサイズであり + * 先頭部分の書き出しがatomicな書き出しになるという利点がある。 + * ただし用いているファイルシステムの実装によっては + * 実際には512バイトが分断されて書き出される可能性もあり、常にこの利点を享受できるとは限らない。 + */ + let buf: &[u8] = &self.write_buf; + assert!(buf.as_ptr() as usize % 512 == 0); + assert!(buf.len() % 512 == 0); + if buf.len() > 512 { + track_io!(self + .inner + .seek(SeekFrom::Start(self.write_buf_offset + 512)))?; + track_io!(self.inner.write(&buf[512..]))?; + track!(self.inner.sync())?; + } + track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?; + track_io!(self.inner.write(&buf[..512]))?; + } else { + track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?; + track_io!(self.inner.write(&self.write_buf))?; + } + if self.write_buf.len() > self.block_size().as_u16() as usize { // このif節では、 // バッファに末端のalignmentバイト分(= new_len)の情報を残す。 @@ -366,6 +422,6 @@ mod tests { fn new_buffer() -> JournalNvmBuffer { let nvm = MemoryNvm::new(vec![0; 10 * 1024]); - JournalNvmBuffer::new(nvm) + JournalNvmBuffer::new(nvm, false) } } diff --git a/src/storage/journal/options.rs b/src/storage/journal/options.rs index aedf7ea..ab62021 100644 --- a/src/storage/journal/options.rs +++ b/src/storage/journal/options.rs @@ -3,11 +3,12 @@ use block::BlockSize; /// ジャーナル領域の挙動を調整するためのパラメータ群. /// /// 各オプションの説明は`StorageBuilder'のドキュメントを参照のこと. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct JournalRegionOptions { pub gc_queue_size: usize, pub sync_interval: usize, pub block_size: BlockSize, + pub buffer_options: JournalBufferOptions, } impl Default for JournalRegionOptions { fn default() -> Self { @@ -15,6 +16,21 @@ impl Default for JournalRegionOptions { gc_queue_size: 0x1000, sync_interval: 0x1000, block_size: BlockSize::min(), + buffer_options: Default::default(), + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct JournalBufferOptions { + pub safe_flush: bool, + pub safe_enqueue: bool, +} +impl Default for JournalBufferOptions { + fn default() -> Self { + JournalBufferOptions { + safe_flush: false, + safe_enqueue: false, } } } diff --git a/src/storage/journal/region.rs b/src/storage/journal/region.rs index e355ee4..9c28e8b 100644 --- a/src/storage/journal/region.rs +++ b/src/storage/journal/region.rs @@ -80,8 +80,12 @@ where let mut header_region = JournalHeaderRegion::new(header_nvm, block_size); let header = track!(header_region.read_header())?; - let ring_buffer = - JournalRingBuffer::new(ring_buffer_nvm, header.ring_buffer_head, metric_builder); + let ring_buffer = JournalRingBuffer::new( + ring_buffer_nvm, + header.ring_buffer_head, + options.buffer_options, + metric_builder, + ); let metrics = JournalRegionMetrics::new(metric_builder, ring_buffer.metrics().clone()); let mut journal = JournalRegion { diff --git a/src/storage/journal/ring_buffer.rs b/src/storage/journal/ring_buffer.rs index 1b966c2..75a2611 100644 --- a/src/storage/journal/ring_buffer.rs +++ b/src/storage/journal/ring_buffer.rs @@ -2,7 +2,7 @@ use prometrics::metrics::MetricBuilder; use std::io::{BufReader, Read, Seek, SeekFrom}; use super::record::{EMBEDDED_DATA_OFFSET, END_OF_RECORDS_SIZE}; -use super::{JournalEntry, JournalNvmBuffer, JournalRecord}; +use super::{JournalBufferOptions, JournalEntry, JournalNvmBuffer, JournalRecord}; use lump::LumpId; use metrics::JournalQueueMetrics; use nvm::NonVolatileMemory; @@ -34,6 +34,8 @@ pub struct JournalRingBuffer { tail: u64, metrics: JournalQueueMetrics, + + safe_enqueue: bool, } impl JournalRingBuffer { pub fn head(&self) -> u64 { @@ -51,15 +53,21 @@ impl JournalRingBuffer { } /// `JournalRingBuffer`インスタンスを生成する. - pub fn new(nvm: N, head: u64, metric_builder: &MetricBuilder) -> Self { + pub fn new( + nvm: N, + head: u64, + options: JournalBufferOptions, + metric_builder: &MetricBuilder, + ) -> Self { let metrics = JournalQueueMetrics::new(metric_builder); metrics.capacity_bytes.set(nvm.capacity() as f64); JournalRingBuffer { - nvm: JournalNvmBuffer::new(nvm), + nvm: JournalNvmBuffer::new(nvm, options.safe_flush), unreleased_head: head, head, tail: head, metrics, + safe_enqueue: options.safe_enqueue, } } @@ -108,10 +116,21 @@ impl JournalRingBuffer { track!(self.nvm.sync()) } + pub fn enqueue>( + &mut self, + record: &JournalRecord, + ) -> Result> { + if self.safe_enqueue { + self.safe_enqueue(record) + } else { + self.fast_enqueue(record) + } + } + /// レコードをジャーナルの末尾に追記する. /// /// レコードが`JournalRecord::Embed`だった場合には、データを埋め込んだ位置を結果として返す. - pub fn enqueue>( + fn fast_enqueue>( &mut self, record: &JournalRecord, ) -> Result> { @@ -157,6 +176,70 @@ impl JournalRingBuffer { } } + /// レコードをジャーナルの末尾に、安全に、追記する. + /// + /// レコードが`JournalRecord::Embed`だった場合には、データを埋め込んだ位置を結果として返す. + fn safe_enqueue>( + &mut self, + record: &JournalRecord, + ) -> Result> { + // GoToFrontレコードを書き出す場所を覚えるための変数 + let mut pos_for_gotofront = None; + + // 1. 十分な空き領域が存在するかをチェック + track!(self.check_free_space(record))?; + + // 2. リングバッファの終端チェック + if self.will_overflow(record) { + // tail位置からでは空きがないので、先頭に戻って再試行 + // 後で先頭から復帰するために場所を覚えておく + pos_for_gotofront = Some(self.tail); + + self.metrics + .consumed_bytes_at_running + .add_u64(self.nvm.capacity() - self.tail); + + // 先頭に移動した上で + // 再度、十分な空き領域が存在するかをチェック + self.tail = 0; + debug_assert!(!self.will_overflow(record)); + track!(self.check_free_space(record))?; + } + + // 3. レコードを書き込む + let prev_tail = self.tail; + track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?; + track!(record.write_to(&mut self.nvm))?; + self.metrics.enqueued_records_at_running.increment(record); + + // 4. 終端を示すレコードも書き込む + self.tail = self.nvm.position(); // 次回の追記開始位置を保存 (`EndOfRecords`の直前) + self.metrics + .consumed_bytes_at_running + .add_u64(self.tail - prev_tail); + track!(JournalRecord::EndOfRecords::<[_; 0]>.write_to(&mut self.nvm))?; + + // 5. GoToFrontを書き込む必要があれば、一度末尾までジャンプして書き込んだ後に戻ってくる。 + // GoToFrontの書き出しを先に行ってしまうと、EndOfRecordsが存在しない状態が永続化される可能性がある。 + if let Some(pos_for_gotofront) = pos_for_gotofront { + track!(self.nvm.sync())?; // 新しいEndOfRecordsの書き込みを先に永続化する。 + track_io!(self.nvm.seek(SeekFrom::Start(pos_for_gotofront)))?; + track!(JournalRecord::GoToFront::<[_; 0]>.write_to(&mut self.nvm))?; // 古いEndOfRecordsに上書き + track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?; + } + + // 6. 埋め込みPUTの場合には、インデックスに位置情報を返す + if let JournalRecord::Embed(ref lump_id, ref data) = *record { + let portion = JournalPortion { + start: Address::from_u64(prev_tail + EMBEDDED_DATA_OFFSET as u64).unwrap(), + len: data.as_ref().len() as u16, + }; + Ok(Some((*lump_id, portion))) + } else { + Ok(None) + } + } + /// リングバッファの先頭からエントリ群を取り出す. /// /// `EndOfRecords`に到達した時点で走査は終了する. @@ -364,7 +447,7 @@ mod tests { #[test] fn append_and_read_records() -> TestResult { let nvm = MemoryNvm::new(vec![0; 1024]); - let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new()); + let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new()); let records = vec![ record_put("000", 30, 5), @@ -397,7 +480,7 @@ mod tests { #[test] fn read_embedded_data() -> TestResult { let nvm = MemoryNvm::new(vec![0; 1024]); - let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new()); + let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new()); track!(ring.enqueue(&record_put("000", 30, 5)))?; track!(ring.enqueue(&record_delete("111")))?; @@ -415,7 +498,7 @@ mod tests { #[test] fn go_round_ring_buffer() -> TestResult { let nvm = MemoryNvm::new(vec![0; 1024]); - let mut ring = JournalRingBuffer::new(nvm, 512, &MetricBuilder::new()); + let mut ring = JournalRingBuffer::new(nvm, 512, Default::default(), &MetricBuilder::new()); assert_eq!(ring.head, 512); assert_eq!(ring.tail, 512); @@ -433,7 +516,7 @@ mod tests { #[test] fn full() -> TestResult { let nvm = MemoryNvm::new(vec![0; 1024]); - let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new()); + let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new()); let record = record_put("000", 1, 2); while ring.tail <= 1024 - record.external_size() as u64 { @@ -464,7 +547,7 @@ mod tests { #[test] fn too_large_record() { let nvm = MemoryNvm::new(vec![0; 1024]); - let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new()); + let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new()); let record = record_embed("000", &vec![0; 997]); assert_eq!(record.external_size(), 1020); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6266027..3a3ad4b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -248,9 +248,9 @@ where // ジャーナル領域に範囲削除レコードを一つ書き込むため、一度のディスクアクセスが起こる。 // 削除レコードを範囲分書き込むわけ *ではない* ため、複数回のディスクアクセスは発生しない。 track!(self - .journal_region - .records_delete_range(&mut self.lump_index, range))?; - + .journal_region + .records_delete_range(&mut self.lump_index, range))?; + for lump_id in &targets { if let Some(portion) = self.lump_index.remove(lump_id) { self.metrics.delete_lumps.increment(); @@ -262,7 +262,7 @@ where self.data_region.delete(portion); } } - } + } Ok(targets) } @@ -522,9 +522,10 @@ mod tests { // マイナーバージョンを減らして、ヘッダを上書きする { - header.minor_version = header.minor_version.checked_sub(1).expect( - "このテストは`MINOR_VERSION >= 1`であることを前提としている", - ); + header.minor_version = header + .minor_version + .checked_sub(1) + .expect("このテストは`MINOR_VERSION >= 1`であることを前提としている"); let file = track_any_err!(OpenOptions::new().write(true).open(&path))?; track!(header.write_to(file))?; }