[TOC]

  1. Title: Web Scrawler Using Selenium 2023
  2. Review Date: Thu, Jun 22, 2023

Web scrawler, ticket buying as a case study

Preparation: python venv

1
2
3
4
5
6
7
8
9
python -m venv venv
source venv/bin/activate

# install the following using pip 
selenium==4.9.1
undetected-chromedriver>=3.4.6
webdriver-manager
jupyterlab
pyopenssl
1
2
MAC ~/Library/Application Support/Google/Chrome
Linux ~/.config/google-chrome
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# load options
options = uc.ChromeOptions()
options.page_load_strategy = 'eager'
# options.add_argument("--auto-open-devtools-for-tabs") # help to avoid cloudfare human verify check
options.add_argument("--disable-popup-blocking")
options.add_argument("--window-size=800,600")
options.add_argument("--start-maximized")
options.add_argument('--disable-gpu')
options.add_argument("--no-sandbox")
options.add_argument("--disable-setuid-sandbox")
options.add_argument("--disable-extensions")
options.add_argument('--disable-application-cache')
options.add_argument("--disable-dev-shm-usage")
if HEADLESS:
    options.add_argument("--headless=new")
    
try:
    # check os system
    if os.name == "posix":
        time.sleep(3) # wait for chrome profile to be ready
        driver = uc.Chrome(options=options,
                        driver_executable_path=ChromeDriverManager().install(),
                        user_data_dir=chrome_profile_path,
                        # version_main=VERSION,
                        user_multi_procs=True)
    elif os.name == "nt":
        time.sleep(3) # wait for chrome profile to be ready
        options.add_argument(f"--user-data-dir={chrome_profile_path}")
        driver = uc.Chrome(options=options,
                            driver_executable_path=ChromeDriverManager().install(),
                        #    driver_executable_path=os.path.join(os.path.abspath(os.getcwd()), 'chromedriver-win64/chromedriver-win64/chromedriver.exe'),
                           user_multi_procs=True,
                           )
    else:
        raise Exception("Unknown OS")
    driver.implicitly_wait(5)

Some known issue about selenium

  1. to avoid memory leak in multiprocessing env, please import the driver inside the subprocess (e.g., inside the function)
  2. wait until presence is very slow, try the sleepy find element
  3. we need to setup logging queue to avoid crash for multiprocessing logging module
  4. list(tqdm(pool.imap(*), total = len(*))) is a good way to record multiprocess progress

Utils functions

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def sleepy_find_element(by, query, attempt_count :int =30, sleep_duration =0.3):
    '''
    Finds the web element using the locator and query.

    This function attempts to find the element multiple times with a specified
    sleep duration between attempts. If the element is found, the function returns the element.

    Args:
        by (selenium.webdriver.common.by.By): The method used to locate the element.
        query (str): The query string to locate the element.
        attempt_count (int, optional): The number of attempts to find the element. Default: 20.
        sleep_duration (int, optional): The duration to sleep between attempts. Default: 1.

    Returns:
        selenium.webdriver.remote.webelement.WebElement: Web element or None if not found.
    '''
    global browser
    for _count in range(attempt_count):
        item = browser.find_elements(by, query)
        if len(item) > 0:
            item = item[0]
            logging.info(f'Element {query} has found')
            break
        logging.info(f'Element {query} is not present, attempt: {_count+1}')
        time.sleep(sleep_duration)
    if item is list:
        logging.warning("Element not find!")
    return item
1
2
3
4
5
6
7
8
# import Action chains
from selenium.webdriver import ActionChains
#element
source = driver.find_element_by_id("name")
#action chain object
action = ActionChains(driver)
# move to element operation
action.move_to_element(source).click().perform()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import logging
from pathlib import Path
from tqdm.auto import tqdm
import os
import pickle
import csv
import time
import threading
from glob import glob
import argparse

lxml html xpath with requests

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from lxml import html

response = requests.get(complete_url)
content = response.content
tree = html.fromstring(content) # read byte content


# print("start loading")
# latitude 
latitude_xp = '//h3[contains(text(), "Coordinates")]/following-sibling::p[contains(text(), "Latitude")]'
latitude_ele = tree.xpath(latitude_xp)
if latitude_ele:
    latitude_ele = latitude_ele[0]
    latitude_info = latitude_ele.text_content().strip().split("Latitude: ")[1]
    # print('latitude_info', latitude_info)
    temp = dict()
    temp['latitude'] = latitude_info
else:
    return 

multiprocessing with tqdm

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from tqdm.contrib.concurrent import process_map
from multiprocessing import Pool, Process, Manager, Lock


  manager = Manager() # Manager dictionary must be used for multiprocessing
  # m_saved_postcode_lst = manager.list(saved_postcode_lst)
  # m_no_data_lst = manager.list(no_data_lst)
  m_chromelock = manager.Lock()
  m_nodatalock = manager.Lock()
  m_savedpstcdlock = manager.Lock()

  m_profile_path_list = manager.list(profile_path_list)



  # %%
  postcode_list_chunks = [postcode_list[x:x+CHUNK_SIZE] for x in range(0, len(postcode_list), CHUNK_SIZE)]

  # %%
  args = [(chunk, m_chromelock,m_nodatalock,m_savedpstcdlock, m_profile_path_list, loggername, loggingfile, data_index) for chunk in postcode_list_chunks]

  try:
      with mp.Pool(PROCESS_NUM) as pool:
          results = list(logging_tqdm(pool.imap(helperf, args, chunksize=1), total=len(args),
                                      loggername=loggername,
                                      loggingfile=loggingfile,
                                      desc="Progress Status in Main Process"))

  except Exception as e:
      logger.error(e)
      raise e
  finally:
      active_children = mp.active_children()
      for p in active_children:
          p.kill()
          p.join()
      logger.info("Killing all processes")

# IMPORTANT we must attach dict to the manager dict, nest dict update is banned in multiprocessing

temp = {
  "United States": "New York", 
  "Italy": "Naples", 
  "England": "London"
}
postcode_info_dict[postcode] = temp # OK
postcode_info_dict[postcode]['United States'] = "New York" # BAD