Data Crawling With Asyncio — 2

Optimize Concurrent Task Execution with Asynchronous QueuesIntroductionpixlr.aiIn our previous tutorial, we built an asynchronous data crawler using Python’s asyncio.gather to handle concurrent API requests and efficiently explore data from the Hacker …


This content originally appeared on Level Up Coding - Medium and was authored by Ichinga Samuel

Optimize Concurrent Task Execution with Asynchronous Queues

Introduction

pixlr.ai

In our previous tutorial, we built an asynchronous data crawler using Python’s asyncio.gather to handle concurrent API requests and efficiently explore data from the Hacker News API. We focused on managing multiple tasks simultaneously, enabling our crawler to retrieve stories, comments, jobs, and user data with speed and precision.

Now, we’re going to explore an alternative and even more robust method for managing asynchronous tasks: asyncio.Queue. While asyncio.gather provided a simple and effective way to execute tasks concurrently, asyncio.Queue introduces more flexibility and control over task distribution. By leveraging the producer-consumer pattern, we can optimize our crawler to handle large datasets more efficiently, avoiding bottlenecks and overloading the system with too many tasks at once.

In this article, we’ll dive into how asyncio.Queue works, compare it with the previous approach using asyncio.gather, and implement a new version of our data crawler that maximizes scalability and task management.

Asyncio Queues

The queues in the asyncio module shares similarities with the queues found in Python’s queue module, as both support a multi-producer, multi-consumer model. However, asyncio.Queue is specifically designed to work with asynchronous code using async/await syntax. There are three types of queues in asyncio:

  1. asyncio.Queue: A standard first-in, first-out (FIFO) queue, where tasks are processed in the order they are added.
  2. asyncio.LifoQueue: A last-in, first-out (LIFO) queue, where the most recently added tasks are processed first.
  3. asyncio.PriorityQueue: A priority queue where tasks are retrieved in priority order, with the lowest priority tasks being processed first.

These specialized queues are highly useful for managing task flow and workload distribution in asynchronous programming scenarios. We will develop our program to work with any of these queues, but our primary focus will be on asyncio.PriorityQueue, as it provides greater control for prioritizing tasks—key to achieving our desired outcomes.

API Requests, Data Models, and Data Storage

This tutorial is a follow-up to our previous guide, where we developed an API class for handling requests, data classes for managing returned items and users, and a mock database. Since there have been no changes to those sections of the code, we will leave them as they are. If you’d like a refresher, you can revisit the previous tutorial, but for convenience, I will include the code here for easy reference.

A Queue Item

A queue typically consumes an item from one end and produces it from the other, without concern for the nature or type of the item. This behavior is especially true for FIFO and LIFO queues. However, the exception is the PriorityQueue, which requires each item to be provided as a tuple, where the first element represents the priority and the second element is the item to be processed. Additionally, a priority queue item must be both sortable and hashable.

Since we want our program to work seamlessly with different types of queues, we’ll need to account for these differences during development and adjust accordingly. The first class we will build is the QueueItem class, which will handle these distinctions and provide the foundation for managing items in the queue.

class QueueItem:
def __init__(self, coroutine: Callable | Coroutine, must_complete=False, *args, **kwargs):
self.coroutine = coroutine
self.args = args
self.kwargs = kwargs
self.must_complete = must_complete
self.time = asyncio.get_event_loop().time()

def __hash__(self):
return id(self)

def __lt__(self, other):
return self.time < other.time

async def run(self):
try:
await self.coroutine(*self.args, **self.kwargs)

except Exception as err:
print(f"An error occurred while running coroutine: {err}")

The QueueItem class is designed to encapsulate a coroutine along with its arguments and metadata, making it suitable for use in different types of asyncio queues, including PriorityQueue. By storing the time when the item is added to the event loop, the class allows for comparison and sorting, which is essential for prioritizing tasks in a PriorityQueue. It also ensures that each QueueItem instance is hashable, enabling its use in sets or as dictionary keys.

When the QueueItem is processed in the run method, it asynchronously executes the encapsulated coroutine with the arguments and/or keyword arguments passed in during instantiation, handling any errors that may occur. This class provides a standardized way to manage tasks in asynchronous queues, allowing for flexible task scheduling and prioritization, depending on the queue type.

A Task Queue

We are now going to develop a task management system for our queue. Just like in the previous article, we want control over our tasks, so that we can exit gracefully as we wish, run our program for a specific amount of time etc.

class TaskQueue:
def __init__(self, size: int = 0, workers: int = 50, timeout: int = 60,
queue: asyncio.Queue = None, on_exit: Literal['cancel', 'complete_priority'] = 'complete_priority'):
self.queue = queue or asyncio.PriorityQueue(maxsize=size)
self.workers = workers
self.tasks = []
self.priority_tasks = set() # tasks that must complete
self.timeout = timeout
self.stop = False
self.on_exit = on_exit
signal(SIGINT, self.sigint_handle)

def add(self, *, item: QueueItem, priority=3):
try:
if not self.stop:
if isinstance(self.queue, asyncio.PriorityQueue):
self.priority_tasks.add(item) if item.must_complete else ...
item = (priority, item)
self.queue.put_nowait(item)

except asyncio.QueueFull:
...

async def worker(self):
counter = 5
while True:
try:
if isinstance(self.queue, asyncio.PriorityQueue):
_, item = self.queue.get_nowait()
else:
item = self.queue.get_nowait()
if not self.stop or item.must_complete:
await item.run()
self.queue.task_done()
self.priority_tasks.discard(item)

except asyncio.QueueEmpty:
if counter:
await asyncio.sleep(counter)
counter -= 1
else:
break

def sigint_handle(self, sig, frame):
print('SIGINT received, cleaning up...')

if self.on_exit == 'complete_priority':
print(f'Completing {len(self.priority_tasks)} priority tasks...')
self.stop = True

else:
self.cancel()

self.on_exit = 'cancel' # force cancel on exit if SIGINT is received again

async def run(self, timeout: int = 0):
loop = asyncio.get_running_loop()
start = loop.time()

try:
self.tasks.extend(asyncio.create_task(self.worker()) for _ in range(self.workers))
task = asyncio.create_task(self.queue.join())
self.tasks.append(task)
await asyncio.wait_for(task, timeout = timeout or self.timeout)

except TimeoutError:
print(f"Timed out after {loop.time() - start} seconds. {self.queue.qsize()} tasks remaining")

if self.on_exit == 'complete_priority' and self.priority_tasks:
print(f'Completing {len(self.priority_tasks)} priority tasks...')
self.stop = True
await self.queue.join()

else:
self.cancel()

except asyncio.CancelledError:
print('Tasks cancelled')

finally:
print(f'Exiting queue after {(loop.time() - start)} seconds.'
f'{self.queue.qsize()} tasks remaining, {len(self.priority_tasks)} are priority tasks')

self.cancel()

def cancel(self):
cancelled = [task.cancel() for task in self.tasks if not task.done()]
print(f'Cancelled {len(cancelled)} worker tasks') if cancelled else ...
self.tasks.clear()

The TaskQueue class is a simple task manager for asynchronous queues, allowing for efficient task scheduling and management in an asynchronous environment. It supports both regular FIFO/LIFO queues and priority queues giving it flexibility for various task handling needs. The class handles a set number of worker coroutines that process tasks from the queue and can prioritize tasks that are marked as essential (those that must complete).

Key Features:

  1. Queue Flexibility: By default, the class uses a PriorityQueue, but it can handle other queue types as well. Tasks can be enqueued with priorities when using a PriorityQueue, and these tasks are processed in priority order. Tasks are exppected to be QueueItem objects.
  2. Task Management: It maintains a set of tasks that need to be completed, especially focusing on those marked as high priority (must_complete). The workers continuously process items from the queue, and when a SIGINT (Ctrl+C) signal is received, the class can either complete all remaining priority tasks or cancel everything based on the on_exit setting. It does this by setting the stop attribute to True. When stop is true only tasks with must_complete set to true are allowed to run. The rest are discarded with runing them.
  3. Timeout Handling: The class enforces a timeout to ensure that tasks do not run indefinitely. If the timeout is reached, the class will either cancel the worker tasks running the queue or attempt to finish priority tasks before shutting down, based on the on_exit attribute. Timeout is handled by wrapping queue.join with asyncyio.wait_for .
  4. Worker Coroutines: The workers dequeue tasks and execute them asynchronously. The system can dynamically manage how many workers are running, and in the event of a queue being emptied, the workers will automatically stop after a series of retry attempts.

Usage

  • Adding Tasks: Tasks are added using the add method, which places them in the queue. If using a PriorityQueue, tasks can be added with a specified priority which defaults to three if not specified.
  • Running the Queue: The run method initiates the worker coroutines and processes tasks in the queue for a specified timeout duration. It handles timeouts gracefully and attempts to finish all priority tasks before exiting.
  • SIGINT Handling: On receiving a SIGINT, the sigint_handle`method can either complete all remaining priority tasks or cancel everything, depending on the on_exit attribute. If sigint_handle receives another signal before completing priority tasks, it cancels all tasks and exits irrespective of the on_exit attribute.

Overall, the TaskQueue class provides convenient way for managing asynchronous tasks with built-in priority handling, timeout control, and a robust shutdown mechanism.

Crawling With The TaskQueue

Finally, we are going to apply our wonderful task queue to the problem at hand. Using the already discussed paradigms from our previous tutorial we are going to navigate the Hacker News API, by either walking back from the largest item or starting from a single set of item ids returned by the relevant endpoints.

The AsyncQueue class orchestrates the crawling and data collection from the Hacker News API using asyncio and a TaskQueue to manage concurrent tasks. It manages items and users, ensuring they are processed efficiently and stored in a mock database (DictDB).

Key Components:

  1. Task Management: The TaskQueue handles task prioritization and scheduling, with tasks wrapped in QueueItem objects. This ensures that critical tasks (such as saving data) are prioritized, while other tasks (like fetching related items or users) are processed concurrently.
  2. Data Collection
  • get_user: Fetches user data and adds tasks for processing user submissions.
  • get_item: Fetches individual items (stories, comments, etc.), ensures related data (such as user and parent) is processed, and handles child items (kids)

3. API Traversal:

  • traverse_api: Collects a wide range of stories (e.g., top, new, best) from the API, enqueues them for processing, and executes the tasks using the task queue.
  • walk_back: Starts from the largest item in the API and walks back through a defined number of items, adding each to the task queue for processing.

4. Concurrency and Prioritization: The class leverages the TaskQueue to process tasks asynchronously, ensuring priority tasks like saving critical data (must_complete=True) are completed even during shutdowns or interruptions.

Conclusion

In this tutorial, we successfully built an efficient asynchronous data crawler using asyncio.Queue to manage and prioritize tasks while collecting data from the Hacker News API. By implementing a flexible task management system, we ensured that critical tasks were completed, even during interruptions or timeouts. We also explored how to seamlessly handle different queue types, including PriorityQueue, to enhance control over task execution.

This approach provides a powerful and scalable way to handle large datasets in real-time while maintaining efficiency and system stability. With the techniques and concepts learned here, you can apply similar methodologies to any data collection or asynchronous task management scenario in Python, further optimizing performance in complex workflows.

Thank you for following along! If you’re interested in exploring the code further or contributing to the project, check out the GitHub repository. Your feedback and contributions are greatly appreciated!


Data Crawling With Asyncio — 2 was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.


This content originally appeared on Level Up Coding - Medium and was authored by Ichinga Samuel


Print Share Comment Cite Upload Translate Updates
APA

Ichinga Samuel | Sciencx (2024-09-12T15:11:37+00:00) Data Crawling With Asyncio — 2. Retrieved from https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-2/

MLA
" » Data Crawling With Asyncio — 2." Ichinga Samuel | Sciencx - Thursday September 12, 2024, https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-2/
HARVARD
Ichinga Samuel | Sciencx Thursday September 12, 2024 » Data Crawling With Asyncio — 2., viewed ,<https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-2/>
VANCOUVER
Ichinga Samuel | Sciencx - » Data Crawling With Asyncio — 2. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-2/
CHICAGO
" » Data Crawling With Asyncio — 2." Ichinga Samuel | Sciencx - Accessed . https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-2/
IEEE
" » Data Crawling With Asyncio — 2." Ichinga Samuel | Sciencx [Online]. Available: https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-2/. [Accessed: ]
rf:citation
» Data Crawling With Asyncio — 2 | Ichinga Samuel | Sciencx | https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-2/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.