Category: Scraping basics

Web Scraping With Python | Advanced Guide

13 mins read Created Date: October 03, 2024   Updated Date: October 04, 2024

This guide dives straight into actionable solutions for advanced web scraping in Python, focusing on efficiency, scalability, and overcoming common challenges. We’ll leverage Python’s rich ecosystem of libraries and tools to scrape data from websites which actively employ anti-scraping techniques such as CAPTCHAs, IP blocking, and dynamic content loading. Additionally, we’ll demonstrate how Scrape.do can simplify and enhance your scraping workflows.

Setting Up the Python Environment for Advanced Web Scraping

To efficiently manage multiple projects, we recommend using Python 3.x and managing environments with tools like pyenv or conda. Here’s how to get started:

# Install pyenv for environment management
curl https://pyenv.run | bash

# Install a specific version of Python (e.g., 3.9.7)
pyenv install 3.9.7
pyenv global 3.9.7

# Create a virtual environment with venv or conda
python -m venv scraping-env
source scraping-env/bin/activate

Essential Libraries for Web Scraping

To handle a variety of scraping tasks, you’ll need to install several powerful libraries:

  • requests: For simple HTTP requests.
  • httpx: Supports asynchronous HTTP requests.
  • beautifulsoup4: Ideal for parsing HTML content.
  • lxml: Faster and more memory-efficient for parsing with XPath.
  • selenium: Handles dynamic content and JavaScript-rendered pages.
  • playwright: A modern alternative to Selenium for faster, headless scraping.

Command:

pip install requests httpx beautifulsoup4 lxml selenium playwright scrapy scrapy-rotating-proxies

Parsing HTML: Best Practices

Efficient Parsing with BeautifulSoup and lxml

For parsing HTML, lxml is generally more efficient than BeautifulSoup’s default parser, especially for large-scale scraping. Here’s a quick comparison:

from bs4 import BeautifulSoup
from lxml import etree

html_content = "<html><body><h1>Hello World!</h1></body></html>"

# BeautifulSoup with default parser
soup = BeautifulSoup(html_content, 'html.parser')

# Parsing with lxml for speed and efficiency
parser = etree.HTMLParser()
tree = etree.fromstring(html_content, parser=parser)

Handling Complex Page Structures with XPath

XPath is a powerful tool that allows precise selection of elements within deeply nested HTML structures, making it highly effective for scraping data from complex pages. Here’s a step-by-step example that demonstrates how to find the last item of a class and then check its siblings, selecting the middle item.

# Using XPath to locate a specific node
elements = tree.xpath('//h1/text()')

The above command extracts all h1 text elements from the document. XPath can be expanded to handle more complex queries, such as locating elements by their position or class.

Comprehensive Example Using lxml and XPath

This example finds the last element with a specific class and examines its siblings, showing how you can navigate element hierarchies:

from lxml import html

# Sample HTML content
html_content = '''
<div class="container">
  <div class="item">Item 1</div>
  <div class="item">Item 2</div>
  <div class="item">Item 3</div>
  <div class="item">Item 4</div>
  <div class="item">Item 5</div>
</div>
'''

# Parse the HTML content using lxml
tree = html.fromstring(html_content)

# Find the last element with the class 'item'
last_item = tree.xpath('//div[@class="item"][last()]')[0]
print(f"Last item text: {last_item.text}")  # Output: Item 5

# Find all siblings of the last item
siblings = last_item.xpath('../div[@class="item"]')

# Find the middle sibling
if siblings:
    middle_index = len(siblings) // 2  # Calculate the index of the middle sibling
    middle_item = siblings[middle_index]
    print(f"Middle sibling text: {middle_item.text}")  # Output: Item 3

Handling Dynamic Content

Scraping JavaScript-rendered Content with Selenium

For pages where content is dynamically loaded via JavaScript, Selenium is a powerful tool:

from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

# Set up Chrome options
options = Options()
options.add_argument('--headless')  # Enable headless mode
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')

# Initialize WebDriver
driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=options)

# Navigate to the target website
driver.get('https://www.scrapingcourse.com/javascript-rendering')

# Wait until product elements are fully loaded
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CLASS_NAME, 'product-name')))

# Extract product names and prices
products = driver.find_elements(By.CLASS_NAME, 'product-name')
prices = driver.find_elements(By.CLASS_NAME, 'product-price')

# Combine and display product details along with the 'itemprop' attributes
for product, price in zip(products, prices):
    product_name = product.text
    product_name_itemprop = product.get_attribute('itemprop')

    product_price = price.text
    product_price_itemprop = price.get_attribute('itemprop')

    print(f"Product Name: {product_name} (Itemprop: {product_name_itemprop})")
    print(f"Product Price: {product_price} (Itemprop: {product_price_itemprop})")
    print('-' * 50)

# Close the browser
driver.quit()

In this example, the code extracts product information from a JavaScript-rendered page as follows:

Product Name: Chaz Kangeroo Hoodie (Itemprop: name)
Product Price: $52 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Teton Pullover Hoodie (Itemprop: name)
Product Price: $70 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Bruno Compete Hoodie (Itemprop: name)
Product Price: $63 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Frankie Sweatshirt (Itemprop: name)
Product Price: $60 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Hollister Backyard Sweatshirt (Itemprop: name)
Product Price: $52 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Stark Fundamental Hoodie (Itemprop: name)
Product Price: $42 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Hero Hoodie (Itemprop: name)
Product Price: $54 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Oslo Trek Hoodie (Itemprop: name)
Product Price: $42 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Abominable Hoodie (Itemprop: name)
Product Price: $69 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Mach Street Sweatshirt (Itemprop: name)
Product Price: $62 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Grayson Crewneck Sweatshirt (Itemprop: name)
Product Price: $64 (Itemprop: priceCurrency)
--------------------------------------------------
Product Name: Ajax Full-Zip Sweatshirt (Itemprop: name)
Product Price: $69 (Itemprop: priceCurrency)
--------------------------------------------------

Playwright for Modern JavaScript-heavy Sites

Playwright is often faster and more reliable than Selenium, especially for JavaScript-heavy websites:

from playwright.sync_api import sync_playwright
import json
import time

# Function to extract product data using improved selectors
def extract_product_data(page):
    # Extract the product containers
    products = page.query_selector_all(".flex.flex-col.items-center.rounded-lg")

    # Create an empty list to store the extracted data
    data_list = []

    # Loop through the product containers to extract names, prices, and images
    for product in products:
        # Create a dictionary to collect each product container's data
        data = {}

        # Extract the product name using more precise selectors
        name_element = product.query_selector(".self-start.text-left.w-full > span:first-child")
        if name_element:
            data["name"] = name_element.inner_text()

        # Extract the price using a more accurate selector
        price_element = product.query_selector(".text-slate-600")
        if price_element:
            data["price"] = price_element.inner_text()

        # Extract the image source
        image_source = product.query_selector("img")
        if image_source:
            data["image"] = image_source.get_attribute("src")

        # Append the extracted data if it has a name
        if data.get("name"):
            data_list.append(data)

    return data_list

# Main function for scraping with infinite scrolling logic
def scrape_infinite_scroll(url):
    with sync_playwright() as p:
        # Start measuring time
        start_time = time.time()

        # Launch the browser in headless mode
        browser = p.chromium.launch(headless=True, args=["--no-sandbox", "--disable-setuid-sandbox"])
        page = browser.new_page(viewport={"width": 1280, "height": 720})

        try:
            # Navigate to the URL
            page.goto(url, wait_until="networkidle")

            # Wait for the initial page load
            page.wait_for_selector('#product-grid')

            # Initialize variables for infinite scrolling
            previous_product_count = 0
            scroll_count = 0
            stable_iteration_count = 0  # To count how many times we haven't seen an increase in products

            # Container to hold the product data
            all_products = []

            while True:
                # Use mouse wheel to scroll down
                page.mouse.wheel(0, 15000)  # Scroll down by 15000 pixels
                time.sleep(5)  # Adjust this time to allow content to load

                # Ensure the #sentinel element is present
                sentinel_element = page.query_selector("#sentinel")
                if sentinel_element is None:
                    print("Sentinel element is missing. Ending scroll.")
                    break

                # Extract the new product data using the improved selectors
                new_products = extract_product_data(page)

                # Add only unique products to the list
                for product in new_products:
                    if product not in all_products:
                        all_products.append(product)
                        # Print each new product as it's found
                        print(f"Scroll Count: {scroll_count}")
                        print(f"Product Name: {product['name']}")
                        print(f"Price: {product['price']}")
                        print(f"Image: {product['image']}")
                        print(f"-------------------------------------------")

                # Check if more products were loaded
                current_product_count = len(all_products)

                if current_product_count == previous_product_count:
                    stable_iteration_count += 1
                else:
                    stable_iteration_count = 0  # Reset if new products were loaded

                # If no new products were found after several checks, break
                if stable_iteration_count > 3:
                    print("No new products loaded after multiple attempts. Ending scroll.")
                    break

                previous_product_count = current_product_count
                scroll_count += 1

            # Output the final JSON data
            print(f"Total products scraped: {len(all_products)}")
            json_output = json.dumps(all_products, indent=4)
            print(json_output)

            # Save the JSON data to a file
            with open("scraped_products.json", "w") as file:
                file.write(json_output)

            # Display total scraping time
            end_time = time.time()
            total_time = end_time - start_time
            print(f"Total Time Taken: {total_time:.2f} seconds")

        except Exception as e:
            print(f"An error occurred: {e}")

        # Close the browser
        browser.close()

# Run the function with the target URL
scrape_infinite_scroll('https://www.scrapingcourse.com/infinite-scrolling')

The code displays the item data in the terminal during the scraping process, and once completed, it returns a JSON data file containing all the scraped items as follows:

...
Scroll Count: 14
Product Name: Breathe-Easy Tank
Price: $34
Image: https://scrapingcourse.com/ecommerce/wp-content/uploads/2024/03/wt09-white_main.jpg
-------------------------------------------
No new products loaded after multiple attempts. Ending scroll.
Total products scraped: 147
[
    {
        "name": "Chaz Kangeroo Hoodie",
        "price": "$52",
        "image": "https://scrapingcourse.com/ecommerce/wp-content/uploads/2024/03/mh01-gray_main.jpg"
    },
    {
        "name": "Teton Pullover Hoodie",
        "price": "$70",
        "image": "https://scrapingcourse.com/ecommerce/wp-content/uploads/2024/03/mh02-black_main.jpg"
    },
    {
        "name": "Bruno Compete Hoodie",
        "price": "$63",
        "image": "https://scrapingcourse.com/ecommerce/wp-content/uploads/2024/03/mh03-black_main.jpg"
    },
...
]

Handling Anti-Scraping Techniques

CSRF Challenges

CSRF (Cross-Site Request Forgery) protections present one of the significant hurdles. Websites implement CSRF protections to ensure that form submissions or data requests come from authenticated and legitimate sources, typically to safeguard user data and prevent unauthorized actions. For web scraping, understanding and handling CSRF is crucial to mimic real user interactions and access protected content.

from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
import time

# Define custom headers to simulate a real browser
CUSTOM_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
}

def extract_credentials(soup):
    """Extracts email and password from the demo credentials alert."""
    credentials_element = soup.find('div', id='demo-credentials-alert')
    if credentials_element:
        credentials_text = credentials_element.get_text(strip=True)
        email = credentials_text.split("Email: ")[1].split(" | ")[0].strip()
        password = credentials_text.split("Password: ")[1].strip()
        return email, password
    return None, None

def extract_csrf_token(soup):
    """Extracts CSRF token from the page."""
    csrf_token_element = soup.find('input', {'name': '_token'})
    return csrf_token_element['value'] if csrf_token_element else None

def fill_login_form(page, email, password, csrf_token):
    """Fills the login form with email, password, and CSRF token, then submits."""
    page.fill('input[name="email"]', email)
    page.fill('input[name="password"]', password)
    page.evaluate(f'document.querySelector("input[name=_token]").value = "{csrf_token}"')
    page.click('#submit-button')
    print("Form submitted, waiting for page redirection...")
    time.sleep(10)  # Wait for page navigation or processing time

def extract_product_data(page):
    """Extracts product data from the product grid."""
    products_list = []
    page.wait_for_selector('#product-grid')  # Wait for the product grid to be visible
    product_items = page.query_selector_all('.product-item')

    for product in product_items:
        product_name = product.query_selector('.product-name').inner_text().strip()
        product_price = product.query_selector('.product-price').inner_text().strip()
        products_list.append({'name': product_name, 'price': product_price})

    return products_list

def check_catalog_section(page):
    """Check if #content-container exists and then find the .catalog and its first .text-right element containing <a>."""
    container_element = page.query_selector('#content-container')
    if container_element:
        catalog_element = container_element.query_selector('.catalog')
        if catalog_element:
            first_text_right = catalog_element.query_selector('.text-right')
            if first_text_right:
                a_element = first_text_right.query_selector('a')
                if a_element:
                    print("The .text-right element inside .catalog contains an <a> element.")
                    return True
                else:
                    print("The .text-right element inside .catalog does NOT contain an <a> element.")
            else:
                print("No .text-right element found inside the .catalog section.")
        else:
            print("No .catalog section found in the #content-container.")
    else:
        print("No #content-container found.")
    return False

def scrape_login_page(url):
    with sync_playwright() as p:
        # Launch Chrome with required settings
        browser = p.chromium.launch(
            channel="chrome",
            headless=True,
            args=["--ignore-certificate-errors"]
        )

        # Use custom headers and set them in the page context
        page = browser.new_page(ignore_https_errors=True, extra_http_headers=CUSTOM_HEADERS)

        try:
            page.goto(url, wait_until="networkidle")
            time.sleep(5)  # Ensure enough time for all content to load
            print(f"Loaded page via proxy: {url}")
        except Exception as e:
            print(f"Error navigating to the page via proxy: {e}")
            browser.close()
            return

        # Extract the page content and parse with BeautifulSoup
        soup = BeautifulSoup(page.content(), 'html.parser')
        email, password = extract_credentials(soup)

        if not email or not password:
            print("Demo credentials not found on the page.")
            browser.close()
            return

        print(f"Extracted Credentials:\nEmail: {email}\nPassword: {password}")
        csrf_token = extract_csrf_token(soup)

        if csrf_token:
            fill_login_form(page, email, password, csrf_token)
        else:
            print("CSRF token not found on the page.")
            browser.close()
            return

        # Check if login was successful
        if "/dashboard" in page.url:
            print("Dashboard page loaded!")
            time.sleep(5)  # Wait for the dashboard content to load

            # Check for the presence of .catalog and its first .text-right containing an <a> element
            if check_catalog_section(page):
                print("The catalog section check passed.")

                # Proceed with product scraping if needed
                products_list = extract_product_data(page)

                # Display scraped product data
                if products_list:
                    print("Scraped Product Data:")
                    for product in products_list:
                        print(f"Product Name: {product['name']}, Price: {product['price']}")
                    print("Product scraping completed.")
                else:
                    print("No products found.")

                # Perform logout action with explicit wait
                try:
                    page.wait_for_selector('a[href*="/logout"]', timeout=100)  # Wait up to 30 seconds for the element
                    page.click('a[href*="/logout"]')
                    page.wait_for_load_state("networkidle")
                    time.sleep(2)  # Allow some time for the logout to process
                    print("Logout successful!")
                except Exception as e:
                    print(f"Logout action failed: {e}")
                return products_list
            else:
                print("Catalog section check failed. The expected structure was not found.")
        else:
            print("Login failed! Redirect to the dashboard was not successful.")

        browser.close()

# URL of the login page
login_page_url = "https://www.scrapingcourse.com/login/csrf"
scraped_products = scrape_login_page(login_page_url)
print(scraped_products if scraped_products else "No products were scraped.")

The example code navigates to the page, finds the login form, fills it with the required data including the CSRF token, and then returns the following results:

Loaded page via proxy: https://www.scrapingcourse.com/login/csrf
Extracted Credentials:
Email: ...
Password: ...
Form submitted, waiting for page redirection...
Dashboard page loaded!
The .text-right element inside .catalog contains an <a> element.
The catalog section check passed.
Scraped Product Data:
Product Name: Chaz Kangeroo Hoodie, Price: $52
Product Name: Teton Pullover Hoodie, Price: $70
...
Product scraping completed.
Logout successful!
[{'name': 'Chaz Kangeroo Hoodie', 'price': '$52'}, {'name': 'Teton Pullover Hoodie', 'price': '$70'}, ...]

Rotating Proxies and User Agents

To avoid being blocked, you need to rotate proxies and user agents. Use libraries like scrapy-rotating-proxies or integrate with Scrape.do’s proxy service:

import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
url = "https://www.scrapingcourse.com/ecommerce/"
token = "YOUR_TOKEN"
proxyModeUrl = "http://{}:waitSelector=.woocommerce&[email protected]:8080".format(token)
proxies = {
    "http": proxyModeUrl,
    "https": proxyModeUrl,
}
response = requests.request("GET", url, proxies=proxies, verify=False)
print(response.text)

CAPTCHA Handling

CAPTCHAs can be a significant roadblock. Tools like 2Captcha or anti-captcha can help solve CAPTCHAs programmatically:

import requests

captcha_api_key = 'YOUR_2CAPTCHA_API_KEY'
captcha_url = 'URL_OF_CAPTCHA'
response = requests.post(f'http://2captcha.com/in.php?key={captcha_api_key}&method=userrecaptcha&googlekey={captcha_url}')

Rate Limiting and Throttling

Respect rate limits by implementing delays or using asynchronous requests:

import asyncio
import httpx

async def fetch(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.text

urls = ['https://example.com/page1', 'https://example.com/page2']
tasks = [fetch(url) for url in urls]

results = asyncio.run(asyncio.gather(*tasks))

Distributed Scraping with Scrapy

For large-scale, distributed scraping, Scrapy is an excellent choice. It is a powerful and flexible web scraping framework that allows you to build and manage scraping projects efficiently. Scrapy offers built-in support for handling requests, managing responses, and extracting data using CSS or XPath selectors. Its modular design makes it easy to extend and customize, allowing you to handle complex scraping tasks.

By deploying Scrapy spiders across multiple servers or cloud instances, you can scale your scraping operations to handle high traffic, access multiple websites simultaneously, and achieve faster data extraction. Integrating with tools like Scrapyd for scheduling and managing your spiders or combining it with cloud-based proxy services ensures smooth and uninterrupted scraping, even for websites with advanced anti-bot measures.

Basic Instructions for Getting Started with Scrapy

Start by installing Scrapy using pip and create a new Scrapy project with the following command:

pip install scrapy
scrapy startproject myproject
cd myproject

Generate a Spider

Create your first spider that will crawl a website:

scrapy genspider example example.com

Edit the Spider

Open the generated spider file (example.py) in the spiders directory and define the logic for crawling and extracting data:

import scrapy

class ExampleSpider(scrapy.Spider):
    name = "example"
    start_urls = ['http://example.com']

    def parse(self, response):
        for item in response.css('div.item'):
            yield {
                'title': item.css('h2::text').get(),
                'link': item.css('a::attr(href)').get(),
            }

Execute the spider to start scraping:

scrapy crawl example

# Export the scraped data to a JSON or CSV file:
scrapy crawl example -o output.json

By following these steps, you can set up a Scrapy project and begin building distributed web scraping applications effectively.

Data Storage and Export

Exporting to JSON, CSV, and Databases

Efficiently export data to various formats:

import pandas as pd

data = {'name': ['Product1', 'Product2'], 'price': [100, 200]}
df = pd.DataFrame(data)
df.to_csv('products.csv', index=False)

Storing Data in NoSQL Databases (MongoDB)

Store data directly in MongoDB:

from pymongo import MongoClient

client = MongoClient('localhost', 27017)
db = client['scraping_db']
collection = db['products']
collection.insert_one({'name': 'Product1', 'price': 100})

Handling Errors and Logging

Implement robust error handling and logging:

import logging

logging.basicConfig(filename='scraping.log', level=logging.INFO)
logging.info('Scraping started...')

Conclusion

Web scraping is a powerful tool for extracting data from websites, but it often involves overcoming various challenges such as CAPTCHAs, dynamic content, rate limiting, and anti-scraping mechanisms. This guide has demonstrated how to use a variety of Python libraries and tools like BeautifulSoup, lxml, Selenium, Playwright, and Scrapy to handle these obstacles effectively. Additionally, integrating proxy services, rotating user agents, and using anti-CAPTCHA solutions like 2Captcha can significantly enhance your scraping efficiency and success rate.

By applying these techniques, you can tackle even the most sophisticated websites, extract valuable data, and scale your scraping projects to handle large-scale data extraction.

Recommended Links and Resources

	Scrapy Documentation: https://docs.scrapy.org/en/latest/
	BeautifulSoup Documentation: https://www.crummy.com/software/BeautifulSoup/bs4/doc/
	lxml Documentation: https://lxml.de/
	Playwright Documentation: https://playwright.dev/python/docs/intro
	Selenium Documentation: https://www.selenium.dev/documentation/en/
	2Captcha API: https://2captcha.com
	Scrape.do: https://scrape.do
	Python Official Documentation: https://docs.python.org/3/

These resources will help you deepen your knowledge and skills in web scraping, allowing you to build more sophisticated and efficient scraping solutions.