The 'selectExpr' attribute cannot be found in the 'NoneType' object

import logging
import findspark
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType



def setup_keyspace(session):
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS my_spark_data
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)

    print("Keyspace set up successfully!")


def setup_table(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS my_spark_data.stream_users (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT);
    """)

    print("Table created successfully!")


def add_data(session, **kwargs):
    print("inserting data...")

    user_id = kwargs.get('id')
    first_name = kwargs.get('first_name')
    last_name = kwargs.get('last_name')
    gender = kwargs.get('gender')
    address = kwargs.get('address')
    postcode = kwargs.get('post_code')
    email = kwargs.get('email')
    username = kwargs.get('username')
    dob = kwargs.get('dob')
    registered_date = kwargs.get('registered_date')
    phone = kwargs.get('phone')
    picture = kwargs.get('picture')

    try:
        session.execute("""
            INSERT INTO my_spark_data.stream_users(id, first_name, last_name, gender, address, 
                post_code, email, username, dob, registered_date, phone, picture)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (user_id, first_name, last_name, gender, address,
              postcode, email, username, dob, registered_date, phone, picture))
        logging.info(f"Data inserted for {first_name} {last_name}")

    except Exception as e:
        logging.error(f'could not insert data due to {e}')


def establish_spark_connection():
    s_conn = None

    try:
        s_conn = SparkSession.builder \
            .appName('StreamingProcessing') \
            .config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1,"
                                           "org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1") \
            .config('spark.cassandra.connection.host', 'localhost') \
            .getOrCreate()

        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection established successfully!")
    except Exception as e:
        logging.error(f"Failed to create the spark session due to exception {e}")

    return s_conn


def link_to_kafka(spark_conn):
    spark_df = None
    try:
        spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', 'localhost:9092') \
            .option('subscribe', 'users_streamed') \
            .option('startingOffsets', 'earliest') \
            .load()
        logging.info("kafka dataframe linked successfully")
    except Exception as e:
        logging.warning(f"unable to link kafka dataframe because: {e}")

    return spark_df


def connect_cassandra():
    try:
        # connecting to the cassandra cluster
        cluster = Cluster(['localhost:9042'])

        cas_session = cluster.connect()

        return cas_session
    except Exception as e:
        logging.error(f"Could not establish cassandra connection due to {e}")
        return None


def prepare_selection_df_from_kafka(spark_df):
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])

    sel = spark_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")
    print(sel)

    return sel


if __name__ == "__main__":
    # establish spark connection
    spark_conn = establish_spark_connection()

    if spark_conn is not None:
        # link to kafka with spark connection
        spark_df = link_to_kafka(spark_conn)
        selection_df = prepare_selection_df_from_kafka(spark_df)
        session = connect_cassandra()

        if session is not None:
            setup_keyspace(session)
            setup_table(session)

            logging.info("Start streaming...")

            streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'my_spark_data')
                               .option('table', 'stream_users')
                               .start())

            streaming_query.awaitTermination()

I encountered an issue when running " spark-submit --master spark://localhost:7077 .\spark_stream.py"

A worker is created in the Spark UI but it doesn't fetch the data coming from Kafka.

I am using Airflow to trigger a DAG from an API to stream data into Kafka, process it in Spark, and save it into Cassandra.

The problem seems to be with communication between Spark and Kafka, even though each component works fine individually.

Answer №1

It appears that the 'selectExpr' attribute is not available for a 'NoneType' object.

This error indicates that when using x.selectExpr, the variable x is actually set to None.

Within your code, specifically inside the function create_selection_df_from_kafka, you have the following line:

spark_df.selectExpr("CAST(value AS STRING)")

This suggests that the spark_df argument being passed in is actually None.

Additionally, within the global scope of your code, inside the if __name__ == '__main__': block, you make a call to:

selection_df = create_selection_df_from_kafka(spark_df)

Subsequently, the spark_df variable within this context is also found to be None.

Prior to this, you initialize the spark_df variable with the result of:

spark_df = connect_to_kafka(spark_conn)

Here as well, it seems that calling connect_to_kafka(spark_conn) returns a value of None.

Examining the connect_to_kafka function itself, the return statement is simply:

return spark_df

However, there is an instance where spark_df may not be assigned properly, particularly within the except-block of a try-except clause. This specific scenario needs to be addressed.

Answer №2

It seems like we are both taking the same course because I have also run into this error. While working on the create_selection_df_from_kafka function, I encountered the following issue within the try-catch clause:

WARNING:root:kafka dataframe could not be created because: An error occurred while calling o36.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less

This problem arises due to missing jar files and leads to compilation errors. After some investigation, I identified three essential packages that were absent:

  • spark-cassandra-connector
  • spark-sql-kafka
  • kafka-clients

To address this, consider the following steps:

  1. Review the versions of Kafka, Spark, Cassandra, and Scala specified in the docker-compose.yml file.

  2. Locate the correct jar package versions for Spark Session configuration from Maven and download them.

For instance, my setup involves Spark 3.5.1, Cassandra 5.0, Confluent Platform 7.6.0, compatible with Kafka 3.6.x. Therefore, I obtained the appropriate jar files for these packages:

  • spark-cassandra-connector_2.12:3.5.0
  • spark-sql-kafka-0-10_2.12:3.5.1
  • kafka-clients:3.7.0
  1. Navigate to the

    venv/lib/python3.11/site-packages/pyspark/jars
    directory and include the three jar files in the jars folder.

  2. Rerun your spark-submit command, and the issue should be resolved.

Additional resources:

Similar questions

If you have not found the answer to your question or you are interested in this topic, then look at other similar questions below or use the search

generate various instances in model serializer generate function

I have created a DRF API View in my Django project to manage Withdraw records. Below is the implementation: class WithdrawListCreateAPIView(PartnerAware, WithdrawQuerySetViewMixin, generics.ListCreateAPIView): permission_classes = (TokenMatchesOASRequi ...

Setting up your YAML configuration to utilize both PHP and Python with AJAX in a unified project on App Engine

Here is my project idea https://i.stack.imgur.com/oGOam.jpg $.ajax({ url: '../server/python/python_file.py', dataType: 'json', type: 'POST', success:function(data) { //Perform additional AJAX requests using PHP f ...

Utilizing Cross-Validation post feature transformation: A comprehensive guide

My dataset contains a mix of categorical and non-categorical values. To handle this, I used OneHotEncoder for the categorical values and StandardScaler for the continuous values. transformerVectoriser = ColumnTransformer(transformers=[('Vector Cat&apo ...

Display the information from every column in a pandas dataframe

My file is called `params.csv` which I loaded into a pandas `dataframe` using the following code in `ipython qtconsole`: import pandas paramdata = pandas.read_csv('params.csv', names=paramnames) The `paramnames` variable is a list of string obj ...

Is there a way to overlay TextItems above candlestick charts in pyqtgraph?

My goal is to visualize candlesticks with TextItems displaying every price level for each candle. I took inspiration from the 'custom graphics' example and made modifications to the CandlestickItem method as shown below: class CandlestickItem(p ...

Extracting Features: 0% complete, still in progress indefinitely... using Python for a deep learning project comparing dogs vs. cats

import cv2 import numpy as np import os from random import shuffle from tqdm import tqdm ​ TRAIN_DIR = '/home/ajmal/Dogs vs cat/train' TEST_DIR = '/home/ajmal/Dogs vs cat/test' IMG_SIZE = 50 LR = 1e-3 CNN = 'dogsvscat ...

Identifying analogous terms within rasa_nlu

My setup is Rasa NLU with Python 3.6.5 on macOS High Sierra. I got the tutorial working but struggling with synonyms integration. This snippet from my training file, first-model.md: ## intent:select - what is the [max](operator) rating? ## synonym:max - ...

Python : Grep causing UnicodeEncodeError in my code

In my program written in Python, I am utilizing a basic script to retrieve reservation results for a specific CID using the file simple.py: data = {"minorRev":"current minorRev #","cid":"xxx","apiKey":"xxx","customerIpAddress":" ","creationDateStart":"03 ...

When saving data to a CSV file in Python, the information is separated by commas, ensuring each character is properly recorded

Trying to read data from a CSV file and then write it to another one. However, the written data is separated by commas. Below is the code snippet: with open(filenameInput, 'r') as csvfile: dfi = pd.read_csv (filenameInput) dfi - dfi.ilo ...

Tips for extracting file paths correctly from the tkinterDND2 event object

Looking to add a "drop files here" feature to my GUI application built with tkinter in Python. I discovered the TkinterDnD2 library after finding this useful Stack Overflow response. After dropping a file, event.data returns the file name within curly br ...

Using Python with Selenium to interact with a drop-down menu element within a

I am a beginner in the field of selenium and I'm stuck on how to select an item from a drop down menu. Despite my research efforts, I have not been able to find a solution. <div class="Select-value"> <span class="Select-value-label" role="o ...

Exploring JSON Object Traversal

Currently, I am facing an issue with the parsing of JSON data. My code looks like this: url = f"https://www.twitter.com/search?q={hashtag}" response = requests.get(url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ( ...

"Resolving Python errors on a Linux platform with Twilio

I am encountering some errors with my Raspberry Pi running Raspbian when trying to run a Python application with Selenium. Here is the application code snippet: from selenium import webdriver from selenium.webdriver.common.keys import Keys from time impo ...

Transferring cookie data between requests in CrawlSpider

My current project involves scraping a bridge website to gather data from recent tournaments. I have previously asked for help on this issue here. Thanks to assistance from @alecxe, the scraper now successfully logs in while rendering JavaScript with Phant ...

PythonAnywhere completed the task of copying 0 static files using the command `python manage.py collectstatic`. Out of those, 175 were

What could be causing the issue with my static files not loading properly? CMD /PruebaApettito.FINAL/PruebaApettito.FINAL-master/Prueba2/proyectoapettito (master)$ python manage.py collectstatic You are trying to collect static files at a specific desti ...

Excluding specific elements using BeautifulSoup 4 in Python

Attempting to eliminate (rather than extract) data from a span. Here is the HTML snippet: <li><span>Type:</span> Cardiac Ultrasound</li> This is the code being used: item_description_infos = listing_soup.find(class_='item_desc ...

Failure during the installation of a Python package due to an ImportError

I followed the installation guide for django-wiki from However, when I run 'python manage.py migrate', I encounter this error: Error message here... Strangely, when I import it using the python shell... >>> from html5lib.constants im ...

Using an integer variable to iterate a string within a function in Python

Being a complete novice, I've tried delving into several related topics but I just can't seem to grasp it. My goal is to write a function that will iterate through the string s exactly "n" times. s="hello" n=2 When I use s[::n] it works fine ...

Updating a div element dynamically using AJAX in DJANGO

Currently in the process of developing a chat application. Using jquery $.post() to add chat messages has been successful thus far. My next step is to fetch the most recent chat message from the table and update the list on the chat page. As a beginner in ...

Tips on personalizing the BROWSERSTACK_BUILD_NAME within Jenkins

Currently, I am in the process of incorporating BrowserStack into my Selenium Python framework and utilizing Jenkins for running tests. However, I am encountering difficulties when trying to personalize the build name on the BrowserStack dashboard. ...