Python商业数据挖掘实战——爬取网页并将其转为Markdown

雪碧有白泡泡 2024-07-02 15:05:09 阅读 57

前言

在这里插入图片描述

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家:https://www.captainbed.cn/z

请添加图片描述

ChatGPT体验地址

请添加图片描述

文章目录

前言前言正则表达式进行转换送书活动

前言

在信息爆炸的时代,互联网上的海量文字信息如同无尽的沙滩。然而,其中真正有价值的信息往往埋在各种网页中,需要经过筛选和整理才能被有效利用。幸运的是,Python这个强大的编程语言可以帮助我们完成这项任务。

本文将介绍如何使用Python将网页文字转换为Markdown格式,这将使得我们能够更加方便地阅读和处理网页内容。无论是将文章保存为本地文件还是转化为其他格式,Markdown都能够提供清晰简洁的排版和格式,让我们更加专注于内容本身。

正则表达式

我们将页面进行Maekdown的转换为了保证准确度,我们可以使用正则表达式去修改,如下

import re

__all__ = ['Tomd', 'convert']

MARKDOWN = {

'h1': ('\n# ', '\n'),

'h2': ('\n## ', '\n'),

'h3': ('\n### ', '\n'),

'h4': ('\n#### ', '\n'),

'h5': ('\n##### ', '\n'),

'h6': ('\n###### ', '\n'),

'code': ('`', '`'),

'ul': ('', ''),

'ol': ('', ''),

'li': ('- ', ''),

'blockquote': ('\n> ', '\n'),

'em': ('**', '**'),

'strong': ('**', '**'),

'block_code': ('\n```\n', '\n```\n'),

'span': ('', ''),

'p': ('\n', '\n'),

'p_with_out_class': ('\n', '\n'),

'inline_p': ('', ''),

'inline_p_with_out_class': ('', ''),

'b': ('**', '**'),

'i': ('*', '*'),

'del': ('~~', '~~'),

'hr': ('\n---', '\n\n'),

'thead': ('\n', '|------\n'),

'tbody': ('\n', '\n'),

'td': ('|', ''),

'th': ('|', ''),

'tr': ('', '\n')

}

BlOCK_ELEMENTS = {

'h1': '<h1.*?>(.*?)</h1>',

'h2': '<h2.*?>(.*?)</h2>',

'h3': '<h3.*?>(.*?)</h3>',

'h4': '<h4.*?>(.*?)</h4>',

'h5': '<h5.*?>(.*?)</h5>',

'h6': '<h6.*?>(.*?)</h6>',

'hr': '<hr/>',

'blockquote': '<blockquote.*?>(.*?)</blockquote>',

'ul': '<ul.*?>(.*?)</ul>',

'ol': '<ol.*?>(.*?)</ol>',

'block_code': '<pre.*?><code.*?>(.*?)</code></pre>',

'p': '<p\s.*?>(.*?)</p>',

'p_with_out_class': '<p>(.*?)</p>',

'thead': '<thead.*?>(.*?)</thead>',

'tr': '<tr>(.*?)</tr>'

}

INLINE_ELEMENTS = {

'td': '<td>(.*?)</td>',

'tr': '<tr>(.*?)</tr>',

'th': '<th>(.*?)</th>',

'b': '<b>(.*?)</b>',

'i': '<i>(.*?)</i>',

'del': '<del>(.*?)</del>',

'inline_p': '<p\s.*?>(.*?)</p>',

'inline_p_with_out_class': '<p>(.*?)</p>',

'code': '<code.*?>(.*?)</code>',

'span': '<span.*?>(.*?)</span>',

'ul': '<ul.*?>(.*?)</ul>',

'ol': '<ol.*?>(.*?)</ol>',

'li': '<li.*?>(.*?)</li>',

'img': '<img.*?src="(.*?)".*?>(.*?)</img>',

'a': '<a.*?href="(.*?)".*?>(.*?)</a>',

'em': '<em.*?>(.*?)</em>',

'strong': '<strong.*?>(.*?)</strong>'

}

DELETE_ELEMENTS = ['<span.*?>', '</span>', '<div.*?>', '</div>']

class Element:

def __init__(self, start_pos, end_pos, content, tag, is_block=False):

self.start_pos = start_pos

self.end_pos = end_pos

self.content = content

self._elements = []

self.is_block = is_block

self.tag = tag

self._result = None

if self.is_block:

self.parse_inline()

def __str__(self):

wrapper = MARKDOWN.get(self.tag)

self._result = '{}{}{}'.format(wrapper[0], self.content, wrapper[1])

return self._result

def parse_inline(self):

for tag, pattern in INLINE_ELEMENTS.items():

if tag == 'a':

self.content = re.sub(pattern, '[\g<2>](\g<1>)', self.content)

elif tag == 'img':

self.content = re.sub(pattern, '![\g<2>](\g<1>)', self.content)

elif self.tag == 'ul' and tag == 'li':

self.content = re.sub(pattern, '- \g<1>', self.content)

elif self.tag == 'ol' and tag == 'li':

self.content = re.sub(pattern, '1. \g<1>', self.content)

elif self.tag == 'thead' and tag == 'tr':

self.content = re.sub(pattern, '\g<1>\n', self.content.replace('\n', ''))

elif self.tag == 'tr' and tag == 'th':

self.content = re.sub(pattern, '|\g<1>', self.content.replace('\n', ''))

elif self.tag == 'tr' and tag == 'td':

self.content = re.sub(pattern, '|\g<1>', self.content.replace('\n', ''))

else:

wrapper = MARKDOWN.get(tag)

self.content = re.sub(pattern, '{}\g<1>{}'.format(wrapper[0], wrapper[1]), self.content)

class Tomd:

def __init__(self, html='', options=None):

self.html = html

self.options = options

self._markdown = ''

def convert(self, html, options=None):

elements = []

for tag, pattern in BlOCK_ELEMENTS.items():

for m in re.finditer(pattern, html, re.I | re.S | re.M):

element = Element(start_pos=m.start(),

end_pos=m.end(),

content=''.join(m.groups()),

tag=tag,

is_block=True)

can_append = True

for e in elements:

if e.start_pos < m.start() and e.end_pos > m.end():

can_append = False

elif e.start_pos > m.start() and e.end_pos < m.end():

elements.remove(e)

if can_append:

elements.append(element)

elements.sort(key=lambda element: element.start_pos)

self._markdown = ''.join([str(e) for e in elements])

for index, element in enumerate(DELETE_ELEMENTS):

self._markdown = re.sub(element, '', self._markdown)

return self._markdown

@property

def markdown(self):

self.convert(self.html, self.options)

return self._markdown

_inst = Tomd()

convert = _inst.convert

这段代码是一个用于将HTML转换为Markdown的工具类。它使用了正则表达式来解析HTML标签,并根据预定义的转换规则将其转换为对应的Markdown格式。

代码中定义了一个Element类,用于表示HTML中的各个元素。Element类包含了标签的起始位置、结束位置、内容、标签类型等信息。它还提供了一个parse_inline方法,用于解析内联元素,并将其转换为Markdown格式。

Tomd类是主要的转换类,它接受HTML字符串并提供了convert方法来执行转换操作。convert方法遍历预定义的HTML标签模式,并使用正则表达式匹配HTML字符串中对应的部分。然后创建相应的Element对象并进行转换操作。最后,将转换后的Markdown字符串返回。

在模块顶部,MARKDOWN字典定义了各个HTML标签对应的Markdown格式。BlOCK_ELEMENTSINLINE_ELEMENTS字典定义了正则表达式模式,用于匹配HTML字符串中的块级元素和内联元素。DELETE_ELEMENTS列表定义了需要删除的HTML元素。

那么既然有了转markdown的工具,我们就可以对网页进行转换

进行转换

首先,result_file函数用于创建一个保存结果文件的路径。它接受文件夹的用户名、文件名和文件夹名作为参数,并在指定的文件夹路径下创建一个新的文件,并返回该文件的路径。

get_headers函数用于从一个文本文件中读取Cookie,并将它们保存为字典形式。它接受包含Cookie的文本文件路径作为参数。

delete_ele函数用于删除BeautifulSoup对象中指定的标签。它接受一个BeautifulSoup对象和待删除的标签列表作为参数,并通过使用该对象的select方法来选择要删除的标签,然后使用decompose方法进行删除。

delete_ele_attr函数用于删除BeautifulSoup对象中指定标签的指定属性。它接受一个BeautifulSoup对象和待删除的属性列表作为参数,并使用find_all方法来选取所有标签,然后使用Python的del语句删除指定的属性。

delete_blank_ele函数用于删除BeautifulSoup对象中的空白标签。它接受一个BeautifulSoup对象和一个例外列表,对于不在例外列表中且内容为空的标签,使用decompose方法进行删除。

TaskQueue类是一个简单的任务队列,用于存储已访问的和未访问的URL。它提供了一系列方法来操作这些列表。

def result_file(folder_username, file_name, folder_name):

folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", folder_name, folder_username)

if not os.path.exists(folder):

try:

os.makedirs(folder)

except Exception:

pass

path = os.path.join(folder, file_name)

file = open(path,"w")

file.close()

else:

path = os.path.join(folder, file_name)

return path

def get_headers(cookie_path:str):

cookies = { }

with open(cookie_path, "r", encoding="utf-8") as f:

cookie_list = f.readlines()

for line in cookie_list:

cookie = line.split(":")

cookies[cookie[0]] = str(cookie[1]).strip()

return cookies

def delete_ele(soup:BeautifulSoup, tags:list):

for ele in tags:

for useless_tag in soup.select(ele):

useless_tag.decompose()

def delete_ele_attr(soup:BeautifulSoup, attrs:list):

for attr in attrs:

for useless_attr in soup.find_all():

del useless_attr[attr]

def delete_blank_ele(soup:BeautifulSoup, eles_except:list):

for useless_attr in soup.find_all():

try:

if useless_attr.name not in eles_except and useless_attr.text == "":

useless_attr.decompose()

except Exception:

pass

class TaskQueue(object):

def __init__(self):

self.VisitedList = []

self.UnVisitedList = []

def getVisitedList(self):

return self.VisitedList

def getUnVisitedList(self):

return self.UnVisitedList

def InsertVisitedList(self, url):

if url not in self.VisitedList:

self.VisitedList.append(url)

def InsertUnVisitedList(self, url):

if url not in self.UnVisitedList:

self.UnVisitedList.append(url)

def RemoveVisitedList(self, url):

self.VisitedList.remove(url)

def PopUnVisitedList(self,index=0):

url = []

if index and self.UnVisitedList:

url = self.UnVisitedList[index]

del self.UnVisitedList[:index]

elif self.UnVisitedList:

url = self.UnVisitedList.pop()

return url

def getUnVisitedListLength(self):

return len(self.UnVisitedList)

class CSDN(object):

def __init__(self, username, folder_name, cookie_path):

# self.headers = {

# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36"

# }

self.headers = get_headers(cookie_path)

self.s = requests.Session()

self.username = username

self.TaskQueue = TaskQueue()

self.folder_name = folder_name

self.url_num = 1

def start(self):

num = 0

articles = [None]

while len(articles) > 0:

num += 1

url = u'https://blog.csdn.net/' + self.username + '/article/list/' + str(num)

response = self.s.get(url=url, headers=self.headers)

html = response.text

soup = BeautifulSoup(html, "html.parser")

articles = soup.find_all('div', attrs={ "class":"article-item-box csdn-tracking-statistics"})

for article in articles:

article_title = article.a.text.strip().replace(' ',':')

article_href = article.a['href']

with ensure_memory(sys.getsizeof(self.TaskQueue.UnVisitedList)):

self.TaskQueue.InsertUnVisitedList([article_title, article_href])

def get_md(self, url):

response = self.s.get(url=url, headers=self.headers)

html = response.text

soup = BeautifulSoup(html, 'lxml')

content = soup.select_one("#content_views")

# 删除注释

for useless_tag in content(text=lambda text: isinstance(text, Comment)):

useless_tag.extract()

# 删除无用标签

tags = ["svg", "ul", ".hljs-button.signin"]

delete_ele(content, tags)

# 删除标签属性

attrs = ["class", "name", "id", "onclick", "style", "data-token", "rel"]

delete_ele_attr(content,attrs)

# 删除空白标签

eles_except = ["img", "br", "hr"]

delete_blank_ele(content, eles_except)

# 转换为markdown

md = Tomd(str(content)).markdown

return md

def write_readme(self):

print("+"*100)

print("[++] 开始爬取 {} 的博文 ......".format(self.username))

print("+"*100)

reademe_path = result_file(self.username,file_name="README.md",folder_name=self.folder_name)

with open(reademe_path,'w', encoding='utf-8') as reademe_file:

readme_head = "# " + self.username + " 的博文\n"

reademe_file.write(readme_head)

for [article_title,article_href] in self.TaskQueue.UnVisitedList[::-1]:

text = str(self.url_num) + '. [' + article_title + ']('+ article_href +')\n'

reademe_file.write(text)

self.url_num += 1

self.url_num = 1

def get_all_articles(self):

try:

while True:

[article_title,article_href] = self.TaskQueue.PopUnVisitedList()

try:

file_name = re.sub(r'[\/::*?"<>|]','-', article_title) + ".md"

artical_path = result_file(folder_username=self.username, file_name=file_name, folder_name=self.folder_name)

md_head = "# " + article_title + "\n"

md = md_head + self.get_md(article_href)

print("[++++] 正在处理URL:{}".format(article_href))

with open(artical_path, "w", encoding="utf-8") as artical_file:

artical_file.write(md)

except Exception:

print("[----] 处理URL异常:{}".format(article_href))

self.url_num += 1

except Exception:

pass

def muti_spider(self, thread_num):

while self.TaskQueue.getUnVisitedListLength() > 0:

thread_list = []

for i in range(thread_num):

th = threading.Thread(target=self.get_all_articles)

thread_list.append(th)

for th in thread_list:

th.start()

lock = threading.Lock()

total_mem= 1024 * 1024 * 500 #500MB spare memory

@contextlib.contextmanager

def ensure_memory(size):

global total_mem

while 1:

with lock:

if total_mem > size:

total_mem-= size

break

time.sleep(5)

yield

with lock:

total_mem += size

def spider_user(username: str, cookie_path:str, thread_num: int = 10, folder_name: str = "articles"):

if not os.path.exists(folder_name):

os.makedirs(folder_name)

csdn = CSDN(username, folder_name, cookie_path)

csdn.start()

th1 = threading.Thread(target=csdn.write_readme)

th1.start()

th2 = threading.Thread(target=csdn.muti_spider, args=(thread_num,))

th2.start()

def spider(usernames: list, cookie_path:str, thread_num: int = 10, folder_name: str = "articles"):

for username in usernames:

try:

user_thread = threading.Thread(target=spider_user,args=(username, cookie_path, thread_num, folder_name))

user_thread.start()

print("[++] 开启爬取 {} 博文进程成功 ......".format(username))

except Exception:

print("[--] 开启爬取 {} 博文进程出现异常 ......".format(username))

我们可以自定义一个测试类运行一下,在本地文件位置会生成一个文件夹,并将markdown文件输出出来

在这里插入图片描述

需要完整源码的小伙伴可以加文末底部微信私信获取哦,公众号内有联系方式

送书活动

🎁本次送书1~3本【取决于阅读量,阅读量越多,送的越多】👈⌛️活动时间:截止到2023-12月27号✳️参与方式:关注博主+三连(点赞、收藏、评论)

在这里插入图片描述

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。