Kv Store
import time
import threading
from threading import *
import json
from glob import glob
class KVStore:
def __init__(self):
self.m = {}
self.timed_m = {}
self.counter = 0
def get(self, k, time=None):
if time is None:
if k not in self.m:
raise ValueError("Key doesn't exist")
return self.m[k]
if k not in self.timed_m:
raise ValueError("Key doesn't exist")
val_list = self.timed_m[k]
idx = self.search(val_list, time)
if idx == -1:
raise ValueError("Couldn't find key in time")
return val_list[idx].val
def set(self, k, v):
self.m[k] = v
t = self.get_time()
value = Value(v, t)
if k not in self.timed_m:
self.timed_m[k] = [value]
else:
self.timed_m[k].append(value)
def search(self, val_list, time):
n = len(val_list)
l, r = 0, n - 1
res = -1
while l <= r:
m = l + (r - l) // 2
if val_list[m].time == time:
return m
elif val_list[m].time < time:
res = m
l = m + 1
else:
r = m - 1
return res
def get_time(self):
res = self.counter
self.counter += 1
return res
class ReadersWriteLock():
def __init__(self):
self.cond_var = Condition()
self.write_in_progress = False
self.readers = 0
def acquire_read_lock(self):
self.cond_var.acquire()
while self.write_in_progress is True:
self.cond_var.wait()
self.readers += 1
self.cond_var.release()
def release_read_lock(self):
self.cond_var.acquire()
self.readers -= 1
if self.readers is 0:
self.cond_var.notifyAll()
self.cond_var.release()
def acquire_write_lock(self):
self.cond_var.acquire()
while self.readers is not 0 or self.write_in_progress is True:
self.cond_var.wait()
self.write_in_progress = True
self.cond_var.release()
def release_write_lock(self):
self.cond_var.acquire()
self.write_in_progress = False
self.cond_var.notifyAll()
self.cond_var.release()
class Value:
def __init__(self, val, time):
self.val = val
self.time = time
def __repr__(self):
return "Value(val={}, time={})".format(self.val, self.time)
class KVStoreLock:
def __init__(self):
self.m = {}
self.timed_m = {}
self.counter = 0
self.lock = ReadersWriteLock()
def get(self, k, time=None):
self.lock.acquire_read_lock()
try:
if time is None:
if k not in self.m:
raise ValueError("Key doesn't exist")
return self.m[k]
if k not in self.timed_m:
raise ValueError("Key doesn't exist")
val_list = self.timed_m[k]
idx = self.search(val_list, time)
if idx == -1:
raise ValueError("Couldn't find key in time")
return val_list[idx].val
finally:
self.lock.release_read_lock()
def set(self, k, v):
self.lock.acquire_write_lock()
try:
self.m[k] = v
t = self.get_time()
value = Value(v, t)
if k not in self.timed_m:
self.timed_m[k] = [value]
else:
self.timed_m[k].append(value)
finally:
self.lock.release_write_lock()
def search(self, val_list, time):
n = len(val_list)
l, r = 0, n - 1
res = -1
while l <= r:
m = l + (r - l) // 2
if val_list[m].time == time:
return m
elif val_list[m].time < time:
res = m
l = m + 1
else:
r = m - 1
return res
def get_time(self):
res = self.counter
self.counter += 1
return res
def set_values(store, key):
for i in range(1000000):
store.set(key, str(i))
def get_value(store, key):
try:
print(f"Value for {key}: {store.get(key)}")
except ValueError as e:
print(e)
class KVStoreLockWithDiskFlush:
def __init__(self, filename_template="store_{}.json", ttl_seconds = 2):
self.m = {}
self.timed_m = {}
self.counter = 0
self.lock = ReadersWriteLock()
self.filename_template = filename_template
self.ttl_seconds = ttl_seconds
self.files = []
self.last_flush_time = time.time()
self.start_flushing_thread()
def get_filename(self, timestamp):
return self.filename_template.format(timestamp)
def flush_to_disk(self):
self.lock.acquire_write_lock()
flushing_time = time.time()
self.last_flush_time = flushing_time
try:
cutoff_timestamp = flushing_time - self.ttl_seconds
old_data_timed_map = {}
for key, val_list in self.timed_m.items():
old_val_list = [val for val in val_list if val.time < cutoff_timestamp]
if old_val_list:
old_data_timed_map[key] = old_val_list
data_to_dump = {
'timed_map': {k: [vars(val) for val in v] for k, v in old_data_timed_map.items()}
}
# Create a filename that includes the cutoff timestamp
filename = self.get_filename(cutoff_timestamp)
self.files.append(filename)
with open(filename, 'w') as f:
json.dump(data_to_dump, f)
for key, old_val_list in old_data_timed_map.items():
self.timed_m[key] = [val for val in self.timed_m[key] if val.time >= cutoff_timestamp]
finally:
self.lock.release_write_lock()
def load_from_disk(self, key, time=None):
# Step 1: Gather all JSON files in the current directory
json_files = self.files
# Step 2 & 3: Binary search for the appropriate file based on the timestamp
file_to_read = None
if time is not None:
left, right = 0, len(json_files) - 1
while left <= right:
mid = left + (right - left) // 2
mid_time = self.extract_timestamp_from_filename(json_files[mid])
if mid_time <= time:
file_to_read = json_files[mid]
left = mid + 1
else:
right = mid - 1
else:
# If time is None, just use the last file (latest)
file_to_read = json_files[-1] if json_files else None
# Step 4: Load and search data
if file_to_read:
try:
with open(file_to_read, 'r') as f:
data = json.load(f)
if time is not None and key in data['timed_map']:
val_list = [Value(**val) for val in data['timed_map'][key]]
idx = self.search(val_list, time)
if idx != -1:
return val_list[idx].val
except FileNotFoundError:
pass
return None
def extract_timestamp_from_filename(self, filename):
# Remove the preceding 'store_' and the '.json' extension to extract the timestamp
timestamp_str = filename[len('store_'):-len('.json')]
try:
return int(timestamp_str)
except ValueError:
# Handle cases where the timestamp is not a valid integer
raise ValueError("Invalid filename format: Cannot parse timestamp from '{}'".format(filename))
def get(self, k, time=None):
self.lock.acquire_read_lock()
try:
res = None
if time is None:
if k in self.m:
res = self.m[k]
else:
if k in self.timed_m:
val_list = self.timed_m[k]
idx = self.search(val_list, time)
if idx != -1:
res = val_list[idx].val
if res is None:
res = self.load_from_disk(k, time)
return res
finally:
self.lock.release_read_lock()
def set(self, k, v):
self.lock.acquire_write_lock()
try:
self.m[k] = v
t = self.get_time()
value = Value(v, t)
if k not in self.timed_m:
self.timed_m[k] = [value]
else:
self.timed_m[k].append(value)
finally:
self.lock.release_write_lock()
def search(self, val_list, time):
# print("search val list: ", str(val_list))
for i in range(len(val_list)):
print(str(val_list[i]))
n = len(val_list)
l, r = 0, n - 1
res = -1
while l <= r:
m = l + (r - l) // 2
if val_list[m].time == time:
return m
elif val_list[m].time < time:
res = m
l = m + 1
else:
r = m - 1
return res
def get_time(self):
return time.time()
def should_flush(self):
return time.time() - self.last_flush_time > self.ttl_seconds
def start_flushing_thread(self):
def flush_periodically():
while True:
if self.should_flush():
self.flush_to_disk()
threading.Thread(target=flush_periodically, daemon=True).start()
def set_values(store, key):
for i in range(1000000):
store.set(key, str(i))
def get_value(store, key):
try:
print(f"Value for {key}: {store.get(key)}")
except ValueError as e:
print(e)
def test_concurrent_write():
# Non-concurrent KVStore
kv_store = KVStoreLockWithDiskFlush()
# Creating threads for simultaneous writes
thread2 = threading.Thread(target=kv_store.set, args=('k1', 'v1'))
thread3 = threading.Thread(target=kv_store.set, args=('k1', 'v2'))
thread1 = threading.Thread(target=kv_store.set, args=('k1', 'v3'))
thread4 = threading.Thread(target=kv_store.set, args=('k1', 'v4'))
thread5 = threading.Thread(target=kv_store.set, args=('k1', 'v5'))
thread1.start()
time.sleep(2)
thread2.start()
thread3.start()
thread4.start()
thread5.start()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
print('counter: ', str(kv_store.counter))
# Reading the value after writes
get_value(kv_store, 'k1')
def generate_dummy_data(kv_store, num_entries, time_interval):
"""
Generates dummy data and triggers flush_to_disk method.
:param kv_store: An instance of KVStoreLockWithDiskFlush
:param num_entries: Number of entries to create
:param time_interval: Time interval (in seconds) between entries
"""
for i in range(num_entries):
key = f"key_{i}"
value = f"value_{i}"
kv_store.set(key, value)
# Simulate time passage
time.sleep(time_interval)
# Optionally trigger flush based on some condition
if i % 10 == 0: # Example condition, adjust as needed
kv_store.flush_to_disk()
if __name__ == "__main__":
# Testing the KVStore
# kv_store = KVStore()
# kv_store.set("k1", "v1") # 0
# kv_store.set("k2", "v2") # 1
# kv_store.set("k1", "v3") # 2
# print(kv_store.get("k1")) # v3
# print(kv_store.get("k1", 1)) # v1
# kv_store.get_time() # 3
# kv_store.get_time() # 4
# kv_store.set("k1", "v4") # 5
# print(kv_store.get("k1")) # v4
# print(kv_store.get("k1", 0)) # v1
# print(kv_store.get("k1", 1)) # v1
# print(kv_store.get("k1", 2)) # v3
# print(kv_store.get("k1", 3)) # v3
# print(kv_store.get("k1", 4)) # v3
# print(kv_store.get("k1", 5)) # v4
# print(kv_store.get("k1", 6)) # v4
# for i in range(50):
test_concurrent_write()
#include <string>
#include <chrono>
#include <iostream>
using namespace std;
#include <mutex>
#include <condition_variable>
class ReadersWriterLock {
public:
ReadersWriterLock() : writeInProgress(false), readersCount(0) {}
// Acquire read lock
void acquireReadLock() {
std::unique_lock<std::mutex> lock(mutex_);
readCondVar.wait(lock, [this]() { return !writeInProgress; });
++readersCount;
}
// Release read lock
void releaseReadLock() {
std::unique_lock<std::mutex> lock(mutex_);
if (--readersCount == 0 && writeInProgress) {
writeCondVar.notify_one();
}
}
// Acquire write lock
void acquireWriteLock() {
std::unique_lock<std::mutex> lock(mutex_);
writeInProgress = true;
writeCondVar.wait(lock, [this]() { return readersCount == 0; });
}
// Release write lock
void releaseWriteLock() {
std::unique_lock<std::mutex> lock(mutex_);
writeInProgress = false;
readCondVar.notify_all();
writeCondVar.notify_one();
}
private:
std::mutex mutex_;
std::condition_variable readCondVar;
std::condition_variable writeCondVar;
bool writeInProgress;
int readersCount;
};
class Value {
public:
string val;
int time;
Value(string v, int t): val(v), time(t) {}
};
// KV Store with timestamp
class KVStore {
public:
unordered_map<string, string> m;
unordered_map<string, vector<Value>> timed_m;
int counter = 0;
string get(string k) {
if (!m.count(k)) {
throw runtime_error("Key doesn't exist");
}
return m[k];
}
void set(string k, string v) {
m[k] = v;
// set k, v in timed_m
int t = getTime();
Value value = Value(v, t);
if (!timed_m.count(k)) {
timed_m[k] = {value};
} else {
timed_m[k].push_back(value);
}
}
string get(string k, int time) {
if (!timed_m.count(k)) {
throw runtime_error("Key doesn't exist");
}
vector<Value> valList = timed_m[k];
int idx = search(valList, time);
if (idx == -1) {
throw runtime_error("Couldn't find key in time");
}
return valList[idx].val;
}
void debug(vector<Value>& valList) {
for (int i = 0; i < valList.size(); i++) {
cout << valList[i].val << " ";
}
cout << endl;
}
int search(vector<Value>& valList, int time) {
int n = valList.size();
int l = 0, r = n-1;
int res = -1;
while (l <= r) {
int m = l + (r-l) / 2;
if (valList[m].time == time) {
return m;
} else if (valList[m].time < time) {
res = m;
l = m+1;
} else {
r = m-1;
}
}
return res;
}
int getTime() {
// const auto now = std::chrono::system_clock::now();
// const std::time_t t_c = std::chrono::system_clock::to_time_t(now);
return counter++;
}
};
int main() {
KVStore kvStore;
kvStore.set("k1", "v1"); // 0
kvStore.set("k2", "v2"); // 1
kvStore.set("k1", "v3"); // 2
cout << kvStore.get("k1") << endl; //v3
cout << kvStore.get("k1", 1) << endl; // v1
kvStore.getTime(); // 3
kvStore.getTime(); // 4
kvStore.set("k1", "v4"); // 5
cout << kvStore.get("k1") << endl; //v4
cout << kvStore.get("k1", 0) << endl; //v1
cout << kvStore.get("k1", 1) << endl; //v1
cout << kvStore.get("k1", 2) << endl; //v3
cout << kvStore.get("k1", 3) << endl; //v3
cout << kvStore.get("k1", 4) << endl; //v3
cout << kvStore.get("k1", 5) << endl; //v4
cout << kvStore.get("k1", 6) << endl; //v4
// {"k1": [(v1, 0), (v3, 2)]}
return 0;
}```
Last updated