受欢迎的博客标签

巨潮资讯网公告下载selenium.driver

Published

下载方式 - 【selenium.driver】

0. 观前提示

本文会用到requests和selenium

如果对上面两个板块不太了解,请结合 【python 爬虫】下载上海证交所公告文件-CSDN博客 来阅读本文章

I. 爬取目标

爬取巨潮资讯网的公告文件

 

II. 开发者工具F12的使用(network部分)

看到有提交的表单,大概猜测一下是用json

因此用开发者工具的network看一下是否有json文件。

从图片中可以看到

query这个xhr会response我们所需要的json 

III. 基础请求 - request获取公告元数据

发现query这个请求能得到我们想要的东西,故而直接构造query请求的url

1. 基础url

(1) 从图片中可以得到我们url的基础部分(通用部分)

(2) 请求方法:POST(后面构造会用到)

url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
python
 
运行

2. header

  1.  
    headers = {
  2.  
    "Accept": "*/*",
  3.  
    "Accept-Encoding": "gzip, deflate, br, zstd",
  4.  
    "Accept-Language": "zh-CN,zh;q=0.9",
  5.  
    "Connection": "keep-alive",
  6.  
    "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  7.  
    "Host": "www.cninfo.com.cn",
  8.  
    "Origin": "https://www.cninfo.com.cn",
  9.  
    "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
  10.  
    "Sec-Fetch-Dest": "empty",
  11.  
    "Sec-Fetch-Mode": "cors",
  12.  
    "Sec-Fetch-Site": "same-origin",
  13.  
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
  14.  
    "X-Requested-With": "XMLHttpRequest",
  15.  
    "Cookie": "JSESSIONID=你的JSESSIONID; insert_cookie=你的insert_cookie", # 替换成你的 Cookie
  16.  
    }
python
 
运行

3. payload - POST请求(发送表单数据)

因为是POST请求,所以请求的参数都放在请求体里面。

参数名示例值作用
pageNum=11当前页码(从1开始)
pageSize=3030每页返回条数(最大可设100)
column=szseszse交易所代码(szse=深交所,sse=上交所)
seDate=2024-12-30~2025-07-01日期范围公告发布日期筛选
stock=股票代码(如000001
category=公告类型(如category_ndbg_szsh=年报)

  1.  
    payload = {
  2.  
    'pageNum': '1',
  3.  
    'pageSize': '30',
  4.  
    'column': 'szse',
  5.  
    'tabName': 'fulltext',
  6.  
    'plate': '',
  7.  
    'stock': '',
  8.  
    'searchkey': '',
  9.  
    'secid': '',
  10.  
    'category': '',
  11.  
    'trade': '',
  12.  
    'seDate': '2024-12-30~2025-07-01',
  13.  
    'sortName': '',
  14.  
    'sortType': '',
  15.  
    'isHLtitle': 'true',
  16.  
    }
python
 
运行

4. requests请求

用的是post方法,将url,headers,payload参数放入

response = requests.post(url, headers=headers, data=payload)
python
 
运行

5. response返回内容

在cninfo中,他返回的数据已经是json格式

  1.  
    {
  2.  
    "classifiedAnnouncements": null,
  3.  
    "totalSecurities": 0,
  4.  
    "totalAnnouncement": 3208,
  5.  
    "totalRecordNum": 3208,
  6.  
    "announcements": [
  7.  
    {
  8.  
    "id": null,
  9.  
    "secCode": "xxxx",
  10.  
    "secName": "xxxx",
  11.  
    "orgId": "xxxx",
  12.  
    "announcementId": "xxxx",
  13.  
    "announcementTitle": "xxxx公告",
  14.  
    "announcementTime": xxxx,
  15.  
    "adjunctUrl": "xxxx.PDF",
  16.  
    "adjunctSize": 234,
  17.  
    "adjunctType": "PDF",
  18.  
    "storageTime": null,
  19.  
    "columnId": "xxxx",
  20.  
    "pageColumn": "SZCY",
  21.  
    "announcementType": "xxxx",
  22.  
    "associateAnnouncement": null,
  23.  
    "important": null,
  24.  
    "batchNum": null,
  25.  
    "announcementContent": "",
  26.  
    "orgName": null,
  27.  
    "tileSecName": "xxxx",
  28.  
    "shortTitle": "xxxx公告",
  29.  
    "announcementTypeName": null,
  30.  
    "secNameList": null
  31.  
    },
  32.  
    ...(总共3208个)
  33.  
    ],
  34.  
    }
javascript
 
运行

6. 基础请求代码

因为我之用了pageSize=1,因此只会得到一页的内容。

  1.  
    import requests
  2.  
    import json
  3.  
     
  4.  
    def cninf():
  5.  
    #header
  6.  
    headers = {
  7.  
    "Accept": "*/*",
  8.  
    "Accept-Encoding": "gzip, deflate, br, zstd",
  9.  
    "Accept-Language": "zh-CN,zh;q=0.9",
  10.  
    "Connection": "keep-alive",
  11.  
    "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  12.  
    "Host": "www.cninfo.com.cn",
  13.  
    "Origin": "https://www.cninfo.com.cn",
  14.  
    "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
  15.  
    "Sec-Fetch-Dest": "empty",
  16.  
    "Sec-Fetch-Mode": "cors",
  17.  
    "Sec-Fetch-Site": "same-origin",
  18.  
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
  19.  
    "X-Requested-With": "XMLHttpRequest",
  20.  
    "Cookie": "JSESSIONID=你的JSESSIONID; insert_cookie=你的COOKIE",
  21.  
    }
  22.  
     
  23.  
    #payload
  24.  
    payload = {
  25.  
    'pageNum': '1',
  26.  
    'pageSize': '30',
  27.  
    'column': 'szse',
  28.  
    'tabName': 'fulltext',
  29.  
    'plate': '',
  30.  
    'stock': '',
  31.  
    'searchkey': '',
  32.  
    'secid': '',
  33.  
    'category': '',
  34.  
    'trade': '',
  35.  
    'seDate': '2024-12-30~2025-07-01',
  36.  
    'sortName': '',
  37.  
    'sortType': '',
  38.  
    'isHLtitle': 'true',
  39.  
    }
  40.  
     
  41.  
    #create url
  42.  
    url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
  43.  
     
  44.  
    response = requests.post(url, headers=headers, data=payload)
  45.  
     
  46.  
    if response.status_code == 200:
  47.  
    return response.text
  48.  
    else:
  49.  
    print(f"请求失败,状态码:{response.status_code}")
  50.  
    return None
  51.  
     
  52.  
    if __name__ == "__main__":
  53.  
    result = cninf()
  54.  
    if result:
  55.  
    print(result)
  56.  
     
python
 
运行

IV. 进阶请求 - 【公告速查】筛选框

我们怎么能只满足于下载公告元数据,都得到文件下载的关键信息了,那必然得继续往下干啦~

1. payload字段填充,下载不同类别的数据

1.1 表单元素运用

表单元素这么多,不运用几个怎么行呢~

根据上文【基础请求】中payload的字段,我们可以输入自己想要的数值,填充进入requests构造里。这样就能得到筛选过的公告数据

  1.  
    payload = {
  2.  
    'pageNum': '1',
  3.  
    'pageSize': '30',
  4.  
    'column': 'szse',
  5.  
    'tabName': 'fulltext',
  6.  
    'plate': '',
  7.  
    'stock': '',
  8.  
    'searchkey': '',
  9.  
    'secid': '',
  10.  
    'category': '',
  11.  
    'trade': '',
  12.  
    'seDate': '2024-12-30~2025-07-01',
  13.  
    'sortName': '',
  14.  
    'sortType': '',
  15.  
    'isHLtitle': 'true',
  16.  
    }
  17.  
     
  18.  
    response = requests.post(url, headers=headers, data=payload)
python
 
运行

   

attribute是否为定值meaning
pageNum

false

定位到第x页的数据

每一页数据都不同,

输入payload的时候'pageNum': 'i' -- 得到第 i 页数据,不是得到总共i页数据

pageSize

true

'pageSize': '30'

不管你参数如何变化,始终得到的数据条数就是30条

column

false

szse:深沪京

hke:港股

third:三板

fund:基金

bond:债券

regulator:监管

pre_disclosure:预披露

platefalse

板块

对应表单里面的【板块】

sz:深市

szmb:深主板

szcy:创业板

sh:沪市

shmb:沪主板

shkcp:科创板

bj:北交所

 

test:

stockfalse

股票代码

对应表单里的【代码/简称/拼音】

 

test:

searchKeyfalse

关键字搜索

对应表单里的【标题关键字】

 

test:

categoryfalse

分类

对应表单里的【选择分类】

中文分类字段名 (Field Name)中文分类字段名 (Field Name)
年报category_ndbg_szsh增发category_zf_szsh
半年报category_bndbg_szsh股权激励category_gqjl_szsh
一季报category_yjdbg_szsh配股category_pg_szsh
三季报category_sjdbg_szsh解禁category_jj_szsh
业绩预告category_yjygjxz_szsh公司债category_gszq_szsh
权益分派category_qyfpxzcs_szsh可转债category_kzzq_szsh
董事会category_dshgg_szsh其他融资category_qtrz_szsh
监事会category_jshgg_szsh股权变动category_gqbd_szsh
股东会category_gddh_szsh补充更正category_bcgz_szsh
日常经营category_rcjy_szsh澄清致歉category_cqdq_szsh
公司治理category_gszl_szsh风险提示category_fxts_szsh
中介报告category_zj_szsh特别处理和退市category_tbclts_szsh
首发category_sf_szsh退市整理期category_tszlq_szsh

 

test:

tradefalse

行业

对应表单里的【行业】

 

test:

seDatefalse

日期区间

对应表单里的【日期选择】

1.2 函数功能设计(流程)

功能目录选项流程设计
A. 根据对应日期查询:公告总条数 / 已下载条数

- 输入:目标日期

- 输出:

        目标日期公告数量

        目标日期已下载公告数量

B. 下载公告

(1)请输入您想要查询的公告日期区间(输入【开始日期】【结束日期】)

输入:

        请输入【开始日期】:

        请输入【结束日期】:

输出:您希望的查询日期区间是:【开始日期】~【结束日期】

 


(2)请选择你要使用的下载功能

a. 基础下载(下载日期区间内所有公告)

输出:开始下载

b. 进阶下载(根据【公告关键词】【股市板块】筛选公告进行下载)

输出:

        请输入【公告关键词】,若不需要,请输入【NO】

        请输入【股市板块】,若不需要,请输入【NO】

输出:

        请确认,您希望的公告下载范围是:【公告关键词】:xxx,【股市板块】:xxx

输入:

        【Y/y】:输出:开始下载

        【N/n】:输出:返回上级目录 

Q. 退出程序 

 

1.3 Main函数展示

  1.  
    def main():
  2.  
    announcementDownloader = Cninfo()
  3.  
    while True:
  4.  
    print("\n请选择功能:")
  5.  
    print("A. 根据对应日期查询已下载公告数量")
  6.  
    print("B. 下载公告")
  7.  
    print("Q. 退出程序")
  8.  
     
  9.  
    choice = input("请输入选项(A/B/Q): ").upper()
  10.  
     
  11.  
    if choice == "A":
  12.  
    date = str(input("请输入目标日期(格式:YYYY-MM-DD): "))
  13.  
    total = announcementDownloader.query_record(date)
  14.  
    downloaded = announcementDownloader.db.get_count_by_date(date)
  15.  
    print(f"\n目标日期公告数量: {total}")
  16.  
    print(f"目标日期已下载公告数量: {downloaded}")
  17.  
     
  18.  
    elif choice == "B":
  19.  
    print("请选择您要使用的下载功能")
  20.  
    print("a. 基础下载(下载日期区间内所有公告)")
  21.  
    print(
  22.  
    "b. 进阶下载(您可根据【股票代码】【公告关键词】【股市板块】筛选公告进行下载)"
  23.  
    )
  24.  
    print("e. 返回上一级目录")
  25.  
    print("q. 退出程序")
  26.  
     
  27.  
    subchoice = input("请输入选项(a / b / e / q)").lower
  28.  
     
  29.  
    if subchoice == "a":
  30.  
    print("\n请输入您想要查询的公告日期区间")
  31.  
    start_date = input("请输入【开始日期】(格式:YYYY-MM-DD): ")
  32.  
    end_date = input("请输入【结束日期】(格式:YYYY-MM-DD): ")
  33.  
    print(f"您希望的查询日期区间是: {start_date}~{end_date}")
  34.  
    confirm = input("请确认开始下载(Y/N)").upper
  35.  
    if confirm == "Y":
  36.  
    print("正在为您启动下载...")
  37.  
    announcementDownloader.query(start_date, end_date)
  38.  
    print("下载完成")
  39.  
    else:
  40.  
    print("返回上一级目录")
  41.  
     
  42.  
    elif subchoice == "b":
  43.  
    print("\n请输入您想要查询的公告日期区间")
  44.  
    start_date = input("请输入【开始日期】(格式:YYYY-MM-DD): ")
  45.  
    end_date = input("请输入【结束日期】(格式:YYYY-MM-DD): ")
  46.  
    print(f"您希望的查询日期区间是: {start_date}~{end_date}")
  47.  
     
  48.  
    # personalize
  49.  
    stock_code = input("请输入【股票代码】,若不需要,请输入【NO】: ")
  50.  
    if stock_code == "NO":
  51.  
    stock_code = "NoSet"
  52.  
     
  53.  
    keywords = input("请输入【公告关键词】,若不需要,请输入【NO】: ")
  54.  
    if keywords == "NO":
  55.  
    keywords = "NoSet"
  56.  
     
  57.  
    print(
  58.  
    "股市板块有:\nsz:深市 \nszmb:深主板 \nszcy:创业板 \nsh:沪市 \nshmb:沪主板 \nshkcp:科创板 \nbj:北交所"
  59.  
    )
  60.  
    plate = input(
  61.  
    "请输入【股市板块】对应缩写(sz/szmb/szcy/sh/shmb/shkcp/bj),若不需要,请输入【NO】: "
  62.  
    )
  63.  
    if plate == "NO":
  64.  
    plate = "NoSet"
  65.  
     
  66.  
    print(f"\n请确认,您希望的公告下载范围是:")
  67.  
    print(f"【股票代码】: {stock_code}")
  68.  
    print(f"【公告关键词】: {keywords}")
  69.  
    print(f"【股市板块】: {plate}")
  70.  
     
  71.  
    confirm = input("确认下载?【Y/y】确认,【N/n】返回 (Y/N): ").upper()
  72.  
    if confirm == "Y":
  73.  
    print("开始下载...")
  74.  
    # 调用进阶下载函数
  75.  
    stock_code = stock_code if stock_code != "NoSet" else ""
  76.  
    keywords = keywords if keywords != "NoSet" else ""
  77.  
    plate = plate if plate != "NoSet" else ""
  78.  
    announcementDownloader.edit_payload(stock_code, keywords, plate)
  79.  
    announcementDownloader.query(start_date, end_date)
  80.  
    print("下载完成")
  81.  
    else:
  82.  
    print("返回上级目录")
  83.  
    continue
  84.  
    elif subchoice == "q":
  85.  
    break
  86.  
     
  87.  
    else:
  88.  
    print("返回上一级目录")
  89.  
     
  90.  
    elif choice == "Q":
  91.  
    print("程序退出")
  92.  
    break
  93.  
     
  94.  
    else:
  95.  
    print("无效选项,请重新选择")
python
 
运行

V. 下载公告文件

1. 文件公告展示页

1.1 页面展示

点击文件公告进行查看,发现这个页面是一个【网址】,并不是一个可供下载公告文件的【url网址】

1.2 下载方式 - 【selenium.driver】

通过页面内容我们可以发现,这里并没有公告的直接url网址。

<button type="button" class="el-button el-button--primary el-button--mini"><!----><!----><span><i class="iconfont icongonggaoxiazai"></i> 公告下载</span></button>
html
 

因此,我们需要构造每个【公告展示页的网址】进入页面,再通过【selenium.driver】点击【公告下载】按钮进行公告下载。

 

2. 分析response来构造公告文件展示页

2.1 从response得到url

我们需要从response中得到【文件公告展示页】的url

  1.  
    "announcements": [
  2.  
    {
  3.  
    "id": null,
  4.  
    "secCode": "xxxx",
  5.  
    "secName": "xxxx",
  6.  
    "orgId": "xxxx",
  7.  
    "announcementId": "xxxx",
  8.  
    "announcementTitle": "xxxx公告",
javascript
 
运行

2.2 构造url

经测试,一个base_url + 公告id 是到达【文件公告展示页】的最基础url

  1.  
    base_url = "https://www.cninfo.com.cn/new/disclosure/detail?"
  2.  
     
  3.  
    final_url = f"{base_url}announcementId={announcement_id}"
python
 
运行

3. 下载单个公告

#selenium类请见【V. selenium类代码】

 因为每一个公告的url实际上的出来是html,并不能直接根据url下载文件。因此,我们使用selenium来模拟按钮点击下载公告文件

点击元素:

download_link = dc._wait_and_highlight(By.XPATH, "//button[contains(.,'公告下载')]")
python
 
运行

 

  1.  
    def save_file(self, url, download_dir = 'cninfo_file/announcements', max_attempt = 1):
  2.  
    dc = None
  3.  
    download_status = False
  4.  
    try:
  5.  
    dc = DriverController(download_dir = download_dir)
  6.  
    if not dc.driver: # 确保浏览器未初始化
  7.  
    dc.start_browser()
  8.  
    dc.driver.get(url)
  9.  
    save_dir = download_dir
  10.  
    #attempt
  11.  
    for attempt in range(max_attempt):
  12.  
    try:
  13.  
    # 记录下载前的文件状态
  14.  
    original_files = set(f for f in os.listdir(save_dir)
  15.  
    if os.path.isfile(os.path.join(save_dir, f)))
  16.  
     
  17.  
    #download click
  18.  
    time.sleep(0.5)
  19.  
    from selenium.webdriver.common.by import By
  20.  
    download_link = dc._wait_and_highlight(By.XPATH, "//button[contains(.,'公告下载')]")
  21.  
    dc._reliable_click(download_link)
  22.  
    dc.logger.info('file start downloading ...')
  23.  
     
  24.  
    # 监控下载进度
  25.  
    for _ in range(60): # 最多等待30秒
  26.  
    time.sleep(0.5) # 控制间隔时间
  27.  
    # current_file 排除新文件
  28.  
    current_files = set(
  29.  
    f for f in os.listdir(save_dir)
  30.  
    if os.path.isfile(os.path.join(save_dir, f))
  31.  
    and not f.endswith('.crdownload') # 核心修复点
  32.  
    )
  33.  
    new_files = current_files - original_files
  34.  
     
  35.  
    # 检查新文件
  36.  
    if new_files:
  37.  
    # 查看最新修改的文件
  38.  
    newest_file = max(new_files,
  39.  
    key=lambda f: os.path.getmtime(os.path.join(save_dir, f)))
  40.  
    temp_path = os.path.join(save_dir, newest_file)
  41.  
     
  42.  
    # 检查文件是否完整(大小稳定)
  43.  
    size1 = os.path.getsize(temp_path)
  44.  
    time.sleep(random.uniform(0.5, 1.0))
  45.  
    size2 = os.path.getsize(temp_path)
  46.  
     
  47.  
    if size1 == size2 and size1 > 0:
  48.  
    download_status = True
  49.  
    break
  50.  
     
  51.  
    except Exception as e:
  52.  
    dc.logger.error(f"Download attempt {attempt+1} failed with error: {str(e)}")
  53.  
    dc._take_screenshot("download_error")
  54.  
     
  55.  
    except Exception as e:
  56.  
    dc.logger.error(f"下载过程中发生严重错误: {str(e)}")
  57.  
    finally:
  58.  
    if dc:
  59.  
    dc.close()
  60.  
    return download_status
python
 
运行

4. 文件查重(防止重复下载文件)+ 文件元数据 - 数据库

我们引入数据库来进行下载数据管理,储存元数据与查重功能

通过response的信息,分析出我们需要的数据库字段:

  1.  
    - secCode: 股票代码
  2.  
    - secName: 股票名称
  3.  
    - announcementId: 公告ID
  4.  
    - announcementTitle: 公告标题
  5.  
    - downloadUrl: 公告URL
  6.  
    - pageColumn: 页面栏目
python
 
运行

从中我们可以知道announcementId是唯一的,对于每一个公告来说。因此,我们用announcementId来去重 

4.1 去重检测字段

写在db类里的函数:当announcementId已存在的时候,return True

因此我们在使用函数下载公告的时候,需要下载return False的公告,再将公告信息存入db

  1.  
    #db class
  2.  
    def record_exists(self, announcement_id: str) -> bool:
  3.  
    """
  4.  
    检查公告是否已存在
  5.  
    参数:
  6.  
    announcement_id: 公告ID
  7.  
    返回:
  8.  
    bool: 是否存在
  9.  
    """
  10.  
    return announcement_id in self._id_cache
python
 
运行

4.2 主程序中将公告元数据插入数据库内容

  1.  
    record = {
  2.  
    'secCode': announcement.get('secCode'),
  3.  
    'secName': announcement.get('secName'),
  4.  
    'announcementId': announcement_id,
  5.  
    'announcementTitle': announcement.get('announcementTitle'),
  6.  
    'downloadUrl':final_url,
  7.  
    'pageColumn': announcement.get('pageColumn'),
  8.  
    }
  9.  
    if success:
  10.  
    self.db.save_record(record)
  11.  
    page_save_cnt += 1
python
 
运行

 5. 存储每页data

我们通过requests得到是每一页pageNum中所有公告元数据,因此要把他们拆分成一条条数据遍历,再下载公告和储存

5.1 基本思路

  1.  
    def save_page():
  2.  
    #构造baseurl
  3.  
    base_url = "https://www.cninfo.com.cn/new/disclosure/detail?"
  4.  
     
  5.  
    for announcement in announcements:
  6.  
    announcement_id = announcement.get("announcementId")
  7.  
     
  8.  
    final_url = f"{base_url}announcementId={announcement_id}"
  9.  
     
  10.  
    success = self.save_file(
  11.  
    final_url, secName, announcementTitle, download_dir
  12.  
    )
  13.  
     
  14.  
    record = {
  15.  
    "secCode": announcement.get("secCode"),
  16.  
    "secName": secName,
  17.  
    "announcementId": announcement_id,
  18.  
    "announcementTitle": announcementTitle,
  19.  
    "downloadUrl": final_url,
  20.  
    "pageColumn": announcement.get("pageColumn"),
  21.  
    }
  22.  
    if success:
  23.  
    self.db.save_record(record)
  24.  
    page_save_cnt += 1
  25.  
    else:
  26.  
    print("download failed")
  27.  
    fail_cnt += 1
python
 
运行

5.2 代码实现 

  1.  
    def save_page(self, data, start_date, end_date, download_dir = 'cninfo_file/announcements', max_fail = 1):
  2.  
     
  3.  
    page_save_cnt = 0
  4.  
    try:
  5.  
    announcements = data.get('announcements')
  6.  
    if not announcements: # 处理null和空列表
  7.  
    print(f"no data has found")
  8.  
    return False, page_save_cnt
  9.  
     
  10.  
    #处理有效数据
  11.  
    fail_cnt = 0
  12.  
    #base url
  13.  
    base_url = 'https://www.cninfo.com.cn/new/disclosure/detail?'
  14.  
    for announcement in announcements:
  15.  
    if fail_cnt >= max_fail:
  16.  
    print('reach maximum failure, break')
  17.  
    return False, page_save_cnt
  18.  
    announcement_id = announcement.get('announcementId')
  19.  
    #print(announcement_id)
  20.  
     
  21.  
    if not announcement or not announcement_id:
  22.  
    print('get no announcement')
  23.  
    continue
  24.  
     
  25.  
    #查重检测
  26.  
    if self.db.record_exists(announcement_id):
  27.  
    print('annoucement exists')
  28.  
    continue
  29.  
     
  30.  
    #download
  31.  
    #create download url
  32.  
    final_url = f'{base_url}announcementId={announcement_id}'
  33.  
    success = self.save_file(final_url, download_dir)
  34.  
     
  35.  
    record = {
  36.  
    'secCode': announcement.get('secCode'),
  37.  
    'secName': announcement.get('secName'),
  38.  
    'announcementId': announcement_id,
  39.  
    'announcementTitle': announcement.get('announcementTitle'),
  40.  
    'downloadUrl':final_url,
  41.  
    'pageColumn': announcement.get('pageColumn'),
  42.  
    }
  43.  
    if success:
  44.  
    self.db.save_record(record)
  45.  
    page_save_cnt += 1
  46.  
    else:
  47.  
    print('download failed')
  48.  
    fail_cnt += 1
  49.  
     
  50.  
    return True, page_save_cnt
  51.  
     
  52.  
    except Exception as e:
  53.  
    print(f"保存失败: {e}")
  54.  
    return False, page_save_cnt
python
 
运行

 

6. 处理分页

6.1 分析response获取分页信息

回到【基础请求】的 requests 部分

  1.  
    payload = {
  2.  
    ...
  3.  
    'pageNum': '1',
  4.  
    ...
  5.  
    }
python
 
运行

在payload中,pageNum只能获取当前页面

参数名示例值作用
pageNum=11当前页码

从pageNum字段可以看出我们只能下载单页的内容,那么我们应该如何得到所有页面的公告url呢

这便要看看response回来的json有没有什么提示了~

 

从json中我们可以看到:数据总条数totalAnnouncement,数据总页数totalpages

这将是我们处理分页机制的关键。

  1.  
    {
  2.  
    "classifiedAnnouncements": null,
  3.  
    "totalSecurities": 0,
  4.  
    "totalAnnouncement": 3034,
  5.  
    "totalRecordNum": 3034,
  6.  
    "announcements": [
  7.  
    {
  8.  
    "id": null,
  9.  
    "secCode": "601005",
  10.  
    "secName": "重庆钢铁",
  11.  
    "orgId": "9900002184",
  12.  
    "announcementId": "1224044014",
  13.  
    "announcementTitle": "第十届董事会第十五次会议决议公告",
  14.  
    "announcementTime": 1751330340000,
  15.  
    "adjunctUrl": "finalpage/2025-07-01/1224044014.PDF",
  16.  
    "adjunctSize": 115,
  17.  
    "adjunctType": "PDF",
  18.  
    "storageTime": null,
  19.  
    "columnId": "250401||251302",
  20.  
    "pageColumn": "SHZB",
  21.  
    "announcementType": "01010503||010113||01239901",
  22.  
    "associateAnnouncement": null,
  23.  
    "important": null,
  24.  
    "batchNum": null,
  25.  
    "announcementContent": "",
  26.  
    "orgName": null,
  27.  
    "tileSecName": "重庆钢铁",
  28.  
    "shortTitle": "第十届董事会第十五次会议决议公告",
  29.  
    "announcementTypeName": null,
  30.  
    "secNameList": null
  31.  
    },
  32.  
    ],
  33.  
    "categoryList": null,
  34.  
    "hasMore": true,
  35.  
    "totalpages": 101
  36.  
    }
javascript
 
运行

 

6.2 设计思路

我们从response里获取了totalpages,由此可以通过payload循环每一页pagNum去得到所有公告信息

  • 第一次response:获取totalpages
  • 循环response,共totalpages次:获取所有公告信息

6.3 分页实现代码

  1.  
    def query(start_date, end_date):
  2.  
    total_page = query_get(start_date, end_date)
  3.  
    query_all(start_date, end_date, total_page)
python
 
运行
  1.  
    def query_get(start_date, end_date):
  2.  
    #header
  3.  
    headers = {
  4.  
    "Accept": "*/*",
  5.  
    "Accept-Encoding": "gzip, deflate, br, zstd",
  6.  
    "Accept-Language": "zh-CN,zh;q=0.9",
  7.  
    "Connection": "keep-alive",
  8.  
    "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  9.  
    "Host": "www.cninfo.com.cn",
  10.  
    "Origin": "https://www.cninfo.com.cn",
  11.  
    "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
  12.  
    "Sec-Fetch-Dest": "empty",
  13.  
    "Sec-Fetch-Mode": "cors",
  14.  
    "Sec-Fetch-Site": "same-origin",
  15.  
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
  16.  
    "X-Requested-With": "XMLHttpRequest",
  17.  
    "Cookie": "JSESSIONID=; insert_cookie=",
  18.  
    }
  19.  
     
  20.  
    #payload
  21.  
    payload = {
  22.  
    'pageNum': '1',
  23.  
    'pageSize': '30',
  24.  
    'column': 'szse',
  25.  
    'tabName': 'fulltext',
  26.  
    'plate': '',
  27.  
    'stock': '',
  28.  
    'searchkey': '',
  29.  
    'secid': '',
  30.  
    'category': '',
  31.  
    'trade': '',
  32.  
    'seDate': f'{start_date}~{end_date}',
  33.  
    'sortName': '',
  34.  
    'sortType': '',
  35.  
    'isHLtitle': 'true',
  36.  
    }
  37.  
     
  38.  
    #create url
  39.  
    url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
  40.  
     
  41.  
    response = requests.post(url, headers = headers, data = payload)
  42.  
     
  43.  
    if response.status_code == 200:
  44.  
    data = response.text
  45.  
    data = json.loads(data)
  46.  
    total_record = data['totalRecordNum']
  47.  
    total_announcement = data['totalAnnouncement']
  48.  
    total_page = data['totalpages']
  49.  
    print(f'total records: {total_record}')
  50.  
    print(f'total announcements: {total_announcement}')
  51.  
    print(f'total pages: {total_page}')
  52.  
    return total_page
  53.  
    else:
  54.  
    print(f"请求失败,状态码:{response.status_code}")
  55.  
    return None
python
 
运行
  1.  
    def query_all(start_date, end_date, total_page):
  2.  
    #header
  3.  
    headers = {
  4.  
    "Accept": "*/*",
  5.  
    "Accept-Encoding": "gzip, deflate, br, zstd",
  6.  
    "Accept-Language": "zh-CN,zh;q=0.9",
  7.  
    "Connection": "keep-alive",
  8.  
    "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  9.  
    "Host": "www.cninfo.com.cn",
  10.  
    "Origin": "https://www.cninfo.com.cn",
  11.  
    "Referer": "https://www.cninfo.com.cn/new/commonUrl/pageOfSearch?url=disclosure/list/search",
  12.  
    "Sec-Fetch-Dest": "empty",
  13.  
    "Sec-Fetch-Mode": "cors",
  14.  
    "Sec-Fetch-Site": "same-origin",
  15.  
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
  16.  
    "X-Requested-With": "XMLHttpRequest",
  17.  
    "Cookie": "JSESSIONID=; insert_cookie=",
  18.  
    }
  19.  
     
  20.  
    #create url
  21.  
    url = 'https://www.cninfo.com.cn/new/hisAnnouncement/query'
  22.  
     
  23.  
    #payload
  24.  
    for i in range(1, total_page + 1):
  25.  
    time.sleep(random.randint(1, 2))
  26.  
    payload = {
  27.  
    'pageNum': f'{i}',
  28.  
    'pageSize': '30',
  29.  
    'column': 'szse',
  30.  
    'tabName': 'fulltext',
  31.  
    'plate': '',
  32.  
    'stock': '',
  33.  
    'searchkey': '',
  34.  
    'secid': '',
  35.  
    'category': '',
  36.  
    'trade': '',
  37.  
    'seDate': f'{start_date}~{end_date}',
  38.  
    'sortName': '',
  39.  
    'sortType': '',
  40.  
    'isHLtitle': 'true',
  41.  
    }
  42.  
     
  43.  
    response = requests.post(url, headers = headers, data = payload)
  44.  
     
  45.  
    if response.status_code == 200:
  46.  
    data = response.text
  47.  
    data = json.loads(data)
  48.  
    success = save_page(data, start_date, end_date)
python
 
运行

V. selenium类代码

如果不熟悉selenium类的,可以参考:【python 爬虫】下载上海证交所公告文件-CSDN博客

  1.  
    from selenium import webdriver
  2.  
    from selenium.webdriver.common.by import By
  3.  
    from selenium.webdriver.support.ui import WebDriverWait
  4.  
    from selenium.webdriver.support import expected_conditions as EC
  5.  
    from selenium.webdriver.common.action_chains import ActionChains
  6.  
    import logging
  7.  
    import os
  8.  
    import time
  9.  
    import random
  10.  
     
  11.  
    class DriverController:
  12.  
    def __init__(self, driver: webdriver.Chrome = None, download_dir: str = None, logger: logging.Logger = None):
  13.  
    self.driver = driver
  14.  
    self.logger = logger or self._setup_default_logger()
  15.  
    self.download_dir = download_dir or "cninfo_file/announcements"#default settings
  16.  
    os.makedirs(self.download_dir, exist_ok=True)
  17.  
    self._is_self_managed_driver = False
  18.  
     
  19.  
    def _setup_default_logger(self) -> logging.Logger:
  20.  
    """
  21.  
    - 创建默认日志记录器
  22.  
    - 输入:无
  23.  
    - 输出:配置好的日志记录器实例
  24.  
    """
  25.  
    logger = logging.getLogger('DriverController')
  26.  
    logger.setLevel(logging.INFO)
  27.  
    handler = logging.StreamHandler()
  28.  
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  29.  
    handler.setFormatter(formatter)
  30.  
    logger.addHandler(handler)
  31.  
    return logger
  32.  
     
  33.  
    def _setup_driver_options(self, download_dir: str, headless: bool = False) -> webdriver.ChromeOptions:
  34.  
    """
  35.  
    - 配置浏览器选项
  36.  
    - 输入:
  37.  
    - download_dir: 文件下载目录
  38.  
    - headless: 是否无头模式运行
  39.  
    - 输出:配置好的浏览器选项
  40.  
    """
  41.  
    options = webdriver.ChromeOptions()
  42.  
    if headless:
  43.  
    options.add_argument("--headless")
  44.  
    options.add_argument("--disable-gpu")
  45.  
    options.add_argument("--no-sandbox")
  46.  
    options.add_argument("--disable-blink-features=AutomationControlled")
  47.  
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
  48.  
    prefs = {
  49.  
    "download.default_directory": os.path.abspath(download_dir),
  50.  
    "download.prompt_for_download": False,
  51.  
    "plugins.always_open_pdf_externally": True,
  52.  
    "download.directory_upgrade": True,
  53.  
    "safebrowsing.enabled": False
  54.  
    }
  55.  
    options.add_experimental_option("prefs", prefs)
  56.  
    return options
  57.  
     
  58.  
    def start_browser(self, headless: bool = False) -> None:
  59.  
    """
  60.  
    - 启动浏览器
  61.  
    - 输入:
  62.  
    - headless: 是否无界面运行
  63.  
    - download_dir: 文件下载存储路径
  64.  
    - 输出:无
  65.  
    """
  66.  
    download_dir = self.download_dir
  67.  
    if self.driver is not None:
  68.  
    self.logger.warning("Browser already initialized")
  69.  
    return
  70.  
    options = self._setup_driver_options(download_dir=download_dir, headless=headless)
  71.  
    try:
  72.  
    self.driver = webdriver.Chrome(options=options)
  73.  
    self.driver.maximize_window()
  74.  
    self._is_self_managed_driver = True
  75.  
    self.logger.info(f"Browser started with download path: {os.path.abspath(download_dir)}")
  76.  
    except Exception as e:
  77.  
    self.logger.error(f"Failed to start browser: {str(e)}")
  78.  
    raise
  79.  
     
  80.  
    def _wait_and_highlight(self, by: str, locator: str, timeout: int = 10, highlight_color: str = "red"):
  81.  
    """
  82.  
    - 等待并高亮元素
  83.  
    - 输入:
  84.  
    - by: 定位策略
  85.  
    - locator: 元素定位表达式
  86.  
    - timeout: 最大等待时间
  87.  
    - highlight_color: 高亮颜色
  88.  
    - 输出:找到的页面元素
  89.  
    """
  90.  
    context = self.driver
  91.  
    element = WebDriverWait(context, timeout).until(EC.presence_of_element_located((by, locator)))
  92.  
    self.driver.execute_script(f"arguments[0].style.border='3px solid {highlight_color}';", element)
  93.  
    time.sleep(random.uniform(0.5, 1.0))
  94.  
    return element
  95.  
     
  96.  
    def _reliable_click(self, element):
  97.  
    """
  98.  
    - 可靠点击元素
  99.  
    - 输入:
  100.  
    - element: 要点击的页面元素
  101.  
    - 输出:无
  102.  
    """
  103.  
    try:
  104.  
    element.click()
  105.  
    except:
  106.  
    try:
  107.  
    ActionChains(self.driver).move_to_element(element).pause(random.uniform(0.5, 1.0)).click().perform()
  108.  
    except:
  109.  
    self.driver.execute_script("arguments[0].click();", element)
  110.  
     
  111.  
    def _take_screenshot(self, prefix="error"):
  112.  
    """
  113.  
    - 截取当前页面截图
  114.  
    - 输入:
  115.  
    - prefix: 截图文件名前缀
  116.  
    - 输出:无(保存截图文件)
  117.  
    """
  118.  
    if not self.driver:
  119.  
    return ""
  120.  
     
  121.  
    try:
  122.  
    os.makedirs("screenshots", exist_ok=True)
  123.  
    timestamp = time.strftime("%Y%m%d_%H%M%S")
  124.  
    filename = f"screenshots/{prefix}_{timestamp}.png"
  125.  
    self.driver.save_screenshot(filename)
  126.  
    self.logger.info(f"截图已保存: {filename}")
  127.  
    return filename
  128.  
    except Exception as e:
  129.  
    self.logger.error(f"截图失败: {str(e)}")
  130.  
    return ""
  131.  
     
  132.  
    def close(self):
  133.  
    """
  134.  
    - 关闭浏览器并清理
  135.  
    - 输入:无
  136.  
    - 输出:无
  137.  
    """
  138.  
    if self.driver and self._is_self_managed_driver:
  139.  
    self.driver.quit()
  140.  
    self.logger.info("Browser closed")
  141.  
    self.driver = None
python
 
运行

VI. database数据库类代码

  1.  
    import sqlite3
  2.  
    from typing import Dict
  3.  
    import os
  4.  
    import logging
  5.  
     
  6.  
     
  7.  
    class CninfoAnnouncementDB:
  8.  
    def __init__(self, db_path: str):
  9.  
    """
  10.  
    初始化公告数据库
  11.  
    参数:
  12.  
    db_path: 数据库文件路径
  13.  
    """
  14.  
    os.makedirs(os.path.dirname(db_path), exist_ok=True)
  15.  
    self.db_path = os.path.abspath(db_path)
  16.  
    self.logger = logging.getLogger("CninfoAnnouncementDB")
  17.  
    self._init_db()
  18.  
    self._id_cache = set()
  19.  
    self._load_id_cache()
  20.  
     
  21.  
    def _init_db(self):
  22.  
    """
  23.  
    初始化数据库表结构
  24.  
    表结构:
  25.  
    - secCode: 股票代码
  26.  
    - secName: 股票名称
  27.  
    - announcementId: 公告ID(主键)
  28.  
    - announcementTitle: 公告标题
  29.  
    - downloadUrl: 公告URL
  30.  
    - pageColumn: 页面栏目
  31.  
    """
  32.  
    with self._get_connection() as conn:
  33.  
    conn.execute(
  34.  
    """
  35.  
    CREATE TABLE IF NOT EXISTS announcements (
  36.  
    secCode TEXT NOT NULL,
  37.  
    secName TEXT NOT NULL,
  38.  
    announcementId TEXT PRIMARY KEY,
  39.  
    announcementTitle TEXT NOT NULL,
  40.  
    downloadUrl TEXT NOT NULL,
  41.  
    pageColumn TEXT
  42.  
    )"""
  43.  
    )
  44.  
    conn.execute(
  45.  
    "CREATE INDEX IF NOT EXISTS idx_secCode ON announcements(secCode)"
  46.  
    )
  47.  
    conn.execute(
  48.  
    "CREATE INDEX IF NOT EXISTS idx_announcementId ON announcements(announcementId)"
  49.  
    )
  50.  
     
  51.  
    def _get_connection(self) -> sqlite3.Connection:
  52.  
    """获取数据库连接"""
  53.  
    conn = sqlite3.connect(self.db_path)
  54.  
    conn.row_factory = sqlite3.Row
  55.  
    return conn
  56.  
     
  57.  
    def _load_id_cache(self):
  58.  
    """加载现有公告ID到内存缓存"""
  59.  
    with self._get_connection() as conn:
  60.  
    cursor = conn.cursor()
  61.  
    cursor.execute("SELECT announcementId FROM announcements")
  62.  
    self._id_cache = {row["announcementId"] for row in cursor.fetchall()}
  63.  
     
  64.  
    def record_exists(self, announcement_id: str) -> bool:
  65.  
    """
  66.  
    检查公告是否已存在
  67.  
    参数:
  68.  
    announcement_id: 公告ID
  69.  
    返回:
  70.  
    bool: 是否存在
  71.  
    """
  72.  
    return announcement_id in self._id_cache
  73.  
     
  74.  
    def save_record(self, record: Dict) -> bool:
  75.  
    """
  76.  
    保存公告记录到数据库
  77.  
    参数:
  78.  
    record: 公告字典,必须包含以下字段:
  79.  
    - secCode: 股票代码
  80.  
    - secName: 股票名称
  81.  
    - announcementId: 公告ID
  82.  
    - announcementTitle: 公告标题
  83.  
    - downloadUrl: 公告URL
  84.  
    - pageColumn: 页面栏目
  85.  
    返回:
  86.  
    bool: 是否保存成功
  87.  
    """
  88.  
    required_fields = [
  89.  
    "secCode",
  90.  
    "secName",
  91.  
    "announcementId",
  92.  
    "announcementTitle",
  93.  
    "downloadUrl",
  94.  
    "pageColumn",
  95.  
    ]
  96.  
    if not all(field in record for field in required_fields):
  97.  
    self.logger.error("缺少必要字段")
  98.  
    return False
  99.  
     
  100.  
    try:
  101.  
    with self._get_connection() as conn:
  102.  
    conn.execute(
  103.  
    """
  104.  
    INSERT OR REPLACE INTO announcements (
  105.  
    secCode, secName, announcementId,
  106.  
    announcementTitle, downloadUrl, pageColumn
  107.  
    ) VALUES (?, ?, ?, ?, ?, ?)
  108.  
    """,
  109.  
    (
  110.  
    record["secCode"],
  111.  
    record["secName"],
  112.  
    record["announcementId"],
  113.  
    record["announcementTitle"],
  114.  
    record["downloadUrl"],
  115.  
    record["pageColumn"],
  116.  
    ),
  117.  
    )
  118.  
    self._id_cache.add(record["announcementId"])
  119.  
    return True
  120.  
    except Exception as e:
  121.  
    self.logger.error(f"保存失败: {str(e)}")
  122.  
    return False
  123.  
     
  124.  
    def get_all_records(self) -> list:
  125.  
    """获取所有公告记录"""
  126.  
    with self._get_connection() as conn:
  127.  
    cursor = conn.cursor()
  128.  
    cursor.execute("SELECT * FROM announcements")
  129.  
    return [dict(row) for row in cursor.fetchall()]
  130.  
     
  131.  
    def delete_record(self, announcement_id: str) -> bool:
  132.  
    """
  133.  
    删除指定公告
  134.  
    参数:
  135.  
    announcement_id: 要删除的公告ID
  136.  
    返回:
  137.  
    bool: 是否删除成功
  138.  
    """
  139.  
    try:
  140.  
    with self._get_connection() as conn:
  141.  
    conn.execute(
  142.  
    "DELETE FROM announcements WHERE announcementId = ?",
  143.  
    (announcement_id,),
  144.  
    )
  145.  
    if announcement_id in self._id_cache:
  146.  
    self._id_cache.remove(announcement_id)
  147.  
    return True
  148.  
    except Exception as e:
  149.  
    self.logger.error(f"删除失败: {str(e)}")
  150.  
    return False
python
 
运行

VII. 常见报错