When utilizing the Billiard Multiprocessing package to insert data into a MySQL database, the data is triplicated in the process

I have a task in Airflow where I am loading data into a table. The process goes like this:

  • query a database -> retrieve results as pandas data frame -> send the result set to multiple worker processes -> each worker process processes the rows and loads data into a different database.

Below is a simplified version of the DAG file:

import process
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator

LOADING = PythonOperator(
            task_id='LOADING',
            python_callable=process,
            op_kwargs={
                'source_DB': MySqlHook(mysql_conn_id='source_DB'),
                'destination_DB': MySqlHook(mysql_conn_id='destination_DB')
            },
            dag=dag,
        )

start >> LOADING >> end

This is the code for the task:

import os
import logging
import billiard as mp

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def process(source_DB, destination_DB):

    get_data = open(f"{CUR_DIR}/path/to/get_data.sql").read()

    data = source_DB.get_pandas_df(
        sql=get_data,
        parameters={}
    )

    with mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(destination_DB,)) as pool:
        items = [(idx, row) for idx, row in data.iterrows()]
        pool.map(load_data, items)


def init_worker(destination_DB):
    global conn
    conn = destination_DB.get_conn()


def load_data(args):

    index, data = args
    insert_sql = open(
        f"{CUR_DIR}/path/to/insert.sql").read()

    conn.autocommit(True)
    destination_DB_cur = conn.cursor()

    params = {
        'para1': data['para1'],
        'para2': data['para2']
    }
    for word, replacement in params.items():
        insert_sql = insert_sql.replace(
            '{{' + str(word) + '}}', str(replacement))

    try:
        destination_DB_cur.execute(insert_sql)
    except Exception as e:
        print(e)
    destination_DB_cur.close()

The job runs without any errors, but sometimes the loaded data gets duplicated three times. After doing some research, I found conflicting advice - some attribute it to the billiard library while others suggest using connection pooling for synchronization and coordination. Can anyone provide insight into this issue and suggest how to prevent the duplication from happening?

Answer №1

  • Data duplication can arise when multiple processes are inserting data into the data warehouse at the same time without proper coordination.
  • One potential cause of this issue is that each worker process in the multiprocessing pool creates its own connection to the data warehouse and inserts data independently, leading to concurrent inserts and possible duplication.
  • Consider implementing a connection pool
  • It may be worth exploring alternative multiprocessing libraries: While billiard is based on the multiprocessing library, there might be subtle differences. Switching back to the standard multiprocessing library could potentially resolve the data duplication problem.

Answer №2

Add a primary key or unique index to a table for data integrity.

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

Generate a NumPy array by assigning names from a given list

I am struggling to create multiple NumPy arrays based on a list of names. For instance, I have the following list: k=["one","two","three"] I want to generate three arrays with names corresponding to each element in the list. This means: one=np.array() t ...

Retrieve the hostnames listed in every line of the /etc/hosts file using Python

I currently have a cronjob set up that uses the AWS SDK in PHP to update the /etc/hosts file by adding the current EC2 private IP address along with a user-friendly hostname for each server. Now, I am attempting to achieve the same functionality in Python ...

Running Python on a Mac-based web server

I've got PHP and Perl up and running smoothly on my machine at the moment. Now, I'm looking to develop Python websites as well. While I have Python installed, I've run into some hurdles trying to get a simple "Hello World" message to display ...

Send a variety of arguments as a list to a function

I'm attempting to feed a series of arguments into a function I created. def pdftotext(params=[], layout='-layout'): cmd = ['pdftotext', params, '-layout'] return cmd This is how the function is invoked: text = ...

PHP MySQL outputting data in a dropdown menu format

Struggling to extract data from a database and display it as a dropdown list. I am fetching the foreign key, its unique code, and its name/title to be displayed in a dropdown list for input into a new table. $sql = "SELECT quali_code, title FROM ...

What is the process for extracting the background using cv2.BackgroundSubtractorMOG2?

Is it possible to extract the background using cv2.BackgroundSubtractorMOG2 in Python? In simpler terms, is there a method to generate an image by analyzing the past n frames of a video that can serve as the background? ...

Alternating between minimum and maximum based on user input

I'm currently facing an issue with my code where the min filtering works on element 1, but the max filtering works on element 2. Does anyone have suggestions for a more efficient way to handle this? from operator import itemgetter data = [['A&ap ...

Error message: Unable to locate Sdl2-config when trying to install pygame_sdl2

Having some trouble while attempting to set up pygame_sdl2 on my system. Each time I run the command: python setup.py install I encounter this error message: sh:1:sdl2-config not found Apart from that, I am also getting an error like this: subprocess.Cal ...

Extracting information from Trading View with the help of Selenium

I'm facing a challenge while attempting to web scrape data from a trading view chart using an infinite loop – I keep encountering the StaleElementReferenceException error. Despite my efforts to address this by making the program wait explicitly, I r ...

What is the most effective approach for thwarting XSS attacks?

What's the best approach for preventing XSS attacks? I'm currently working on some code. Will this line of code effectively protect against XSS attacks, or should each element be parsed with 'strip_tags'? Any suggestions would be great ...

Using Jupyter Notebook to Connect with Google Cloud API

Running Jupyter notebooks locally through Docker (jupyter/scipy-notebook) has been my go-to method. Typically, I have stored API credentials in the .env file located with the Dockerfile. However, as I dive into using Google Cloud, I am being provided priv ...

Is it possible to invoke a Python local function from an HTML document?

After creating a Python file with multiple functions, I am now working on designing a web page where I aim to trigger one of the functions mentioned earlier by clicking a button. However, I am unsure about how to go about this process. Is there anyone who ...

StringIO and pystache are known to produce unexpected null characters

After rendering a mustache file into a string, I attempted to process it with the csv module. To do this, I created a file-like interface using StringIO. However, when using the csv module, I encountered the following error: _csv.Error: line contains NULL ...

Encountering an issue with the PHP login page

Currently, I am working through a tutorial at mybringback. In particular, I am examining the code in register.php v.02 and have set up a Wamp server. However, when attempting to run the code, I encounter an error. I have made adjustments to $cfg['Ser ...

Encountering a NameError for a defined variable: confusing and frustrating

Currently, I am in the process of developing a basic survey by establishing a class and generating an instance of it in a separate file. Unfortunately, I have encountered an issue where I receive an error indicating that my 'question' variable is ...

MySQL Query Running Exceptionally Slow

Dealing with a database structure that is less than ideal. Table1 has 2 Ids (id1, id2) which link to rows in Table2. Want to retrieve specific columns from Table1 using id1 and id2 to get data from Table2. Is it necessary to join the same table twice to ...

A guide on transferring JSON data from a Django view to Vue.js instance methods

Looking for assistance with Vue.js - I'm new to it. How can JSON data be passed from a Django view to a Vue instance (method)? Views.py def articles(request): model = News.objects.all() # getting News objects list random_generator = rando ...

Tips for accessing the topmost heading in the Tkinter Python treeview

After working diligently on this code, I have come up with the following structure: def fee_foo(): fee_screen = tk.Tk() fee_screen.title("Fee Information") fee_screen.geometry("500x500") # Running the SELECT statement and ...

Batch files in PATH require a .bat extension to be executed in Windows Server 2012, CMD, and Anaconda CMD

My system is running on Windows Server 2012. I recently installed Anaconda on my machines, and it has automatically added the necessary paths to the PATH variable: C:\Users\user1\tools\Anaconda3;C:\Users\user1\tools&bso ...

Having difficulty with installing python-geohash using PIP on Mac command line

Struggling with the installation of python-geocode on my Mac running Anaconda 10.14.4. Every time I try using: pip install python-geohash An error pops up: warning: include path for stdlibc++ headers not found; pass '-stdlib=libc++' to use ...