中文关键字检索分析-导出到csv或者excel-多文件或文件夹-使用python和asyncio和pandas的dataframe

cnblogs 2024-09-10 08:13:00 阅读 86

  • 1.02版本

    • 把原来的tab一个个拼接成文件输出,改成pandas的dataframe
    • 使用asyncio库来使用协程,但是测试下来速度好像是差不多的。可能速度太快了,没能很好的测出来差异。
  • 原来的最初的代码是java版本的,现在用python重写一遍

    • java版本使用completableFuture来异步IO,主要是文件输出的时候,但是好像文件的顺序并没有发生变化。
    • java版本没有使用什么特殊的类和库,结果打印到system.out,可以是控制台,或者是文件
  • 代码的功能

    • 打印出关键字
    • 打印出检索到的文件名,行号,正则表达式命中的结果,多个结果就多行。同一行命中了几个正则表达式。当前行的原来的内容。
    • 可以同时检索多个正则表达式
    • 对检索的文件路径和文件名也做匹配和排除
    • 同一个文件的结果需要在一起,同一行的多次命中也需要在一起
    • 检索结果通过tab或者excel表格,每一列展示同一个正则表达式的命中结果
    • 原来的java版本,可以设置参数,如果命中,把下一行也返回回来。
  • 代码和本地代码的区别

    • 路径修改成【您的检索路径】和【您的输出文件】
  • 有什么用

    • 同时检索多个关键字,能够把命中结果和行原来的内容一起输出到表格中。然后可以进行筛选和分析。这是现有IDE工具没有的定制化的功能
    • 检索结果是对代码或者文件这些内容的直观分析,不需要再次打开文件查看和分析。
  • 实际使用的案例

    • 考试背题库,题解中有官网文档的链接,全部拷贝出来,通过检索【https:/这个url的正则表达式】的关键字,一次性抽取到同一列,然后贴在旁边。这样就可以快速看题解中找到的官网链接了。而且题解很多链接写错的地方,也能发现,调整后把个别错误的也修改好了。
    • 而且这个脚本的功能是实际工作经验中,一点点的发现的痛点,然后把工具的功能降到最基础和简单。最一开始是用vba,后来用java,现在再用python。不过确实上述的提起官网链接,这个通过某些IDE的搜索功能也能实现。有一些最新版的IDE也有了更多的功能。但我这个是每天承受巨大压力,实战中得出的功能,和自由定制化。功能又很接地。
  • 不能解决的问题

    • 如果搜索到的代码,是一个赋值,变量名称不一样。那么需要再次检索,或者把变量名称作为关键字再次加入进来来检索。所以需要代码比较规范,或者变量名称统一。
    • 检索中文的时候,没有tokenizer的分词器功能的情况下,那么只是所有一个字,需要把这个字的前一个字和后一个字都检索出来,才能找到完整的结果,比如【知识】和【识别】,那么命中结果会不美观和直接。和AI的word词汇的处理没有什么关系。一开始的初衷是处理代码的。
  • 没有上传到github和gitee的原因

    • 基本的文本处理工具,很实用,就一段代码
    • 比较仓促,容易找不到,可以上传到github,然后把输入的文件,和输出的两种文件也上传,但是这个工具就在于手搓和轻便,有需要的时候,随时加功能
  • 效果图

    • 荷塘月色,就一篇文章的分析结果
    • image

    • image

  • 源代码

    • 草草上传,未做很多修改,用的时候再改。

<code># %任务,直接转变成 使用dataframe来操作

# % 然后,。。啥,就完成上面这个就行 完成file操作的async

# 打印检索结果的开头和title,条件,限制条件

# 然后提供一个api,自己curl测试一下。

# 然后用docker打包一下。把文件内容传送过去,然后返回处理的结果。

# 然后做一个页面,把内容填写进去,然后点击按钮,另外一边显示出来。

# 做一个备忘录,或者白板功能

# 做一个小程序界面

# 做一个搜索功能,把关键字写进去,然后能够查询出结果

# java也重新写一写

# 3大功能

# 搜索文件夹和文件名称

# (vscode是comman + P)

# 妙:可以选择多个文件夹,添加子文件夹排除条件

# 搜索文件内容

# (vscode command + shift + F)

# 妙: 可以检索多个正则表达式,然后先匹配上的作为变量向下复制,同一行,或者同文件,

# 但是因为没有结束匹配。所以开头和结尾的地方会出现数据不匹配。如果能添加一个结束匹配的正则表达式。那么默认是文件内作为变量

# 提取表格中的列的内容,然后命中分类

# 妙:同一行抽取多个特征,一个正则表达式提取多个token

# 妙:多个正则表达式的命中,同时分类

# 同一行多个命中,分成多行。

# 配置常量

import datetime

from datetime import date

from operator import concat

import os

import asyncio

from asyncio import Lock

import pandas as pd

import numpy as np

# from traceback import print_list

# import tornado

# from threading import Thread

import re

# from typing import Concatenate

def getChildFiles(basePath):

return [f for f in os.listdir(basePath) if os.path.isfile(basePath + f)]

def getChildFolders(basePath):

return [f for f in os.listdir(basePath) if os.path.isdir(basePath + f)]

isFirstExcelOutput = True

# mac的设置里面一旦访问过了,就会有允许和不允许,下面的是可移除卷宗,然后网络卷宗现在vscode是没有勾选上

# async def使用方法

# https://superfastpython.com/asyncio-async-def/

# https://docs.python.org/3/library/index.html

# 正则表达式 中文例子 https://www.jb51.net/article/177521.htm

# https://blog.csdn.net/weixin_40907382/article/details/79654372

# 官网 正则表达式 https://docs.python.org/3/library/re.html

async def writeToFile(filout, finalStrArr, lock: Lock, oneFileData: pd.DataFrame):

async with lock:

# for finalStr in finalStrArr:

# filout.wirte(oneFileData.)

# note 输出的是,有的是多个空格的字符

# oneFileData.to_string(filout)

#

# filout.write("\n\n")

# filout.write("".join(finalStrArr))

# 不包含表头,表头已经打印出来了。

oneFileData.to_csv(filout, sep='\t', index=False, header=None)code>

# 不写到excel文件了。因为excel文件不知道什么位置是文件末尾。没办法append。

# 如果要append,需要用到pd.ExcelWriter mode=append 然后sheet名称,开始的行数是maxrow

file_path = '您的输出文件/output/test01.xlsx'

global isFirstExcelOutput

oneFileData = oneFileData.fillna(" ")

if isFirstExcelOutput:

oneFileData.index.name = "No"

oneFileData.columns.name = "No2" # 这个设置了好像就显示不出来了。

oneFileData.index = oneFileData.index + 1

# oneFileData.rename(columns={"result1":"result1"+"\nresult1_1"}, inplace=True)

multHd = []

multHd.append((t_hitNos,""))

resultNoCnt = 1

for kw in searchKwsArr:

multHd.append((t_result_tmp+str(resultNoCnt),kw))

resultNoCnt+=1

multHd.append((t_hitNos,""))

multHd.append((t_hitKws,""))

multHd.append((t_lineContent,""))

oneFileData.columns = pd.MultiIndex.from_tuples(multHd,names=["titles","keywords"])

# oneFileData.columns = pd.MultiIndex.from_tuples([("lineNo",""),("result1",""),("result2", "result1_1"),("result3",""),("result4",""),("result5",""),("hitNos",""),("hitKws",""),("lineContent","")])

# oneFileData.columns[2] = ("result1",r"(在|到).+里")

oneFileData.to_excel(file_path)

isFirstExcelOutput = False

else:

with pd.ExcelWriter(file_path, mode='a', if_sheet_exists='overlay') as writer:code>

oneFileData.index = oneFileData.index + 1 - 1 + writer.sheets['Sheet1'].max_row

oneFileData.to_excel(writer, sheet_name='Sheet1', startrow=writer.sheets['Sheet1'].max_row, header=None)code>

# with pd.ExcelWriter(file_path) as writer:

# oneFileData.to_excel(writer, sheet_name='Sheet1', startrow=writer.sheets['Sheet1'].max_row, header=None)code>

# oneFileData.to_excel(writer, sheet_name='Sheet1', startrow=writer.sheets['Sheet1'].max_row, index=False, header=None)code>

# oneFileData.to_excel("您的输出文件output/test01.xlsx")

# print("fileout" + str(datetime.datetime.now()))

# def writeToFile(filout, finalStr):

# filout.write(finalStr)

def multiMatch(content, kwsArr):

for kw in kwsArr:

if re.match(kw, content):

return True

return False

excFileType = [

r"^\._.*",

r".*\.xls.*"

]

incFileType = [

r"^[^\.]+\.[^\.]+"

]

searchKwsArr = [

r"(在|到)[^,。]+里",

r"忽然[^,。]+",

r"[^,。]+一般",

r"像[^,。]+",

r"是[^,。]+"

]

t_lineNo="lineNo"code>

t_result_tmp="result"code>

t_hitNos="hitNos"code>

t_hitKws="hitKws"code>

t_lineContent="lineContent"code>

async def searchInFile(f, basePath, filout, lock: Lock):

print("filename: " + f)

# if not re.match(r"^\._.*", f) and not re.match(r".*\.xls.*", f):

if not multiMatch(f,excFileType):

# if not re.match(r"^\.", f):

col_title=[t_lineNo]

resultNoCnt = 1

for kw in searchKwsArr:

col_title.append(t_result_tmp+str(resultNoCnt))

resultNoCnt+=1

# col_title.extend([t_hitNos,t_hitKws,t_lineContent])

col_title.append(t_hitNos)

col_title.append(t_hitKws)

col_title.append(t_lineContent)

with open(basePath + f, "r") as file:

one_file_result = pd.DataFrame(columns=

col_title)

finalStrArr = []

# ["lineNo","result1","result2","result3","result4","result5","hitNos","hitKws","lineContent"])

# one_file_result =

# note 明明可以看到append,但是提示没有这个append,说是一种方法是降低版本,但是因为和很多其他裤捆绑,所以不建议

# pip install pandas==1.3.4

# 大多数还是说用concat来代替

# one_file_result = pd.concat([one_file_result,pd.DataFrame({"lineNo":[5],"result1":["tmp"],"result2":["tmp"],"result3":["tmp"],"result4":["tmp"]

# ,"result5":["tmp"],"hitNos":["tmp"],"hitKws":["tmp"],"lineContent":["tmp"]})], ignore_index=True)

# one_file_result.add(pd.DataFrame({"lineNo":5,"result1":"tmp","result2":"tmp","result3":"tmp","result4":"tmp","result5":"tmp"

# ,"hitNos":"tmp","hitKws":"tmp","lineContent":"tmp"}), ignore_index=True)

# one_file_result = pd.append([one_file_result,pd.DataFrame({"lineNo":5,"result1":"tmp","result2":"tmp","result3":"tmp","result4":"tmp","result5":"tmp"

# ,"hitNos":"tmp","hitKws":"tmp","lineContent":"tmp"})], ignore_index=True)

# print(one_file_result)

linNo = 0

lines = file.readlines()

for line in lines:

linNo += 1

ptStrs = list()

resultPD_key = pd.DataFrame(columns=col_title)

ptStrTmp = str(linNo) + "\t"

resultPD_tmp = pd.DataFrame(columns=col_title)

resultPD_tmp.loc[0,t_lineNo]=linNo

maxFnd = 0

hitKws = []

hitNos = []

kwsSeq = 0

# for pp in [r"https://hXXXXXXXXXXXXXXXXXXl/[0-9]+\.html"]:

# for pp in [r"(在|到).+里", r"忽然[^,。]+", r"[^,。]+一般", r"像[^,。]+", r"是[^,。]+"]:

for pp in searchKwsArr:

kwsSeq = kwsSeq + 1

# for pp in [r".风.", r".香", r"一.", r".{2,4}(地)" , r"荷.", r".塘", r"月.", r".色"]:

lastFnd = "\t"

findCnt = 0

for m in re.finditer(

pp

, line

, flags=re.IGNORECASE):

findCnt += 1

if findCnt > maxFnd:

maxFnd = findCnt

ptStrs.append(ptStrTmp)

resultPD_key = pd.concat([resultPD_key,resultPD_tmp], ignore_index=True)

ptStrs[findCnt-1] = ptStrs[findCnt-1] + pp + ": " + m.group() + "\t"

# resultPD_key.loc[findCnt-1,t_result_tmp+str(kwsSeq)] =pp + ": " + m.group()

resultPD_key.loc[findCnt-1,t_result_tmp+str(kwsSeq)] =m.group()

lastFnd = pp + ": " + m.group() + "\t"

hitNos.append(str(kwsSeq))

hitKws.append(pp)

if False:

ptStrTmp = ptStrTmp + lastFnd

else:

ptStrTmp = ptStrTmp + "\t"

# pd这里单个key搜索就不用填充了

notfnd = 0

for fnd in ptStrs:

notfnd += 1

if notfnd > findCnt:

ptStrs[notfnd-1] = ptStrs[notfnd-1] + "\t"

# 统计一行的命中结果

fndNo = 0

for fnd in ptStrs:

fndNo += 1

ptStrs[fndNo-1] = ptStrs[fndNo-1] + ";"+";".join(hitNos) +";"+ "\t" +";"+ ";".join(hitKws) +";"+ "\t"

# for i in range(0,maxFnd-1):

# 这里是单行搜索,单行的多个结果拼接到一起

if maxFnd > 0:

finalStr = ""

for st in (ptStrs): finalStr = finalStr + st + line # + "\n"

finalStrArr.append(finalStr)

resultPD_key[t_hitNos]=";".join(hitNos)

resultPD_key[t_hitKws]="【"+"】;【".join(hitKws)+"】"

resultPD_key[t_lineContent]=line.replace("\n","").replace("\r","")

one_file_result = pd.concat([one_file_result,resultPD_key], ignore_index=True)

# one_file_result = one_file_result.fillna({t_result_tmp+str(1):"b"})

# writeToFile(filout, finalStr)

# print(one_file_result)

# one_file_result.columns[2].

await asyncio.create_task(writeToFile(filout, finalStrArr, lock, one_file_result))

async def searchInFolder(basePath, filout, lock: Lock):

tasklist = []

for fo in getChildFolders(basePath):

asyncio.create_task(searchInFolder(basePath + fo + "/", filout, lock))

files = getChildFiles(basePath)

for f in files:

tasklist.append(asyncio.create_task(searchInFile(f, basePath, filout, lock)))

# if f

await asyncio.wait(tasklist)

async def main():

lock = Lock()

starttime =datetime.datetime.now()

basePaths = ['/Volumes/SDCARD_01/tmp/']

filout = open("/Volumes/SDCARD_01/output/"+"output.txt","w")

filout.write("excFileType:" + "\n")

filout.write("\t" + "\n\t".join(excFileType) + "\n")

filout.write("incFileType:" + "\n")

filout.write("\t" + "\n\t".join(incFileType) + "\n")

filout.write("searchKwsArr:" + "\n")

filout.write("\t" + "\n\t".join(searchKwsArr) + "\n")

filout.write("basePaths:" + "\n")

filout.write("\t" + "\n\t".join(basePaths) + "\n")

titleStr = "lineNo\t"

titleStrDes = "\t"

resultNo = 1

for kw in searchKwsArr:

titleStr = titleStr + "result" + str(resultNo) + "\t"

titleStrDes = titleStrDes + kw + "\t"

resultNo = resultNo + 1

titleStr = titleStr + "hitNos" + "\t" + "hitKws" + "\t" + "lineContent" + "\t"

filout.write(titleStr + "\n")

filout.write(titleStrDes + "\n")

task_fol_list = []

for basePath in basePaths:

task_fol_list.append(asyncio.create_task(searchInFolder(basePath, filout, lock)))

await asyncio.wait(task_fol_list)

# await coro

print('search complete!')

print("start" + str(starttime))

print("end " + str(datetime.datetime.now()))

# 2024-03-04 21:53:57.998985

# 2024-03-04 21:53:58.041339

# 2024-03-04 22:10:00.298639

# 2024-03-04 22:10:00.443002

# async

# 2024-03-04 21:55:17.430653

# 2024-03-04 21:55:17.490983

# lock

# 2024-03-04 22:07:11.735860

# 2024-03-04 22:07:11.850801

# 2024-03-04 22:11:36.540289

# 2024-03-04 22:11:36.595845

# create task

# start2024-03-04 22:40:18.462565

# end 2024-03-04 22:40:18.653983

if __name__ == "__main__":

# loop = asyncio.get_event_loop()

# result = loop.run_until_complete(main())

asyncio.run(main())

# print(date.ctime())

def foldersSample():

basePath = '您的检索文件夹的路径/'

print("当前目录下的文件夹名称为:", getChildFolders(basePath))

# print("当前目录下的文件夹名称为:", getChildFolders(basePath))

files = getChildFiles(basePath)

print("当前目录下的文件名称为:", getChildFiles(basePath))

# TODO 觉得可以修改一下快捷键 ctrl + K

# TODO 读取文件,按照行读取,哪个好

# TODO 文件名可以先用正则表达式筛选一下。如果是多次匹配来试一下比如a有两个,测试的时候print一下

# foldersSample()

def sample():

pattern = re.compile("(d)[o|a](g)")

matc = pattern.search("abcdogabcdagabc") # Match at index 0

matc = pattern.search("abcdogabcdagabc",3) # Match at index 0

matcs = re.findall(pattern, "abcdogabcdagabc", flags=0)

print(re.findall(re.compile("c(d([o|a])g)"), "abcdogabcdagabc", flags=0))

iter = re.finditer(re.compile("c(d([o|a])g)"), "abcdogabcdagabc", flags=0)

for m in re.finditer(

"c(d([o|a])g)"

, "abcdogabcdagabc"

, flags=re.IGNORECASE):

print(m.group())

for g in m.groups():

print(g)

print(m.span())

# 应该用findall就能满足了。就是没有all的index,

print(re.match(r'l','liuyan1').group())

print(re.match(r'y','liuyan1'))

print(re.search(r'y','liuyan1').groups())

pattern.search("dog", 1) # No match; search doesn't include the "d"

# sample()

# 协程使用方法

# asyncio walkthrough

# https://realpython.com/async-io-python/

# Coroutines and Tasks官网文档

# https://docs.python.org/3/library/asyncio-task.html

# async def main2():

# print('hello')

# await asyncio.sleep(1)

# print('world')

# loop = asyncio.get_event_loop()

# result = loop.run_until_complete(main2())



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。