Notification Service with threading
Automation/System Design
Implementing bulk notification services for niche applications or bots can be done in a fairly straightforward way.
For example, suppose you have an application that sends notifications for specific events. In my case, it was sending notifications when new anime episodes or manga chapters are released.
A common approach is to loop through all the notifications in the main thread. This works fine, but if you have many data entries to process, it can cause significant delays. You can mitigate this issue by using threading, batch jobs, message queues, or similar techniques.
Let’s say you have data for guilds and the anime they are subscribed to, and the task you’re looping over will generate an RSS result. Now, consider doing this with more than 30,000 rows.
rss_result = [
'Demon Slayer',
'My Hero Academia',
'Naruto',
'Attack on Titan',
'Jujutsu Kaisen',
'Nana'
]
user_anime_database = [
{'guild_id': 392115, 'anime_name': 'Demon Slayer'},
{'guild_id': 754600, 'anime_name': 'My Hero Academia'},
{'guild_id': 682307, 'anime_name': 'Naruto'},
{'guild_id': 992163, 'anime_name': 'Attack on Titan'}
]
You need to pass the data to an executor function along with the function you will use to execute it.
Generally, you would pass the user_anime_database
to the main thread, which would loop over the data and execute the rest of your code logic.
However, in this case, you will pass each entry from the user_anime_database
to different threads.
You can use either ProcessPoolExecutor or ThreadPoolExecutor. In this example, I will be using ThreadPoolExecutor. You will pass the function that checks if there’s any new anime in the RSS feed, and it will return the relevant data.
import concurrent.futures
import time
from threading import current_thread
def executor_function(user_anime,rss_result):
for anime in rss_result:
if user_anime['anime_name'] in anime:
print(current_thread())
return user_anime
return None
Now, start_concurrent_processes()
def start_concurrent_processes(user_anime_database,rss_result):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_tasks = {
executor.submit(executor_function, user_anime,rss_result): user_anime for user_anime in user_anime_database
}
for future in concurrent.futures.as_completed(future_tasks):
user_anime = future_tasks[future]
try:
data = future.result()
if data:
print("Data: ",data)
except Exception as error:
print('Exception occured',error,user_anime)
If you checkout the future_tasks
its mapping the object to the data.
<Future at 0x1ecaf3dc050 state=finished returned dict>: {'guild_id': 392115, 'anime_name': 'Demon Slayer'}
You might assume that
for future in concurrent.futures.as_completed(future_tasks)
is just looping over the tasks, so how is this any different?
Parallel processing isn’t restricted by this—it’s simply waiting for task completion.
This means the tasks are being processed in parallel, and as_completed()
allows you to process the results of each task as soon as it’s finished,
without waiting for all tasks to complete.
You can also implement your send_notification()
within the executor_function.
After processing, it can send the notification directly, allowing multiple notifications to be sent concurrently.
You can get the thread identifier using current_thread(), which helps visualize which thread is responsible for processing a specific task.
<Thread(ThreadPoolExecutor-0_0, started 28948)>
<Thread(ThreadPoolExecutor-0_0, started 28948)>
<Thread(ThreadPoolExecutor-0_0, started 28948)>
<Thread(ThreadPoolExecutor-0_1, started 15452)>
<Thread(ThreadPoolExecutor-0_2, started 29784)>
<Thread(ThreadPoolExecutor-0_6, started 12520)>
✌️Peace.
đź“’NOTE
I haven't implemented this approach in my bot yet, as it’s a bit complex and requires changes to the system design. However, I plan to use the following blueprint as a starting point.