Commit eb4625a9 authored by zhaoxiaomeng's avatar zhaoxiaomeng

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	magic_pdf/model/pdf_extract_kit.py
parents 4101c357 899c7918
......@@ -18,15 +18,12 @@ on:
workflow_dispatch:
jobs:
pdf-test:
runs-on: mineru
runs-on: ubuntu-latest
timeout-minutes: 180
strategy:
fail-fast: true
steps:
- name: config-net
run: |
source activate base
- name: PDF benchmark
uses: actions/checkout@v3
with:
......
......@@ -18,15 +18,12 @@ on:
workflow_dispatch:
jobs:
cli-test:
runs-on: mineru
runs-on: ubuntu-latest
timeout-minutes: 40
strategy:
fail-fast: true
steps:
- name: config-net
run: |
source activate base
- name: PDF cli
uses: actions/checkout@v3
with:
......@@ -34,19 +31,11 @@ jobs:
- name: check-requirements
run: |
changed_files=$(git diff --name-only -r HEAD~1 HEAD)
echo $changed_files
if [[ $changed_files =~ "requirements.txt" ]] || [[ $changed_files =~ "requirements-qa.txt" ]]; then
pip install -r requirements.txt
pip install -r requirements-qa.txt
fi
- name: config-net-reset
run: |
export http_proxy=""
export https_proxy=""
- name: test_cli
run: |
cp magic-pdf.template.json ~/magic-pdf.json
echo $GITHUB_WORKSPACE
cd $GITHUB_WORKSPACE && export PYTHONPATH=. && pytest -s -v tests/test_unit.py
cd $GITHUB_WORKSPACE && pytest -s -v tests/test_cli/test_cli.py
......
# pdf_toolbox
pdf 解析基础函数
## pdf是否是文字类型/扫描类型的区分
```shell
cat s3_pdf_path.example.pdf | parallel --colsep ' ' -j 10 "python pdf_meta_scan.py --s3-pdf-path {2} --s3-profile {1} >> {/}.jsonl"
find dir/to/jsonl/ -type f -name "*.jsonl" | parallel -j 10 "python pdf_classfy_by_type.py --json_file {} >> {/}.jsonl"
```
```shell
# 如果单独运行脚本,合并到code-clean之后需要运行,参考如下:
python -m pdf_meta_scan --s3-pdf-path "D:\pdf_files\内容排序测试_pdf\p3_图文混排 5.pdf" --s3-profile s2
```
## pdf
# 最终版:把那种text_block有重叠,且inline_formula位置在重叠部分的,认定整个页面都有问题,所有的inline_formula都改成no_check
from magic_pdf.libs import fitz
def check_inline_formula(page, inline_formula_boxes):
"""
:param page :fitz读取的当前页的内容
:param inline_formula_boxes: list类型,每一个元素是一个元祖 (L, U, R, D)
:return: inline_formula_check: list类型,每一个元素是一个类别,其顺序对应输入的inline_formula_boxes,给每个行内公式打一个标签,包括:
- nocheck_inline_formula:这个公式框没有与任何span相交,有可能存在问题
- wrong_text_block:这个公式框同时存在多个block里,可能页面的text block存在问题
- false_inline_formula:只涉及一个span并且只占据这个span的小部分面积,判断可能不是公式
- true_inline_formula:两种情况判断为公式,一是横跨多个span,二是只涉及一个span但是几乎占据了这个span大部分的面积
"""
# count = defaultdict(int)
## ------------------------ Text --------------------------------------------
blocks = page.get_text(
"dict",
flags=fitz.TEXTFLAGS_TEXT,
#clip=clip,
)["blocks"]
# iterate over the bboxes
inline_formula_check = []
for result in inline_formula_boxes:
(x1, y1, x2, y2) = (result[0], result[1], result[2], result[3])
## 逐个block##
in_block = 0
for bbox in blocks:
# image = cv2.rectangle(image, (int(bbox['bbox'][0]), int(bbox['bbox'][1])), (int(bbox['bbox'][2]), int(bbox['bbox'][3])), (0, 255, 0), 1)
if (y1 >= bbox['bbox'][1] and y2 <= bbox['bbox'][3]) and (x1 >= bbox['bbox'][0] and x2 <= bbox['bbox'][2]): # 判定公式在哪一个block
in_block += 1
intersect = []
# ## 逐个span###
for line in bbox['lines']:
if line['bbox'][1] <= ((y2 - y1) / 2) + y1 <= line['bbox'][3]: # 判断公式在哪一行
for item in line['spans']:
(t_x1, t_y1, t_x2, t_y2) = item['bbox']
if not ((t_x1 < x1 and t_x2 < x1) or (t_x1 > x2 and t_x2 > x2) or (t_y1 < y1 and t_y2 < y1) or (t_y1 > y2 and t_y2 > y2)): # 判断是否相交
intersect.append(item['bbox'])
# image = cv2.rectangle(image, (int(t_x1), int(t_y1)), (int(t_x2), int(t_y2)), (0, 255, 0), 1) # 可视化涉及到的span
# 可视化公式的分类
if len(intersect) == 0: # 没有与任何一个span有相交,这个span或者这个inline_formula_box可能有问题
# print(f'Wrong location, check {img_path}')
inline_formula_check_result = "nocheck_inline_formula"
# count['not_in_line'] += 1
elif len(intersect) == 1:
if abs((intersect[0][2] - intersect[0][0]) - (x2 - x1)) < (x2 - x1)*0.5: # 只涉及一个span但是几乎占据了这个span大部分的面积,判定为公式
# image = cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 1)
inline_formula_check_result = "true_inline_formula"
# count['one_span_large'] += 1
else: # 只涉及一个span并且只占据这个span的小部分面积,判断可能不是公式
# image = cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 1)
inline_formula_check_result = "false_inline_formula"
# count['fail'] += 1
else: # 横跨多个span,判定为公式
# image = cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 1)
inline_formula_check_result = "true_inline_formula"
# count['multi_span'] += 1
if in_block == 0: # 这个公式没有在任何的block里,这个公式可能有问题
# image = cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), (255, 255, 0), 1)
inline_formula_check_result = "nocheck_inline_formula"
# count['not_in_block'] += 1
elif in_block > 1: # 这个公式存在于多个block里,这个页面可能有问题
inline_formula_check_result = "wrong_text_block"
inline_formula_check.append(inline_formula_check_result)
return inline_formula_check
import sys
from typing import Tuple
import os
import boto3, json
from botocore.config import Config
from magic_pdf.libs import fitz
from loguru import logger
from pathlib import Path
from tqdm import tqdm
import numpy as np
# sys.path.insert(0, "/mnt/petrelfs/ouyanglinke/code-clean/")
# print(sys.path)
from validation import cal_edit_distance, format_gt_bbox, label_match, detect_val
# from pdf2text_recogFigure_20231107 import parse_images # 获取figures的bbox
# from pdf2text_recogTable_20231107 import parse_tables # 获取tables的bbox
# from pdf2text_recogEquation_20231108 import parse_equations # 获取equations的bbox
# from pdf2text_recogTitle_20231113 import parse_titles # 获取Title的bbox
# from pdf2text_recogPara import parse_blocks_per_page
# from bbox_sort import bbox_sort, CONTENT_IDX, CONTENT_TYPE_IDX
from magic_pdf.layout.bbox_sort import bbox_sort, CONTENT_IDX, CONTENT_TYPE_IDX
from magic_pdf.pre_proc import parse_images # 获取figures的bbox
from magic_pdf.pre_proc.detect_tables import parse_tables # 获取tables的bbox
from magic_pdf.pre_proc import parse_equations # 获取equations的bbox
# from pdf2text_recogFootnote import parse_footnotes # 获取footnotes的bbox
from magic_pdf.post_proc.detect_para import process_blocks_per_page
from magic_pdf.libs import parse_aws_param, parse_bucket_key, read_file, join_path
def cut_image(bbox: Tuple, page_num: int, page: fitz.Page, save_parent_path: str, s3_profile: str):
"""
从第page_num页的page中,根据bbox进行裁剪出一张jpg图片,返回图片路径
save_path:需要同时支持s3和本地, 图片存放在save_path下,文件名是: {page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg , bbox内数字取整。
"""
# 拼接路径
image_save_path = join_path(save_parent_path, f"{page_num}_{int(bbox[0])}_{int(bbox[1])}_{int(bbox[2])}_{int(bbox[3])}.jpg")
try:
# 将坐标转换为fitz.Rect对象
rect = fitz.Rect(*bbox)
# 配置缩放倍数为3倍
zoom = fitz.Matrix(3, 3)
# 截取图片
pix = page.get_pixmap(clip=rect, matrix=zoom)
# 打印图片文件名
# print(f"Saved {image_save_path}")
if image_save_path.startswith("s3://"):
ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
cli = boto3.client(service_name="s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=end_point,
config=Config(s3={'addressing_style': addressing_style}))
bucket_name, bucket_key = parse_bucket_key(image_save_path)
# 将字节流上传到s3
cli.upload_fileobj(pix.tobytes(output='jpeg', jpg_quality=95), bucket_name, bucket_key)
else:
# 保存图片到本地
# 先检查一下image_save_path的父目录是否存在,如果不存在,就创建
parent_dir = os.path.dirname(image_save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
pix.save(image_save_path, jpg_quality=95)
# 为了直接能在markdown里看,这里把地址改为相对于mardown的地址
pth = Path(image_save_path)
image_save_path = f"{pth.parent.name}/{pth.name}"
return image_save_path
except Exception as e:
logger.exception(e)
return image_save_path
def get_images_by_bboxes(book_name:str, page_num:int, page: fitz.Page, save_path:str, s3_profile:str, image_bboxes:list, table_bboxes:list, equation_inline_bboxes:list, equation_interline_bboxes:list) -> dict:
"""
返回一个dict, key为bbox, 值是图片地址
"""
ret = {}
# 图片的保存路径组成是这样的: {s3_or_local_path}/{book_name}/{images|tables|equations}/{page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg
image_save_path = join_path(save_path, book_name, "images")
table_save_path = join_path(save_path, book_name, "tables")
equation_inline_save_path = join_path(save_path, book_name, "equations_inline")
equation_interline_save_path = join_path(save_path, book_name, "equation_interline")
for bbox in image_bboxes:
image_path = cut_image(bbox, page_num, page, image_save_path, s3_profile)
ret[bbox] = (image_path, "image") # 第二个元素是"image",表示是图片
for bbox in table_bboxes:
image_path = cut_image(bbox, page_num, page, table_save_path, s3_profile)
ret[bbox] = (image_path, "table")
# 对公式目前只截图,不返回
for bbox in equation_inline_bboxes:
cut_image(bbox, page_num, page, equation_inline_save_path, s3_profile)
for bbox in equation_interline_bboxes:
cut_image(bbox, page_num, page, equation_interline_save_path, s3_profile)
return ret
def reformat_bboxes(images_box_path_dict:list, paras_dict:dict):
"""
把bbox重新组装成一个list,每个元素[x0, y0, x1, y1, block_content, idx_x, idx_y], 初始时候idx_x, idx_y都是None. 对于图片、公式来说,block_content是图片的地址, 对于段落来说,block_content是段落的内容
"""
all_bboxes = []
for bbox, image_info in images_box_path_dict.items():
all_bboxes.append([bbox[0], bbox[1], bbox[2], bbox[3], image_info, None, None, 'image'])
paras_dict = paras_dict[f"page_{paras_dict['page_id']}"]
for block_id, kvpair in paras_dict.items():
bbox = kvpair['bbox']
content = kvpair
all_bboxes.append([bbox[0], bbox[1], bbox[2], bbox[3], content, None, None, 'text'])
return all_bboxes
def concat2markdown(all_bboxes:list):
"""
对排序后的bboxes拼接内容
"""
content_md = ""
for box in all_bboxes:
content_type = box[CONTENT_TYPE_IDX]
if content_type == 'image':
image_type = box[CONTENT_IDX][1]
image_path = box[CONTENT_IDX][0]
content_md += f"![{image_type}]({image_path})"
content_md += "\n\n"
elif content_type == 'text': # 组装文本
paras = box[CONTENT_IDX]['paras']
text_content = ""
for para_id, para in paras.items():# 拼装内部的段落文本
text_content += para['text']
text_content += "\n\n"
content_md += text_content
else:
raise Exception(f"ERROR: {content_type} is not supported!")
return content_md
def main(s3_pdf_path: str, s3_pdf_profile: str, pdf_model_path:str, pdf_model_profile:str, save_path: str, page_num: int):
"""
"""
pth = Path(s3_pdf_path)
book_name = pth.name
#book_name = "".join(os.path.basename(s3_pdf_path).split(".")[0:-1])
res_dir_path = None
exclude_bboxes = []
# text_content_save_path = f"{save_path}/{book_name}/book.md"
# metadata_save_path = f"{save_path}/{book_name}/metadata.json"
try:
pdf_bytes = read_file(s3_pdf_path, s3_pdf_profile)
pdf_docs = fitz.open("pdf", pdf_bytes)
page_id = page_num - 1
page = pdf_docs[page_id] # 验证集只需要读取特定页面即可
model_output_json = join_path(pdf_model_path, f"page_{page_num}.json") # 模型输出的页面编号从1开始的
json_from_docx = read_file(model_output_json, pdf_model_profile) # TODO 这个读取方法名字应该改一下,避免语义歧义
json_from_docx_obj = json.loads(json_from_docx)
# 解析图片
image_bboxes = parse_images(page_id, page, json_from_docx_obj)
# 解析表格
table_bboxes = parse_tables(page_id, page, json_from_docx_obj)
# 解析公式
equations_interline_bboxes, equations_inline_bboxes = parse_equations(page_id, page, json_from_docx_obj)
# # 解析标题
# title_bboxs = parse_titles(page_id, page, res_dir_path, json_from_docx_obj, exclude_bboxes)
# # 解析页眉
# header_bboxs = parse_headers(page_id, page, res_dir_path, json_from_docx_obj, exclude_bboxes)
# # 解析页码
# pageNo_bboxs = parse_pageNos(page_id, page, res_dir_path, json_from_docx_obj, exclude_bboxes)
# # 解析脚注
# footnote_bboxs = parse_footnotes(page_id, page, res_dir_path, json_from_docx_obj, exclude_bboxes)
# # 解析页脚
# footer_bboxs = parse_footers(page_id, page, res_dir_path, json_from_docx_obj, exclude_bboxes)
# # 评估Layout是否规整、简单
# isSimpleLayout_flag, fullColumn_cnt, subColumn_cnt, curPage_loss = evaluate_pdf_layout(page_id, page, res_dir_path, json_from_docx_obj, exclude_bboxes)
# 把图、表、公式都进行截图,保存到本地,返回图片路径作为内容
images_box_path_dict = get_images_by_bboxes(book_name, page_id, page, save_path, s3_pdf_profile, image_bboxes, table_bboxes, equations_inline_bboxes,
equations_interline_bboxes) # 只要表格和图片的截图
# 解析文字段落
footer_bboxes = []
header_bboxes = []
exclude_bboxes = image_bboxes + table_bboxes
paras_dict = process_blocks_per_page(page, page_id, image_bboxes, table_bboxes, equations_inline_bboxes, equations_interline_bboxes, footer_bboxes, header_bboxes)
# paras_dict = postprocess_paras_pipeline(paras_dict)
# 最后一步,根据bbox进行从左到右,从上到下的排序,之后拼接起来, 排序
all_bboxes = reformat_bboxes(images_box_path_dict, paras_dict) # 由于公式目前还没有,所以equation_bboxes是None,多数存在段落里,暂时不解析
# 返回的是一个数组,每个元素[x0, y0, x1, y1, block_content, idx_x, idx_y, type], 初始时候idx_x, idx_y都是None. 对于图片、公式来说,block_content是图片的地址, 对于段落来说,block_content是段落的内容
# sorted_bboxes = bbox_sort(all_bboxes)
# markdown_text = concat2markdown(sorted_bboxes)
# parent_dir = os.path.dirname(text_content_save_path)
# if not os.path.exists(parent_dir):
# os.makedirs(parent_dir)
# with open(text_content_save_path, "a") as f:
# f.write(markdown_text)
# f.write(chr(12)) #换页符
# end for
# 写一个小的json,记录元数据
# metadata = {"book_name": book_name, "pdf_path": s3_pdf_path, "pdf_model_path": pdf_model_path, "save_path": save_path}
# with open(metadata_save_path, "w") as f:
# json.dump(metadata, f, ensure_ascii=False, indent=4)
return all_bboxes
except Exception as e:
print(f"ERROR: {s3_pdf_path}, {e}", file=sys.stderr)
logger.exception(e)
# @click.command()
# @click.option('--pdf-file-sub-path', help='s3上pdf文件的路径')
# @click.option('--save-path', help='解析出来的图片,文本的保存父目录')
def validation(validation_dataset: str, pdf_bin_file_profile: str, pdf_model_dir: str, pdf_model_profile: str, save_path: str):
#pdf_bin_file_path = "s3://llm-raw-snew/llm-raw-scihub/scimag07865000-07865999/10.1007/"
# pdf_bin_file_parent_path = "s3://llm-raw-snew/llm-raw-scihub/"
# pdf_model_parent_dir = "s3://llm-pdf-text/layout_det/scihub/"
# p = Path(pdf_file_sub_path)
# pdf_parent_path = p.parent
# pdf_file_name = p.name # pdf文件名字,含后缀
# pdf_bin_file_path = join_path(pdf_bin_file_parent_path, pdf_parent_path)
with open(validation_dataset, 'r') as f:
samples = json.load(f)
labels = []
det_res = []
edit_distance_list = []
for sample in tqdm(samples):
pdf_name = sample['pdf_name']
s3_pdf_path = sample['s3_path']
page_num = sample['page']
gt_order = sample['order']
pre = main(s3_pdf_path, pdf_bin_file_profile, join_path(pdf_model_dir, pdf_name), pdf_model_profile, save_path, page_num)
pre_dict_list = []
for item in pre:
pre_sample = {
'box': [item[0],item[1],item[2],item[3]],
'type': item[7],
'score': 1
}
pre_dict_list.append(pre_sample)
det_res.append(pre_dict_list)
match_change_dict = { # 待确认
"figure": "image",
"svg_figure": "image",
"inline_fomula": "equations_inline",
"fomula": "equation_interline",
"figure_caption": "text",
"table_caption": "text",
"fomula_caption": "text"
}
gt_annos = sample['annotations']
matched_label = label_match(gt_annos, match_change_dict)
labels.append(matched_label)
# 判断排序函数的精度
# 目前不考虑caption与图表相同序号的问题
ignore_category = ['abandon', 'figure_caption', 'table_caption', 'formula_caption']
gt_bboxes = format_gt_bbox(gt_annos, ignore_category)
sorted_bboxes = bbox_sort(gt_bboxes)
edit_distance = cal_edit_distance(sorted_bboxes)
edit_distance_list.append(edit_distance)
label_classes = ["image", "text", "table", "equation_interline"]
detect_matrix = detect_val(labels, det_res, label_classes)
print('detect_matrix', detect_matrix)
edit_distance_mean = np.mean(edit_distance_list)
print('edit_distance_mean', edit_distance_mean)
if __name__ == '__main__':
# 输入可以用以下命令生成批量pdf
# aws s3 ls s3://llm-pdf-text/layout_det/scihub/ --profile langchao | tail -n 10 | awk '{print "s3://llm-pdf-text/layout_det/scihub/"$4}' | xargs -I{} aws s3 ls {} --recursive --profile langchao | awk '{print substr($4,19)}' | parallel -j 1 echo {//} | sort -u
pdf_bin_file_profile = "outsider"
pdf_model_dir = "s3://llm-pdf-text/eval_1k/layout_res/"
pdf_model_profile = "langchao"
# validation_dataset = "/mnt/petrelfs/share_data/ouyanglinke/OCR/OCR_validation_dataset.json"
validation_dataset = "/mnt/petrelfs/share_data/ouyanglinke/OCR/OCR_validation_dataset_subset.json" # 测试
save_path = "/mnt/petrelfs/share_data/ouyanglinke/OCR/OCR_val_result"
validation(validation_dataset, pdf_bin_file_profile, pdf_model_dir, pdf_model_profile, save_path)
from magic_pdf.libs import fitz # pyMuPDF库
def calculate_overlapRatio_between_rect1_and_rect2(L1: float, U1: float, R1: float, D1: float, L2: float, U2: float, R2: float, D2: float) -> (float, float):
# 计算两个rect,重叠面积各占2个rect面积的比例
if min(R1, R2) < max(L1, L2) or min(D1, D2) < max(U1, U2):
return 0, 0
square_1 = (R1 - L1) * (D1 - U1)
square_2 = (R2 - L2) * (D2 - U2)
if square_1 == 0 or square_2 == 0:
return 0, 0
square_overlap = (min(R1, R2) - max(L1, L2)) * (min(D1, D2) - max(U1, U2))
return square_overlap / square_1, square_overlap / square_2
def evaluate_pdf_layout(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
DPI = 72 # use this resolution
pix = page.get_pixmap(dpi=DPI)
pageL = 0
pageR = int(pix.w)
pageU = 0
pageD = int(pix.h)
#--------- 通过json_from_DocXchain来获取 title ---------#
title_bbox_from_DocXChain = []
xf_json = json_from_DocXchain_obj
width_from_json = xf_json['page_info']['width']
height_from_json = xf_json['page_info']['height']
LR_scaleRatio = width_from_json / (pageR - pageL)
UD_scaleRatio = height_from_json / (pageD - pageU)
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
LOSS_THRESHOLD = 2000 # 经验值
fullColumn_bboxs = []
subColumn_bboxs = []
plainText_bboxs = []
#### read information of plain text
for xf in xf_json['layout_dets']:
L = xf['poly'][0] / LR_scaleRatio
U = xf['poly'][1] / UD_scaleRatio
R = xf['poly'][2] / LR_scaleRatio
D = xf['poly'][5] / UD_scaleRatio
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if xf['category_id'] == 2:
plainText_bboxs.append((L, U, R, D))
#### read information of column
for xf in xf_json['subfield_dets']:
L = xf['poly'][0] / LR_scaleRatio
U = xf['poly'][1] / UD_scaleRatio
R = xf['poly'][2] / LR_scaleRatio
D = xf['poly'][5] / UD_scaleRatio
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if xf['category_id'] == 11:
fullColumn_bboxs.append((L, U, R, D))
elif xf['category_id'] == 12:
subColumn_bboxs.append((L, U, R, D))
curPage_loss = 0 # 当前页的loss
fail_cnt = 0 # Text文本块没被圈到的情形。
for L, U, R, D in plainText_bboxs:
find = False
for L2, U2, R2, D2 in (fullColumn_bboxs + subColumn_bboxs):
ratio_1, _ = calculate_overlapRatio_between_rect1_and_rect2(L, U, R, D, L2, U2, R2, D2)
if ratio_1 >= 0.9:
loss_1 = (L + R) / 2 - (L2 + R2) / 2
loss_2 = L - L2
cur_loss = min(abs(loss_1), abs(loss_2))
curPage_loss += cur_loss
find = True
break
if find == False:
fail_cnt += 1
isSimpleLayout_flag = False
if fail_cnt == 0 and len(fullColumn_bboxs) <= 1 and len(subColumn_bboxs) <= 2:
if curPage_loss <= LOSS_THRESHOLD:
isSimpleLayout_flag = True
return isSimpleLayout_flag, len(fullColumn_bboxs), len(subColumn_bboxs), curPage_loss
from magic_pdf.libs import fitz
from typing import List
def show_image(item, title=""):
"""Display a pixmap.
Just to display Pixmap image of "item" - ignore the man behind the curtain.
Args:
item: any PyMuPDF object having a "get_pixmap" method.
title: a string to be used as image title
Generates an RGB Pixmap from item using a constant DPI and using matplotlib
to show it inline of the notebook.
"""
DPI = 150 # use this resolution
import numpy as np
import matplotlib.pyplot as plt
# %matplotlib inline
pix = item.get_pixmap(dpi=DPI)
img = np.ndarray([pix.h, pix.w, 3], dtype=np.uint8, buffer=pix.samples_mv)
plt.figure(dpi=DPI) # set the figure's DPI
plt.title(title) # set title of image
_ = plt.imshow(img, extent=(0, pix.w * 72 / DPI, pix.h * 72 / DPI, 0))
def calculate_overlapRatio_between_line1_and_line2(L1: float, R1: float, L2: float, R2: float) -> (float, float):
# 计算两个line,重叠line各占2个line长度的比例
if max(L1, L2) > min(R1, R2):
return 0, 0
if L1 == R1 or L2 == R2:
return 0, 0
overlap_line = min(R1, R2) - max(L1, L2)
return overlap_line / (R1 - L1), overlap_line / (R2 - L2)
def get_targetAxis_and_splitAxis(page_ID: int, page: fitz.Page, columnNumber: int, textBboxs: List[(float, float, float, float)]) -> (List[float], List[float]):
"""
param: page: fitz解析出来的格式
param: columnNumber: Text的列数
param: textBboxs: 文本块list。 [(L, U, R, D), ... ]
return:
"""
INF = 10 ** 9
pageL, pageU, pageR, pageD = INF, INF, 0, 0
for L, U, R, D in textBboxs:
assert L <= R and U <= D
pageL = min(pageL, L)
pageR = max(pageR, R)
pageU = min(pageU, U)
pageD = max(pageD, D)
pageWidth = pageR - pageL
pageHeight = pageD - pageU
pageL -= pageWidth / 10 # 10是经验值
pageR += pageWidth / 10
pageU -= pageHeight / 10
pageD += pageHeight / 10
pageWidth = pageR - pageL
pageHeight = pageD - pageU
x_targetAxis = []
x_splitAxis = []
for i in range(0, columnNumber * 2 + 1):
if i & 1:
x_targetAxis.append(pageL + pageWidth / (2 * columnNumber) * i)
else:
x_splitAxis.append(pageL + pageWidth / (2 * columnNumber) * i)
# # 可视化:分列的外框
# path_bbox = []
# N = len(x_targetAxis)
# for i in range(N):
# L, R = x_splitAxis[i], x_splitAxis[i + 1]
# path_bbox.append((L, pageU, R, pageD))
# shape = page.new_shape()
# # iterate over the bboxes
# color_map = [fitz.pdfcolor["red"], fitz.pdfcolor["blue"], fitz.pdfcolor["yellow"], fitz.pdfcolor["black"], fitz.pdfcolor["green"], fitz.pdfcolor["brown"]]
# for i, rect in enumerate(path_bbox):
# # if i < 20:
# # continue
# shape.draw_rect(rect) # draw a border
# shape.insert_text(Point(rect[0], rect[1])+(5, 15), str(i), color=fitz.pdfcolor["blue"])
# shape.finish(color=color_map[i%len(color_map)])
# # shape.finish(color=fitz.pdfcolor["blue"])
# shape.commit() # store to the page
# # if i == 3:
# # print(rect)
# # break
# # print(rect)
# show_image(page, f"Table & Header BBoxes")
return x_targetAxis, x_splitAxis
def calculate_loss(page_ID: int, x_targetAxis: List[float], x_splitAxis: List[float], textBboxs: List[(float, float, float, float)]) -> (float, bool):
INF = 10 ** 9
# page_artbox = page.artbox
# pageL, pageU, pageR, pageD = page_artbox[0], page_artbox[1], page_artbox[2], page_artbox[3]
pageL, pageU, pageR, pageD = INF, INF, 0, 0
for L, U, R, D in textBboxs:
assert L <= R and U <= D
pageL = min(pageL, L)
pageR = max(pageR, R)
pageU = min(pageU, U)
pageD = max(pageD, D)
pageWidth = pageR - pageL
pageHeight = pageD - pageU
pageL -= pageWidth / 10
pageR += pageWidth / 10
pageU -= pageHeight / 10
pageD += pageHeight / 10
pageWidth = pageR - pageL
pageHeight = pageD - pageU
col_N = len(x_targetAxis) # 列数
col_texts_mid = [[] for _ in range(col_N)]
col_texts_LR = [[] for _ in range(col_N)]
oneLocateLoss_mid = 0
oneLocateLoss_LR = 0
oneLocateCnt_mid = 0 # 完美在一列中的个数
oneLocateCnt_LR = 0
oneLocateSquare_mid = 0.0 # 完美在一列的面积
oneLocateSquare_LR = 0.0
multiLocateLoss_mid = 0
multiLocateLoss_LR = 0
multiLocateCnt_mid = 0 # 在多列中的个数
multiLocateCnt_LR = 0
multiLocateSquare_mid = 0.0 # 在多列中的面积
multiLocateSquare_LR = 0.0
allLocateLoss_mid = 0
allLocateLoss_LR = 0
allLocateCnt_mid = 0 # 横跨页面的大框的个数
allLocateCnt_LR = 0
allLocateSquare_mid = 0.0 # 横跨整个页面的个数
allLocateSquare_LR = 0.0
isSimpleCondition = True # 就1个。2种方式,只要有一种情况不规整,就是不规整。
colID_Textcnt_mid = [0 for _ in range(col_N)] # 每一列中有多少个Text块,根据mid判断的
colID_Textcnt_LR = [0 for _ in range(col_N)] # 每一列中有多少个Text块,根据区间边界判断
allLocateBboxs_mid = [] # 跨整页的,bbox
allLocateBboxs_LR = []
non_allLocateBboxs_mid = []
non_allLocateBboxs_LR = [] # 不在单独某一列,但又不是全列
for L, U, R, D in textBboxs:
if D - U < 40: # 现在还没拼接好。先简单这样过滤页眉。也会牺牲一些很窄的长条
continue
if R - L < 40:
continue
located_cols_mid = []
located_cols_LR = []
for col_ID in range(col_N):
if col_N == 1:
located_cols_mid.append(col_ID)
located_cols_LR.append(col_ID)
else:
if L <= x_targetAxis[col_ID] <= R:
located_cols_mid.append(col_ID)
if calculate_overlapRatio_between_line1_and_line2(x_splitAxis[col_ID], x_splitAxis[col_ID + 1], L, R)[0] >= 0.2:
located_cols_LR.append(col_ID)
if len(located_cols_mid) == col_N:
allLocateBboxs_mid.append((L, U, R, D))
else:
non_allLocateBboxs_mid.append((L, U, R, D))
if len(located_cols_LR) == col_N:
allLocateBboxs_LR.append((L, U, R, D))
else:
non_allLocateBboxs_LR.append((L, U, R, D))
allLocateBboxs_mid.sort(key=lambda LURD: (LURD[1], LURD[0]))
non_allLocateBboxs_mid.sort(key=lambda LURD: (LURD[1], LURD[0]))
allLocateBboxs_LR.sort(key=lambda LURD: (LURD[1], LURD[0]))
non_allLocateBboxs_LR.sort(key=lambda LURD: (LURD[1], LURD[0]))
# --------------------判断,是不是有标题类的小块,掺杂在一列的pdf页面里。-------------#
isOneClumn = False
under_cnt = 0
under_square = 0.0
before_cnt = 0
before_square = 0.0
for nL, nU, nR, nD in non_allLocateBboxs_mid:
cnt = 0
for L, U, R, D in allLocateBboxs_mid:
if nD <= U:
cnt += 1
if cnt >= 1:
before_cnt += cnt
before_square += (R - L) * (D - U) * cnt
else:
under_cnt += 1
under_square += (R - L) * (D - U) * cnt
if (before_square + under_square) != 0 and before_square / (before_square + under_square) >= 0.2:
isOneClumn = True
if isOneClumn == True and col_N != 1:
return INF, False
if isOneClumn == True and col_N == 1:
return 0, True
#### 根据边界的统计情况,再判断一次
isOneClumn = False
under_cnt = 0
under_square = 0.0
before_cnt = 0
before_square = 0.0
for nL, nU, nR, nD in non_allLocateBboxs_LR:
cnt = 0
for L, U, R, D in allLocateBboxs_LR:
if nD <= U:
cnt += 1
if cnt >= 1:
before_cnt += cnt
before_square += (R - L) * (D - U) * cnt
else:
under_cnt += 1
under_square += (R - L) * (D - U) * cnt
if (before_square + under_square) != 0 and before_square / (before_square + under_square) >= 0.2:
isOneClumn = True
if isOneClumn == True and col_N != 1:
return INF, False
if isOneClumn == True and col_N == 1:
return 0, True
for L, U, R, D in textBboxs:
assert L < R and U < D, 'There is an error on bbox of text when calculate loss!'
# 简单排除页眉、迷你小块
# if (D - U) < pageHeight / 15 < 40 or (R - L) < pageWidth / 8:
if (D - U) < 40:
continue
if (R - L) < 40:
continue
mid = (L + R) / 2
located_cols_mid = [] # 在哪一列里,根据中点来判断
located_cols_LR = [] # 在哪一列里,根据边界判断
for col_ID in range(col_N):
if col_N == 1:
located_cols_mid.append(col_ID)
else:
# 根据中点判断
if L <= x_targetAxis[col_ID] <= R:
located_cols_mid.append(col_ID)
# 根据边界判断
if calculate_overlapRatio_between_line1_and_line2(x_splitAxis[col_ID], x_splitAxis[col_ID + 1], L, R)[0] >= 0.2:
located_cols_LR.append(col_ID)
## 1列的情形
if col_N == 1:
oneLocateLoss_mid += abs(mid - x_targetAxis[located_cols_mid[0]]) * (D - U) * (R - L)
# oneLocateLoss_mid += abs(L - x_splitAxis[located_cols[0]]) * (D - U) * (R - L)
oneLocateLoss_LR += abs(L - x_splitAxis[located_cols_mid[0]]) * (D - U) * (R - L)
oneLocateCnt_mid += 1
oneLocateSquare_mid += (D - U) * (R - L)
## 多列的情形
else:
######## 根据mid判断
if len(located_cols_mid) == 1:
oneLocateLoss_mid += abs(mid - x_targetAxis[located_cols_mid[0]]) * (D - U) * (R - L)
# oneLocateLoss_mid += abs(L - x_splitAxis[located_cols[0]]) * (D - U) * (R - L)
oneLocateCnt_mid += 1
oneLocateSquare_mid += (D - U) * (R - L)
elif 1 <= len(located_cols_mid) < col_N:
ll, rr = located_cols_mid[0], located_cols_mid[-1]
# multiLocateLoss_mid += abs(mid - (x_targetAxis[ll] + x_targetAxis[rr]) / 2) * (D - U) * (R - L)
multiLocateLoss_mid += abs(mid - x_targetAxis[ll]) * (D - U) * (R - L)
# multiLocateLoss_mid += abs(mid - (pageL + pageR) / 2) * (D - U) * (R - L)
multiLocateCnt_mid += 1
multiLocateSquare_mid += (D - U) * (R - L)
isSimpleCondition = False
else:
allLocateLoss_mid += abs(mid - (pageR + pageL) / 2) * (D - U) * (R - L)
allLocateCnt_mid += 1
allLocateSquare_mid += (D - U) * (R - L)
isSimpleCondition = False
######## 根据区间的边界判断
if len(located_cols_LR) == 1:
oneLocateLoss_LR += abs(mid - x_targetAxis[located_cols_LR[0]]) * (D - U) * (R - L)
# oneLocateLoss_LR += abs(L - x_splitAxis[located_cols_LR[0]]) * (D - U) * (R - L)
oneLocateCnt_LR += 1
oneLocateSquare_LR += (D - U) * (R - L)
elif 1 <= len(located_cols_LR) < col_N:
ll, rr = located_cols_LR[0], located_cols_LR[-1]
# multiLocateLoss_LR += abs(mid - (x_targetAxis[ll] + x_targetAxis[rr]) / 2) * (D - U) * (R - L)
multiLocateLoss_LR += abs(mid - x_targetAxis[ll]) * (D - U) * (R - L)
# multiLocateLoss_LR += abs(mid - (pageL + pageR) / 2) * (D - U) * (R - L)
multiLocateCnt_LR += 1
multiLocateSquare_LR += (D - U) * (R - L)
isSimpleCondition = False
else:
allLocateLoss_LR += abs(mid - (pageR + pageL) / 2) * (D - U) * (R - L)
allLocateCnt_LR += 1
allLocateSquare_LR += (D - U) * (R - L)
isSimpleCondition = False
tot_TextCnt = oneLocateCnt_mid + multiLocateCnt_mid + allLocateCnt_mid
tot_TextSquare = oneLocateSquare_mid + multiLocateSquare_mid + allLocateSquare_mid
# 1列的情形
if tot_TextSquare != 0 and allLocateSquare_mid / tot_TextSquare >= 0.85 and col_N == 1:
return 0, True
# 多列的情形
# if col_N >= 2:
# if allLocateCnt >= 1:
# oneLocateLoss_mid += ((pageR - pageL)) * oneLocateCnt_mid
# multiLocateLoss_mid += ((pageR - pageL) ) * multiLocateCnt_mid
# else:
# if multiLocateCnt_mid >= 1:
# oneLocateLoss_mid += ((pageR - pageL)) * oneLocateCnt_mid
totLoss_mid = oneLocateLoss_mid + multiLocateLoss_mid + allLocateLoss_mid
totLoss_LR = oneLocateCnt_LR + multiLocateCnt_LR + allLocateLoss_LR
return totLoss_mid + totLoss_LR, isSimpleCondition
def get_columnNumber(page_ID: int, page: fitz.Page, textBboxs) -> (int, float):
columnNumber_loss = dict()
columnNumber_isSimpleCondition = dict()
#### 枚举列数
for columnNumber in range(1, 5):
# print('---------{}--------'.format(columnNumber))
x_targetAxis, x_splitAxis = get_targetAxis_and_splitAxis(page_ID, page, columnNumber, textBboxs)
loss, isSimpleCondition = calculate_loss(page_ID, x_targetAxis, x_splitAxis, textBboxs)
columnNumber_loss[columnNumber] = loss
columnNumber_isSimpleCondition[columnNumber] = isSimpleCondition
col_idxs = [i for i in range(1, len(columnNumber_loss) + 1)]
col_idxs.sort(key=lambda i: (columnNumber_loss[i], i))
return col_idxs, columnNumber_loss, columnNumber_isSimpleCondition
import re
from magic_pdf.libs import _is_in_or_part_overlap
from magic_pdf.libs import fitz
import collections
def calculate_overlapRatio_between_rect1_and_rect2(L1: float, U1: float, R1: float, D1: float, L2: float, U2: float, R2: float, D2: float) -> (float, float):
# 计算两个rect,重叠面积各占2个rect面积的比例
if min(R1, R2) < max(L1, L2) or min(D1, D2) < max(U1, U2):
return 0, 0
square_1 = (R1 - L1) * (D1 - U1)
square_2 = (R2 - L2) * (D2 - U2)
if square_1 == 0 or square_2 == 0:
return 0, 0
square_overlap = (min(R1, R2) - max(L1, L2)) * (min(D1, D2) - max(U1, U2))
return square_overlap / square_1, square_overlap / square_2
def calculate_overlapRatio_between_line1_and_line2(L1: float, R1: float, L2: float, R2: float) -> (float, float):
# 计算两个line,重叠区间各占2个line长度的比例
if max(L1, L2) > min(R1, R2):
return 0, 0
if L1 == R1 or L2 == R2:
return 0, 0
overlap_line = min(R1, R2) - max(L1, L2)
return overlap_line / (R1 - L1), overlap_line / (R2 - L2)
def parse_footnoteLine(page_ID: int, page: fitz.Page, json_from_DocXchain_obj, exclude_bboxes):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
DPI = 72 # use this resolution
pix = page.get_pixmap(dpi=DPI)
pageL = 0
pageR = int(pix.w)
pageU = 0
pageD = int(pix.h)
#---------------------- PyMuPDF解析text --------------------#
textSize_freq = collections.defaultdict(float) # text块中,textSize的频率
textBlock_bboxs = []
textLine_bboxs = []
text_blocks = page.get_text(
"dict",
flags=fitz.TEXTFLAGS_TEXT,
#clip=clip,
)["blocks"]
totText_list = []
for i in range(len(text_blocks)):
# print(blocks[i]) #### print
bbox = text_blocks[i]['bbox']
textBlock_bboxs.append(bbox)
# print(bbox)
cur_block_text_list = []
for tt in text_blocks[i]['lines']:
# 当前line
cur_line_text_list = []
cur_line_bbox = None # 当前line,最右侧的section的bbox
for xf in tt['spans']:
L, U, R, D = xf['bbox']
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
textLine_bboxs.append((L, U, R, D))
cur_line_text_list.append(xf['text'])
textSize_freq[xf['size']] += len(xf['text'])
cur_lines_text = ' '.join(cur_line_text_list)
cur_block_text_list.append(cur_lines_text)
totText_list.append('\n'.join(cur_block_text_list))
totText = '\n'.join(totText_list)
# print(totText) # 打印Text
textLine_bboxs.sort(key = lambda LURD: (LURD[0], LURD[1]))
textBlock_bboxs.sort(key = lambda LURD: (LURD[0], LURD[1]))
# print('------------ textSize_freq -----------')
max_sizeFreq = 0 # 出现频率最高的textSize
textSize_withMaxFreq = 0
for x, f in textSize_freq.items():
# print(x, f)
if f > max_sizeFreq:
max_sizeFreq = f
textSize_withMaxFreq = x
#**********************************************************#
#------------------ PyMuPDF读取drawings -----------------#
horizon_lines = []
drawings = page.get_cdrawings()
for drawing in drawings:
try:
rect = drawing['rect']
L, U, R, D = rect
# if (L, U, R, D) in exclude_bboxes:
# continue # 如果是Fiugre, Table, Equation。注释掉是因为,可以暂时先不消,先自我对消。最后再判读需不需要排除。
# 如果是水平线
if U <= D and D - U <= 3:
# 如果长度够
if (pageR - pageL) / 15 <= R - L:
if not(80/800 * pageD <= U <= 750/800 * pageD):
continue # 很可能是页眉和页脚的线
horizon_lines.append((L, U, R, D))
# print((L, U, R, D))
except:
pass
horizon_lines.sort(key = lambda LURD: (LURD[1]))
#********************************************************#
#----------------- 两条线可能是在表格中 ------------------#
def has_text_below_line(L: float, U: float, R: float, D: float, inLowerArea: bool) -> bool:
"""
检查线下是否紧挨着text
"""
Uu, Du = U - textSize_withMaxFreq, U # 线上的一个矩形
Lu, Ru = L, R
Ud, Dd = U, U + textSize_withMaxFreq # 线下的一个矩形
Ld, Rd = L, R
find = 0 # 在线下的文字。统计面积。
leftTextCnt = 0 # 不在线底下的文字(整体在线左侧的文字),说明不是个脚注线。统计面积。
English_alpha_cnt = 0 # 英文字母个数
nonEnglish_alpha_cnt = 0 # 非英文字母个数
punctuation_mark_cnt = 0 # 常见标点符号个数
digit_cnt = 0 # 数字个数
distance_nearest_up_line = None
distance_nearest_down_line = None
for i in range(len(text_blocks)):
# print(blocks[i]) #### print
bbox = text_blocks[i]['bbox']
L0, U0, R0, D0 = bbox
if 0< (R0 - L0) < pageR / 6 and (D0 - U0) / (R0 - L0) > 10 :
continue # 一个很窄的,竖直的长条。比如,arXiv预印本,左侧的arXiv标志信息。
textBlock_bboxs.append(bbox)
# print(bbox)
cur_block_text_list = []
for tt in text_blocks[i]['lines']:
# 当前line
cur_line_text_list = []
cur_line_bbox = None # 当前line,最右侧的section的bbox
for xf in tt['spans']:
L2, U2, R2, D2 = xf['bbox']
L2, R2 = min(L2, R2), max(L2, R2)
U2, D2 = min(U2, D2), max(U2, D2)
textLine = xf['text']
if L>0 and L2 < L and (L - L2) / L > 0.2:
leftTextCnt += abs(R2 - L2) * abs(D2 - U2)
else:
## 线下的部分
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(Ud, Dd, U2, D2)
ratio_3, ratio_4 = calculate_overlapRatio_between_line1_and_line2(Ld, Rd, L2, R2)
if U < (U2 + D2) / 2 and ratio_1 > 0 and ratio_2 > 0:
if max(ratio_3, ratio_4) > 0.8:
# if 444 <= U1 < 445 and 55 <= L2 < 56:
# print('匹配的框', L2, U2, R2, D2)
# if xf['size'] > 1.2 * textSize_withMaxFreq:
# return False # 可能是个标题。不能这样卡
find += abs(R2 - L2) * abs(D2 - U2)
distance_nearest_down_line = (U2 + D2) / 2 - U
for c in textLine:
if c == ' ':
continue
elif c.isdigit() == True:
digit_cnt += 1
elif c in ',.:!?[]()%,。、!?:【】()《》-':
punctuation_mark_cnt += 1
elif c.isalpha() == True:
English_alpha_cnt += 1
else:
nonEnglish_alpha_cnt += 1
## 线上的部分
ratio_5, ratio_6 = calculate_overlapRatio_between_line1_and_line2(Uu, Du, U2, D2)
ratio_7, ratio_8 = calculate_overlapRatio_between_line1_and_line2(Lu, Ru, L2, R2)
if (U2 + D2) / 2 < U and ratio_5 > 0 and ratio_6 > 0:
if max(ratio_7, ratio_8) > 0.8:
distance_nearest_up_line = U - (U2 + D2) / 2
# if distance_nearest_up_line < 0:
# print(Lu, Uu, Ru, Du, L2, U2, R2, D2)
# print(distance_nearest_up_line, distance_nearest_down_line)
if distance_nearest_up_line != None and distance_nearest_down_line != None:
if distance_nearest_up_line * 1.5 < distance_nearest_down_line:
return False # 如果,一根线。距离上面的文字line更近。说明是个下划线,而不是footnoteLine
## 在上面的线条,要考虑左侧的text块儿。在很靠下的线条,就暂时不考虑左侧text块儿了。
if inLowerArea == False:
if leftTextCnt >= 2000/500000 * pageR * pageD:
return False
return find >= 0 and (English_alpha_cnt + nonEnglish_alpha_cnt + digit_cnt) >= 10
## 最下面区域的线条,判断时。
# print(English_alpha_cnt, nonEnglish_alpha_cnt, digit_cnt)
if (English_alpha_cnt + nonEnglish_alpha_cnt + digit_cnt) == 0:
return False
if (English_alpha_cnt + digit_cnt) / (English_alpha_cnt + nonEnglish_alpha_cnt + digit_cnt) > 0.5:
if nonEnglish_alpha_cnt / (English_alpha_cnt + nonEnglish_alpha_cnt + digit_cnt) > 0.4:
return False
else:
return True
return True
visited = [False for _ in range(len(horizon_lines))]
for i, b1 in enumerate(horizon_lines):
for j in range(i + 1, len(horizon_lines)):
L1, U1, R1, D1 = horizon_lines[i]
L2, U2, R2, D2 = horizon_lines[j]
## 在一条水平线,且挨着
if L1 > L2:
L1, U1, R1, D1, L2, U2, R2, D2 = L2, U2, R2, D2, L1, U1, R1, D1
in_horizontal_line_flag = (max(U1, D1, U2, D2) - min(U1, D1, U2, D2) <= 5) and (L2 - R1 <= pageR/10)
if in_horizontal_line_flag == True:
visited[i] = True
visited[j] = True
## 在竖直方向上是一致的。(表格,或者有的文章就是喜欢划线)
L1, U1, R1, D1 = horizon_lines[i]
L2, U2, R2, D2 = horizon_lines[j]
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(L1, R1, L2, R2)
# print(L1, U1, R1, D1, L2, U2, R2, D2, ratio_1, ratio_2)
in_vertical_line_flag = (ratio_1 > 0.9 and ratio_2 > 0.9) or (max(ratio_1, ratio_2) > 0.95)
if in_vertical_line_flag == True:
visited[i] = True
# if (U2 < pageD * 0.8 or (U2 - U1) < pageD * 0.3) and has_text_below_line(L2, U2, R2, D2, False) == False:
# visited[j] = True # 最最底下的线先不要动
else:
if ratio_1 > 0 and (R2 - L2) / (R1 - L1) > 1:
visited[i] = True
# print(horizon_lines)
horizon_lines = [horizon_lines[i] for i in range(len(horizon_lines)) if visited[i] == False]
# print(horizon_lines)
#*****************************************************************#
#------- 靠上的,就不是脚注。用一个THRESHOLD直接卡掉位于上半页的 -------#
visited = [False for _ in range(len(horizon_lines))]
THRESHOLD = (pageD - pageU) * 0.5
for i, (L, U, R, D) in enumerate(horizon_lines):
if U < THRESHOLD:
visited[i] = True
horizon_lines = [horizon_lines[i] for i in range(len(horizon_lines)) if visited[i] == False]
#******************************************************#
#--------------- 此时,还有遮挡的,上面的丢弃 ---------------#
visited = [False for _ in range(len(horizon_lines))]
for i, (L1, U1, R1, D1) in enumerate(horizon_lines):
for j in range(i + 1, len(horizon_lines)):
L2, U2, R2, D2 = horizon_lines[j]
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(L1, R1, L2, R2)
if (ratio_1 > 0.2 and ratio_2 > 0.2) or max(ratio_1, ratio_2) > 0.7:
visited[i] = True
horizon_lines = [horizon_lines[i] for i in range(len(horizon_lines)) if visited[i] == False]
#********************************************************#
# print(horizon_lines)
## 检查,线下面有没有紧挨着的text
horizon_lines = [LURD for LURD in horizon_lines if has_text_below_line(*(LURD), True) == True]
# print(horizon_lines)
## 卡一下长度
# horizon_lines = [LURD for LURD in horizon_lines if (LURD[2] - LURD[0] >= pageR / 10)]
## 上面最多保留2条
horizon_lines = horizon_lines[max(-2, -len(horizon_lines)) :]
#----------------------------------------------------- 第2段 -----------------------------------------------------------#
#----------------------------------- 最下面的情形,用距离硬卡。还有在右侧的情形就被包含了 -----------------------------------#
#------------------ PyMuPDF读取drawings -----------------#
down_horizon_lines = []
drawings = page.get_cdrawings()
for drawing in drawings:
try:
rect = drawing['rect']
L, U, R, D = rect
# if (L, U, R, D) in exclude_bboxes:
# continue # 如果是Fiugre, Table, Equation。目前是Figure识别的比较好。但是Table和Equation识别的不好
# 如果是水平线
if U <= D and D - U <= 3 and U > pageD * 0.85:
# 如果长度够
if (pageR - pageL) / 15 <= R - L:
down_horizon_lines.append((L, U, R, D))
# print((L, U, R, D))
except:
pass
down_horizon_lines.sort(key = lambda LURD: (LURD[0], LURD[2], LURD[1]))
visited = [False for _ in range(len(down_horizon_lines))]
for i in range(len(down_horizon_lines) - 1):
L1, U1, R1, D1 = down_horizon_lines[i]
L2, U2, R2, D2 = down_horizon_lines[i + 1]
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(L1, R1, L2, R2)
if ratio_1 <= 0.1 and ratio_2 <= 0.1:
if L2 - R1 <= pageR / 3:
visited[i] = True
visited[i + 1] = True
down_horizon_lines = [down_horizon_lines[i] for i in range(len(down_horizon_lines)) if visited[i] == False]
down_horizon_lines = [LURD for LURD in down_horizon_lines if has_text_below_line(*(LURD), True) == True]
# for LURD in down_horizon_lines:
# print('第2阶段,LURD是: ', LURD)
# print(has_text_below_line(*(LURD), True))
footnoteLines = horizon_lines + down_horizon_lines
footnoteLines = list(set(footnoteLines))
footnoteLines = footnoteLines[max(-2, -len(footnoteLines)) : ]
#-------------------------- 最后再检查一遍。是否在图片、表格、公式中。 ------------------------------#
def line_in_specialBboxes(L: float, U: float, R: float, D: float, specialBboxes) -> bool:
L2, U2, R2, D2 = L, U, R, D # 当前这根线
for L1, U1, R1, D1 in specialBboxes:
if U1 <= U2 <= D2 < D1:
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(L1, R1, L2, R2)
if ratio_1 > 0 and ratio_2 > 0.6:
return True
# else:
# U1 -= min(textSize_withMaxFreq * 2, 20)
# D1 += min(textSize_withMaxFreq * 2, 20)
# if U1 <= U2 <= D2 < D1:
# ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(L1, R1, L2, R2)
# if ratio_1 > 0 and ratio_2 > 0.8:
# return True
return False
footnoteLines = [LURD for LURD in footnoteLines if line_in_specialBboxes(*(LURD), exclude_bboxes) == False]
#-------------------------- 检查,线,是否在当前column的左侧,而不是在一段文字的中间 (通过DocXChain识别的column或者徐超老师写的Layout识别)------------------------------#
# #--------- 通过json_from_DocXchain来获取 column ---------#
# column_bbox_from_DocXChain = []
# xf_json = json_from_DocXchain_obj
# width_from_json = xf_json['page_info']['width']
# height_from_json = xf_json['page_info']['height']
# LR_scaleRatio = width_from_json / (pageR - pageL)
# UD_scaleRatio = height_from_json / (pageD - pageU)
# # {0: 'title', # 标题
# # 1: 'figure', # 图片
# # 2: 'plain text', # 文本
# # 3: 'header', # 页眉
# # 4: 'page number', # 页码
# # 5: 'footnote', # 脚注
# # 6: 'footer', # 页脚
# # 7: 'table', # 表格
# # 8: 'table caption', # 表格描述
# # 9: 'figure caption', # 图片描述
# # 10: 'equation', # 公式
# # 11: 'full column', # 单栏
# # 12: 'sub column', # 多栏
# # 13: 'embedding', # 嵌入公式
# # 14: 'isolated'} # 单行公式
# for xf in xf_json['layout_dets']:
# L = xf['poly'][0] / LR_scaleRatio
# U = xf['poly'][1] / UD_scaleRatio
# R = xf['poly'][2] / LR_scaleRatio
# D = xf['poly'][5] / UD_scaleRatio
# # L += pageL # 有的页面,artBox偏移了。不在(0,0)
# # R += pageL
# # U += pageU
# # D += pageU
# L, R = min(L, R), max(L, R)
# U, D = min(U, D), max(U, D)
# if (xf['category_id'] == 11 or xf['category_id'] == 12) and xf['score'] >= 0.3:
# column_bbox_from_DocXChain.append((L, U, R, D))
#---------------手写,检查,线是否是与某个column的左端对齐 ------------------#
def check_isOnTheLeftOfColumn(L: float, U: float, R: float, D: float) -> bool:
LL = L - textSize_withMaxFreq
RR = LL
UU = max(pageD * 0.02, U - 100/800 * pageD)
DD = min(U + 50/800 * pageD, pageD * 0.98)
# print(LL, UU, RR, DD)
cnt = 0
for bbox in textLine_bboxs:
L2, U2, R2, D2 = bbox
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(UU, DD, U2, D2)
ratio_3, ratio_4 = calculate_overlapRatio_between_line1_and_line2(L, R, L2, R2)
if ratio_1 > 0 and ratio_2 > 0:
if max(ratio_3, ratio_4) > 0.8:
if abs(LL - L2) <= 20/700 * pageR:
cnt += 1
# else:
# if (R2 - L2) >= 30/700 * pageR:
# print(LL, UU, RR, DD, L2, U2, R2, D2)
# return False # 不能这样卡。有些注释里面,单独的特殊符号就是一个textLineBbox
# print('cnt: ', cnt)
return cnt >= 4
# def check_isOnTheLeftOfColumn_considerLayout(L0: float, U0: float, R0: float, D0: float) -> bool:
# LL = L0 - textSize_withMaxFreq * 1.5
# RR = LL
# UU = 100/800 * pageD
# DD = 700/800 * pageD
# STEP = textSize_withMaxFreq / 2
# def check_ok(L: float, U: float, R: float, D: float) -> bool:
# for bbox in textBlock_bboxs:
# L2, U2, R2, D2 = bbox
# ratio_3, ratio_4 = calculate_overlapRatio_between_line1_and_line2(L, R, L2, R2)
# if max(ratio_3, ratio_4) > 0.8:
# if (R2 - L2) > 1/4 * pageR and L2 < LL <= RR < R2:
# if abs(LL - L2) < 50/700 * pageR or abs(RR - R2) < 50/700 * pageR:
# continue
# else:
# return False
# return True
# ## 先探上面
# u = UU
# d = U0
# while u + STEP/2 < d:
# mid = (u + d) / 2
# if check_ok(L0, mid, R0, U0) == True:
# d = mid
# else:
# u = mid + STEP
# print(mid)
# dist_up = U0 - u
# print(u)
# ## 再探下面
# u = D0
# d = DD
# while u + STEP/2 < d:
# mid = (u + d) / 2
# if check_ok(L0, mid, R0, D0) == True:
# u = mid
# else:
# d = mid - STEP
# print(u)
# print('^^^^^^^^^^^^^^')
# dist_down = u - D0
# if dist_up + dist_down < textSize_withMaxFreq * 10:
# return False
# return True
footnoteLines = [LURD for LURD in footnoteLines if check_isOnTheLeftOfColumn(*(LURD)) == True]
# footnoteLines = [LURD for LURD in footnoteLines if check_isOnTheLeftOfColumn_considerLayout(*(LURD)) == True] # 不具有泛化性。不用了。
#--------------------------------- 通过footnoteLine获取bbox -------------------------------#
def get_footnoteBbox(L: float, U: float, R: float, D: float) -> (float, float, float, float):
"""
检查线下是否紧挨着text
"""
L1, U1, R1, D1 = L, U, R, D
raw_bboxes = []
for i in range(len(text_blocks)):
bbox = text_blocks[i]['bbox']
L2, U2, R2, D2 = bbox
if (D2 - U2) / (R2 - L2) > 10 and (R2 - L2) < pageR / 6:
continue # 一个很窄的,竖直的长条。比如,arXiv预印本,左侧的arXiv标志信息。
if U2 < D2 < U1:
continue # 在线上面
under_THRESHOLD = min(D1 + textSize_withMaxFreq * 20, pageD * 0.98)
if U2 < under_THRESHOLD:
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(L1, R1, L2, R2)
if max(ratio_1, ratio_2) > 0.8:
raw_bboxes.append((L2, U2, R2, D2))
# print(L1, U1, R1, D1)
# print(raw_bboxes)
if len(raw_bboxes) == 0:
return []
raw_bboxes.sort(key = lambda LURD: (LURD[1], LURD[0]))
raw_bboxes = [LURD for LURD in raw_bboxes if (abs(LURD[0] - L1) < textSize_withMaxFreq * 6 or L1 < LURD[0])] # footnote的bbox,应该都是左端对齐的
if len(raw_bboxes) == 0:
return []
#------------------ full column和sub column混合,肯定也不行 ------------------#
LL, UU, RR, DD = raw_bboxes[0]
for L, U, R, D in raw_bboxes:
LL, UU, RR, DD = min(LL, L), min(UU, U), max(RR, R), max(DD, D)
for L, U, R, D in raw_bboxes:
if (RR - LL) > pageR*0.8 and (R - L) > pageR * 0.15 and (RR - LL) / (R - L) > 2:
return []
if abs(LL - L) > textSize_withMaxFreq * 3:
return []
#-------------------- 太高了的,full column的框。不行 ----------------------#
if UU < 650/800 * pageD and (RR - LL) > 0.5 * pageR:
return []
#-------------- 第一段字数很少。后面的段字数很多,也不行 ----------------#
if len(raw_bboxes) > 1:
bbox_square = []
for L, U, R, D in raw_bboxes:
cur_s = abs(R - L) * abs(D - U)
bbox_square.append(cur_s)
s0 = bbox_square[0]
s1n = sum(bbox_square[1: ]) / len(bbox_square[1: ])
if s1n / s0 > 10 or max(bbox_square) / s0 > 15:
return []
raw_bboxes += [(LL, UU, RR, DD)]
return raw_bboxes
# print(footnoteLines)
footnoteBboxes = []
for L, U, R, D in footnoteLines:
cur = get_footnoteBbox(L, U, R, D)
if len(cur) > 0:
footnoteBboxes.append((L, U, R, D))
footnoteBboxes += cur
footnoteBboxes = list(set(footnoteBboxes))
return footnoteBboxes
def __bbox_in(box1, box2):
"""
box1是否在box2中
"""
L1, U1, R1, D1 = box1
L2, U2, R2, D2 = box2
if int(L2) <= int(L1) and int(U2) <= int(U1) and int(R1) <= int(R2) and int(D1) <= int(D2):
return True
return False
def remove_footnote_text(raw_text_block, footnote_bboxes):
"""
:param raw_text_block: str类型,是当前页的文本内容
:param footnoteBboxes: list类型,是当前页的脚注bbox
"""
footnote_text_blocks = []
for block in raw_text_block:
text_bbox = block['bbox']
# TODO 更严谨点在line级别做
if any([_is_in_or_part_overlap(text_bbox, footnote_bbox) for footnote_bbox in footnote_bboxes]):
#if any([text_bbox[3]>=footnote_bbox[1] for footnote_bbox in footnote_bboxes]):
block['tag'] = 'footnote'
footnote_text_blocks.append(block)
#raw_text_block.remove(block)
# 移除,不能再内部移除,否则会出错
for block in footnote_text_blocks:
raw_text_block.remove(block)
return raw_text_block, footnote_text_blocks
def remove_footnote_image(image_blocks, footnote_bboxes):
"""
:param image_bboxes: list类型,是当前页的图片bbox(结构体)
:param footnoteBboxes: list类型,是当前页的脚注bbox
"""
footnote_imgs_blocks = []
for image_block in image_blocks:
if any([__bbox_in(image_block['bbox'], footnote_bbox) for footnote_bbox in footnote_bboxes]):
footnote_imgs_blocks.append(image_block)
for footnote_imgs_block in footnote_imgs_blocks:
image_blocks.remove(footnote_imgs_block)
return image_blocks, footnote_imgs_blocks
def remove_headder_footer_one_page(text_raw_blocks, image_bboxes, table_bboxes, header_bboxs, footer_bboxs, page_no_bboxs, page_w, page_h):
"""
删除页眉页脚,页码
从line级别进行删除,删除之后观察这个text-block是否是空的,如果是空的,则移动到remove_list中
"""
header = []
footer = []
if len(header)==0:
model_header = header_bboxs
if model_header:
x0 = min([x for x,_,_,_ in model_header])
y0 = min([y for _,y,_,_ in model_header])
x1 = max([x1 for _,_,x1,_ in model_header])
y1 = max([y1 for _,_,_,y1 in model_header])
header = [x0, y0, x1, y1]
if len(footer)==0:
model_footer = footer_bboxs
if model_footer:
x0 = min([x for x,_,_,_ in model_footer])
y0 = min([y for _,y,_,_ in model_footer])
x1 = max([x1 for _,_,x1,_ in model_footer])
y1 = max([y1 for _,_,_,y1 in model_footer])
footer = [x0, y0, x1, y1]
header_y0 = 0 if len(header) == 0 else header[3]
footer_y0 = page_h if len(footer) == 0 else footer[1]
if page_no_bboxs:
top_part = [b for b in page_no_bboxs if b[3] < page_h/2]
btn_part = [b for b in page_no_bboxs if b[1] > page_h/2]
top_max_y0 = max([b[1] for b in top_part]) if top_part else 0
btn_min_y1 = min([b[3] for b in btn_part]) if btn_part else page_h
header_y0 = max(header_y0, top_max_y0)
footer_y0 = min(footer_y0, btn_min_y1)
content_boundry = [0, header_y0, page_w, footer_y0]
header = [0,0, page_w, header_y0]
footer = [0, footer_y0, page_w, page_h]
"""以上计算出来了页眉页脚的边界,下面开始进行删除"""
text_block_to_remove = []
# 首先检查每个textblock
for blk in text_raw_blocks:
if len(blk['lines']) > 0:
for line in blk['lines']:
line_del = []
for span in line['spans']:
span_del = []
if span['bbox'][3] < header_y0:
span_del.append(span)
elif _is_in_or_part_overlap(span['bbox'], header) or _is_in_or_part_overlap(span['bbox'], footer):
span_del.append(span)
for span in span_del:
line['spans'].remove(span)
if not line['spans']:
line_del.append(line)
for line in line_del:
blk['lines'].remove(line)
else:
# if not blk['lines']:
blk['tag'] = 'in-foot-header-area'
text_block_to_remove.append(blk)
"""有的时候由于pageNo太小了,总是会有一点和content_boundry重叠一点,被放入正文,因此对于pageNo,进行span粒度的删除"""
page_no_block_2_remove = []
if page_no_bboxs:
for pagenobox in page_no_bboxs:
for block in text_raw_blocks:
if _is_in_or_part_overlap(pagenobox, block['bbox']): # 在span级别删除页码
for line in block['lines']:
for span in line['spans']:
if _is_in_or_part_overlap(pagenobox, span['bbox']):
#span['text'] = ''
span['tag'] = "page-no"
# 检查这个block是否只有这一个span,如果是,那么就把这个block也删除
if len(line['spans']) == 1 and len(block['lines'])==1:
page_no_block_2_remove.append(block)
else:
# 测试最后一个是不是页码:规则是,最后一个block仅有1个line,一个span,且text是数字,空格,符号组成,不含字母,并且包含数字
if len(text_raw_blocks) > 0:
text_raw_blocks.sort(key=lambda x: x['bbox'][1], reverse=True)
last_block = text_raw_blocks[0]
if len(last_block['lines']) == 1:
last_line = last_block['lines'][0]
if len(last_line['spans']) == 1:
last_span = last_line['spans'][0]
if last_span['text'].strip() and not re.search('[a-zA-Z]', last_span['text']) and re.search('[0-9]', last_span['text']):
last_span['tag'] = "page-no"
page_no_block_2_remove.append(last_block)
for b in page_no_block_2_remove:
text_block_to_remove.append(b)
for blk in text_block_to_remove:
if blk in text_raw_blocks:
text_raw_blocks.remove(blk)
text_block_remain = text_raw_blocks
image_bbox_to_remove = [bbox for bbox in image_bboxes if not _is_in_or_part_overlap(bbox, content_boundry)]
image_bbox_remain = [bbox for bbox in image_bboxes if _is_in_or_part_overlap(bbox, content_boundry)]
table_bbox_to_remove = [bbox for bbox in table_bboxes if not _is_in_or_part_overlap(bbox, content_boundry)]
table_bbox_remain = [bbox for bbox in table_bboxes if _is_in_or_part_overlap(bbox, content_boundry)]
return image_bbox_remain, table_bbox_remain, text_block_remain, text_block_to_remove, image_bbox_to_remove, table_bbox_to_remove
This source diff could not be displayed because it is too large. You can view the blob instead.
from magic_pdf.libs.commons import fitz # pyMuPDF库
def parse_titles(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict, exclude_bboxes):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
DPI = 72 # use this resolution
pix = page.get_pixmap(dpi=DPI)
pageL = 0
pageR = int(pix.w)
pageU = 0
pageD = int(pix.h)
#--------- 通过json_from_DocXchain来获取 title ---------#
title_bbox_from_DocXChain = []
xf_json = json_from_DocXchain_obj
width_from_json = xf_json['page_info']['width']
height_from_json = xf_json['page_info']['height']
LR_scaleRatio = width_from_json / (pageR - pageL)
UD_scaleRatio = height_from_json / (pageD - pageU)
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
for xf in xf_json['layout_dets']:
L = xf['poly'][0] / LR_scaleRatio
U = xf['poly'][1] / UD_scaleRatio
R = xf['poly'][2] / LR_scaleRatio
D = xf['poly'][5] / UD_scaleRatio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if xf['category_id'] == 0 and xf['score'] >= 0.3:
title_bbox_from_DocXChain.append((L, U, R, D))
title_final_names = []
title_final_bboxs = []
title_ID = 0
for L, U, R, D in title_bbox_from_DocXChain:
# cur_title = page.get_pixmap(clip=(L,U,R,D))
new_title_name = "title_{}_{}.png".format(page_ID, title_ID) # 标题name
# cur_title.save(res_dir_path + '/' + new_title_name) # 把标题存储在新建的文件夹,并命名
title_final_names.append(new_title_name) # 把标题的名字存在list中
title_final_bboxs.append((L, U, R, D))
title_ID += 1
title_final_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
curPage_all_title_bboxs = title_final_bboxs
return curPage_all_title_bboxs
import numpy as np
import tqdm
import json
from validation import cal_edit_distance, format_gt_bbox
from magic_pdf.layout.layout_sort import sort_with_layout
with open('/mnt/petrelfs/share_data/ouyanglinke/OCR/OCR_validation_dataset_final_rotated_formulafix_highdpi_scihub.json', 'r') as f:
samples = json.load(f)
# labels = []
# det_res = []
edit_distance_dict = []
edit_distance_list = []
for i, sample in tqdm.tqdm(enumerate(samples)):
pdf_name = sample['pdf_name']
s3_pdf_path = sample['s3_path']
page_num = sample['page']
page_width = sample['annotations']['width']
page_height = sample['annotations']['height']
# pre = main(s3_pdf_path, pdf_bin_file_profile, join_path(pdf_model_dir, pdf_name), pdf_model_profile, save_path, page_num)
# pre_dict_list = []
# for item in pre:
# pre_sample = {
# 'box': [item[0],item[1],item[2],item[3]],
# 'type': item[7],
# 'score': 1
# }
# pre_dict_list.append(pre_sample)
# det_res.append(pre_dict_list)
# match_change_dict = { # 待确认
# "figure": "image",
# "svg_figure": "image",
# "inline_fomula": "equations_inline",
# "fomula": "equation_interline",
# "figure_caption": "text",
# "table_caption": "text",
# "fomula_caption": "text"
# }
gt_annos = sample['annotations']
# matched_label = label_match(gt_annos, match_change_dict)
# labels.append(matched_label)
# 判断排序函数的精度
# 目前不考虑caption与图表相同序号的问题
ignore_category = ['abandon', 'figure_caption', 'table_caption', 'formula_caption', 'inline_fomula']
gt_bboxes = format_gt_bbox(gt_annos, ignore_category)
sorted_bboxes, _ = sort_with_layout(gt_bboxes, page_width, page_height)
if sorted_bboxes:
edit_distance = cal_edit_distance(sorted_bboxes)
edit_distance_list.append(edit_distance)
edit_distance_dict.append({
"sample_id": i,
"s3_path": s3_pdf_path,
"page_num": page_num,
"page_s2_path": sample['page_path'],
"edit_distance": edit_distance
})
# label_classes = ["image", "text", "table", "equation_interline"]
# detect_matrix = detect_val(labels, det_res, label_classes)
# print('detect_matrix', detect_matrix)
edit_distance_mean = np.mean(edit_distance_list)
print('edit_distance_mean', edit_distance_mean)
edit_distance_dict_sorted = sorted(edit_distance_dict, key=lambda x: x['edit_distance'], reverse=True)
# print(edit_distance_dict_sorted)
result = {
"edit_distance_mean": edit_distance_mean,
"edit_distance_dict_sorted": edit_distance_dict_sorted
}
with open('vali_bbox_sort_result.json', 'w') as f:
json.dump(result, f)
\ No newline at end of file
import numpy as np
from mmeval import COCODetection
import distance
def reformat_gt_and_pred(labels, det_res, label_classes):
preds = []
gts = []
for idx, (ann, pred) in enumerate(zip(labels, det_res)):
# with open(label_path, "r") as f:
# ann = json.load(f)
gt_bboxes = []
gt_labels = []
for item in ann['step_1']['result']:
if item['attribute'] in label_classes:
gt_bboxes.append([item['x'], item['y'], item['x']+item['width'], item['y']+item['height']])
gt_labels.append(label_classes.index(item['attribute']))
gts.append({
'img_id': idx,
'width': ann['width'],
'height': ann['height'],
'bboxes': np.array(gt_bboxes),
'labels': np.array(gt_labels),
'ignore_flags': [False]*len(gt_labels),
})
bboxes = []
labels = []
scores = []
for item in pred:
bboxes.append(item['box'])
labels.append(label_classes.index(item['type']))
scores.append(item['score'])
preds.append({
'img_id': idx,
'bboxes': np.array(bboxes),
'scores': np.array(scores),
'labels': np.array(labels),
})
return gts, preds
def detect_val(labels, det_res, label_classes):
# label_classes = ['inline_formula', "formula"]
meta={'CLASSES':tuple(label_classes)}
coco_det_metric = COCODetection(dataset_meta=meta, metric=['bbox'])
gts, preds = reformat_gt_and_pred(labels, det_res, label_classes)
res = coco_det_metric(predictions=preds, groundtruths=gts)
return res
def label_match(annotations, match_change_dict):
for item in annotations['step_1']['result']:
if item['attribute'] in match_change_dict.keys():
item['attribute'] = match_change_dict[item['attribute']]
return annotations
def format_gt_bbox(annotations, ignore_category):
gt_bboxes = []
for item in annotations['step_1']['result']:
if item['textAttribute'] and item['attribute'] not in ignore_category:
x0 = item['x']
y0 = item['y']
x1 = item['x'] + item['width']
y1 = item['y'] + item['height']
order = item['textAttribute']
category = item['attribute']
gt_bboxes.append([x0, y0, x1, y1, order, None, None, category])
return gt_bboxes
def cal_edit_distance(sorted_bboxes):
# order_list = [int(bbox[4]) for bbox in sorted_bboxes]
# print(sorted_bboxes[0][0][12])
order_list = [int(bbox[12]) for bbox in sorted_bboxes]
sorted_order = sorted(order_list, key=int)
distance_cal = distance.levenshtein(order_list, sorted_order)
if len(order_list) > 0:
return distance_cal / len(order_list)
else:
return 0
\ No newline at end of file
pytest
Levenshtein
nltk
rapidfuzz
......@@ -12,3 +13,4 @@ scikit-learn
tqdm
htmltabletomd
pypandoc
pyopenssl==24.0.0
\ No newline at end of file
......@@ -2,7 +2,6 @@ import os
conf = {
"code_path": os.environ.get('GITHUB_WORKSPACE'),
"pdf_dev_path" : os.environ.get('GITHUB_WORKSPACE') + "/tests/test_cli/pdf_dev",
"pdf_res_path": "/share/quyuan/mineru/data/mineru"
"pdf_res_path": "/tmp"
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment