Kv Store
import time
import threading
from threading import *
import json
from glob import glob
class KVStore:
def __init__(self):
self.m = {}
self.timed_m = {}
self.counter = 0
def get(self, k, time=None):
if time is None:
if k not in self.m:
raise ValueError("Key doesn't exist")
return self.m[k]
if k not in self.timed_m:
raise ValueError("Key doesn't exist")
val_list = self.timed_m[k]
idx = self.search(val_list, time)
if idx == -1:
raise ValueError("Couldn't find key in time")
return val_list[idx].val
def set(self, k, v):
self.m[k] = v
t = self.get_time()
value = Value(v, t)
if k not in self.timed_m:
self.timed_m[k] = [value]
else:
self.timed_m[k].append(value)
def search(self, val_list, time):
n = len(val_list)
l, r = 0, n - 1
res = -1
while l <= r:
m = l + (r - l) // 2
if val_list[m].time == time:
return m
elif val_list[m].time < time:
res = m
l = m + 1
else:
r = m - 1
return res
def get_time(self):
res = self.counter
self.counter += 1
return res
class ReadersWriteLock():
def __init__(self):
self.cond_var = Condition()
self.write_in_progress = False
self.readers = 0
def acquire_read_lock(self):
self.cond_var.acquire()
while self.write_in_progress is True:
self.cond_var.wait()
self.readers += 1
self.cond_var.release()
def release_read_lock(self):
self.cond_var.acquire()
self.readers -= 1
if self.readers is 0:
self.cond_var.notifyAll()
self.cond_var.release()
def acquire_write_lock(self):
self.cond_var.acquire()
while self.readers is not 0 or self.write_in_progress is True:
self.cond_var.wait()
self.write_in_progress = True
self.cond_var.release()
def release_write_lock(self):
self.cond_var.acquire()
self.write_in_progress = False
self.cond_var.notifyAll()
self.cond_var.release()
class Value:
def __init__(self, val, time):
self.val = val
self.time = time
def __repr__(self):
return "Value(val={}, time={})".format(self.val, self.time)
class KVStoreLock:
def __init__(self):
self.m = {}
self.timed_m = {}
self.counter = 0
self.lock = ReadersWriteLock()
def get(self, k, time=None):
self.lock.acquire_read_lock()
try:
if time is None:
if k not in self.m:
raise ValueError("Key doesn't exist")
return self.m[k]
if k not in self.timed_m:
raise ValueError("Key doesn't exist")
val_list = self.timed_m[k]
idx = self.search(val_list, time)
if idx == -1:
raise ValueError("Couldn't find key in time")
return val_list[idx].val
finally:
self.lock.release_read_lock()
def set(self, k, v):
self.lock.acquire_write_lock()
try:
self.m[k] = v
t = self.get_time()
value = Value(v, t)
if k not in self.timed_m:
self.timed_m[k] = [value]
else:
self.timed_m[k].append(value)
finally:
self.lock.release_write_lock()
def search(self, val_list, time):
n = len(val_list)
l, r = 0, n - 1
res = -1
while l <= r:
m = l + (r - l) // 2
if val_list[m].time == time:
return m
elif val_list[m].time < time:
res = m
l = m + 1
else:
r = m - 1
return res
def get_time(self):
res = self.counter
self.counter += 1
return res
def set_values(store, key):
for i in range(1000000):
store.set(key, str(i))
def get_value(store, key):
try:
print(f"Value for {key}: {store.get(key)}")
except ValueError as e:
print(e)
class KVStoreLockWithDiskFlush:
def __init__(self, filename_template="store_{}.json", ttl_seconds = 2):
self.m = {}
self.timed_m = {}
self.counter = 0
self.lock = ReadersWriteLock()
self.filename_template = filename_template
self.ttl_seconds = ttl_seconds
self.files = []
self.last_flush_time = time.time()
self.start_flushing_thread()
def get_filename(self, timestamp):
return self.filename_template.format(timestamp)
def flush_to_disk(self):
self.lock.acquire_write_lock()
flushing_time = time.time()
self.last_flush_time = flushing_time
try:
cutoff_timestamp = flushing_time - self.ttl_seconds
old_data_timed_map = {}
for key, val_list in self.timed_m.items():
old_val_list = [val for val in val_list if val.time < cutoff_timestamp]
if old_val_list:
old_data_timed_map[key] = old_val_list
data_to_dump = {
'timed_map': {k: [vars(val) for val in v] for k, v in old_data_timed_map.items()}
}
# Create a filename that includes the cutoff timestamp
filename = self.get_filename(cutoff_timestamp)
self.files.append(filename)
with open(filename, 'w') as f:
json.dump(data_to_dump, f)
for key, old_val_list in old_data_timed_map.items():
self.timed_m[key] = [val for val in self.timed_m[key] if val.time >= cutoff_timestamp]
finally:
self.lock.release_write_lock()
def load_from_disk(self, key, time=None):
# Step 1: Gather all JSON files in the current directory
json_files = self.files
# Step 2 & 3: Binary search for the appropriate file based on the timestamp
file_to_read = None
if time is not None:
left, right = 0, len(json_files) - 1
while left <= right:
mid = left + (right - left) // 2
mid_time = self.extract_timestamp_from_filename(json_files[mid])
if mid_time <= time:
file_to_read = json_files[mid]
left = mid + 1
else:
right = mid - 1
else:
# If time is None, just use the last file (latest)
file_to_read = json_files[-1] if json_files else None
# Step 4: Load and search data
if file_to_read:
try:
with open(file_to_read, 'r') as f:
data = json.load(f)
if time is not None and key in data['timed_map']:
val_list = [Value(**val) for val in data['timed_map'][key]]
idx = self.search(val_list, time)
if idx != -1:
return val_list[idx].val
except FileNotFoundError:
pass
return None
def extract_timestamp_from_filename(self, filename):
# Remove the preceding 'store_' and the '.json' extension to extract the timestamp
timestamp_str = filename[len('store_'):-len('.json')]
try:
return int(timestamp_str)
except ValueError:
# Handle cases where the timestamp is not a valid integer
raise ValueError("Invalid filename format: Cannot parse timestamp from '{}'".format(filename))
def get(self, k, time=None):
self.lock.acquire_read_lock()
try:
res = None
if time is None:
if k in self.m:
res = self.m[k]
else:
if k in self.timed_m:
val_list = self.timed_m[k]
idx = self.search(val_list, time)
if idx != -1:
res = val_list[idx].val
if res is None:
res = self.load_from_disk(k, time)
return res
finally:
self.lock.release_read_lock()
def set(self, k, v):
self.lock.acquire_write_lock()
try:
self.m[k] = v
t = self.get_time()
value = Value(v, t)
if k not in self.timed_m:
self.timed_m[k] = [value]
else:
self.timed_m[k].append(value)
finally:
self.lock.release_write_lock()
def search(self, val_list, time):
# print("search val list: ", str(val_list))
for i in range(len(val_list)):
print(str(val_list[i]))
n = len(val_list)
l, r = 0, n - 1
res = -1
while l <= r:
m = l + (r - l) // 2
if val_list[m].time == time:
return m
elif val_list[m].time < time:
res = m
l = m + 1
else:
r = m - 1
return res
def get_time(self):
return time.time()
def should_flush(self):
return time.time() - self.last_flush_time > self.ttl_seconds
def start_flushing_thread(self):
def flush_periodically():
while True:
if self.should_flush():
self.flush_to_disk()
threading.Thread(target=flush_periodically, daemon=True).start()
def set_values(store, key):
for i in range(1000000):
store.set(key, str(i))
def get_value(store, key):
try:
print(f"Value for {key}: {store.get(key)}")
except ValueError as e:
print(e)
def test_concurrent_write():
# Non-concurrent KVStore
kv_store = KVStoreLockWithDiskFlush()
# Creating threads for simultaneous writes
thread2 = threading.Thread(target=kv_store.set, args=('k1', 'v1'))
thread3 = threading.Thread(target=kv_store.set, args=('k1', 'v2'))
thread1 = threading.Thread(target=kv_store.set, args=('k1', 'v3'))
thread4 = threading.Thread(target=kv_store.set, args=('k1', 'v4'))
thread5 = threading.Thread(target=kv_store.set, args=('k1', 'v5'))
thread1.start()
time.sleep(2)
thread2.start()
thread3.start()
thread4.start()
thread5.start()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
print('counter: ', str(kv_store.counter))
# Reading the value after writes
get_value(kv_store, 'k1')
def generate_dummy_data(kv_store, num_entries, time_interval):
"""
Generates dummy data and triggers flush_to_disk method.
:param kv_store: An instance of KVStoreLockWithDiskFlush
:param num_entries: Number of entries to create
:param time_interval: Time interval (in seconds) between entries
"""
for i in range(num_entries):
key = f"key_{i}"
value = f"value_{i}"
kv_store.set(key, value)
# Simulate time passage
time.sleep(time_interval)
# Optionally trigger flush based on some condition
if i % 10 == 0: # Example condition, adjust as needed
kv_store.flush_to_disk()
if __name__ == "__main__":
# Testing the KVStore
# kv_store = KVStore()
# kv_store.set("k1", "v1") # 0
# kv_store.set("k2", "v2") # 1
# kv_store.set("k1", "v3") # 2
# print(kv_store.get("k1")) # v3
# print(kv_store.get("k1", 1)) # v1
# kv_store.get_time() # 3
# kv_store.get_time() # 4
# kv_store.set("k1", "v4") # 5
# print(kv_store.get("k1")) # v4
# print(kv_store.get("k1", 0)) # v1
# print(kv_store.get("k1", 1)) # v1
# print(kv_store.get("k1", 2)) # v3
# print(kv_store.get("k1", 3)) # v3
# print(kv_store.get("k1", 4)) # v3
# print(kv_store.get("k1", 5)) # v4
# print(kv_store.get("k1", 6)) # v4
# for i in range(50):
test_concurrent_write()
Last updated