Data Crawling With Asyncio — 1

Leveraging Structural Concurrency In PythonStructural Concurrency In Python With Asyncio According to pixlr.comIntroductionIn this tutorial, we will walk through the process of building an asynchronous data crawler using Python and the Hacker News (HN)…


This content originally appeared on Level Up Coding - Medium and was authored by Ichinga Samuel

Leveraging Structural Concurrency In Python

Structural Concurrency In Python With Asyncio According to pixlr.com

Introduction

In this tutorial, we will walk through the process of building an asynchronous data crawler using Python and the Hacker News (HN) API. The HN API allows developers to programmatically access a wealth of information, including stories, comments, and user data, through various endpoints. We’ll leverage Python’s asyncio library to efficiently manage multiple requests simultaneously, allowing us to collect data quickly and effectively.

Throughout this tutorial, you’ll learn how to design an API class for interacting with the HN API, implement concurrent data collection using asynchronous programming, handle large volumes of data, and manage program execution with features like timeout control and graceful shutdown. Whether you’re new to working with APIs or looking to deepen your understanding of asynchronous programming in Python, this guide will provide you with practical insights and tools to build scalable data collection systems.

By the end of this tutorial, you will have a working data crawler capable of exploring the vast landscape of the Hacker News site, efficiently navigating through its items and users, and handling real-world challenges such as timeouts and interruptions.

Hacker News API

The Hacker News (HN) API enables developers to programmatically access the site’s data through various endpoints. To facilitate interaction with the API, we’ll begin by creating an API class to manage the HN web requests. This class will include methods corresponding to each HN endpoint. All requests will be handled by a single method using the http.client library, streamlining and simplifying our communication with the API.

The get(self, *, path) method accepts a path string as an argument and appends it to the URL attribute to form a complete API endpoint. To support asynchronous execution, the request function is wrapped within asyncio.to_thread. This method returns a string, an integer, or a dictionary, depending on the endpoint.

Handling Data

The Hacker News API mainly revolves around two core data structures: Item and User. The Item structure is adaptable, with its ‘type’ field indicating whether it represents a story, job, comment, poll, or poll option. For detailed insights into the API’s design, consult the official GitHub repository. In this tutorial, we will exclude the poll and poll option item types. To manage our data, we will define four Python data classes: User, Story, Job, and Comment. Since Story, Job, and Comment are all specific Item types, they will inherit from a base Item class containing the shared fields.

Saving Our Data

In this tutorial, instead of using a real database, we will store the data in a customized dictionary. The mock database, represented as a nested dictionary, is illustrated in the code below. It includes special methods for saving both user and item data. Items are stored according to their type in the internal dictionaries for Story, Comment, Job, or User.

Navigating The API

Our primary goal is to efficiently navigate the API and gather as much data as possible by using asyncio to make multiple concurrent requests. To achieve this, we need to account for the characteristics of Hacker News' two main data structures: Item and User. An Item may link to other items through the kids and parent fields, though not all items contain these fields. Additionally, each Item references the user who created it via the by field. On the other hand, the User data structure lists all items submitted by that user through the submitted field. This interconnectivity allows us to explore thousands of items and users by starting with just one. Our objective is to design our code so that, by accessing one item, we can concurrently access all related items and users. To accomplish this, we will create a class that integrates our existing API and mock database classes, allowing us to efficiently collect and store data.

# async_gather.py
import asyncio

from api import API
from dict_db import DictDB


class AsyncGather:
def __init__(self, db=None):
self.api = API()
self.db = db or DictDB()
self.visited = set()
self.tasks: list[asyncio.Task] = []

async def traverse_item(self, *, item):
try:
if item in self.visited:
return

res = await self.api.get_item(item_id=item)
await self.db.save(data=res)
self.visited.add(item)

# saving user data
user_stories = []
if (by := res.get('by')) and by not in self.visited:
res = await self.api.get_user(user_id=by)
self.visited.add(res['id'])
await self.db.save_user(data=res)
if submissions := res.get('submitted'):
user_stories.extend(asyncio.create_task(self.traverse_item(item=item)) for item in submissions if item not in self.visited)

# saving kids data
if kids := res.get('kids'):
self.tasks.extend(asyncio.create_task(self.traverse_item(item=item)) for item in kids if item not in self.visited)

# saving parent data
if (parent := res.get('parent')) and parent not in self.visited:
self.tasks.append(asyncio.create_task(self.traverse_item(item=parent)))

# include user stories in the tasks list to be executed
self.tasks.extend(user_stories)
except Exception as err:
print(err)

The AsyncGather class is initialized with an instance of the API, a mock database (DictDB) instance, a set to track visited items, and a list to manage asynchronous tasks. The traverse_item method is an asynchronous function that accepts an item as input and performs a series of operations. It first checks whether the item has already been visited to prevent duplicate processing. If the item is new, the method fetches its data from the API, saves it to the database, and adds the item to the visited set. It then handles related data, such as user details, child items (referred to as "kids"), and parent items. For each of these, it creates new asynchronous tasks to traverse the associated items and gathers them with asyncio.gather.

Gathering It All Together

Now that we can navigate individual items, the next question is how to obtain the initial item(s). There are two approaches I want us to explore. The first approach is to use the API endpoints that return a list of items, such as /beststories, /topstories, /newstories, /jobstories, /showstories, and /askstories. This method is demonstrated in the traverse_api function, as shown in the code below.

    async def traverse_api(self):
s, j, n, t, a, b = await asyncio.gather(self.api.show_stories(), self.api.job_stories(), self.api.new_stories(),
self.api.top_stories(), self.api.ask_stories(), self.api.best_stories())
# unite in a set to avoid duplicate items.
stories = set(s) | set(j) | set(t) | set(a) | set(b) | set(n)
print(f"Total stories: {len(stories)}")
start = asyncio.get_running_loop().time()

try:
self.tasks.extend(asyncio.create_task(self.traverse_item(item=story)) for story in stories)
await asyncio.gather(*self.tasks)

except Exception as _:
print('Cancelled')

finally:
print(f"Made {len(self.visited)} API calls. Saved {len(self.db)} items"
f" in {asyncio.get_running_loop().time() - start} seconds")
print(self.db)

In the traverse_api method, we use asyncio.gather to simultaneously call all the endpoints that return lists of items. These lists are combined into a set to eliminate duplicate entries. Afterward, asyncio.gather is used again to invoke the traverse_item method for each unique item in the set.

Executing this code may result in prolonged runtimes due to the vast amount of data being processed. In some cases, it could end up traversing every item and user ever posted to the Hacker News API, which may not be the intended behaviour. Furthermore, if the program is forcibly interrupted (e.g., via a SIGINT signal), we lack persistent storage, meaning there will be no clear record of the data that has been processed up to that point. To mitigate this, two control mechanisms can be implemented: first, handling SIGINT signals to ensure a graceful shutdown and the preservation of progress upon interruption; and second, employing timing primitives to enforce a fixed runtime, after which the program will terminate execution and provide feedback on the progress made during that interval.

We will modify the traverse_api method by wrapping the main asyncio.gather task, which handles multiple items, with the asyncio.wait_for function. The asyncio.wait_for function takes two arguments: a coroutine and a timeout value in seconds. If the timeout is set to None, the coroutine will run until completion; otherwise, it will run until the specified timeout is reached, at which point the task will be cancelled and a TimeoutError will be raised. We will handle this exception with a try/exceptblock. Additionally, we will incorporate a SIGINT handler that cancels all running tasks stored in the class's tasks attribute when a SIGINT signal is detected. This will ensure that the main task is cancelled, allowing for a controlled shutdown of the program. This updated version of traverse_api includes an optional timeout parameter, which defaults to 60 seconds. Running this code with the default timeout parameter will give a similar output to that shown below.

    async def traverse_api(self, timeout=60):
s, j, n, t, a, b = await asyncio.gather(self.api.show_stories(), self.api.job_stories(), self.api.new_stories(),
self.api.top_stories(), self.api.ask_stories(), self.api.best_stories())
stories = set(s) | set(j) | set(t) | set(a) | set(b) | set(n)
print(f"Traversing {len(stories)} stories")
loop = asyncio.get_running_loop()
start = loop.time()

try:
self.tasks = [asyncio.create_task(self.traverse_item(item=story)) for story in stories]
task = asyncio.gather(*self.tasks)
await asyncio.wait_for(task, timeout)

except TimeoutError as _:
print('Timed out')

except asyncio.CancelledError as _:
print('Tasks Cancelled')

finally:
print(f"Made {len(self.visited)} API calls and "
f"saved {len(self.db)} items in {loop.time() - start:.2f} seconds.")
print(self.db)
traverse_api

A Walk Back In Time

Another approach to navigating the Hacker News API is to start from the /maxitem endpoint, which returns the current largest item ID. From there, you can work backwards through the items until reaching the desired stopping point, using the traverse_item method to explore each encountered item. The walk_back method is shown below.

    async def walk_back(self, *, amount: int = 1000, timeout: int = 60):
largest = await self.api.max_item()
print(f"Walking back from item {largest} to {largest - amount}")
loop = asyncio.get_running_loop()
start = loop.time()

try:
self.tasks = [asyncio.create_task(self.traverse_item(item=item)) for item in
range(largest, largest - amount, -1)]
for task in asyncio.as_completed(self.tasks, timeout=timeout):
try:
await task
except asyncio.CancelledError as _:
...
else:
print('Timed out or completed')

except Exception as exe:
print(f"Error: {exe}")

finally:
print(f"Made {len(self.visited)} API calls in"
f" {loop.time() - start:.2f} seconds")
print(self.db)

This method walks backwards from the largest item on the Hacker News API and processes a specified number of items (defaulting to 1,000). It also includes a timeout (defaulting to 60 seconds) to limit how long the task runs. It uses asyncio.as_completed to run the tasks concurrently, allowing the program to process each task as soon as it finishes, rather than waiting for all tasks to complete. This helps maximize efficiency by processing tasks as they complete, one by one, in whatever order they are finished.

Conclusion.

In this tutorial, we’ve built an asynchronous data crawler using Python to interact with the Hacker News API. We designed an API class to streamline API requests, implemented efficient data traversal with asyncio, and handled large-scale data collection through various techniques such as concurrent task execution and timeout management. Additionally, we learned how to manage graceful shutdowns using SIGINT handlers and control execution time with timing primitives.

While we’ve explored the use of asyncio.gather and concurrent task execution to handle data collection, another powerful and flexible approach is to utilize asyncio.Queue. This method allows for better control over task distribution, prioritization, and flow management when handling large volumes of data. In another tutorial, we shall see how this can be achieved.

Thank You 🙏

Thank you for taking the time to explore and use this Hacker News Crawler project! Your interest and support mean a lot. I hope you find this tool useful in your data crawling and exploration endeavors. If you have any suggestions, feedback, or run into any issues, feel free to reach out or contribute to the project. Stay curious and keep building! Visit the GitHub Repository


Data Crawling With Asyncio — 1 was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.


This content originally appeared on Level Up Coding - Medium and was authored by Ichinga Samuel


Print Share Comment Cite Upload Translate Updates
APA

Ichinga Samuel | Sciencx (2024-09-12T15:11:40+00:00) Data Crawling With Asyncio — 1. Retrieved from https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-1/

MLA
" » Data Crawling With Asyncio — 1." Ichinga Samuel | Sciencx - Thursday September 12, 2024, https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-1/
HARVARD
Ichinga Samuel | Sciencx Thursday September 12, 2024 » Data Crawling With Asyncio — 1., viewed ,<https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-1/>
VANCOUVER
Ichinga Samuel | Sciencx - » Data Crawling With Asyncio — 1. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-1/
CHICAGO
" » Data Crawling With Asyncio — 1." Ichinga Samuel | Sciencx - Accessed . https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-1/
IEEE
" » Data Crawling With Asyncio — 1." Ichinga Samuel | Sciencx [Online]. Available: https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-1/. [Accessed: ]
rf:citation
» Data Crawling With Asyncio — 1 | Ichinga Samuel | Sciencx | https://www.scien.cx/2024/09/12/data-crawling-with-asyncio-1/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.