
免费python编程教程:https://pan.quark.cn/s/2c17aed36b72
在信息爆炸的时代,新闻数据是分析社会热点、舆情趋势的重要素材。本文将以腾讯新闻网为例,演示如何用Python编写爬虫抓取新闻数据,并存储到MongoDB数据库中。过程中会涉及反爬机制突破、数据清洗、异常处理等关键技术点。


requests:发送HTTP请求BeautifulSoup4:解析HTMLpymongo:MongoDB操作接口从站大爷IP代理获取住宅代理IP,建立代理池:
# proxy_pool.py
PROXY_LIST = [
"123.123.123.123:8080", # 示例IP
"124.124.124.124:8081"
]
def get_random_proxy():
import random
return {"http": random.choice(PROXY_LIST)}
import requests
from proxy_pool import get_random_proxy
import time
import random
def fetch_page(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
max_retries = 3
for _ in range(max_retries):
try:
proxy = get_random_proxy()
response = requests.get(url, headers=headers, proxies=proxy, timeout=10)
if response.status_code == 200:
return response.text
elif response.status_code == 403: # IP被封
time.sleep(random.uniform(5, 10))
continue
except requests.exceptions.RequestException:
time.sleep(random.uniform(2, 5))
return None
技术要点:
from bs4 import BeautifulSoup
def parse_news(html):
soup = BeautifulSoup(html, 'html.parser')
news_list = []
# 腾讯新闻示例结构(实际需根据网页调整)
items = soup.select('.news-item') # CSS选择器
for item in items:
title = item.select_one('.title a').text.strip()
url = item.select_one('.title a')['href']
time_str = item.select_one('.time').text.strip()
# 数据清洗
if "广告" in title or "推广" in title:
continue
news_list.append({
"title": title,
"url": url,
"publish_time": time_str,
"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
})
return news_list
解析技巧:
from pymongo import MongoClient
class MongoDBStorage:
def __init__(self, db_name="news_db", collection_name="articles"):
self.client = MongoClient('mongodb://localhost:27017/')
self.db = self.client[db_name]
self.collection = self.db[collection_name]
def insert_batch(self, data_list):
try:
if data_list: # 非空检查
result = self.collection.insert_many(data_list)
print(f"成功插入{len(result.inserted_ids)}条数据")
except Exception as e:
print(f"插入失败: {str(e)}")
优化建议:

import time
def main():
base_url = "https://new.qq.com/ch/news" # 示例URL
storage = MongoDBStorage()
for page in range(1, 6): # 爬取前5页
url = f"{base_url}?page={page}"
html = fetch_page(url)
if html:
news_data = parse_news(html)
storage.insert_batch(news_data)
time.sleep(random.uniform(2, 5)) # 礼貌爬取
else:
print(f"第{page}页获取失败")
if __name__ == "__main__":
main()

反爬类型 | 腾讯新闻表现 | 应对方案 |
|---|---|---|
IP限制 | 连续访问50次后封IP | 代理池轮换+请求间隔3-5秒 |
User-Agent检测 | 识别非浏览器请求 | 随机UA池(包含移动端UA) |
行为分析 | 检测鼠标移动轨迹等 | Selenium模拟真实用户操作 |
数据加密 | 关键数据通过JS动态加载 | 执行JS获取渲染后DOM(Pyppeteer) |
方案1:Selenium无头浏览器
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
def get_dynamic_content(url):
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
time.sleep(3) # 等待JS执行
return driver.page_source
finally:
driver.quit()
方案2:验证码识别
def clean_data(raw_data):
cleaned = []
for item in raw_data:
# 标准化时间格式
if item.get("publish_time"):
try:
item["publish_time"] = datetime.strptime(
item["publish_time"], "%Y-%m-%d %H:%M"
).strftime("%Y-%m-%d")
except ValueError:
item["publish_time"] = None
# 去除特殊字符
if "title" in item:
item["title"] = re.sub(r'[\t\n\r]', '', item["title"])
cleaned.append(item)
return cleaned
def safe_fetch(url):
try:
return fetch_page(url)
except requests.exceptions.SSLError:
print(f"SSL错误: {url}")
return None
except requests.exceptions.ConnectionError:
print(f"连接错误: {url}")
return None
except Exception as e:
print(f"未知错误: {str(e)}")
return None
from concurrent.futures import ThreadPoolExecutor
def multi_thread_crawl(urls):
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(fetch_page, url) for url in urls]
for future in futures:
html = future.result()
if html:
results.extend(parse_news(html))
return results
测试数据:
爬虫节点1 → 代理池 → 任务队列 → 结果存储
爬虫节点2 → ↑ ↓
爬虫节点3 → 任务分配 MongoDB集群
实现要点:
Q1:被网站封IP怎么办? A:立即启用备用代理池,建议:
Q2:如何避免被法律追责? A:遵守三原则:
robots.txt(如腾讯新闻允许爬取/ch/路径)Q3:数据重复存储怎么解决? A:MongoDB去重方案:
# 创建唯一索引
self.collection.create_index([("url", pymongo.ASCENDING)], unique=True)
# 插入时处理重复
try:
self.collection.insert_one(data)
except pymongo.errors.DuplicateKeyError:
print("数据已存在")
Q4:如何应对动态加载内容? A:分情况处理:

Q5:MongoDB存储空间不足? A:优化方案:

# 结合Pandas进行实时分析
import pandas as pd
def analyze_news():
df = pd.DataFrame(list(storage.collection.find()))
# 词频统计
from collections import Counter
words = " ".join(df["title"].dropna()).split()
word_counts = Counter(words)
print(word_counts.most_common(10))
# 时间趋势分析
df["date"] = pd.to_datetime(df["publish_time"])
daily_counts = df.groupby("date").size()
daily_counts.plot(title="新闻发布量趋势")
from fpdf import FPDF
def generate_report(data):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt="新闻采集日报", ln=1, align="C")
pdf.cell(200, 10, txt=f"采集时间: {time.strftime('%Y-%m-%d')}", ln=1)
pdf.cell(200, 10, txt=f"总条数: {len(data)}", ln=1)
# 添加图表(需配合matplotlib)
pdf.output("news_report.pdf")
通过本文的实践,我们掌握了:
未来发展方向:
新闻数据采集是大数据分析的基础环节,掌握这些技术后,你可以进一步构建舆情监控系统、热点预测模型等高级应用。记住:技术本身无善恶,关键在于如何使用——在合法合规的前提下,让数据发挥更大价值。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。