下载方式 - 【selenium.driver】
0. 观前提示
本文会用到requests和selenium
如果对上面两个板块不太了解,请结合 【python 爬虫】下载上海证交所公告文件-CSDN博客 来阅读本文章
I. 爬取目标
爬取巨潮资讯网的公告文件

II. 开发者工具F12的使用(network部分)
看到有提交的表单,大概猜测一下是用json。
因此用开发者工具的network看一下是否有json文件。

从图片中可以看到
query这个xhr会response我们所需要的json

III. 基础请求 - request获取公告元数据
发现query这个请求能得到我们想要的东西,故而直接构造query请求的url
1. 基础url
(1) 从图片中可以得到我们url的基础部分(通用部分)
(2) 请求方法:POST(后面构造会用到)
url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
2. header
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
- "Host": "www.cninfo.com.cn",
- "Origin": "https://www.cninfo.com.cn",
- "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
- "X-Requested-With": "XMLHttpRequest",
- "Cookie": "JSESSIONID=你的JSESSIONID; insert_cookie=你的insert_cookie", # 替换成你的 Cookie
- }
3. payload - POST请求(发送表单数据)
因为是POST请求,所以请求的参数都放在请求体里面。
| 参数名 | 示例值 | 作用 |
|---|---|---|
pageNum=1 | 1 | 当前页码(从1开始) |
pageSize=30 | 30 | 每页返回条数(最大可设100) |
column=szse | szse | 交易所代码(szse=深交所,sse=上交所) |
seDate=2024-12-30~2025-07-01 | 日期范围 | 公告发布日期筛选 |
stock= | 空 | 股票代码(如000001) |
category= | 空 | 公告类型(如category_ndbg_szsh=年报) |

- payload = {
- 'pageNum': '1',
- 'pageSize': '30',
- 'column': 'szse',
- 'tabName': 'fulltext',
- 'plate': '',
- 'stock': '',
- 'searchkey': '',
- 'secid': '',
- 'category': '',
- 'trade': '',
- 'seDate': '2024-12-30~2025-07-01',
- 'sortName': '',
- 'sortType': '',
- 'isHLtitle': 'true',
- }
4. requests请求
用的是post方法,将url,headers,payload参数放入
response = requests.post(url, headers=headers, data=payload)5. response返回内容
在cninfo中,他返回的数据已经是json格式
- {
- "classifiedAnnouncements": null,
- "totalSecurities": 0,
- "totalAnnouncement": 3208,
- "totalRecordNum": 3208,
- "announcements": [
- {
- "id": null,
- "secCode": "xxxx",
- "secName": "xxxx",
- "orgId": "xxxx",
- "announcementId": "xxxx",
- "announcementTitle": "xxxx公告",
- "announcementTime": xxxx,
- "adjunctUrl": "xxxx.PDF",
- "adjunctSize": 234,
- "adjunctType": "PDF",
- "storageTime": null,
- "columnId": "xxxx",
- "pageColumn": "SZCY",
- "announcementType": "xxxx",
- "associateAnnouncement": null,
- "important": null,
- "batchNum": null,
- "announcementContent": "",
- "orgName": null,
- "tileSecName": "xxxx",
- "shortTitle": "xxxx公告",
- "announcementTypeName": null,
- "secNameList": null
- },
- ...(总共3208个)
- ],
- }
6. 基础请求代码
因为我之用了pageSize=1,因此只会得到一页的内容。
- import requests
- import json
- def cninf():
- #header
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
- "Host": "www.cninfo.com.cn",
- "Origin": "https://www.cninfo.com.cn",
- "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
- "X-Requested-With": "XMLHttpRequest",
- "Cookie": "JSESSIONID=你的JSESSIONID; insert_cookie=你的COOKIE",
- }
- #payload
- payload = {
- 'pageNum': '1',
- 'pageSize': '30',
- 'column': 'szse',
- 'tabName': 'fulltext',
- 'plate': '',
- 'stock': '',
- 'searchkey': '',
- 'secid': '',
- 'category': '',
- 'trade': '',
- 'seDate': '2024-12-30~2025-07-01',
- 'sortName': '',
- 'sortType': '',
- 'isHLtitle': 'true',
- }
- #create url
- url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
- response = requests.post(url, headers=headers, data=payload)
- if response.status_code == 200:
- return response.text
- else:
- print(f"请求失败,状态码:{response.status_code}")
- return None
- if __name__ == "__main__":
- result = cninf()
- if result:
- print(result)
IV. 进阶请求 - 【公告速查】筛选框
我们怎么能只满足于下载公告元数据,都得到文件下载的关键信息了,那必然得继续往下干啦~
1. payload字段填充,下载不同类别的数据
1.1 表单元素运用
表单元素这么多,不运用几个怎么行呢~
根据上文【基础请求】中payload的字段,我们可以输入自己想要的数值,填充进入requests构造里。这样就能得到筛选过的公告数据
- payload = {
- 'pageNum': '1',
- 'pageSize': '30',
- 'column': 'szse',
- 'tabName': 'fulltext',
- 'plate': '',
- 'stock': '',
- 'searchkey': '',
- 'secid': '',
- 'category': '',
- 'trade': '',
- 'seDate': '2024-12-30~2025-07-01',
- 'sortName': '',
- 'sortType': '',
- 'isHLtitle': 'true',
- }
- response = requests.post(url, headers=headers, data=payload)

| attribute | 是否为定值 | meaning | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pageNum | false | 定位到第x页的数据 每一页数据都不同, 输入payload的时候'pageNum': 'i' -- 得到第 i 页数据,不是得到总共i页数据 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pageSize | true | 'pageSize': '30' 不管你参数如何变化,始终得到的数据条数就是30条 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
column | false | szse:深沪京 hke:港股 third:三板 fund:基金 bond:债券 regulator:监管 pre_disclosure:预披露 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| plate | false | 板块 对应表单里面的【板块】 sz:深市 szmb:深主板 szcy:创业板 sh:沪市 shmb:沪主板 shkcp:科创板 bj:北交所
test:
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stock | false | 股票代码 对应表单里的【代码/简称/拼音】
test:
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| searchKey | false | 关键字搜索 对应表单里的【标题关键字】
test:
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| category | false | 分类 对应表单里的【选择分类】
test:
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| trade | false | 行业 对应表单里的【行业】
test:
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| seDate | false | 日期区间 对应表单里的【日期选择】 |
1.2 函数功能设计(流程)
| 功能目录选项 | 流程设计 |
| A. 根据对应日期查询:公告总条数 / 已下载条数 | - 输入:目标日期 - 输出: 目标日期公告数量 目标日期已下载公告数量 |
| B. 下载公告 | (1)请输入您想要查询的公告日期区间(输入【开始日期】【结束日期】) 输入: 请输入【开始日期】: 请输入【结束日期】: 输出:您希望的查询日期区间是:【开始日期】~【结束日期】
(2)请选择你要使用的下载功能 a. 基础下载(下载日期区间内所有公告) 输出:开始下载 b. 进阶下载(根据【公告关键词】【股市板块】筛选公告进行下载) 输出: 请输入【公告关键词】,若不需要,请输入【NO】 请输入【股市板块】,若不需要,请输入【NO】 输出: 请确认,您希望的公告下载范围是:【公告关键词】:xxx,【股市板块】:xxx 输入: 【Y/y】:输出:开始下载 【N/n】:输出:返回上级目录 |
| Q. 退出程序 |
1.3 Main函数展示
- def main():
- announcementDownloader = Cninfo()
- while True:
- print("\n请选择功能:")
- print("A. 根据对应日期查询已下载公告数量")
- print("B. 下载公告")
- print("Q. 退出程序")
- choice = input("请输入选项(A/B/Q): ").upper()
- if choice == "A":
- date = str(input("请输入目标日期(格式:YYYY-MM-DD): "))
- total = announcementDownloader.query_record(date)
- downloaded = announcementDownloader.db.get_count_by_date(date)
- print(f"\n目标日期公告数量: {total}")
- print(f"目标日期已下载公告数量: {downloaded}")
- elif choice == "B":
- print("请选择您要使用的下载功能")
- print("a. 基础下载(下载日期区间内所有公告)")
- print(
- "b. 进阶下载(您可根据【股票代码】【公告关键词】【股市板块】筛选公告进行下载)"
- )
- print("e. 返回上一级目录")
- print("q. 退出程序")
- subchoice = input("请输入选项(a / b / e / q)").lower
- if subchoice == "a":
- print("\n请输入您想要查询的公告日期区间")
- start_date = input("请输入【开始日期】(格式:YYYY-MM-DD): ")
- end_date = input("请输入【结束日期】(格式:YYYY-MM-DD): ")
- print(f"您希望的查询日期区间是: {start_date}~{end_date}")
- confirm = input("请确认开始下载(Y/N)").upper
- if confirm == "Y":
- print("正在为您启动下载...")
- announcementDownloader.query(start_date, end_date)
- print("下载完成")
- else:
- print("返回上一级目录")
- elif subchoice == "b":
- print("\n请输入您想要查询的公告日期区间")
- start_date = input("请输入【开始日期】(格式:YYYY-MM-DD): ")
- end_date = input("请输入【结束日期】(格式:YYYY-MM-DD): ")
- print(f"您希望的查询日期区间是: {start_date}~{end_date}")
- # personalize
- stock_code = input("请输入【股票代码】,若不需要,请输入【NO】: ")
- if stock_code == "NO":
- stock_code = "NoSet"
- keywords = input("请输入【公告关键词】,若不需要,请输入【NO】: ")
- if keywords == "NO":
- keywords = "NoSet"
- print(
- "股市板块有:\nsz:深市 \nszmb:深主板 \nszcy:创业板 \nsh:沪市 \nshmb:沪主板 \nshkcp:科创板 \nbj:北交所"
- )
- plate = input(
- "请输入【股市板块】对应缩写(sz/szmb/szcy/sh/shmb/shkcp/bj),若不需要,请输入【NO】: "
- )
- if plate == "NO":
- plate = "NoSet"
- print(f"\n请确认,您希望的公告下载范围是:")
- print(f"【股票代码】: {stock_code}")
- print(f"【公告关键词】: {keywords}")
- print(f"【股市板块】: {plate}")
- confirm = input("确认下载?【Y/y】确认,【N/n】返回 (Y/N): ").upper()
- if confirm == "Y":
- print("开始下载...")
- # 调用进阶下载函数
- stock_code = stock_code if stock_code != "NoSet" else ""
- keywords = keywords if keywords != "NoSet" else ""
- plate = plate if plate != "NoSet" else ""
- announcementDownloader.edit_payload(stock_code, keywords, plate)
- announcementDownloader.query(start_date, end_date)
- print("下载完成")
- else:
- print("返回上级目录")
- continue
- elif subchoice == "q":
- break
- else:
- print("返回上一级目录")
- elif choice == "Q":
- print("程序退出")
- break
- else:
- print("无效选项,请重新选择")
V. 下载公告文件
1. 文件公告展示页
1.1 页面展示
点击文件公告进行查看,发现这个页面是一个【网址】,并不是一个可供下载公告文件的【url网址】

1.2 下载方式 - 【selenium.driver】
通过页面内容我们可以发现,这里并没有公告的直接url网址。
<button type="button" class="el-button el-button--primary el-button--mini"><!----><!----><span><i class="iconfont icongonggaoxiazai"></i> 公告下载</span></button>因此,我们需要构造每个【公告展示页的网址】进入页面,再通过【selenium.driver】点击【公告下载】按钮进行公告下载。

2. 分析response来构造公告文件展示页
2.1 从response得到url
我们需要从response中得到【文件公告展示页】的url
- "announcements": [
- {
- "id": null,
- "secCode": "xxxx",
- "secName": "xxxx",
- "orgId": "xxxx",
- "announcementId": "xxxx",
- "announcementTitle": "xxxx公告",
2.2 构造url
经测试,一个base_url + 公告id 是到达【文件公告展示页】的最基础url
- base_url = "https://www.cninfo.com.cn/new/disclosure/detail?"
- final_url = f"{base_url}announcementId={announcement_id}"

3. 下载单个公告
#selenium类请见【V. selenium类代码】
因为每一个公告的url实际上的出来是html,并不能直接根据url下载文件。因此,我们使用selenium来模拟按钮点击下载公告文件
点击元素:
download_link = dc._wait_and_highlight(By.XPATH, "//button[contains(.,'公告下载')]")
- def save_file(self, url, download_dir = 'cninfo_file/announcements', max_attempt = 1):
- dc = None
- download_status = False
- try:
- dc = DriverController(download_dir = download_dir)
- if not dc.driver: # 确保浏览器未初始化
- dc.start_browser()
- dc.driver.get(url)
- save_dir = download_dir
- #attempt
- for attempt in range(max_attempt):
- try:
- # 记录下载前的文件状态
- original_files = set(f for f in os.listdir(save_dir)
- if os.path.isfile(os.path.join(save_dir, f)))
- #download click
- time.sleep(0.5)
- from selenium.webdriver.common.by import By
- download_link = dc._wait_and_highlight(By.XPATH, "//button[contains(.,'公告下载')]")
- dc._reliable_click(download_link)
- dc.logger.info('file start downloading ...')
- # 监控下载进度
- for _ in range(60): # 最多等待30秒
- time.sleep(0.5) # 控制间隔时间
- # current_file 排除新文件
- current_files = set(
- f for f in os.listdir(save_dir)
- if os.path.isfile(os.path.join(save_dir, f))
- and not f.endswith('.crdownload') # 核心修复点
- )
- new_files = current_files - original_files
- # 检查新文件
- if new_files:
- # 查看最新修改的文件
- newest_file = max(new_files,
- key=lambda f: os.path.getmtime(os.path.join(save_dir, f)))
- temp_path = os.path.join(save_dir, newest_file)
- # 检查文件是否完整(大小稳定)
- size1 = os.path.getsize(temp_path)
- time.sleep(random.uniform(0.5, 1.0))
- size2 = os.path.getsize(temp_path)
- if size1 == size2 and size1 > 0:
- download_status = True
- break
- except Exception as e:
- dc.logger.error(f"Download attempt {attempt+1} failed with error: {str(e)}")
- dc._take_screenshot("download_error")
- except Exception as e:
- dc.logger.error(f"下载过程中发生严重错误: {str(e)}")
- finally:
- if dc:
- dc.close()
- return download_status
4. 文件查重(防止重复下载文件)+ 文件元数据 - 数据库
我们引入数据库来进行下载数据管理,储存元数据与查重功能
通过response的信息,分析出我们需要的数据库字段:
- - secCode: 股票代码
- - secName: 股票名称
- - announcementId: 公告ID
- - announcementTitle: 公告标题
- - downloadUrl: 公告URL
- - pageColumn: 页面栏目
从中我们可以知道announcementId是唯一的,对于每一个公告来说。因此,我们用announcementId来去重
4.1 去重检测字段
写在db类里的函数:当announcementId已存在的时候,return True
因此我们在使用函数下载公告的时候,需要下载return False的公告,再将公告信息存入db
- #db class
- def record_exists(self, announcement_id: str) -> bool:
- """
- 检查公告是否已存在
- 参数:
- announcement_id: 公告ID
- 返回:
- bool: 是否存在
- """
- return announcement_id in self._id_cache
4.2 主程序中将公告元数据插入数据库内容
- record = {
- 'secCode': announcement.get('secCode'),
- 'secName': announcement.get('secName'),
- 'announcementId': announcement_id,
- 'announcementTitle': announcement.get('announcementTitle'),
- 'downloadUrl':final_url,
- 'pageColumn': announcement.get('pageColumn'),
- }
- if success:
- self.db.save_record(record)
- page_save_cnt += 1
5. 存储每页data
我们通过requests得到是每一页pageNum中所有公告元数据,因此要把他们拆分成一条条数据遍历,再下载公告和储存
5.1 基本思路
- def save_page():
- #构造baseurl
- base_url = "https://www.cninfo.com.cn/new/disclosure/detail?"
- for announcement in announcements:
- announcement_id = announcement.get("announcementId")
- final_url = f"{base_url}announcementId={announcement_id}"
- success = self.save_file(
- final_url, secName, announcementTitle, download_dir
- )
- record = {
- "secCode": announcement.get("secCode"),
- "secName": secName,
- "announcementId": announcement_id,
- "announcementTitle": announcementTitle,
- "downloadUrl": final_url,
- "pageColumn": announcement.get("pageColumn"),
- }
- if success:
- self.db.save_record(record)
- page_save_cnt += 1
- else:
- print("download failed")
- fail_cnt += 1
5.2 代码实现
- def save_page(self, data, start_date, end_date, download_dir = 'cninfo_file/announcements', max_fail = 1):
- page_save_cnt = 0
- try:
- announcements = data.get('announcements')
- if not announcements: # 处理null和空列表
- print(f"no data has found")
- return False, page_save_cnt
- #处理有效数据
- fail_cnt = 0
- #base url
- base_url = 'https://www.cninfo.com.cn/new/disclosure/detail?'
- for announcement in announcements:
- if fail_cnt >= max_fail:
- print('reach maximum failure, break')
- return False, page_save_cnt
- announcement_id = announcement.get('announcementId')
- #print(announcement_id)
- if not announcement or not announcement_id:
- print('get no announcement')
- continue
- #查重检测
- if self.db.record_exists(announcement_id):
- print('annoucement exists')
- continue
- #download
- #create download url
- final_url = f'{base_url}announcementId={announcement_id}'
- success = self.save_file(final_url, download_dir)
- record = {
- 'secCode': announcement.get('secCode'),
- 'secName': announcement.get('secName'),
- 'announcementId': announcement_id,
- 'announcementTitle': announcement.get('announcementTitle'),
- 'downloadUrl':final_url,
- 'pageColumn': announcement.get('pageColumn'),
- }
- if success:
- self.db.save_record(record)
- page_save_cnt += 1
- else:
- print('download failed')
- fail_cnt += 1
- return True, page_save_cnt
- except Exception as e:
- print(f"保存失败: {e}")
- return False, page_save_cnt
6. 处理分页
6.1 分析response获取分页信息
回到【基础请求】的 requests 部分
- payload = {
- ...
- 'pageNum': '1',
- ...
- }
在payload中,pageNum只能获取当前页面
| 参数名 | 示例值 | 作用 |
|---|---|---|
pageNum=1 | 1 | 当前页码 |
从pageNum字段可以看出我们只能下载单页的内容,那么我们应该如何得到所有页面的公告url呢
这便要看看response回来的json有没有什么提示了~
从json中我们可以看到:数据总条数totalAnnouncement,数据总页数totalpages
这将是我们处理分页机制的关键。
- {
- "classifiedAnnouncements": null,
- "totalSecurities": 0,
- "totalAnnouncement": 3034,
- "totalRecordNum": 3034,
- "announcements": [
- {
- "id": null,
- "secCode": "601005",
- "secName": "重庆钢铁",
- "orgId": "9900002184",
- "announcementId": "1224044014",
- "announcementTitle": "第十届董事会第十五次会议决议公告",
- "announcementTime": 1751330340000,
- "adjunctUrl": "finalpage/2025-07-01/1224044014.PDF",
- "adjunctSize": 115,
- "adjunctType": "PDF",
- "storageTime": null,
- "columnId": "250401||251302",
- "pageColumn": "SHZB",
- "announcementType": "01010503||010113||01239901",
- "associateAnnouncement": null,
- "important": null,
- "batchNum": null,
- "announcementContent": "",
- "orgName": null,
- "tileSecName": "重庆钢铁",
- "shortTitle": "第十届董事会第十五次会议决议公告",
- "announcementTypeName": null,
- "secNameList": null
- },
- ],
- "categoryList": null,
- "hasMore": true,
- "totalpages": 101
- }
6.2 设计思路
我们从response里获取了totalpages,由此可以通过payload循环每一页pagNum去得到所有公告信息
- 第一次response:获取totalpages
- 循环response,共totalpages次:获取所有公告信息
6.3 分页实现代码
- def query(start_date, end_date):
- total_page = query_get(start_date, end_date)
- query_all(start_date, end_date, total_page)
- def query_get(start_date, end_date):
- #header
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
- "Host": "www.cninfo.com.cn",
- "Origin": "https://www.cninfo.com.cn",
- "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
- "X-Requested-With": "XMLHttpRequest",
- "Cookie": "JSESSIONID=; insert_cookie=",
- }
- #payload
- payload = {
- 'pageNum': '1',
- 'pageSize': '30',
- 'column': 'szse',
- 'tabName': 'fulltext',
- 'plate': '',
- 'stock': '',
- 'searchkey': '',
- 'secid': '',
- 'category': '',
- 'trade': '',
- 'seDate': f'{start_date}~{end_date}',
- 'sortName': '',
- 'sortType': '',
- 'isHLtitle': 'true',
- }
- #create url
- url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
- response = requests.post(url, headers = headers, data = payload)
- if response.status_code == 200:
- data = response.text
- data = json.loads(data)
- total_record = data['totalRecordNum']
- total_announcement = data['totalAnnouncement']
- total_page = data['totalpages']
- print(f'total records: {total_record}')
- print(f'total announcements: {total_announcement}')
- print(f'total pages: {total_page}')
- return total_page
- else:
- print(f"请求失败,状态码:{response.status_code}")
- return None
- def query_all(start_date, end_date, total_page):
- #header
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
- "Host": "www.cninfo.com.cn",
- "Origin": "https://www.cninfo.com.cn",
- "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
- "X-Requested-With": "XMLHttpRequest",
- "Cookie": "JSESSIONID=; insert_cookie=",
- }
- #create url
- url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
- #payload
- for i in range(1, total_page + 1):
- time.sleep(random.randint(1, 2))
- payload = {
- 'pageNum': f'{i}',
- 'pageSize': '30',
- 'column': 'szse',
- 'tabName': 'fulltext',
- 'plate': '',
- 'stock': '',
- 'searchkey': '',
- 'secid': '',
- 'category': '',
- 'trade': '',
- 'seDate': f'{start_date}~{end_date}',
- 'sortName': '',
- 'sortType': '',
- 'isHLtitle': 'true',
- }
- response = requests.post(url, headers = headers, data = payload)
- if response.status_code == 200:
- data = response.text
- data = json.loads(data)
- success = save_page(data, start_date, end_date)
V. selenium类代码
如果不熟悉selenium类的,可以参考:【python 爬虫】下载上海证交所公告文件-CSDN博客
- from selenium import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.common.action_chains import ActionChains
- import logging
- import os
- import time
- import random
- class DriverController:
- def __init__(self, driver: webdriver.Chrome = None, download_dir: str = None, logger: logging.Logger = None):
- self.driver = driver
- self.logger = logger or self._setup_default_logger()
- self.download_dir = download_dir or "cninfo_file/announcements"#default settings
- os.makedirs(self.download_dir, exist_ok=True)
- self._is_self_managed_driver = False
- def _setup_default_logger(self) -> logging.Logger:
- """
- - 创建默认日志记录器
- - 输入:无
- - 输出:配置好的日志记录器实例
- """
- logger = logging.getLogger('DriverController')
- logger.setLevel(logging.INFO)
- handler = logging.StreamHandler()
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- return logger
- def _setup_driver_options(self, download_dir: str, headless: bool = False) -> webdriver.ChromeOptions:
- """
- - 配置浏览器选项
- - 输入:
- - download_dir: 文件下载目录
- - headless: 是否无头模式运行
- - 输出:配置好的浏览器选项
- """
- options = webdriver.ChromeOptions()
- if headless:
- options.add_argument("--headless")
- options.add_argument("--disable-gpu")
- options.add_argument("--no-sandbox")
- options.add_argument("--disable-blink-features=AutomationControlled")
- options.add_experimental_option("excludeSwitches", ["enable-automation"])
- prefs = {
- "download.default_directory": os.path.abspath(download_dir),
- "download.prompt_for_download": False,
- "plugins.always_open_pdf_externally": True,
- "download.directory_upgrade": True,
- "safebrowsing.enabled": False
- }
- options.add_experimental_option("prefs", prefs)
- return options
- def start_browser(self, headless: bool = False) -> None:
- """
- - 启动浏览器
- - 输入:
- - headless: 是否无界面运行
- - download_dir: 文件下载存储路径
- - 输出:无
- """
- download_dir = self.download_dir
- if self.driver is not None:
- self.logger.warning("Browser already initialized")
- return
- options = self._setup_driver_options(download_dir=download_dir, headless=headless)
- try:
- self.driver = webdriver.Chrome(options=options)
- self.driver.maximize_window()
- self._is_self_managed_driver = True
- self.logger.info(f"Browser started with download path: {os.path.abspath(download_dir)}")
- except Exception as e:
- self.logger.error(f"Failed to start browser: {str(e)}")
- raise
- def _wait_and_highlight(self, by: str, locator: str, timeout: int = 10, highlight_color: str = "red"):
- """
- - 等待并高亮元素
- - 输入:
- - by: 定位策略
- - locator: 元素定位表达式
- - timeout: 最大等待时间
- - highlight_color: 高亮颜色
- - 输出:找到的页面元素
- """
- context = self.driver
- element = WebDriverWait(context, timeout).until(EC.presence_of_element_located((by, locator)))
- self.driver.execute_script(f"arguments[0].style.border='3px solid {highlight_color}';", element)
- time.sleep(random.uniform(0.5, 1.0))
- return element
- def _reliable_click(self, element):
- """
- - 可靠点击元素
- - 输入:
- - element: 要点击的页面元素
- - 输出:无
- """
- try:
- element.click()
- except:
- try:
- ActionChains(self.driver).move_to_element(element).pause(random.uniform(0.5, 1.0)).click().perform()
- except:
- self.driver.execute_script("arguments[0].click();", element)
- def _take_screenshot(self, prefix="error"):
- """
- - 截取当前页面截图
- - 输入:
- - prefix: 截图文件名前缀
- - 输出:无(保存截图文件)
- """
- if not self.driver:
- return ""
- try:
- os.makedirs("screenshots", exist_ok=True)
- timestamp = time.strftime("%Y%m%d_%H%M%S")
- filename = f"screenshots/{prefix}_{timestamp}.png"
- self.driver.save_screenshot(filename)
- self.logger.info(f"截图已保存: {filename}")
- return filename
- except Exception as e:
- self.logger.error(f"截图失败: {str(e)}")
- return ""
- def close(self):
- """
- - 关闭浏览器并清理
- - 输入:无
- - 输出:无
- """
- if self.driver and self._is_self_managed_driver:
- self.driver.quit()
- self.logger.info("Browser closed")
- self.driver = None
VI. database数据库类代码
- import sqlite3
- from typing import Dict
- import os
- import logging
- class CninfoAnnouncementDB:
- def __init__(self, db_path: str):
- """
- 初始化公告数据库
- 参数:
- db_path: 数据库文件路径
- """
- os.makedirs(os.path.dirname(db_path), exist_ok=True)
- self.db_path = os.path.abspath(db_path)
- self.logger = logging.getLogger("CninfoAnnouncementDB")
- self._init_db()
- self._id_cache = set()
- self._load_id_cache()
- def _init_db(self):
- """
- 初始化数据库表结构
- 表结构:
- - secCode: 股票代码
- - secName: 股票名称
- - announcementId: 公告ID(主键)
- - announcementTitle: 公告标题
- - downloadUrl: 公告URL
- - pageColumn: 页面栏目
- """
- with self._get_connection() as conn:
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS announcements (
- secCode TEXT NOT NULL,
- secName TEXT NOT NULL,
- announcementId TEXT PRIMARY KEY,
- announcementTitle TEXT NOT NULL,
- downloadUrl TEXT NOT NULL,
- pageColumn TEXT
- )"""
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_secCode ON announcements(secCode)"
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_announcementId ON announcements(announcementId)"
- )
- def _get_connection(self) -> sqlite3.Connection:
- """获取数据库连接"""
- conn = sqlite3.connect(self.db_path)
- conn.row_factory = sqlite3.Row
- return conn
- def _load_id_cache(self):
- """加载现有公告ID到内存缓存"""
- with self._get_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("SELECT announcementId FROM announcements")
- self._id_cache = {row["announcementId"] for row in cursor.fetchall()}
- def record_exists(self, announcement_id: str) -> bool:
- """
- 检查公告是否已存在
- 参数:
- announcement_id: 公告ID
- 返回:
- bool: 是否存在
- """
- return announcement_id in self._id_cache
- def save_record(self, record: Dict) -> bool:
- """
- 保存公告记录到数据库
- 参数:
- record: 公告字典,必须包含以下字段:
- - secCode: 股票代码
- - secName: 股票名称
- - announcementId: 公告ID
- - announcementTitle: 公告标题
- - downloadUrl: 公告URL
- - pageColumn: 页面栏目
- 返回:
- bool: 是否保存成功
- """
- required_fields = [
- "secCode",
- "secName",
- "announcementId",
- "announcementTitle",
- "downloadUrl",
- "pageColumn",
- ]
- if not all(field in record for field in required_fields):
- self.logger.error("缺少必要字段")
- return False
- try:
- with self._get_connection() as conn:
- conn.execute(
- """
- INSERT OR REPLACE INTO announcements (
- secCode, secName, announcementId,
- announcementTitle, downloadUrl, pageColumn
- ) VALUES (?, ?, ?, ?, ?, ?)
- """,
- (
- record["secCode"],
- record["secName"],
- record["announcementId"],
- record["announcementTitle"],
- record["downloadUrl"],
- record["pageColumn"],
- ),
- )
- self._id_cache.add(record["announcementId"])
- return True
- except Exception as e:
- self.logger.error(f"保存失败: {str(e)}")
- return False
- def get_all_records(self) -> list:
- """获取所有公告记录"""
- with self._get_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM announcements")
- return [dict(row) for row in cursor.fetchall()]
- def delete_record(self, announcement_id: str) -> bool:
- """
- 删除指定公告
- 参数:
- announcement_id: 要删除的公告ID
- 返回:
- bool: 是否删除成功
- """
- try:
- with self._get_connection() as conn:
- conn.execute(
- "DELETE FROM announcements WHERE announcementId = ?",
- (announcement_id,),
- )
- if announcement_id in self._id_cache:
- self._id_cache.remove(announcement_id)
- return True
- except Exception as e:
- self.logger.error(f"删除失败: {str(e)}")
- return False
VII. 常见报错











