In the world of Python and Trio, where producers also double as consumers, the question arises: how can one elegantly exit when

My goal is to create a basic web crawler using trio and asks. I am utilizing a nursery to launch multiple crawlers simultaneously, and a memory channel to store a list of urls to be visited.

Each crawler is given copies of both ends of the channel so they can retrieve a url (using receive_channel), read its content, discover new urls to visit, and add them back to the channel (via send_channel).

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

In this scenario, the consumers actually act as producers. How can everything be stopped gracefully?

The criteria for shutting down all processes are:

  • The channel must be empty,
  • AND
  • All crawlers must be stuck at the first for loop, waiting for a url that will never appear in receive_channel.

I have attempted using async with send_channel within crawler(), but have not come up with an effective solution. Other approaches, such as implementing a worker pool bound to a memory channel, have also been unsuccessful.

Answer №1

It appears that there are a couple of issues here.

Firstly, the assumption about stopping when the channel is empty is problematic. Since the memory channel is allocated with a size of 0, it will always be empty. The handoff of a URL can only occur if a crawler is prepared to receive it.

This leads to the second issue. If you happen to discover more URLs than the number of crawlers you have allocated, the application will end up deadlocked.

The reason for this is that if you cannot pass off all the found URLs to a crawler, the crawler will never be ready to receive a new URL to crawl because it is stuck waiting for another crawler to take one of its URLs.

Furthermore, if other crawlers find new URLs, they too will become stuck behind the crawler that is already awaiting to pass off its URLs and they will not be able to process any of the URLs waiting in line.

You can refer to this excerpt from the documentation for additional context:

Assuming these issues are addressed, where should we focus on next?

It might be necessary to maintain a list or set of visited URLs to prevent revisiting them.

To determine when to halt the process, instead of closing the channels, it could be simpler to cancel the nursery altogether.

If we alter the main loop like so:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(3) # Number of workers
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            while True:
                await trio.sleep(1) # Allow the workers to initialize.
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    nursery.cancel_scope.cancel() # All tasks completed!

We must then adjust the crawler functions slightly to ensure they consume tokens appropriately.

async def crawler(active_workers, send_channel, receive_channel):
    async for url in receive_channel:  # Acting as a consumer!
        with active_workers:
            content = await ...
            urls_found = ...
            for u in urls_found:
                await send_channel.send(u)  # Also producing!

Other factors to take into account -

In the crawler function, using send_channel.send_noblock(u) may be advisable. Because the buffer is unbounded, there is no risk of encountering a WouldBlock exception. This behavior ensures that a given URL is fully processed and all new URLs are added before other tasks or the parent task attempts to proceed further.

Answer №2

After spending some time reorganizing the problem, I devised a solution that works like this:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
 
    limit = trio.CapacityLimiter(3)

    async with send_channel:
        await send_channel.send(('https://start-url', send_channel.clone()))
    #HERE1

    async with trio.open_nursery() as nursery:
        async for url, send_channel in receive_channel:  #HERE3
            nursery.start(consumer, url, send_channel, limit)

async def crawler(url, send_channel, limit, task_status):
    async with limit, send_channel:
        content = await ...
        links = ...
        for link in links:
            await send_channel.send((link, send_channel.clone()))
    #HERE2

In the process, I omitted handling visited URLs.

This approach allows for up to three active consumers at any given time, depending on the workload available. At #HERE1, the primary send_channel is closed since it was used within a context manager. The channel only remains active through its clone inside.

When we reach #HERE2, the clone is also closed due to the context manager. If there are no more items in the channel, then the last remaining clone keeping it alive will also disappear. This results in the end of the for loop (#HERE3).

However, if new URLs are discovered during processing, they are added back to the channel along with additional clones of send_channel, ensuring the channel's longevity until all tasks are complete.

I find both my solution and Anders E. Andersen’s methods somewhat unconventional: one involves using sleep and statistics(), while the other relies on creating clones of send_channel within the channel itself—a bit like a software rendition of a Klein bottle. I may explore alternative strategies moving forward.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Python is not triggering the custom object_hook function in JSON parsing

I have created a custom hook in Python that successfully works when the input string is a valid dictionary, but it fails to work with a valid list. For example, [1,2] is a valid JSON format. However, when I pass this to json.loads along with my custom hoo ...

Error with Drive API: JSON quickstart file not found in Python

My experience with using the Google Drive API has hit a roadblock. Despite following every step of the Python quickstart tutorial provided by Google, I encountered an error related to a missing file. You can find the tutorial here. from __future__ import ...

ChromeDriver fails to open a new page when using the chrome_options parameter

When attempting to open a new page using ChromeDriver, I am using the following code: import selenium from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as E ...

Failed to generate a seamless transition between the files

I've been attempting to generate a gradient effect by utilizing two files and an import statement, but I'm consistently encountering an error. Let's take a look at the files. backrounds.py # Keigen Sheppard, CS 115, Autumn 2021 # Programmin ...

Remove all duplicate lists from JSON data

Currently seeking more information about a particular issue I am facing. I have already explored JSON encoding/decoding, but it did not provide the exact solution I need. I am looking for an efficient way to extract data from a list like this: //response ...

Spider login page

My attempt to automate a log in form using Scrapy's formrequest method is running into some issues. The website I am working with does not have a simple HTML form "fieldset" containing separate "divs" for the username and password fields. I need to id ...

How to interact with a menu featuring an SVG tag by automating it with Selenium

I’m having trouble clicking on a menu button with Selenium on the website . None of my attempts, including using WebDriverWait, seem to work. Can someone please advise me on how I can successfully click on this button using Selenium? <div class=&quo ...

Using Python and Selenium to automate web browsing, you can open multiple tabs in

I am experimenting with a Python script using Selenium to open multiple chrome tabs. I have created a loop as shown below, but it seems to stop at 9 windows when I substitute ""url"" with the actual URL pointing to a video. Could there be an is ...

Iterate through and append to a list

My current goal is to create a list of URLs named visit_urls that I need to visit. To begin with, I manually provide the first URL to be visited using self.br.get(url). By determining the number of pages on the website, let's say it has 40 pages, I ca ...

The Python tkmacosx is throwing an error related to the "systemWindowBackgroundColor" property

Currently, I am facing a challenge with changing the foreground color of a button on my Mac running Big Sur. I have attempted to resolve this by installing tkmacosx, but it seems there may be an issue with the package. Whenever I run the code below or att ...

Issue with XLWings when attempting to close application following a copying action

While running xlwings versions 0.26.1 (the latest for Anaconda 3.83) or 0.10.0 (used for compatibility reasons) with the most recent Office 365 Excel, an error occurs after moving a sheet and executing app.quit(): import xlwings as xw import pythoncom pyt ...

An issue arises in Django where a particular field fails to display the accurate value when presented in a template

models.py The Order model in models.py defines various fields such as user, customer, card_id, mobile, email, total_price, payment_method, status, take_method, points_earned, date, and address. It also includes preset choices for payment options, order sta ...

Troubleshooting Issue: Selenium Python ActionChain not functioning due to element being unresponsive

I'm currently stuck on an issue with ActionChains. Despite my best efforts, I can't seem to get the mouse to move as intended. Any ideas or suggestions would be greatly appreciated. from selenium import webdriver from selenium.webdriver.common.ac ...

For some odd reason, my stack is missing the "top" attribute

import random value = { "Two":2,"Three":3, "Four":4, "Five":5, "Six":6, "Seven":7, "Eight":8, "Nine":9, "Ten":10, "Jack":10, "Queen&qu ...

Developing a multithreaded approach to locating elements using Selenium

Looking to optimize my script's performance, I decided to implement multithreading for the more time-consuming parts, particularly the locator calls. However, I encountered "CannotSendRequest" and "ResponseNotReady" exceptions from the threads. Is th ...

Is it possible to randomly generate an arithmetic operator and ensure its validity?

Attempting to generate a list of arithmetic operations randomly, taking into account the operators. The goal is to introduce randomness by adding parentheses as an operator on both sides. For example:- (2) + (51) = However, when I try to implement this in ...

The PyQT5 ui file fails to load correctly when running the executable

Currently, I am in the process of developing a PyQt5 application using the designer to create interfaces and then exporting them as .ui files. These files are subsequently loaded by my primary class. Below is an excerpt from my code snippet, named main.py: ...

Exiting early from a complete test suite in Pytest based on conditions

I am managing a parameterized pytest test suite where each parameter represents a specific website, and the automation is done using Selenium. With numerous tests in total once parameters are taken into account, they all run one after another. However, th ...

determining the minimum number of routers required to establish connectivity with a group of buildings

Given a range of n, two lists of numbers x = [2,4,5,6,7,9,11,12] and y = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14], the task is to determine the minimum number of routers needed to cover all buildings in list x. Each router has a range of n and can reach consec ...

What is the process for utilizing the pd.DataFrame method to generate three columns instead of two?

After successfully creating a dataframe with two columns using the pd.DataFrame method, I am curious if it is possible to modify the method to accommodate three columns instead. quantities = dict() quotes = dict() for index, row in df.iterrows(): # ...