File: | home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc |
Warning: | line 243, column 9 Value stored to 'total_penlty' is never read |
[?] Use j/k keys for keyboard navigation
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License |
4 | // (found in the LICENSE.Apache file in the root directory). |
5 | |
6 | #ifndef ROCKSDB_LITE |
7 | |
8 | #include "util/delete_scheduler.h" |
9 | |
10 | #include <thread> |
11 | #include <vector> |
12 | |
13 | #include "port/port.h" |
14 | #include "rocksdb/env.h" |
15 | #include "util/logging.h" |
16 | #include "util/mutexlock.h" |
17 | #include "util/sst_file_manager_impl.h" |
18 | #include "util/sync_point.h" |
19 | |
20 | namespace rocksdb { |
21 | |
22 | DeleteScheduler::DeleteScheduler(Env* env, int64_t rate_bytes_per_sec, |
23 | Logger* info_log, |
24 | SstFileManagerImpl* sst_file_manager, |
25 | double max_trash_db_ratio, |
26 | uint64_t bytes_max_delete_chunk) |
27 | : env_(env), |
28 | total_trash_size_(0), |
29 | rate_bytes_per_sec_(rate_bytes_per_sec), |
30 | pending_files_(0), |
31 | bytes_max_delete_chunk_(bytes_max_delete_chunk), |
32 | closing_(false), |
33 | cv_(&mu_), |
34 | info_log_(info_log), |
35 | sst_file_manager_(sst_file_manager), |
36 | max_trash_db_ratio_(max_trash_db_ratio) { |
37 | assert(sst_file_manager != nullptr)(static_cast<void> (0)); |
38 | assert(max_trash_db_ratio >= 0)(static_cast<void> (0)); |
39 | bg_thread_.reset( |
40 | new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this)); |
41 | } |
42 | |
43 | DeleteScheduler::~DeleteScheduler() { |
44 | { |
45 | InstrumentedMutexLock l(&mu_); |
46 | closing_ = true; |
47 | cv_.SignalAll(); |
48 | } |
49 | if (bg_thread_) { |
50 | bg_thread_->join(); |
51 | } |
52 | } |
53 | |
54 | Status DeleteScheduler::DeleteFile(const std::string& file_path, |
55 | const std::string& dir_to_sync, |
56 | const bool force_bg) { |
57 | Status s; |
58 | if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && |
59 | total_trash_size_.load() > |
60 | sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) { |
61 | // Rate limiting is disabled or trash size makes up more than |
62 | // max_trash_db_ratio_ (default 25%) of the total DB size |
63 | TEST_SYNC_POINT("DeleteScheduler::DeleteFile"); |
64 | s = env_->DeleteFile(file_path); |
65 | if (s.ok()) { |
66 | sst_file_manager_->OnDeleteFile(file_path); |
67 | } |
68 | return s; |
69 | } |
70 | |
71 | // Move file to trash |
72 | std::string trash_file; |
73 | s = MarkAsTrash(file_path, &trash_file); |
74 | |
75 | if (!s.ok()) { |
76 | ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash", file_path.c_str())rocksdb::Log(InfoLogLevel::ERROR_LEVEL, info_log_, ("[%s:" "76" "] " "Failed to mark %s as trash"), RocksLogShorterFileName( "/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), file_path.c_str()); |
77 | s = env_->DeleteFile(file_path); |
78 | if (s.ok()) { |
79 | sst_file_manager_->OnDeleteFile(file_path); |
80 | } |
81 | return s; |
82 | } |
83 | |
84 | // Update the total trash size |
85 | uint64_t trash_file_size = 0; |
86 | env_->GetFileSize(trash_file, &trash_file_size); |
87 | total_trash_size_.fetch_add(trash_file_size); |
88 | |
89 | // Add file to delete queue |
90 | { |
91 | InstrumentedMutexLock l(&mu_); |
92 | queue_.emplace(trash_file, dir_to_sync); |
93 | pending_files_++; |
94 | if (pending_files_ == 1) { |
95 | cv_.SignalAll(); |
96 | } |
97 | } |
98 | return s; |
99 | } |
100 | |
101 | std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() { |
102 | InstrumentedMutexLock l(&mu_); |
103 | return bg_errors_; |
104 | } |
105 | |
106 | const std::string DeleteScheduler::kTrashExtension = ".trash"; |
107 | bool DeleteScheduler::IsTrashFile(const std::string& file_path) { |
108 | return (file_path.size() >= kTrashExtension.size() && |
109 | file_path.rfind(kTrashExtension) == |
110 | file_path.size() - kTrashExtension.size()); |
111 | } |
112 | |
113 | Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm, |
114 | const std::string& path) { |
115 | Status s; |
116 | // Check if there are any files marked as trash in this path |
117 | std::vector<std::string> files_in_path; |
118 | s = env->GetChildren(path, &files_in_path); |
119 | if (!s.ok()) { |
120 | return s; |
121 | } |
122 | for (const std::string& current_file : files_in_path) { |
123 | if (!DeleteScheduler::IsTrashFile(current_file)) { |
124 | // not a trash file, skip |
125 | continue; |
126 | } |
127 | |
128 | Status file_delete; |
129 | std::string trash_file = path + "/" + current_file; |
130 | if (sfm) { |
131 | // We have an SstFileManager that will schedule the file delete |
132 | sfm->OnAddFile(trash_file); |
133 | file_delete = sfm->ScheduleFileDeletion(trash_file, path); |
134 | } else { |
135 | // Delete the file immediately |
136 | file_delete = env->DeleteFile(trash_file); |
137 | } |
138 | |
139 | if (s.ok() && !file_delete.ok()) { |
140 | s = file_delete; |
141 | } |
142 | } |
143 | |
144 | return s; |
145 | } |
146 | |
147 | Status DeleteScheduler::MarkAsTrash(const std::string& file_path, |
148 | std::string* trash_file) { |
149 | // Sanity check of the path |
150 | size_t idx = file_path.rfind("/"); |
151 | if (idx == std::string::npos || idx == file_path.size() - 1) { |
152 | return Status::InvalidArgument("file_path is corrupted"); |
153 | } |
154 | |
155 | Status s; |
156 | if (DeleteScheduler::IsTrashFile(file_path)) { |
157 | // This is already a trash file |
158 | *trash_file = file_path; |
159 | return s; |
160 | } |
161 | |
162 | *trash_file = file_path + kTrashExtension; |
163 | // TODO(tec) : Implement Env::RenameFileIfNotExist and remove |
164 | // file_move_mu mutex. |
165 | int cnt = 0; |
166 | InstrumentedMutexLock l(&file_move_mu_); |
167 | while (true) { |
168 | s = env_->FileExists(*trash_file); |
169 | if (s.IsNotFound()) { |
170 | // We found a path for our file in trash |
171 | s = env_->RenameFile(file_path, *trash_file); |
172 | break; |
173 | } else if (s.ok()) { |
174 | // Name conflict, generate new random suffix |
175 | *trash_file = file_path + std::to_string(cnt) + kTrashExtension; |
176 | } else { |
177 | // Error during FileExists call, we cannot continue |
178 | break; |
179 | } |
180 | cnt++; |
181 | } |
182 | if (s.ok()) { |
183 | sst_file_manager_->OnMoveFile(file_path, *trash_file); |
184 | } |
185 | return s; |
186 | } |
187 | |
188 | void DeleteScheduler::BackgroundEmptyTrash() { |
189 | TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash"); |
190 | |
191 | while (true) { |
192 | InstrumentedMutexLock l(&mu_); |
193 | while (queue_.empty() && !closing_) { |
194 | cv_.Wait(); |
195 | } |
196 | |
197 | if (closing_) { |
198 | return; |
199 | } |
200 | |
201 | // Delete all files in queue_ |
202 | uint64_t start_time = env_->NowMicros(); |
203 | uint64_t total_deleted_bytes = 0; |
204 | int64_t current_delete_rate = rate_bytes_per_sec_.load(); |
205 | while (!queue_.empty() && !closing_) { |
206 | if (current_delete_rate != rate_bytes_per_sec_.load()) { |
207 | // User changed the delete rate |
208 | current_delete_rate = rate_bytes_per_sec_.load(); |
209 | start_time = env_->NowMicros(); |
210 | total_deleted_bytes = 0; |
211 | } |
212 | |
213 | // Get new file to delete |
214 | const FileAndDir& fad = queue_.front(); |
215 | std::string path_in_trash = fad.fname; |
216 | |
217 | // We dont need to hold the lock while deleting the file |
218 | mu_.Unlock(); |
219 | uint64_t deleted_bytes = 0; |
220 | bool is_complete = true; |
221 | // Delete file from trash and update total_penlty value |
222 | Status s = |
223 | DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete); |
224 | total_deleted_bytes += deleted_bytes; |
225 | mu_.Lock(); |
226 | if (is_complete) { |
227 | queue_.pop(); |
228 | } |
229 | |
230 | if (!s.ok()) { |
231 | bg_errors_[path_in_trash] = s; |
232 | } |
233 | |
234 | // Apply penlty if necessary |
235 | uint64_t total_penlty; |
236 | if (current_delete_rate > 0) { |
237 | // rate limiting is enabled |
238 | total_penlty = |
239 | ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate); |
240 | while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} |
241 | } else { |
242 | // rate limiting is disabled |
243 | total_penlty = 0; |
Value stored to 'total_penlty' is never read | |
244 | } |
245 | TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait", |
246 | &total_penlty); |
247 | |
248 | if (is_complete) { |
249 | pending_files_--; |
250 | } |
251 | if (pending_files_ == 0) { |
252 | // Unblock WaitForEmptyTrash since there are no more files waiting |
253 | // to be deleted |
254 | cv_.SignalAll(); |
255 | } |
256 | } |
257 | } |
258 | } |
259 | |
260 | Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, |
261 | const std::string& dir_to_sync, |
262 | uint64_t* deleted_bytes, |
263 | bool* is_complete) { |
264 | uint64_t file_size; |
265 | Status s = env_->GetFileSize(path_in_trash, &file_size); |
266 | *is_complete = true; |
267 | TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile"); |
268 | if (s.ok()) { |
269 | bool need_full_delete = true; |
270 | if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) { |
271 | uint64_t num_hard_links = 2; |
272 | // We don't have to worry aobut data race between linking a new |
273 | // file after the number of file link check and ftruncte because |
274 | // the file is now in trash and no hardlink is supposed to create |
275 | // to trash files by RocksDB. |
276 | Status my_status = env_->NumFileLinks(path_in_trash, &num_hard_links); |
277 | if (my_status.ok()) { |
278 | if (num_hard_links == 1) { |
279 | std::unique_ptr<WritableFile> wf; |
280 | my_status = |
281 | env_->ReopenWritableFile(path_in_trash, &wf, EnvOptions()); |
282 | if (my_status.ok()) { |
283 | my_status = wf->Truncate(file_size - bytes_max_delete_chunk_); |
284 | if (my_status.ok()) { |
285 | TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync"); |
286 | my_status = wf->Fsync(); |
287 | } |
288 | } |
289 | if (my_status.ok()) { |
290 | *deleted_bytes = bytes_max_delete_chunk_; |
291 | need_full_delete = false; |
292 | *is_complete = false; |
293 | } else { |
294 | ROCKS_LOG_WARN(info_log_,rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "296" "] " "Failed to partially delete %s from trash -- %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str(), my_status.ToString().c_str()) |
295 | "Failed to partially delete %s from trash -- %s",rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "296" "] " "Failed to partially delete %s from trash -- %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str(), my_status.ToString().c_str()) |
296 | path_in_trash.c_str(), my_status.ToString().c_str())rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "296" "] " "Failed to partially delete %s from trash -- %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str(), my_status.ToString().c_str()); |
297 | } |
298 | } else { |
299 | ROCKS_LOG_INFO(info_log_,rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "302" "] " "Cannot delete %s slowly through ftruncate from trash " "as it has other links"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str()) |
300 | "Cannot delete %s slowly through ftruncate from trash "rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "302" "] " "Cannot delete %s slowly through ftruncate from trash " "as it has other links"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str()) |
301 | "as it has other links",rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "302" "] " "Cannot delete %s slowly through ftruncate from trash " "as it has other links"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str()) |
302 | path_in_trash.c_str())rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "302" "] " "Cannot delete %s slowly through ftruncate from trash " "as it has other links"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str()); |
303 | } |
304 | } else if (!num_link_error_printed_) { |
305 | ROCKS_LOG_INFO(rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "309" "] " "Cannot delete files slowly through ftruncate from trash " "as Env::NumFileLinks() returns error: %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), my_status.ToString().c_str()) |
306 | info_log_,rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "309" "] " "Cannot delete files slowly through ftruncate from trash " "as Env::NumFileLinks() returns error: %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), my_status.ToString().c_str()) |
307 | "Cannot delete files slowly through ftruncate from trash "rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "309" "] " "Cannot delete files slowly through ftruncate from trash " "as Env::NumFileLinks() returns error: %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), my_status.ToString().c_str()) |
308 | "as Env::NumFileLinks() returns error: %s",rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "309" "] " "Cannot delete files slowly through ftruncate from trash " "as Env::NumFileLinks() returns error: %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), my_status.ToString().c_str()) |
309 | my_status.ToString().c_str())rocksdb::Log(InfoLogLevel::INFO_LEVEL, info_log_, ("[%s:" "309" "] " "Cannot delete files slowly through ftruncate from trash " "as Env::NumFileLinks() returns error: %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), my_status.ToString().c_str()); |
310 | num_link_error_printed_ = true; |
311 | } |
312 | } |
313 | |
314 | if (need_full_delete) { |
315 | s = env_->DeleteFile(path_in_trash); |
316 | if (!dir_to_sync.empty()) { |
317 | std::unique_ptr<Directory> dir_obj; |
318 | if (s.ok()) { |
319 | s = env_->NewDirectory(dir_to_sync, &dir_obj); |
320 | } |
321 | if (s.ok()) { |
322 | s = dir_obj->Fsync(); |
323 | TEST_SYNC_POINT_CALLBACK( |
324 | "DeleteScheduler::DeleteTrashFile::AfterSyncDir", |
325 | reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync))); |
326 | } |
327 | } |
328 | *deleted_bytes = file_size; |
329 | sst_file_manager_->OnDeleteFile(path_in_trash); |
330 | } |
331 | } |
332 | if (!s.ok()) { |
333 | // Error while getting file size or while deleting |
334 | ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",rocksdb::Log(InfoLogLevel::ERROR_LEVEL, info_log_, ("[%s:" "335" "] " "Failed to delete %s from trash -- %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str(), s.ToString().c_str()) |
335 | path_in_trash.c_str(), s.ToString().c_str())rocksdb::Log(InfoLogLevel::ERROR_LEVEL, info_log_, ("[%s:" "335" "] " "Failed to delete %s from trash -- %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/util/delete_scheduler.cc" ), path_in_trash.c_str(), s.ToString().c_str()); |
336 | *deleted_bytes = 0; |
337 | } else { |
338 | total_trash_size_.fetch_sub(*deleted_bytes); |
339 | } |
340 | |
341 | return s; |
342 | } |
343 | |
344 | void DeleteScheduler::WaitForEmptyTrash() { |
345 | InstrumentedMutexLock l(&mu_); |
346 | while (pending_files_ > 0 && !closing_) { |
347 | cv_.Wait(); |
348 | } |
349 | } |
350 | |
351 | } // namespace rocksdb |
352 | |
353 | #endif // ROCKSDB_LITE |