Scrapy 扩展中间件: 同步/异步提交批量 item 到 MySQL
0.参考
https://doc.scrapy.org/en/latest/topics/item-pipeline.html?highlight=mongo#write-items-to-mongodb
20180721新增:异步版本
https://twistedmatrix.com/documents/15.3.0/core/howto/rdbms.html
https://twistedmatrix.com/documents/18.7.0/api/twisted.python.failure.Failure.html
https://twistedmatrix.com/documents/12.1.0/core/howto/time.html
1.主要实现
(1) 连接超时自动重连 MySQL server
(2) 通过 item_list 收集 item,达到阈值后批量提交至数据库
(3) 通过解析异常,自动移除存在异常的数据行,重新提交 item_list
(4) shutdown 之前在 close_spider() 中提交当前 item_list
(5) 20180721新增:异步版本
2.同步版本
保存至 /site-packages/my_pipelines.py
'''
遇到问题没人解答?小编创建了一个Python学习交流QQ群:857662006
寻找有志同道合的小伙伴,互帮互助,群里还有不错的视频学习教程和PDF电子书!
'''
from socket import gethostname
import time
import re
from html import escape
import pymysql
pymysql.install_as_MySQLdb()
from pymysql import OperationalError, InterfaceError, DataError, IntegrityError
class MyMySQLPipeline(object):
hostname = gethostname()
def __init__(self, settings):
self.mysql_host = settings.get('MYSQL_HOST', '127.0.0.1')
self.mysql_port = settings.get('MYSQL_PORT', 3306)
self.mysql_user = settings.get('MYSQL_USER', 'username')
self.mysql_passwd = settings.get('MYSQL_PASSWD', 'password')
self.mysql_reconnect_wait = settings.get('MYSQL_RECONNECT_WAIT', 60)
self.mysql_db = settings.get('MYSQL_DB')
self.mysql_charset = settings.get('MYSQL_CHARSET', 'utf8') #utf8mb4
self.mysql_item_list_limit = settings.get('MYSQL_ITEM_LIST_LIMIT', 30)
self.item_list = []
@classmethod
def from_crawler(cls, crawler):
return cls(
settings = crawler.settings
)
def open_spider(self, spider):
try:
self.conn = pymysql.connect(
host = self.mysql_host,
port = self.mysql_port,
user = self.mysql_user,
passwd = self.mysql_passwd,
db = self.mysql_db,
charset = self.mysql_charset,
)
except Exception as err:
spider.logger.warn('MySQL: FAIL to connect {} {}'.format(err.__class__, err))
time.sleep(self.mysql_reconnect_wait)
self.open_spider(spider)
else:
spider.logger.info('MySQL: connected')
self.curs = self.conn.cursor(pymysql.cursors.DictCursor)
spider.curs = self.curs
def close_spider(self, spider):
self.insert_item_list(spider)
self.conn.close()
spider.logger.info('MySQL: closed')
def process_item(self, item, spider):
self.item_list.append(item)
if len(self.item_list) >= self.mysql_item_list_limit:
self.insert_item_list(spider)
return item
def sql(self):
raise NotImplementedError('Subclass of MyMySQLPipeline must implement the sql() method')
def insert_item_list(self, spider):
spider.logger.info('insert_item_list: {}'.format(len(self.item_list)))
try:
self.sql()
except (OperationalError, InterfaceError) as err:
# <class 'pymysql.err.OperationalError'>
# (2013, 'Lost connection to MySQL server during query ([Errno 110] Connection timed out)')
spider.logger.info('MySQL: exception {} err {}'.format(err.__class__, err))
self.open_spider(spider)
self.insert_item_list(spider)
except Exception as err:
if len(err.args) == 2 and isinstance(err.args[1], str):
# <class 'pymysql.err.DataError'>
# (1264, "Out of range value for column 'position_id' at row 2")
# <class 'pymysql.err.InternalError'>
# (1292, "Incorrect date value: '1977-06-31' for column 'release_day' at row 26")
m_row = re.search(r'at\s+row\s+(\d+)$', err.args[1])
# <class 'pymysql.err.IntegrityError'>
# (1048, "Column 'name' cannot be null") films 43894
m_column = re.search(r"Column\s'(.+)'", err.args[1])
if m_row:
row = m_row.group(1)
item = self.item_list.pop(int(row) - 1)
spider.logger.warn('MySQL: {} {} exception from item {}'.format(err.__class__, err, item))
self.insert_item_list(spider)
elif m_column:
column = m_column.group(1)
item_list = []
for item in self.item_list:
if item[column] == None:
item_list.append(item)
for item in item_list:
self.item_list.remove(item)
spider.logger.warn('MySQL: {} {} exception from item {}'.format(err.__class__, err, item))
self.insert_item_list(spider)
else:
spider.logger.error('MySQL: {} {} unhandled exception from item_list: \n{}'.format(
err.__class__, err, self.item_list))
else:
spider.logger.error('MySQL: {} {} unhandled exception from item_list: \n{}'.format(
err.__class__, err, self.item_list))
finally:
self.item_list.clear()
3.调用方法
Scrapy 项目 project_name
MySQL 数据库 database_name, 表 table_name
(1) 项目 pipelines.py 添加代码:
'''
遇到问题没人解答?小编创建了一个Python学习交流QQ群:857662006
寻找有志同道合的小伙伴,互帮互助,群里还有不错的视频学习教程和PDF电子书!
'''
from my_pipelines import MyMySQLPipeline
class MySQLPipeline(MyMySQLPipeline):
def sql(self):
self.curs.executemany("""
INSERT INTO table_name (
position_id, crawl_time)
VALUES (
%(position_id)s, %(crawl_time)s)
alt="img" />
在 self.item_list.append(item) 之后 添加代码 spider.logger.info(‘process_item: {}’.format(len(self.item_list))) 打印添加 item 后的当前 item_list 元素个数
连续 yield 5个 item,累计3个则触发 insert,红框 insert 部分将会阻塞绿框中后续 yield 部分:
5.异步版本
(1) 保存至 /site-packages/my_pipelines.py
'''
遇到问题没人解答?小编创建了一个Python学习交流QQ群:857662006
寻找有志同道合的小伙伴,互帮互助,群里还有不错的视频学习教程和PDF电子书!
'''
# -*- coding: utf-8 -*-
from socket import gethostname
import time
import re
# https://twistedmatrix.com/documents/15.3.0/core/howto/rdbms.html
# twisted.enterprise.adbapi: Twisted RDBMS support
from twisted.enterprise import adbapi
import pymysql
from pymysql import OperationalError, InterfaceError, DataError, InternalError, IntegrityError
class MyMySQLPipeline(object):
hostname = gethostname()
def __init__(self, spider, settings):
self.spider = spider
self.dbpool = adbapi.ConnectionPool('pymysql',
host = settings.get('MYSQL_HOST', '127.0.0.1'),
port = settings.get('MYSQL_PORT', 3306),
user = settings.get('MYSQL_USER', 'username'),
passwd = settings.get('MYSQL_PASSWD', 'password'),
db = settings.get('MYSQL_DB', 'test'),
charset = settings.get('MYSQL_CHARSET', 'utf8'), #utf8mb4
cursorclass = pymysql.cursors.DictCursor
)
self.mysql_reconnect_wait = settings.get('MYSQL_RECONNECT_WAIT', 60)
self.mysql_item_list_limit = settings.get('MYSQL_ITEM_LIST_LIMIT', 30)
self.item_list = []
@classmethod
def from_crawler(cls, crawler):
return cls(
spider = crawler.spider,
settings = crawler.settings
)
def close_spider(self, spider):
self._sql(list(self.item_list))
def process_item(self, item, spider):
self.item_list.append(item)
if len(self.item_list) >= self.mysql_item_list_limit:
spider.log('item_list: %s'%len(self.item_list))
self._sql(list(self.item_list))
self.item_list.clear()
return item
def sql(self, txn, item_list):
raise NotImplementedError('Subclass of MyMySQLPipeline must implement the sql() method')
def _sql(self, item_list, retrying=False):
d = self.dbpool.runInteraction(self.sql, item_list)
d.addCallback(self.handle_result, item_list)
d.addErrback(self.handle_error, item_list, retrying)
def handle_result(self, result, item_list):
self.spider.logger.info('{} items inserted with retcode {}'.format(len(item_list), result))
def handle_error(self, failure, item_list, retrying):
# https://twistedmatrix.com/documents/18.7.0/api/twisted.python.failure.Failure.html
# r = failure.trap(pymysql.err.InternalError)
args = failure.value.args
# <class 'pymysql.err.OperationalError'> (1045, "Access denied for user 'username'@'localhost' (using password: YES)")
# <class 'pymysql.err.OperationalError'> (2013, 'Lost connection to MySQL server during query ([Errno 110] Connection timed out)')
# <class 'pymysql.err.OperationalError'> (2003, "Can't connect to MySQL server alt="img" />
另外可见使用了连接池