Asynchronous Python Programming

Umut Tekin
Analytics Vidhya
Published in
3 min readOct 21, 2020

--

Today, I am going to show you something different. The topic that I am still trying to use almost on every suitable subject and as a result to master on it is asynchronous programming with Python.

In my company, we administer 195 read — write Oracle databases(as well as with their local and remote standbys )and in some cases we have to query each of them. Of course, we have scheduled jobs to consolidate data on the our monitoring database. Yet, our inventory is not enough to compensate our and/or our customer’s demands.

Either, we can gather the data serially from all the databases or concurrently from all the databases.

In order to compare fairly, to measure serial processing time I will use Python.

Let’ s have a look,

import time
import cx_Oracle
import concurrent.futures
t1 = time.perf_counter()query= f"""select distinct db_name, tns_host, tns_port, tns_service from inv_schema.inv_table"""]mapdsn = cx_Oracle.makedsn('inv_db_dns', 'inv_db_port',service_name='inv_db_service')
con1 = cx_Oracle.connect("inv_db_user", "inv_db_passwd", mapdsn, encoding="UTF-8")
cursor1 = con1.cursor()
cursor1.execute(query)
strmappeddsn = [(element[0], f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={element[1]})(PORT={element[2]}))(CONNECT_DATA=(SERVICE_NAME={element[3]})))") for element in cursor1.fetchall()]def connect_and_execute(dsn):
querylist = ["""select count(*) from dba_role_privs"""]
try:
con = cx_Oracle.connect("inv_db_user", "inv_db_passwd", dsn[1], encoding="UTF-8")
if not con: return
else:
cursor = con.cursor()
x = []
for query in querylist:
cursor.execute(query)
try:
x.append(cursor.fetchall())
except Exception as e:
pass
con.close()
print (str(dsn[0]), str(x))

except Exception as e:
print(str(dsn[0]) + "\t" + str(e))
pass
t2 = time.perf_counter()

for x in strmappeddsn:
connect_and_execute(x)

t3 = time.perf_counter()

print(t2-t1)
print(t3-t2)

As you can see below, only serial processing time (t3 — t2) took ~100,5 seconds for 195 databases.

On the other hand,

import time
import cx_Oracle
import concurrent.futures
t1 = time.perf_counter()query= f"""select distinct db_name, tns_host, tns_port, tns_service from inv_schema.inv_table"""]mapdsn = cx_Oracle.makedsn('inv_db_dns', 'inv_db_port',service_name='inv_db_service')
con1 = cx_Oracle.connect("inv_db_user", "inv_db_passwd", mapdsn, encoding="UTF-8")
cursor1 = con1.cursor()
cursor1.execute(query)
strmappeddsn = [(element[0], f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={element[1]})(PORT={element[2]}))(CONNECT_DATA=(SERVICE_NAME={element[3]})))") for element in cursor1.fetchall()]def connect_and_execute(dsn):
querylist = ["""select count(*) from dba_role_privs"""]
try:
con = cx_Oracle.connect("inv_db_user", "inv_db_passwd", dsn[1], encoding="UTF-8")
if not con: return
else:
cursor = con.cursor()
x = []
for query in querylist:
cursor.execute(query)
try:
x.append(cursor.fetchall())
except Exception as e:
pass
con.close()
return (str(dsn[0]), str(x))

except Exception as e:
print(str(dsn[0]) + "\t" + str(e))
pass
t2 = time.perf_counter()

with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(connect_and_execute, strmappeddsn)
for result in results:
output.append(result)
t3 = time.perf_counter()[print(_) for _ in output]
print(t2-t1)
print(t3-t2)

In both Python script I have used same function to connect Oracle database and to execute SQL with one difference that in the asynchronous version I have returned the output of the SQL instead print.

For asynchronous programming I used concurrent library. According to the https://docs.python.org/3/library/concurrent.futures.html, after creating an executor we can use map function to invoke asynchronous calls. This map function differs from built — in map function in those are:

  • the iterables are collected immediately rather than lazily.
  • func is executed asynchronously and several calls to func may be made concurrently.

Also, in the official Python documentation for Python version 3.7.1 says that default maximum number of worker(calculation of maximum worker number has not been changed since Python version 3.5 until 3.8) is “the number of processors on the machine, multiplied by 5”. So,

>>> import os
>>> print(os.cpu_count())
8
>>>

my maximum number of workers is 40.

Okay, but, what is the result? I will not arouse curiosity anymore. The time is ~3.68 seconds for 195 databases.

100,4983596 / 3,6819677 = 27,2947398750945

Approximately, 26 times better :). It’ s cool, huh?

On the next story, I will try to optimize max worker count and push the limits Oracle Exadata.

Thanks for reading!

Umut Tekin

--

--