Unverified Commit c2d5dd2b authored by myhloli's avatar myhloli Committed by GitHub

Merge pull request #42 from icecraft/feat/add_txt_parse

Feat/add txt parse
parents a77cb36d b16599cd
...@@ -26,6 +26,8 @@ import json as json_parse ...@@ -26,6 +26,8 @@ import json as json_parse
from datetime import datetime from datetime import datetime
import click import click
from magic_pdf.pipe.UNIPipe import UNIPipe from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.libs.config_reader import get_s3_config from magic_pdf.libs.config_reader import get_s3_config
from magic_pdf.libs.path_utils import ( from magic_pdf.libs.path_utils import (
parse_s3path, parse_s3path,
...@@ -33,9 +35,9 @@ from magic_pdf.libs.path_utils import ( ...@@ -33,9 +35,9 @@ from magic_pdf.libs.path_utils import (
remove_non_official_s3_args, remove_non_official_s3_args,
) )
from magic_pdf.libs.config_reader import get_local_dir from magic_pdf.libs.config_reader import get_local_dir
from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter, MODE_BIN, MODE_TXT from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
parse_pdf_methods = click.Choice(["ocr", "txt", "auto"]) parse_pdf_methods = click.Choice(["ocr", "txt", "auto"])
...@@ -53,24 +55,34 @@ def prepare_env(): ...@@ -53,24 +55,34 @@ def prepare_env():
def _do_parse(pdf_bytes, model_list, parse_method, image_writer, md_writer, image_dir): def _do_parse(pdf_bytes, model_list, parse_method, image_writer, md_writer, image_dir):
uni_pipe = UNIPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True) if parse_method == "auto":
jso_useful_key = { pipe = UNIPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True)
"_pdf_type": "txt", elif parse_method == "txt":
"model_list": model_list, pipe = TXTPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True)
} elif parse_method == "ocr":
if parse_method == "ocr": pipe = OCRPipe(pdf_bytes, model_list, image_writer, image_dir, is_debug=True)
jso_useful_key["_pdf_type"] = "ocr" else:
print("unknow parse method")
uni_pipe.pipe_parse() os.exit(1)
md_content = uni_pipe.pipe_mk_markdown()
pipe.pipe_classify()
pipe.pipe_parse()
md_content = pipe.pipe_mk_markdown()
part_file_name = datetime.now().strftime("%H-%M-%S") part_file_name = datetime.now().strftime("%H-%M-%S")
md_writer.write(content=md_content, path=f"{part_file_name}.md", mode=MODE_TXT)
md_writer.write( md_writer.write(
content=json_parse.dumps( content=md_content, path=f"{part_file_name}.md", mode=AbsReaderWriter.MODE_TXT
uni_pipe.pdf_mid_data, ensure_ascii=False, indent=4 )
), md_writer.write(
content=json_parse.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
path=f"{part_file_name}.json", path=f"{part_file_name}.json",
mode=MODE_TXT, mode=AbsReaderWriter.MODE_TXT,
)
try:
content_list = pipe.pipe_mk_uni_format()
except Exception as e:
print(e)
md_writer.write(
str(content_list), f"{part_file_name}.txt", AbsReaderWriter.MODE_TXT
) )
...@@ -106,7 +118,10 @@ def json_command(json, method): ...@@ -106,7 +118,10 @@ def json_command(json, method):
byte_start, byte_end = int(may_range_params[0]), int(may_range_params[1]) byte_start, byte_end = int(may_range_params[0]), int(may_range_params[1])
byte_end += byte_start - 1 byte_end += byte_start - 1
return s3_rw.read_jsonl( return s3_rw.read_jsonl(
remove_non_official_s3_args(s3path), byte_start, byte_end, MODE_BIN remove_non_official_s3_args(s3path),
byte_start,
byte_end,
AbsReaderWriter.MODE_BIN,
) )
jso = json_parse.loads(read_s3_path(json).decode("utf-8")) jso = json_parse.loads(read_s3_path(json).decode("utf-8"))
...@@ -119,7 +134,7 @@ def json_command(json, method): ...@@ -119,7 +134,7 @@ def json_command(json, method):
_do_parse( _do_parse(
pdf_data, pdf_data,
jso['doc_layout_result'], jso["doc_layout_result"],
method, method,
local_image_rw, local_image_rw,
local_md_rw, local_md_rw,
...@@ -148,7 +163,7 @@ def pdf_command(pdf, model, method): ...@@ -148,7 +163,7 @@ def pdf_command(pdf, model, method):
def read_fn(path): def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path)) disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), MODE_BIN) return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
pdf_data = read_fn(pdf) pdf_data = read_fn(pdf)
jso = json_parse.loads(read_fn(model).decode("utf-8")) jso = json_parse.loads(read_fn(model).decode("utf-8"))
......
import time
from loguru import logger
from magic_pdf.layout.layout_sort import get_bboxes_layout
from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.commons import fitz, get_delta_time
from magic_pdf.model.magic_model import MagicModel
from magic_pdf.pre_proc.construct_page_dict import ocr_construct_page_component_v2
from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
from magic_pdf.pre_proc.ocr_detect_all_bboxes import ocr_prepare_bboxes_for_layout_split
from magic_pdf.pre_proc.ocr_dict_merge import (
sort_blocks_by_layout,
fill_spans_in_blocks,
fix_block_spans,
)
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.pre_proc.ocr_span_list_modify import (
remove_overlaps_min_spans,
get_qa_need_list_v2,
)
from magic_pdf.pre_proc.equations_replace import (
combine_chars_to_pymudict,
remove_chars_in_text_blocks,
replace_equations_in_textblock,
)
from magic_pdf.pre_proc.equations_replace import (
combine_chars_to_pymudict,
remove_chars_in_text_blocks,
replace_equations_in_textblock,
)
from magic_pdf.pre_proc.citationmarker_remove import remove_citation_marker
def txt_spans_extract(pdf_page, inline_equations, interline_equations):
text_raw_blocks = pdf_page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
char_level_text_blocks = pdf_page.get_text("rawdict", flags=fitz.TEXTFLAGS_TEXT)[
"blocks"
]
text_blocks = combine_chars_to_pymudict(text_raw_blocks, char_level_text_blocks)
text_blocks = replace_equations_in_textblock(
text_blocks, inline_equations, interline_equations
)
text_blocks = remove_citation_marker(text_blocks)
text_blocks = remove_chars_in_text_blocks(text_blocks)
spans = []
for v in text_blocks:
for line in v["lines"]:
for span in line["spans"]:
spans.append(
{
"bbox": list(span["bbox"]),
"content": span["text"],
"type": ContentType.Text,
}
)
return spans
def replace_text_span(pymu_spans, ocr_spans):
return list(filter(lambda x: x["type"] != ContentType.Text, ocr_spans)) + pymu_spans
def parse_pdf_by_txt(
pdf_bytes,
model_list,
imageWriter,
start_page_id=0,
end_page_id=None,
debug_mode=False,
):
pdf_bytes_md5 = compute_md5(pdf_bytes)
pdf_docs = fitz.open("pdf", pdf_bytes)
"""初始化空的pdf_info_dict"""
pdf_info_dict = {}
"""用model_list和docs对象初始化magic_model"""
magic_model = MagicModel(model_list, pdf_docs)
"""根据输入的起始范围解析pdf"""
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
"""初始化启动时间"""
start_time = time.time()
for page_id in range(start_page_id, end_page_id + 1):
"""debug时输出每页解析的耗时"""
if debug_mode:
time_now = time.time()
logger.info(
f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}"
)
start_time = time_now
"""从magic_model对象中获取后面会用到的区块信息"""
img_blocks = magic_model.get_imgs(page_id)
table_blocks = magic_model.get_tables(page_id)
discarded_blocks = magic_model.get_discarded(page_id)
text_blocks = magic_model.get_text_blocks(page_id)
title_blocks = magic_model.get_title_blocks(page_id)
inline_equations, interline_equations, interline_equation_blocks = (
magic_model.get_equations(page_id)
)
page_w, page_h = magic_model.get_page_size(page_id)
"""将所有区块的bbox整理到一起"""
all_bboxes = ocr_prepare_bboxes_for_layout_split(
img_blocks,
table_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equation_blocks,
page_w,
page_h,
)
"""根据区块信息计算layout"""
page_boundry = [0, 0, page_w, page_h]
layout_bboxes, layout_tree = get_bboxes_layout(
all_bboxes, page_boundry, page_id
)
"""根据layout顺序,对当前页面所有需要留下的block进行排序"""
sorted_blocks = sort_blocks_by_layout(all_bboxes, layout_bboxes)
"""ocr 中文本类的 span 用 pymu spans 替换!"""
ocr_spans = magic_model.get_all_spans(page_id)
pymu_spans = txt_spans_extract(
pdf_docs[page_id], inline_equations, interline_equations
)
spans = replace_text_span(pymu_spans, ocr_spans)
"""删除重叠spans中较小的那些"""
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
"""对image和table截图"""
spans = ocr_cut_image_and_table(
spans, pdf_docs[page_id], page_id, pdf_bytes_md5, imageWriter
)
"""将span填入排好序的blocks中"""
block_with_spans = fill_spans_in_blocks(sorted_blocks, spans)
"""对block进行fix操作"""
fix_blocks = fix_block_spans(block_with_spans, img_blocks, table_blocks)
"""获取QA需要外置的list"""
images, tables, interline_equations = get_qa_need_list_v2(fix_blocks)
"""构造pdf_info_dict"""
page_info = ocr_construct_page_component_v2(
fix_blocks,
layout_bboxes,
page_id,
page_w,
page_h,
layout_tree,
images,
tables,
interline_equations,
discarded_blocks,
)
pdf_info_dict[f"page_{page_id}"] = page_info
"""分段"""
pass
"""dict转list"""
pdf_info_list = dict_to_list(pdf_info_dict)
new_pdf_info_dict = {
"pdf_info": pdf_info_list,
}
return new_pdf_info_dict
if __name__ == "__main__":
if 1:
import fitz
import json
with open("/opt/data/pdf/20240418/25536-00.pdf", "rb") as f:
pdf_bytes = f.read()
pdf_docs = fitz.open("pdf", pdf_bytes)
with open("/opt/data/pdf/20240418/25536-00.json") as f:
model_list = json.loads(f.readline())
magic_model = MagicModel(model_list, pdf_docs)
for i in range(7):
print(magic_model.get_imgs(i))
for page_no, page in enumerate(pdf_docs):
inline_equations, interline_equations, interline_equation_blocks = (
magic_model.get_equations(page_no)
)
text_raw_blocks = page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
char_level_text_blocks = page.get_text(
"rawdict", flags=fitz.TEXTFLAGS_TEXT
)["blocks"]
text_blocks = combine_chars_to_pymudict(
text_raw_blocks, char_level_text_blocks
)
text_blocks = replace_equations_in_textblock(
text_blocks, inline_equations, interline_equations
)
text_blocks = remove_citation_marker(text_blocks)
text_blocks = remove_chars_in_text_blocks(text_blocks)
""" """
对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果 对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果
""" """
from magic_pdf.libs.commons import fitz from magic_pdf.libs.commons import fitz
import json import json
import os import os
...@@ -17,24 +18,24 @@ def combine_chars_to_pymudict(block_dict, char_dict): ...@@ -17,24 +18,24 @@ def combine_chars_to_pymudict(block_dict, char_dict):
把block级别的pymupdf 结构里加入char结构 把block级别的pymupdf 结构里加入char结构
""" """
# 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充 # 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充
char_map = {tuple(item['bbox']):item for item in char_dict} char_map = {tuple(item["bbox"]): item for item in char_dict}
for i in range(len(block_dict)): # blcok for i in range(len(block_dict)): # blcok
block = block_dict[i] block = block_dict[i]
key = block['bbox'] key = block["bbox"]
char_dict_item = char_map[tuple(key)] char_dict_item = char_map[tuple(key)]
char_dict_map = {tuple(item['bbox']):item for item in char_dict_item['lines']} char_dict_map = {tuple(item["bbox"]): item for item in char_dict_item["lines"]}
for j in range(len(block['lines'])): for j in range(len(block["lines"])):
lines = block['lines'][j] lines = block["lines"][j]
with_char_lines = char_dict_map[lines['bbox']] with_char_lines = char_dict_map[lines["bbox"]]
for k in range(len(lines['spans'])): for k in range(len(lines["spans"])):
spans = lines['spans'][k] spans = lines["spans"][k]
try: try:
chars = with_char_lines['spans'][k]['chars'] chars = with_char_lines["spans"][k]["chars"]
except Exception as e: except Exception as e:
logger.error(char_dict[i]['lines'][j]) logger.error(char_dict[i]["lines"][j])
spans['chars'] = chars spans["chars"] = chars
return block_dict return block_dict
...@@ -54,23 +55,22 @@ def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox): ...@@ -54,23 +55,22 @@ def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox):
# The area of overlap area # The area of overlap area
intersection_area = (x_right - x_left) * (y_bottom - y_top) intersection_area = (x_right - x_left) * (y_bottom - y_top)
min_box_area = (min_bbox[3]-min_bbox[1])*(min_bbox[2]-min_bbox[0]) min_box_area = (min_bbox[3] - min_bbox[1]) * (min_bbox[2] - min_bbox[0])
if min_box_area==0: if min_box_area == 0:
return 0 return 0
else: else:
return intersection_area / min_box_area return intersection_area / min_box_area
def _is_xin(bbox1, bbox2): def _is_xin(bbox1, bbox2):
area1 = abs(bbox1[2]-bbox1[0])*abs(bbox1[3]-bbox1[1]) area1 = abs(bbox1[2] - bbox1[0]) * abs(bbox1[3] - bbox1[1])
area2 = abs(bbox2[2]-bbox2[0])*abs(bbox2[3]-bbox2[1]) area2 = abs(bbox2[2] - bbox2[0]) * abs(bbox2[3] - bbox2[1])
if area1<area2: if area1 < area2:
ratio = calculate_overlap_area_2_minbox_area_ratio(bbox2, bbox1) ratio = calculate_overlap_area_2_minbox_area_ratio(bbox2, bbox1)
else: else:
ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2) ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2)
return ratio>0.6 return ratio > 0.6
def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks): def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
...@@ -78,8 +78,11 @@ def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks): ...@@ -78,8 +78,11 @@ def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
for eq_bbox in interline_bboxes: for eq_bbox in interline_bboxes:
removed_txt_blk = [] removed_txt_blk = []
for text_blk in text_blocks: for text_blk in text_blocks:
text_bbox = text_blk['bbox'] text_bbox = text_blk["bbox"]
if calculate_overlap_area_2_minbox_area_ratio(eq_bbox['bbox'], text_bbox)>=0.7: if (
calculate_overlap_area_2_minbox_area_ratio(eq_bbox["bbox"], text_bbox)
>= 0.7
):
removed_txt_blk.append(text_blk) removed_txt_blk.append(text_blk)
for blk in removed_txt_blk: for blk in removed_txt_blk:
text_blocks.remove(blk) text_blocks.remove(blk)
...@@ -87,7 +90,6 @@ def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks): ...@@ -87,7 +90,6 @@ def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
return text_blocks return text_blocks
def _is_in_or_part_overlap(box1, box2) -> bool: def _is_in_or_part_overlap(box1, box2) -> bool:
""" """
两个bbox是否有部分重叠或者包含 两个bbox是否有部分重叠或者包含
...@@ -98,54 +100,78 @@ def _is_in_or_part_overlap(box1, box2) -> bool: ...@@ -98,54 +100,78 @@ def _is_in_or_part_overlap(box1, box2) -> bool:
x0_1, y0_1, x1_1, y1_1 = box1 x0_1, y0_1, x1_1, y1_1 = box1
x0_2, y0_2, x1_2, y1_2 = box2 x0_2, y0_2, x1_2, y1_2 = box2
return not (x1_1 < x0_2 or # box1在box2的左边 return not (
x0_1 > x1_2 or # box1在box2的右边 x1_1 < x0_2 # box1在box2的左边
y1_1 < y0_2 or # box1在box2的上边 or x0_1 > x1_2 # box1在box2的右边
y0_1 > y1_2) # box1在box2的下边 or y1_1 < y0_2 # box1在box2的上边
or y0_1 > y1_2
) # box1在box2的下边
def remove_text_block_overlap_interline_equation_bbox(interline_eq_bboxes, pymu_block_list): def remove_text_block_overlap_interline_equation_bbox(
interline_eq_bboxes, pymu_block_list
):
"""消除掉行行内公式有部分重叠的文本块的内容。 """消除掉行行内公式有部分重叠的文本块的内容。
同时重新计算消除重叠之后文本块的大小""" 同时重新计算消除重叠之后文本块的大小"""
deleted_block = [] deleted_block = []
for text_block in pymu_block_list: for text_block in pymu_block_list:
deleted_line = [] deleted_line = []
for line in text_block['lines']: for line in text_block["lines"]:
deleted_span = [] deleted_span = []
for span in line['spans']: for span in line["spans"]:
deleted_chars = [] deleted_chars = []
for char in span['chars']: for char in span["chars"]:
if any([_is_in_or_part_overlap(char['bbox'], eq_bbox['bbox']) for eq_bbox in interline_eq_bboxes]): if any(
[
_is_in_or_part_overlap(char["bbox"], eq_bbox["bbox"])
for eq_bbox in interline_eq_bboxes
]
):
deleted_chars.append(char) deleted_chars.append(char)
# 检查span里没有char则删除这个span # 检查span里没有char则删除这个span
for char in deleted_chars: for char in deleted_chars:
span['chars'].remove(char) span["chars"].remove(char)
# 重新计算这个span的大小 # 重新计算这个span的大小
if len(span['chars'])==0: # 删除这个span if len(span["chars"]) == 0: # 删除这个span
deleted_span.append(span) deleted_span.append(span)
else: else:
span['bbox'] = min([b['bbox'][0] for b in span['chars']]),min([b['bbox'][1] for b in span['chars']]),max([b['bbox'][2] for b in span['chars']]), max([b['bbox'][3] for b in span['chars']]) span["bbox"] = (
min([b["bbox"][0] for b in span["chars"]]),
min([b["bbox"][1] for b in span["chars"]]),
max([b["bbox"][2] for b in span["chars"]]),
max([b["bbox"][3] for b in span["chars"]]),
)
# 检查这个span # 检查这个span
for span in deleted_span: for span in deleted_span:
line['spans'].remove(span) line["spans"].remove(span)
if len(line['spans'])==0: #删除这个line if len(line["spans"]) == 0: # 删除这个line
deleted_line.append(line) deleted_line.append(line)
else: else:
line['bbox'] = min([b['bbox'][0] for b in line['spans']]),min([b['bbox'][1] for b in line['spans']]),max([b['bbox'][2] for b in line['spans']]), max([b['bbox'][3] for b in line['spans']]) line["bbox"] = (
min([b["bbox"][0] for b in line["spans"]]),
min([b["bbox"][1] for b in line["spans"]]),
max([b["bbox"][2] for b in line["spans"]]),
max([b["bbox"][3] for b in line["spans"]]),
)
# 检查这个block是否可以删除 # 检查这个block是否可以删除
for line in deleted_line: for line in deleted_line:
text_block['lines'].remove(line) text_block["lines"].remove(line)
if len(text_block['lines'])==0: # 删除block if len(text_block["lines"]) == 0: # 删除block
deleted_block.append(text_block) deleted_block.append(text_block)
else: else:
text_block['bbox'] = min([b['bbox'][0] for b in text_block['lines']]),min([b['bbox'][1] for b in text_block['lines']]),max([b['bbox'][2] for b in text_block['lines']]), max([b['bbox'][3] for b in text_block['lines']]) text_block["bbox"] = (
min([b["bbox"][0] for b in text_block["lines"]]),
min([b["bbox"][1] for b in text_block["lines"]]),
max([b["bbox"][2] for b in text_block["lines"]]),
max([b["bbox"][3] for b in text_block["lines"]]),
)
# 检查text block删除 # 检查text block删除
for block in deleted_block: for block in deleted_block:
pymu_block_list.remove(block) pymu_block_list.remove(block)
if len(pymu_block_list)==0: if len(pymu_block_list) == 0:
return [] return []
return pymu_block_list return pymu_block_list
...@@ -154,8 +180,8 @@ def remove_text_block_overlap_interline_equation_bbox(interline_eq_bboxes, pymu_ ...@@ -154,8 +180,8 @@ def remove_text_block_overlap_interline_equation_bbox(interline_eq_bboxes, pymu_
def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list): def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
"""在行间公式对应的地方插上一个伪造的block""" """在行间公式对应的地方插上一个伪造的block"""
for eq in interline_eq_bboxes: for eq in interline_eq_bboxes:
bbox = eq['bbox'] bbox = eq["bbox"]
latex_content = eq['latex_text'] latex_content = eq["latex"]
text_block = { text_block = {
"number": len(pymu_block_list), "number": len(pymu_block_list),
"type": 0, "type": 0,
...@@ -172,24 +198,19 @@ def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list): ...@@ -172,24 +198,19 @@ def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
"ascender": 0.9409999847412109, "ascender": 0.9409999847412109,
"descender": -0.3050000071525574, "descender": -0.3050000071525574,
"text": f"\n$$\n{latex_content}\n$$\n", "text": f"\n$$\n{latex_content}\n$$\n",
"origin": [ "origin": [bbox[0], bbox[1]],
bbox[0], "bbox": bbox,
bbox[1]
],
"bbox": bbox
} }
], ],
"wmode": 0, "wmode": 0,
"dir": [ "dir": [1.0, 0.0],
1.0, "bbox": bbox,
0.0
],
"bbox": bbox
} }
] ],
} }
pymu_block_list.append(text_block) pymu_block_list.append(text_block)
def x_overlap_ratio(box1, box2): def x_overlap_ratio(box1, box2):
a, _, c, _ = box1 a, _, c, _ = box1
e, _, g, _ = box2 e, _, g, _ = box2
...@@ -205,8 +226,10 @@ def x_overlap_ratio(box1, box2): ...@@ -205,8 +226,10 @@ def x_overlap_ratio(box1, box2):
return overlap_ratio return overlap_ratio
def __is_x_dir_overlap(bbox1, bbox2): def __is_x_dir_overlap(bbox1, bbox2):
return not (bbox1[2]<bbox2[0] or bbox1[0]>bbox2[2]) return not (bbox1[2] < bbox2[0] or bbox1[0] > bbox2[2])
def __y_overlap_ratio(box1, box2): def __y_overlap_ratio(box1, box2):
"""""" """"""
...@@ -224,6 +247,7 @@ def __y_overlap_ratio(box1, box2): ...@@ -224,6 +247,7 @@ def __y_overlap_ratio(box1, box2):
return overlap_ratio return overlap_ratio
def replace_line_v2(eqinfo, line): def replace_line_v2(eqinfo, line):
""" """
扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。 扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。
...@@ -233,54 +257,55 @@ def replace_line_v2(eqinfo, line): ...@@ -233,54 +257,55 @@ def replace_line_v2(eqinfo, line):
first_overlap_span_idx = -1 first_overlap_span_idx = -1
last_overlap_span = -1 last_overlap_span = -1
delete_chars = [] delete_chars = []
for i in range(0, len(line['spans'])): for i in range(0, len(line["spans"])):
if line['spans'][i].get("_type", None) is not None: if line["spans"][i].get("_type", None) is not None:
continue # 忽略,因为已经是插入的伪造span公式了 continue # 忽略,因为已经是插入的伪造span公式了
for char in line['spans'][i]['chars']: for char in line["spans"][i]["chars"]:
if __is_x_dir_overlap(eqinfo['bbox'], char['bbox']): if __is_x_dir_overlap(eqinfo["bbox"], char["bbox"]):
line_txt = "" line_txt = ""
for span in line['spans']: for span in line["spans"]:
span_txt = "<span>" span_txt = "<span>"
for ch in span['chars']: for ch in span["chars"]:
span_txt = span_txt + ch['c'] span_txt = span_txt + ch["c"]
span_txt = span_txt + "</span>" span_txt = span_txt + "</span>"
line_txt = line_txt + span_txt line_txt = line_txt + span_txt
if first_overlap_span_idx == -1: if first_overlap_span_idx == -1:
first_overlap_span = line['spans'][i] first_overlap_span = line["spans"][i]
first_overlap_span_idx = i first_overlap_span_idx = i
last_overlap_span = line['spans'][i] last_overlap_span = line["spans"][i]
delete_chars.append(char) delete_chars.append(char)
# 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多 # 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多
if len(delete_chars)>0: if len(delete_chars) > 0:
ch0_bbox = delete_chars[0]['bbox'] ch0_bbox = delete_chars[0]["bbox"]
if x_overlap_ratio(eqinfo['bbox'], ch0_bbox)<0.51: if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
delete_chars.remove(delete_chars[0]) delete_chars.remove(delete_chars[0])
if len(delete_chars)>0: if len(delete_chars) > 0:
ch0_bbox = delete_chars[-1]['bbox'] ch0_bbox = delete_chars[-1]["bbox"]
if x_overlap_ratio(eqinfo['bbox'], ch0_bbox)<0.51: if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
delete_chars.remove(delete_chars[-1]) delete_chars.remove(delete_chars[-1])
# 计算x方向上被删除区间内的char的真实x0, x1 # 计算x方向上被删除区间内的char的真实x0, x1
if len(delete_chars): if len(delete_chars):
x0, x1 = min([b['bbox'][0] for b in delete_chars]), max([b['bbox'][2] for b in delete_chars]) x0, x1 = min([b["bbox"][0] for b in delete_chars]), max(
[b["bbox"][2] for b in delete_chars]
)
else: else:
logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}") logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}")
return False return False
# 删除位于x0, x1这两个中间的span # 删除位于x0, x1这两个中间的span
delete_span = [] delete_span = []
for span in line['spans']: for span in line["spans"]:
span_box = span['bbox'] span_box = span["bbox"]
if x0<=span_box[0] and span_box[2]<=x1: if x0 <= span_box[0] and span_box[2] <= x1:
delete_span.append(span) delete_span.append(span)
for span in delete_span: for span in delete_span:
line['spans'].remove(span) line["spans"].remove(span)
equation_span = { equation_span = {
"size": 9.962599754333496, "size": 9.962599754333496,
...@@ -291,67 +316,91 @@ def replace_line_v2(eqinfo, line): ...@@ -291,67 +316,91 @@ def replace_line_v2(eqinfo, line):
"ascender": 0.9409999847412109, "ascender": 0.9409999847412109,
"descender": -0.3050000071525574, "descender": -0.3050000071525574,
"text": "", "text": "",
"origin": [ "origin": [337.1410153102337, 216.0205245153934],
337.1410153102337,
216.0205245153934
],
"bbox": [ "bbox": [
337.1410153102337, 337.1410153102337,
216.0205245153934, 216.0205245153934,
390.4496373892022, 390.4496373892022,
228.50171037628277 228.50171037628277,
] ],
} }
#equation_span = line['spans'][0].copy() # equation_span = line['spans'][0].copy()
equation_span['text'] = f" ${eqinfo['latex_text']}$ " equation_span["text"] = f" ${eqinfo['latex']}$ "
equation_span['bbox'] = [x0, equation_span['bbox'][1], x1, equation_span['bbox'][3]] equation_span["bbox"] = [x0, equation_span["bbox"][1], x1, equation_span["bbox"][3]]
equation_span['origin'] = [equation_span['bbox'][0], equation_span['bbox'][1]] equation_span["origin"] = [equation_span["bbox"][0], equation_span["bbox"][1]]
equation_span['chars'] = delete_chars equation_span["chars"] = delete_chars
equation_span['_type'] = TYPE_INLINE_EQUATION equation_span["_type"] = TYPE_INLINE_EQUATION
equation_span['_eq_bbox'] = eqinfo['bbox'] equation_span["_eq_bbox"] = eqinfo["bbox"]
line['spans'].insert(first_overlap_span_idx+1, equation_span) # 放入公式 line["spans"].insert(first_overlap_span_idx + 1, equation_span) # 放入公式
# logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】") # logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】")
# 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置 # 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置
first_span_chars = [char for char in first_overlap_span['chars'] if (char['bbox'][2]+char['bbox'][0])/2<x0] first_span_chars = [
tail_span_chars = [char for char in last_overlap_span['chars'] if (char['bbox'][0]+char['bbox'][2])/2>x1] char
for char in first_overlap_span["chars"]
if (char["bbox"][2] + char["bbox"][0]) / 2 < x0
]
tail_span_chars = [
char
for char in last_overlap_span["chars"]
if (char["bbox"][0] + char["bbox"][2]) / 2 > x1
]
if len(first_span_chars)>0: if len(first_span_chars) > 0:
first_overlap_span['chars'] = first_span_chars first_overlap_span["chars"] = first_span_chars
first_overlap_span['text'] = ''.join([char['c'] for char in first_span_chars]) first_overlap_span["text"] = "".join([char["c"] for char in first_span_chars])
first_overlap_span['bbox'] = (first_overlap_span['bbox'][0], first_overlap_span['bbox'][1], max([chr['bbox'][2] for chr in first_span_chars]), first_overlap_span['bbox'][3]) first_overlap_span["bbox"] = (
first_overlap_span["bbox"][0],
first_overlap_span["bbox"][1],
max([chr["bbox"][2] for chr in first_span_chars]),
first_overlap_span["bbox"][3],
)
# first_overlap_span['_type'] = "first" # first_overlap_span['_type'] = "first"
else: else:
# 删掉 # 删掉
if first_overlap_span not in delete_span: if first_overlap_span not in delete_span:
line['spans'].remove(first_overlap_span) line["spans"].remove(first_overlap_span)
if len(tail_span_chars) > 0:
if len(tail_span_chars)>0: if last_overlap_span == first_overlap_span: # 这个时候应该插入一个新的
if last_overlap_span==first_overlap_span: # 这个时候应该插入一个新的 tail_span_txt = "".join([char["c"] for char in tail_span_chars])
tail_span_txt = ''.join([char['c'] for char in tail_span_chars])
last_span_to_insert = last_overlap_span.copy() last_span_to_insert = last_overlap_span.copy()
last_span_to_insert['chars'] = tail_span_chars last_span_to_insert["chars"] = tail_span_chars
last_span_to_insert['text'] = ''.join([char['c'] for char in tail_span_chars]) last_span_to_insert["text"] = "".join(
last_span_to_insert['bbox'] = (min([chr['bbox'][0] for chr in tail_span_chars]), last_overlap_span['bbox'][1], last_overlap_span['bbox'][2], last_overlap_span['bbox'][3]) [char["c"] for char in tail_span_chars]
)
last_span_to_insert["bbox"] = (
min([chr["bbox"][0] for chr in tail_span_chars]),
last_overlap_span["bbox"][1],
last_overlap_span["bbox"][2],
last_overlap_span["bbox"][3],
)
# 插入到公式对象之后 # 插入到公式对象之后
equation_idx = line['spans'].index(equation_span) equation_idx = line["spans"].index(equation_span)
line['spans'].insert(equation_idx+1, last_span_to_insert) # 放入公式 line["spans"].insert(equation_idx + 1, last_span_to_insert) # 放入公式
else: # 直接修改原来的span else: # 直接修改原来的span
last_overlap_span['chars'] = tail_span_chars last_overlap_span["chars"] = tail_span_chars
last_overlap_span['text'] = ''.join([char['c'] for char in tail_span_chars]) last_overlap_span["text"] = "".join([char["c"] for char in tail_span_chars])
last_overlap_span['bbox'] = (min([chr['bbox'][0] for chr in tail_span_chars]), last_overlap_span['bbox'][1], last_overlap_span['bbox'][2], last_overlap_span['bbox'][3]) last_overlap_span["bbox"] = (
min([chr["bbox"][0] for chr in tail_span_chars]),
last_overlap_span["bbox"][1],
last_overlap_span["bbox"][2],
last_overlap_span["bbox"][3],
)
else: else:
# 删掉 # 删掉
if last_overlap_span not in delete_span and last_overlap_span!=first_overlap_span: if (
line['spans'].remove(last_overlap_span) last_overlap_span not in delete_span
and last_overlap_span != first_overlap_span
):
line["spans"].remove(last_overlap_span)
remain_txt = "" remain_txt = ""
for span in line['spans']: for span in line["spans"]:
span_txt = "<span>" span_txt = "<span>"
for char in span['chars']: for char in span["chars"]:
span_txt = span_txt + char['c'] span_txt = span_txt + char["c"]
span_txt = span_txt + "</span>" span_txt = span_txt + "</span>"
...@@ -364,11 +413,16 @@ def replace_line_v2(eqinfo, line): ...@@ -364,11 +413,16 @@ def replace_line_v2(eqinfo, line):
def replace_eq_blk(eqinfo, text_block): def replace_eq_blk(eqinfo, text_block):
"""替换行内公式""" """替换行内公式"""
for line in text_block['lines']: for line in text_block["lines"]:
line_bbox = line['bbox'] line_bbox = line["bbox"]
if _is_xin(eqinfo['bbox'], line_bbox) or __y_overlap_ratio(eqinfo['bbox'], line_bbox)>0.6: # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄, if (
_is_xin(eqinfo["bbox"], line_bbox)
or __y_overlap_ratio(eqinfo["bbox"], line_bbox) > 0.6
): # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄,
replace_succ = replace_line_v2(eqinfo, line) replace_succ = replace_line_v2(eqinfo, line)
if not replace_succ: # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行 if (
not replace_succ
): # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
continue continue
else: else:
break break
...@@ -380,9 +434,9 @@ def replace_eq_blk(eqinfo, text_block): ...@@ -380,9 +434,9 @@ def replace_eq_blk(eqinfo, text_block):
def replace_inline_equations(inline_equation_bboxes, raw_text_blocks): def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
"""替换行内公式""" """替换行内公式"""
for eqinfo in inline_equation_bboxes: for eqinfo in inline_equation_bboxes:
eqbox = eqinfo['bbox'] eqbox = eqinfo["bbox"]
for blk in raw_text_blocks: for blk in raw_text_blocks:
if _is_xin(eqbox, blk['bbox']): if _is_xin(eqbox, blk["bbox"]):
if not replace_eq_blk(eqinfo, blk): if not replace_eq_blk(eqinfo, blk):
logger.error(f"行内公式没有替换成功:{eqinfo} ") logger.error(f"行内公式没有替换成功:{eqinfo} ")
else: else:
...@@ -390,22 +444,29 @@ def replace_inline_equations(inline_equation_bboxes, raw_text_blocks): ...@@ -390,22 +444,29 @@ def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
return raw_text_blocks return raw_text_blocks
def remove_chars_in_text_blocks(text_blocks): def remove_chars_in_text_blocks(text_blocks):
"""删除text_blocks里的char""" """删除text_blocks里的char"""
for blk in text_blocks: for blk in text_blocks:
for line in blk['lines']: for line in blk["lines"]:
for span in line['spans']: for span in line["spans"]:
_ = span.pop("chars", "no such key") _ = span.pop("chars", "no such key")
return text_blocks return text_blocks
def replace_equations_in_textblock(raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes): def replace_equations_in_textblock(
raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes
):
""" """
替换行间和和行内公式为latex 替换行间和和行内公式为latex
""" """
raw_text_blocks = remove_text_block_in_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消除重叠:第一步,在公式内部的 raw_text_blocks = remove_text_block_in_interline_equation_bbox(
raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消重,第二步,和公式覆盖的 interline_equation_bboxes, raw_text_blocks
) # 消除重叠:第一步,在公式内部的
raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消重,第二步,和公式覆盖的
insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks) insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks) raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks)
...@@ -414,34 +475,38 @@ def replace_equations_in_textblock(raw_text_blocks, inline_equation_bboxes, inte ...@@ -414,34 +475,38 @@ def replace_equations_in_textblock(raw_text_blocks, inline_equation_bboxes, inte
def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path): def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
""" """ """
"""
new_pdf = f"{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf" new_pdf = f"{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf"
with open(json_path, "r", encoding='utf-8') as f: with open(json_path, "r", encoding="utf-8") as f:
obj = json.loads(f.read()) obj = json.loads(f.read())
if os.path.exists(new_pdf): if os.path.exists(new_pdf):
os.remove(new_pdf) os.remove(new_pdf)
new_doc = fitz.open('') new_doc = fitz.open("")
doc = fitz.open(pdf_path) doc = fitz.open(pdf_path)
new_doc = fitz.open(pdf_path) new_doc = fitz.open(pdf_path)
for i in range(len(new_doc)): for i in range(len(new_doc)):
page = new_doc[i] page = new_doc[i]
inline_equation_bboxes = obj[f"page_{i}"]['inline_equations'] inline_equation_bboxes = obj[f"page_{i}"]["inline_equations"]
interline_equation_bboxes = obj[f"page_{i}"]['interline_equations'] interline_equation_bboxes = obj[f"page_{i}"]["interline_equations"]
raw_text_blocks = obj[f'page_{i}']['preproc_blocks'] raw_text_blocks = obj[f"page_{i}"]["preproc_blocks"]
raw_text_blocks = remove_text_block_in_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消除重叠:第一步,在公式内部的 raw_text_blocks = remove_text_block_in_interline_equation_bbox(
raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(interline_equation_bboxes, raw_text_blocks) # 消重,第二步,和公式覆盖的 interline_equation_bboxes, raw_text_blocks
) # 消除重叠:第一步,在公式内部的
raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消重,第二步,和公式覆盖的
insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks) insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks) raw_text_blocks = replace_inline_equations(
inline_equation_bboxes, raw_text_blocks
)
# 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的 # 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的
color_map = [fitz.pdfcolor['blue'],fitz.pdfcolor['green']] color_map = [fitz.pdfcolor["blue"], fitz.pdfcolor["green"]]
j = 0 j = 0
for blk in raw_text_blocks: for blk in raw_text_blocks:
for i,line in enumerate(blk['lines']): for i, line in enumerate(blk["lines"]):
# line_box = line['bbox'] # line_box = line['bbox']
# shape = page.new_shape() # shape = page.new_shape()
...@@ -450,20 +515,20 @@ def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path): ...@@ -450,20 +515,20 @@ def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
# shape.commit() # shape.commit()
# j = j+1 # j = j+1
for i, span in enumerate(line['spans']): for i, span in enumerate(line["spans"]):
shape_page = page.new_shape() shape_page = page.new_shape()
span_type = span.get('_type') span_type = span.get("_type")
color = fitz.pdfcolor['blue'] color = fitz.pdfcolor["blue"]
if span_type=='first': if span_type == "first":
color = fitz.pdfcolor['blue'] color = fitz.pdfcolor["blue"]
elif span_type=='tail': elif span_type == "tail":
color = fitz.pdfcolor['green'] color = fitz.pdfcolor["green"]
elif span_type==TYPE_INLINE_EQUATION: elif span_type == TYPE_INLINE_EQUATION:
color = fitz.pdfcolor['black'] color = fitz.pdfcolor["black"]
else: else:
color = None color = None
b = span['bbox'] b = span["bbox"]
shape_page.draw_rect(b) shape_page.draw_rect(b)
shape_page.finish(color=None, fill=color, fill_opacity=0.3) shape_page.finish(color=None, fill=color, fill_opacity=0.3)
...@@ -471,13 +536,13 @@ def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path): ...@@ -471,13 +536,13 @@ def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
new_doc.save(new_pdf) new_doc.save(new_pdf)
logger.info(f"save ok {new_pdf}") logger.info(f"save ok {new_pdf}")
final_json = json.dumps(obj, ensure_ascii=False,indent=2) final_json = json.dumps(obj, ensure_ascii=False, indent=2)
with open("equations_test/final_json.json", "w") as f: with open("equations_test/final_json.json", "w") as f:
f.write(final_json) f.write(final_json)
return new_pdf return new_pdf
if __name__=="__main__": if __name__ == "__main__":
# draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf) # draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf)
pass pass
...@@ -16,7 +16,7 @@ from loguru import logger ...@@ -16,7 +16,7 @@ from loguru import logger
from magic_pdf.rw import AbsReaderWriter from magic_pdf.rw import AbsReaderWriter
from magic_pdf.pdf_parse_by_ocr_v2 import parse_pdf_by_ocr from magic_pdf.pdf_parse_by_ocr_v2 import parse_pdf_by_ocr
from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt from magic_pdf.pdf_parse_by_txt_v2 import parse_pdf_by_txt
PARSE_TYPE_TXT = "txt" PARSE_TYPE_TXT = "txt"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment