REDIS DB using PYTHON

Amit.Kumar
6 min readOct 20, 2022

Putting out the code doe python redis db . redis-py installation required before running this code

from ast import Notfrom redis import Redisimport loggingimport timeimport jsonimport pickleimport datetimeimport timeimport stringimport randomimport sysimport threadingimport getoptfrom redis.commands.json.path import Pathimport redis.commands.search.aggregation as aggregationsimport redis.commands.search.reducers as reducersfrom redis.commands.search.field import TextField, NumericField, TagFieldfrom redis.commands.search.indexDefinition import IndexDefinition, IndexTypefrom redis.commands.search.query import NumericFilter, Queryredis_host = '<redis-host>'redis_pwd = '<redis-pwd>'redis_username = '<redis-username>'redis_port = '<redis-port>'logging.basicConfig(level=logging.INFO)redis = Redis(host=redis_host, port=redis_port, decode_responses=True,username=redis_username, password=redis_pwd)orderbook_value = <json-data>def GenerateRandomString(lenstring):res = ''.join(random.choices(string.ascii_uppercase +string.digits, k=lenstring))# print result#print("The generated random string : " + str(res))return resdef StoreOrderbookData(num):try :num_of_records = numcounter = 0pipeline = redis.pipeline()if(len(orderbook_data_type) != len(orderbook_data_value)):print("length of lists not equal . stop the program.")exit(1)list_len = len(orderbook_data_type)while counter < num_of_records:random_ucc = GenerateRandomString(6)ts = time.time()list_len_count = 0random_ucc_key = "orderbook:"+random_uccwhile list_len_count < list_len:pipeline.hset(random_ucc_key,orderbook_data_type[list_len_count], orderbook_data_value[list_len_count])list_len_count = list_len_count + 1pipeline.execute()counter = counter + 1pipeline.execute()except Exception as e:print("exception occurred in redis {}".format(e))return counterdef ScanOrderbookData():counter = 0for item in redis.scan_iter():response = redis.hgetall(item)print(response)counter = counter + 1return counterdef StoreJSONRedis(num):try :# 1 record is approx 1 kb in dummy data above . So to get 1 GB data on redis -> 1 * 1000 * 1000  num_of_recordsnum_of_records = numcounter = 0order_random_key = ""while counter < num_of_records:random_ucc = GenerateRandomString(6)order_random_key = order_type[0] + ":"+random_uccglobal orderbook_valuets = time.time()orderbook_value["header"]["ucc"] = random_uccorderbook_value["data"]["client_id"] = random_uccorderbook_value["header"]["timestamp"] = tsp_mydict = pickle.dumps(orderbook_value, protocol=0)#print(sys.getsizeof(p_mydict)) -> get size in bytesredis.set(order_random_key,p_mydict)# read_dict = redis.get(order_random_key)# yourdict = pickle.loads(str.encode(read_dict))# print(yourdict)counter = counter + 1except Exception as e:print("exception occurred in redis {}".format(e))return counterdef StoreJSONRedis2(num):try :# 1 record is approx 1 kb in dummy data above . So to get 1 GB data on redis -> 1 * 1000 * 1000  num_of_recordsnum_of_records = numcounter = 0order_random_key = ""while counter < num_of_records:random_ucc = GenerateRandomString(6)order_random_key = order_type[0] + ":"+random_uccglobal orderbook_valuets = time.time()orderbook_value["header"]["ucc"] = random_uccorderbook_value["data"]["client_id"] = random_uccorderbook_value["header"]["timestamp"] = tsredis.hset(order_random_key,orderbook_value)cache_value = redis.hgetall(order_random_key)print("cache value : "+cache_value)# read_dict = redis.get(order_random_key)# yourdict = pickle.loads(str.encode(read_dict))# print(yourdict)counter = counter + 1except Exception as e:print("exception occurred in redis {}".format(e))return counterdef StoreJSONRedis3(num):try :# 1 record is approx 1 kb in dummy data above . So to get 1 GB data on redis -> 1 * 1000 * 1000  num_of_recordsnum_of_records = numcounter = 0order_random_key = ""while counter < num_of_records:random_ucc = GenerateRandomString(6)order_random_key = order_type[0] + ":"+random_uccglobal orderbook_valuets = time.time()orderbook_value["header"]["ucc"] = random_uccorderbook_value["data"]["client_id"] = random_uccorderbook_value["header"]["timestamp"] = tsredis.json().set(order_random_key, Path.root_path(), orderbook_value)# result = redis.json().get(order_random_key)# print(result)counter = counter + 1except Exception as e:print("exception occurred in redis {}".format(e))return counterdef StoreJSONRedis4(num):try :# 1 record is approx 1 kb in dummy data above . So to get 1 GB data on redis -> 1 * 1000 * 1000  num_of_recordsnum_of_records = numcounter = 0order_random_key = ""pipeline = redis.pipeline()num_pipeline = 100while counter < num_of_records:pipeline_counter = 0while pipeline_counter < num_pipeline and counter < num_of_records:random_ucc = GenerateRandomString(6)order_random_key = "user" + ":"+random_uccglobal orderbook_valuets = time.time()orderbook_value["header"]["ucc"] = random_uccorderbook_value["data"]["client_id"] = random_uccorderbook_value["header"]["timestamp"] = tspipeline.json().set(order_random_key, Path.root_path(), orderbook_value)# result = redis.json().get(order_random_key)# print(result)pipeline_counter = pipeline_counter + 1counter = counter + 1pipeline.execute()except Exception as e:print("exception occurred in redis {}".format(e))return counterdef IndexRedisData():try:print("----indexing the redis data----")schema = (TextField("$.data.order_number", as_name="order_num"), TextField("$.data.token", as_name="token_num"))redis.ft().create_index(schema, definition=IndexDefinition(prefix=["user:"], index_type=IndexType.JSON))except Exception as e:print("index already exists")#redis.ft().dropindex() #-> delete the index created aboveprint(e)def QueryRedisDB():print("querying redis db:")#response = redis.ft().search("220823000295727")#redis.ft().search(Query("Paul").return_field("$.user.city", as_field="city")).docs#q1 = Query("Paul").add_filter(NumericFilter("age", 30, 40))#res = client.search("@title:evil wizards") ->  search in specific field#client.search("@published_year:[2020 2021]") -> either in 2020 or 2021#querying using the secondary indexes. Seconday indexes made in IndexRedisData() functionnum_count = 0while num_count < 1000:q1 = Query("@token_num:31951").paging(0,5000)#q1 = Query("@order_num:220823000295727").paging(0,5000)response = redis.ft().search(q1)print("total result : "+str(response.total))num_count = num_count + 1# counter = 0# for item in response.docs:#     print(item.json)#     print("------------------------------------------------------------")#     counter = counter + 1# for item in response.docs:#     #print(item)print("-------------------------------------------------------------------")#print("printed result count : "+str(counter))#print("total result : "+str(response.total))#querying using the primary key# result = redis.json().get("user:3ARO5I")# print(result)def ScanRedis():counter = 0for item in redis.scan_iter():print(item)#print(type(item))read_dict = redis.get(item)yourdict = pickle.loads(str.encode(read_dict))print(yourdict)# if values is not None:#     print(values)counter = counter + 1return counter#JSON Scandef ScanRedis2():counter = 0for item in redis.scan_iter():print(item)#print(type(item))result = redis.json().get(item)print(result)counter = counter + 1return counterdef PublishToRedis():try :# 1 record is approx 1 kb in dummy data above . So to get 1 GB data on redis -> 1 * 1000 * 1000  num_of_recordsnum_of_records = 3000counter = 0order_random_key = ""while counter < num_of_records:random_ucc = GenerateRandomString(6)global orderbook_valueorderbook_value.update({"client_id" : random_ucc})p_mydict = pickle.dumps(orderbook_value, protocol=0)#print(sys.getsizeof(p_mydict)) -> get size in bytesredis.publish(random_ucc, p_mydict)counter = counter + 1except Exception as e:print("exception occurred in redis {}".format(e))def SubscribeToRedis():sub = redis.pubsub()ucc = "KU8083"sub.subscribe(ucc)for message in sub.listen():if message is not None and isinstance(message, dict):order_data = message.get('data')print(order_data)redis.zincrby('ucc_score', 1, order_data)def CleanRedisDB():#redis.execute_command('FLUSHALL ASYNC')redis.flushdb() # Delete all keys of currently selected database instance.#redis.flushall() # Delete all keys of entire database.def StartPubSubThread():t1 = threading.Thread(target=SubscribeToRedis)t2 = threading.Thread(target=PublishToRedis)# starting thread 1t1.start()# starting thread 2t2.start()# wait until thread 1 is completely executedt1.join()# wait until thread 2 is completely executedt2.join()def GetCommandLineOptions():argumentList = sys.argv[1:]# Optionsoptions = "si:"# Long optionslong_options = ["help", "insert=","inserti=","jinsert=","hinsert=","pinsert=", "publish", "subscribe", "scan","jscan","hscan", "flush","query"]try:# Parsing argumentarguments, values = getopt.getopt(argumentList, options, long_options)# checking each argumentfor currentArgument, currentValue in arguments:if currentArgument in ("--help"):print ("python <script_name> <--help for help>/<--insert num for insertions>/<--publish for publish>/<--scan for scan the db>/<--flush for clearing the db>/<--subscribe for subscribe for db> ")elif currentArgument in ("--insert"):start = time.time()total_inserted = StoreJSONRedis(int(currentValue))end  = time.time()time_diff = end - startprint(f"time taken in sec for insert : {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbprint("total inserted count : "+str(total_inserted))elif currentArgument in ("--jinsert"):start = time.time()total_inserted = StoreJSONRedis3(int(currentValue))end  = time.time()time_diff = end - startprint(f"time taken in sec for jinsert : {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbprint("total inserted count : "+str(total_inserted))elif currentArgument in ("--pinsert"):start = time.time()total_inserted = StoreJSONRedis4(int(currentValue))end  = time.time()time_diff = end - startIndexRedisData()print(f"time taken in sec for pinsert : {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbprint("total inserted count : "+str(total_inserted))elif currentArgument in ("--hinsert"):start = time.time()total_inserted = StoreOrderbookData(int(currentValue))end  = time.time()time_diff = end - startprint(f"time taken in sec for hinsert : {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbprint("total inserted count : "+str(total_inserted))elif currentArgument in ("--inserti"):start = time.time()total_inserted = StoreJSONRedis2(int(currentValue))end  = time.time()time_diff = end - startprint(f"time taken in sec for insert2 : {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbprint("total inserted count : "+str(total_inserted))elif currentArgument in ("--scan"):start = time.time()total_count = ScanRedis()print("total count : "+str(total_count))end = time.time()time_diff = end - startprint(f"time taken in sec for scan: {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbelif currentArgument in ("--jscan"):start = time.time()total_count = ScanRedis2()print("total count : "+str(total_count))end = time.time()time_diff = end - startprint(f"time taken in sec for json scan: {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbelif currentArgument in ("--hscan"):start = time.time()total_count = ScanOrderbookData()print("total count : "+str(total_count))end = time.time()time_diff = end - startprint(f"time taken in sec for  hscan: {str(time_diff)}")print("total db size : "+str(redis.dbsize())) # number of keys inside the redis dbelif currentArgument in ("--flush"):start = time.time()CleanRedisDB()end = time.time()time_diff = end - startprint(f"time taken in sec for flush : {str(time_diff)}")elif currentArgument in ("--query"):start = time.time()QueryRedisDB()end = time.time()time_diff = end - startprint(f"time taken in sec for query : {str(time_diff)}")except getopt.error as err:# output error, and return with an error codeprint (str(err))if __name__ == '__main__':print("program started\n")#start = time.time()GetCommandLineOptions()
)
print("\nprogram ends")

--

--

Amit.Kumar

I have been a coder all my life . And yes a dreamer too. But i am also interested in understanding different aspects of life .