Skip to main content

5 posts tagged with "db"

View All Tags

rocksdb

· 7 min read

rocksdb get 使用

(gdb) bt
#0 std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::assign (__n=6, __s=0x555555db8135 "value", this=0x7fffffffde90) at /usr/include/c++/12/bits/basic_string.h:1063
#1 rocksdb::SaveValue (arg=0x7fffffffd690, entry=<optimized out>) at db/memtable.cc:1068
#2 0x000055555588eed2 in rocksdb::(anonymous namespace)::SkipListRep::Get (this=<optimized out>, k=..., callback_args=0x7fffffffd690, callback_func=0x555555769fe0 <rocksdb::SaveValue(void*, char const*)>)
at memtable/skiplistrep.cc:90
#3 0x00005555557683c3 in rocksdb::MemTable::GetFromTable (this=this@entry=0x555555db7d70, key=..., max_covering_tombstone_seq=<optimized out>, do_merge=do_merge@entry=true, callback=callback@entry=0x0,
is_blob_index=is_blob_index@entry=0x0, value=0x7fffffffde90, columns=0x0, timestamp=0x0, s=0x7fffffffd950, merge_context=0x7fffffffd990, seq=0x7fffffffd8f8, found_final_value=0x7fffffffd7b6,
merge_in_progress=0x7fffffffd7b7) at db/memtable.cc:1329
#4 0x0000555555769215 in rocksdb::MemTable::Get (this=0x555555db7d70, key=..., value=0x7fffffffde90, columns=0x0, timestamp=timestamp@entry=0x0, s=s@entry=0x7fffffffd950, merge_context=<optimized out>,
max_covering_tombstone_seq=<optimized out>, seq=<optimized out>, read_opts=..., immutable_memtable=<optimized out>, callback=<optimized out>, is_blob_index=<optimized out>, do_merge=<optimized out>)
at db/memtable.cc:1285
#5 0x000055555565e189 in rocksdb::MemTable::Get (do_merge=true, is_blob_index=<optimized out>, callback=<optimized out>, immutable_memtable=false, read_opts=..., max_covering_tombstone_seq=0x7fffffffd8f0,
merge_context=0x7fffffffd990, s=0x7fffffffd950, timestamp=0x0, columns=<optimized out>, value=<optimized out>, key=..., this=<optimized out>) at ./db/memtable.h:279
#6 rocksdb::DBImpl::GetImpl (this=0x555555d7db40, read_options=..., key=..., get_impl_options=...) at db/db_impl/db_impl.cc:2293
#7 0x000055555565522b in rocksdb::DBImpl::GetImpl (this=this@entry=0x555555d7db40, read_options=..., column_family=column_family@entry=0x555555da3ba0, key=..., value=value@entry=0x7fffffffddc0,
timestamp=<optimized out>) at db/db_impl/db_impl.cc:2025
#8 0x0000555555655492 in rocksdb::DBImpl::Get (this=0x555555d7db40, _read_options=..., column_family=0x555555da3ba0, key=..., value=0x7fffffffddc0, timestamp=<optimized out>) at db/db_impl/db_impl.cc:2013
#9 0x000055555564cc57 in rocksdb::DBImpl::Get (this=<optimized out>, read_options=..., column_family=<optimized out>, key=..., value=<optimized out>) at db/db_impl/db_impl.cc:1985
#10 0x0000555555642bf2 in rocksdb::DB::Get (this=this@entry=0x555555d7db40, options=..., column_family=0x555555da3ba0, key=..., value=value@entry=0x7fffffffde90) at ./include/rocksdb/db.h:562
#11 0x0000555555626ed2 in rocksdb::DB::Get (value=0x7fffffffde90, key=..., options=..., this=0x555555d7db40) at ./include/rocksdb/db.h:573
#12 rocksdb_get (db=<optimized out>, options=0x555555dbdc70, key=<optimized out>, keylen=<optimized out>, vallen=0x7fffffffdf00, errptr=0x7fffffffdf10) at db/c.cc:1293
#13 0x00005555556240d7 in main (argc=1, argv=0x7fffffffe078) at c_simple_example.c:67
(gdb) up
#1 rocksdb::SaveValue (arg=0x7fffffffd690, entry=<optimized out>) at db/memtable.cc:1068

走到这个分支:

      case kTypeValue: {                                             // (1) kv type 的类型
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}

Slice v = GetLengthPrefixedSlice(key_ptr + key_length); // (2) 获取值

*(s->status) = Status::OK();

if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
// TODO(yanqin) update MergeContext so that timestamps information
// can also be retained.

merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
} else if (*(s->merge_in_progress)) {
assert(s->do_merge);

if (s->value || s->columns) {
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(),
MergeHelper::kPlainBaseValue, v, merge_context->GetOperands(),
s->logger, s->statistics, s->clock,
/* update_num_ops_stats */ true, s->value, s->columns,
/* op_failure_scope */ nullptr);
}
} else if (s->value) {
s->value->assign(v.data(), v.size()); // (3) 将取得的值塞到s->value
} else if (s->columns) {
s->columns->SetPlainValue(v);
}

Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr ||
get_impl_options.columns != nullptr);

assert(get_impl_options.column_family);

if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
*(read_options.timestamp));
if (!s.ok()) {
return s;
}
} else {
const Status s = FailIfCfHasTs(get_impl_options.column_family);
if (!s.ok()) {
return s;
}
}

// Clear the timestamps for returning results so that we can distinguish
// between tombstone or key that has never been written
if (get_impl_options.timestamp) {
get_impl_options.timestamp->clear();
}

GetWithTimestampReadCallback read_cb(0); // Will call Refresh

PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);

auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
get_impl_options.column_family);
auto cfd = cfh->cfd();

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
// TODO: maybe handle the tracing status?
tracer_->Get(get_impl_options.column_family, key).PermitUncheckedError();
}
}

if (get_impl_options.get_merge_operands_options != nullptr) {
for (int i = 0; i < get_impl_options.get_merge_operands_options
->expected_max_number_of_operands;
++i) {
get_impl_options.merge_operands[i].Reset();
}
}

// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
if (read_options.timestamp && read_options.timestamp->size() > 0) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
}

TEST_SYNC_POINT_CALLBACK("DBImpl::GetImpl:AfterAcquireSv", nullptr);
TEST_SYNC_POINT("DBImpl::GetImpl:1");
TEST_SYNC_POINT("DBImpl::GetImpl:2");

SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (get_impl_options.callback) {
// Already calculated based on read_options.snapshot
snapshot = get_impl_options.callback->max_visible_seq();
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
}
} else {
// Note that the snapshot is assigned AFTER referencing the super
// version because otherwise a flush happening in between may compact away
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
snapshot = GetLastPublishedSequence();
if (get_impl_options.callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
get_impl_options.callback->Refresh(snapshot);

// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
//
// Currently, the commented out assert is broken by
// InvalidSnapshotReadCallback, but if write unprepared recovery followed
// the regular transaction flow, then this special read callback would not
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
snapshot = get_impl_options.callback->max_visible_seq();
}
}
// If timestamp is used, we use read callback to ensure <key,t,s> is returned
// only if t <= read_opts.timestamp and s <= snapshot.
// HACK: temporarily overwrite input struct field but restore
SaveAndRestore<ReadCallback*> restore_callback(&get_impl_options.callback);
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
assert(!get_impl_options
.callback); // timestamp with callback is not supported
read_cb.Refresh(snapshot);
get_impl_options.callback = &read_cb;
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");

// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0;

Status s;
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);

bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
std::string* timestamp =
ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
if (!skip_memtable) {
// Get value associated with key
if (get_impl_options.get_value) {
if (sv->mem->Get(
lkey,
get_impl_options.value ? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;

if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}

RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey,
get_impl_options.value
? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;

if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}

RecordTick(stats_, MEMTABLE_HIT);
}
} else {
// Get Merge Operands associated with key, Merge Operands should not be
// merged and raw values should be returned to the user.
if (sv->mem->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, nullptr, nullptr,
false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->GetMergeOperands(lkey, &s, &merge_context,
&max_covering_tombstone_seq,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:0");
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:1");
PinnedIteratorsManager pinned_iters_mgr;
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(
read_options, lkey, get_impl_options.value, get_impl_options.columns,
timestamp, &s, &merge_context, &max_covering_tombstone_seq,
&pinned_iters_mgr,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
RecordTick(stats_, MEMTABLE_MISS);
}

{
PERF_TIMER_GUARD(get_post_process_time);

RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
const auto& merge_threshold = read_options.merge_operand_count_threshold;
if (merge_threshold.has_value() &&
merge_context.GetNumOperands() > merge_threshold.value()) {
s = Status::OkMergeOperandThresholdExceeded();
}

if (get_impl_options.get_value) {
if (get_impl_options.value) {
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
}
} else {
// Return all merge operands for get_impl_options.key
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
if (*get_impl_options.number_of_operands >
get_impl_options.get_merge_operands_options
->expected_max_number_of_operands) {
s = Status::Incomplete(
Status::SubCode::KMergeOperandsInsufficientCapacity);
} else {
// Each operand depends on one of the following resources: `sv`,
// `pinned_iters_mgr`, or `merge_context`. It would be crazy expensive
// to reference `sv` for each operand relying on it because `sv` is
// (un)ref'd in all threads using the DB. Furthermore, we do not track
// on which resource each operand depends.
//
// To solve this, we bundle the resources in a `GetMergeOperandsState`
// and manage them with a `SharedCleanablePtr` shared among the
// `PinnableSlice`s we return. This bundle includes one `sv` reference
// and ownership of the `merge_context` and `pinned_iters_mgr`
// objects.
bool ref_sv = ShouldReferenceSuperVersion(merge_context);
if (ref_sv) {
assert(!merge_context.GetOperands().empty());
SharedCleanablePtr shared_cleanable;
GetMergeOperandsState* state = nullptr;
state = new GetMergeOperandsState();
state->merge_context = std::move(merge_context);
state->pinned_iters_mgr = std::move(pinned_iters_mgr);

sv->Ref();

state->sv_handle = new SuperVersionHandle(
this, &mutex_, sv,
immutable_db_options_.avoid_unnecessary_blocking_io);

shared_cleanable.Allocate();
shared_cleanable->RegisterCleanup(CleanupGetMergeOperandsState,
state /* arg1 */,
nullptr /* arg2 */);
for (size_t i = 0; i < state->merge_context.GetOperands().size();
++i) {
const Slice& sl = state->merge_context.GetOperands()[i];
size += sl.size();

get_impl_options.merge_operands->PinSlice(
sl, nullptr /* cleanable */);
if (i == state->merge_context.GetOperands().size() - 1) {
shared_cleanable.MoveAsCleanupTo(
get_impl_options.merge_operands);
} else {
shared_cleanable.RegisterCopyWith(
get_impl_options.merge_operands);
}
get_impl_options.merge_operands++;
}
} else {
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
}
}
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}

ReturnAndCleanupSuperVersion(cfd, sv);

RecordInHistogram(stats_, BYTES_PER_READ, size);
}
return s;
}

相关阅读

milvus 编译使用

· One min read

背景

# Clone github repository.
$ git clone https://github.com/milvus-io/milvus.git

# Install third-party dependencies.
$ cd milvus/
$ ./scripts/install_deps.sh

# Compile Milvus.
$ make

相关错误

  • Could NOT find BLAS (missing: BLAS_LIBRARIES) 解决方案
sudo apt-get update
sudo apt-get install -y libopenblas-dev
  • ./milvus: error while loading shared libraries: libtbbmalloc.so.2: cannot open shared object file: No such file or directory 解决方案:
sudo apt-get install libtbb2

btree

· One min read

tree

A free tree T is an undirected graph that is connected and acyclic.

树有三个属性:

  • 无向
  • 连通
  • 无环

定义

一共有两个参数: kh

性质

1 每个叶子节点的高度都一样 2.1 除了叶子节点根节点,其他节点至少有k+1个子节点 2.2 根节点是叶节点或者根节点至少有两个子节点. 3 每个节点最多有2k+1个子节点

核心性质

v ∈ Parent(p)   有
<a+1

插入

插入方式有几种: 1 插入到第一个大于他的节点的同一个页 2 和大于他的最小节点同一个页

删除

查询

mvcc

· One min read

mvcc论文 http://www.cs.cmu.edu/~pavlo/papers/p781-wu.pdf

zhihu相关内容: https://zhuanlan.zhihu.com/p/45734268

mvcc是什么?

mvcc是多版本并发控制

数学基础:

  • 偏序 (part order ) : 偏序则部分元素可以互相比较
  • 全序 (full order ) : 全序描述的是每个元素都可以比较

complete mutipversion history

complete mv history 满足下面性质:

  • $ H = h( \cup ^n_{i=0}T_i) for \quad some \quad translation \quad function \quad h $
  • for each $T_i$ and all operations $p_i$ $q_i$ in $T_i$ if $p_i <_i q_i$, then $h(p_i) < h(q_i)$

相关阅读