Commit fae882ff authored by liukaiwen's avatar liukaiwen

Merge branch 'master' of github.com:papayalove/Magic-PDF

parents b2019af5 8b9dde1b
......@@ -28,6 +28,7 @@ import click
from loguru import logger
from pathlib import Path
from magic_pdf.libs.MakeContentConfig import DropMode
from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.pipe.OCRPipe import OCRPipe
......@@ -78,8 +79,8 @@ def _do_parse(pdf_file_name, pdf_bytes, model_list, parse_method, image_writer,
pdf_info = pipe.pdf_mid_data['pdf_info']
draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir)
draw_span_bbox(pdf_info, pdf_bytes, local_md_dir)
md_content = pipe.pipe_mk_markdown(image_dir)
#part_file_name = datetime.now().strftime("%H-%M-%S")
md_content = pipe.pipe_mk_markdown(image_dir, drop_mode=DropMode.NONE)
md_writer.write(
content=md_content, path=f"{pdf_file_name}.md", mode=AbsReaderWriter.MODE_TXT
)
......@@ -89,7 +90,7 @@ def _do_parse(pdf_file_name, pdf_bytes, model_list, parse_method, image_writer,
mode=AbsReaderWriter.MODE_TXT,
)
try:
content_list = pipe.pipe_mk_uni_format(image_dir)
content_list = pipe.pipe_mk_uni_format(image_dir, drop_mode=DropMode.NONE)
except Exception as e:
logger.exception(e)
md_writer.write(
......
from loguru import logger
from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.language import detect_lang
from magic_pdf.libs.markdown_utils import ocr_escape_special_markdown_char
......@@ -319,3 +320,37 @@ def ocr_mk_mm_standard_format(pdf_info_dict: list):
content = line_to_standard_format(line)
content_list.append(content)
return content_list
def union_make(pdf_info_dict: list, make_mode: str, drop_mode: str, img_buket_path: str = ""):
output_content = []
for page_info in pdf_info_dict:
if page_info.get("need_drop", False):
drop_reason = page_info.get("drop_reason")
if drop_mode == DropMode.NONE:
pass
elif drop_mode == DropMode.WHOLE_PDF:
raise Exception(f"drop_mode is {DropMode.WHOLE_PDF} , drop_reason is {drop_reason}")
elif drop_mode == DropMode.SINGLE_PAGE:
logger.warning(f"drop_mode is {DropMode.SINGLE_PAGE} , drop_reason is {drop_reason}")
continue
else:
raise Exception(f"drop_mode can not be null")
paras_of_layout = page_info.get("para_blocks")
if not paras_of_layout:
continue
if make_mode == MakeMode.MM_MD:
page_markdown = ocr_mk_markdown_with_para_core_v2(paras_of_layout, "mm", img_buket_path)
output_content.extend(page_markdown)
elif make_mode == MakeMode.NLP_MD:
page_markdown = ocr_mk_markdown_with_para_core_v2(paras_of_layout, "nlp")
output_content.extend(page_markdown)
elif make_mode == MakeMode.STANDARD_FORMAT:
for para_block in paras_of_layout:
para_content = para_to_standard_format_v2(para_block, img_buket_path)
output_content.append(para_content)
if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
return '\n\n'.join(output_content)
elif make_mode == MakeMode.STANDARD_FORMAT:
return output_content
class MakeMode:
MM_MD = "mm_markdown"
NLP_MD = "nlp_markdown"
STANDARD_FORMAT = "standard_format"
class DropMode:
WHOLE_PDF = "whole_pdf"
SINGLE_PAGE = "single_page"
NONE = "none"
class DropReason:
TEXT_BLCOK_HOR_OVERLAP = "text_block_horizontal_overlap" # 文字块有水平互相覆盖,导致无法准确定位文字顺序
USEFUL_BLOCK_HOR_OVERLAP = "useful_block_horizontal_overlap" # 需保留的block水平覆盖
COMPLICATED_LAYOUT = "complicated_layout" # 复杂的布局,暂时不支持
TOO_MANY_LAYOUT_COLUMNS = "too_many_layout_columns" # 目前不支持分栏超过2列的
COLOR_BACKGROUND_TEXT_BOX = "color_background_text_box" # 含有带色块的PDF,色块会改变阅读顺序,目前不支持带底色文字块的PDF。
......
......@@ -89,6 +89,25 @@ class MagicModel:
ret = []
MAX_DIS_OF_POINT = 10**9 + 7
# subject 和 object 的 bbox 会合并成一个大的 bbox (named: merged bbox)。 筛选出所有和 merged bbox 有 overlap 且 overlap 面积大于 object 的面积的 subjects。
# 再求出筛选出的 subjects 和 object 的最短距离!
def may_find_other_nearest_bbox(subject_idx, object_idx):
ret = float("inf")
x0 = min(all_bboxes[subject_idx]["bbox"][0], all_bboxes[object_idx]["bbox"][0])
y0 = min(all_bboxes[subject_idx]["bbox"][1], all_bboxes[object_idx]["bbox"][1])
x1 = max(all_bboxes[subject_idx]["bbox"][2], all_bboxes[object_idx]["bbox"][2])
y1 = max(all_bboxes[subject_idx]["bbox"][3], all_bboxes[object_idx]["bbox"][3])
object_area = abs(all_bboxes[object_idx]["bbox"][2] - all_bboxes[object_idx]["bbox"][0]) * abs(all_bboxes[object_idx]["bbox"][3] - all_bboxes[object_idx]["bbox"][1])
for i in range(len(all_bboxes)):
if i == subject_idx or all_bboxes[i]["category_id"] != subject_category_id:
continue
if _is_part_overlap([x0, y0, x1, y1], all_bboxes[i]["bbox"]) or _is_in(all_bboxes[i]["bbox"], [x0, y0, x1, y1]):
i_area = abs(all_bboxes[i]["bbox"][2] - all_bboxes[i]["bbox"][0]) * abs(all_bboxes[i]["bbox"][3] - all_bboxes[i]["bbox"][1])
if i_area >= object_area:
ret = min(float("inf"), dis[i][object_idx])
return ret
subjects = self.__reduct_overlap(
list(
map(
......@@ -170,8 +189,10 @@ class MagicModel:
arr.sort(key=lambda x: x[0])
if len(arr) > 0:
candidates.append(arr[0][1])
seen.add(arr[0][1])
# bug: 离该subject 最近的 object 可能跨越了其它的 subject 。比如 [this subect] [some sbuject] [the nearest objec of subject]
if may_find_other_nearest_bbox(i, j) >= arr[0][0]:
candidates.append(arr[0][1])
seen.add(arr[0][1])
# 已经获取初始种子
for j in set(candidates):
......
import time
from loguru import logger
from magic_pdf.layout.layout_sort import get_bboxes_layout, LAYOUT_UNPROC, get_columns_cnt_of_layout
from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.commons import fitz, get_delta_time
from magic_pdf.model.magic_model import MagicModel
from magic_pdf.pre_proc.construct_page_dict import ocr_construct_page_component_v2
from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
from magic_pdf.pre_proc.ocr_detect_all_bboxes import ocr_prepare_bboxes_for_layout_split
from magic_pdf.pre_proc.ocr_dict_merge import sort_blocks_by_layout, fill_spans_in_blocks, fix_block_spans
from magic_pdf.pre_proc.ocr_span_list_modify import remove_overlaps_min_spans, get_qa_need_list_v2
# from magic_pdf.para.para_split import para_split
from magic_pdf.para.para_split_v2 import para_split
from magic_pdf.pre_proc.resolve_bbox_conflict import check_useful_block_horizontal_overlap
from magic_pdf.pdf_parse_union_core import pdf_parse_union
def parse_pdf_by_ocr(pdf_bytes,
model_list,
......@@ -25,114 +7,11 @@ def parse_pdf_by_ocr(pdf_bytes,
end_page_id=None,
debug_mode=False,
):
pdf_bytes_md5 = compute_md5(pdf_bytes)
pdf_docs = fitz.open("pdf", pdf_bytes)
'''初始化空的pdf_info_dict'''
pdf_info_dict = {}
'''用model_list和docs对象初始化magic_model'''
magic_model = MagicModel(model_list, pdf_docs)
'''根据输入的起始范围解析pdf'''
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
'''初始化启动时间'''
start_time = time.time()
for page_id in range(start_page_id, end_page_id + 1):
'''debug时输出每页解析的耗时'''
if debug_mode:
time_now = time.time()
logger.info(
f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}"
)
start_time = time_now
'''从magic_model对象中获取后面会用到的区块信息'''
img_blocks = magic_model.get_imgs(page_id)
table_blocks = magic_model.get_tables(page_id)
discarded_blocks = magic_model.get_discarded(page_id)
text_blocks = magic_model.get_text_blocks(page_id)
title_blocks = magic_model.get_title_blocks(page_id)
inline_equations, interline_equations, interline_equation_blocks = magic_model.get_equations(page_id)
page_w, page_h = magic_model.get_page_size(page_id)
'''将所有区块的bbox整理到一起'''
all_bboxes = ocr_prepare_bboxes_for_layout_split(
img_blocks, table_blocks, discarded_blocks, text_blocks, title_blocks,
interline_equations, page_w, page_h)
"""在切分之前,先检查一下bbox是否有左右重叠的情况,如果有,那么就认为这个pdf暂时没有能力处理好,这种左右重叠的情况大概率是由于pdf里的行间公式、表格没有被正确识别出来造成的 """
useful_blocks = []
for bbox in all_bboxes:
useful_blocks.append({
"bbox": bbox[:4]
})
is_useful_block_horz_overlap = check_useful_block_horizontal_overlap(useful_blocks)
if is_useful_block_horz_overlap:
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.TEXT_BLCOK_HOR_OVERLAP}")
continue
'''根据区块信息计算layout'''
page_boundry = [0, 0, page_w, page_h]
layout_bboxes, layout_tree = get_bboxes_layout(all_bboxes, page_boundry, page_id)
if len(text_blocks) > 0 and len(all_bboxes) > 0 and len(layout_bboxes) == 0:
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}")
continue
"""以下去掉复杂的布局和超过2列的布局"""
if any([lay["layout_label"] == LAYOUT_UNPROC for lay in layout_bboxes]): # 复杂的布局
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.COMPLICATED_LAYOUT}")
continue
layout_column_width = get_columns_cnt_of_layout(layout_tree)
if layout_column_width > 2: # 去掉超过2列的布局pdf
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}")
continue
'''根据layout顺序,对当前页面所有需要留下的block进行排序'''
sorted_blocks = sort_blocks_by_layout(all_bboxes, layout_bboxes)
'''获取所有需要拼接的span资源'''
spans = magic_model.get_all_spans(page_id)
'''删除重叠spans中较小的那些'''
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
'''对image和table截图'''
spans = ocr_cut_image_and_table(spans, pdf_docs[page_id], page_id, pdf_bytes_md5, imageWriter)
'''将span填入排好序的blocks中'''
block_with_spans = fill_spans_in_blocks(sorted_blocks, spans)
'''对block进行fix操作'''
fix_blocks = fix_block_spans(block_with_spans, img_blocks, table_blocks)
'''获取QA需要外置的list'''
images, tables, interline_equations = get_qa_need_list_v2(fix_blocks)
'''构造pdf_info_dict'''
page_info = ocr_construct_page_component_v2(fix_blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, discarded_blocks)
pdf_info_dict[f"page_{page_id}"] = page_info
"""分段"""
try:
para_split(pdf_info_dict, debug_mode=debug_mode)
except Exception as e:
logger.exception(e)
raise e
"""dict转list"""
pdf_info_list = dict_to_list(pdf_info_dict)
new_pdf_info_dict = {
"pdf_info": pdf_info_list,
}
return new_pdf_info_dict
return pdf_parse_union(pdf_bytes,
model_list,
imageWriter,
"ocr",
start_page_id=start_page_id,
end_page_id=end_page_id,
debug_mode=debug_mode,
)
import time
from loguru import logger
from magic_pdf.layout.layout_sort import get_bboxes_layout, LAYOUT_UNPROC, get_columns_cnt_of_layout
from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.commons import fitz, get_delta_time
from magic_pdf.model.magic_model import MagicModel
from magic_pdf.pre_proc.construct_page_dict import ocr_construct_page_component_v2
from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
from magic_pdf.pre_proc.ocr_detect_all_bboxes import ocr_prepare_bboxes_for_layout_split
from magic_pdf.pre_proc.ocr_dict_merge import (
sort_blocks_by_layout,
fill_spans_in_blocks,
fix_block_spans,
)
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.pre_proc.ocr_span_list_modify import (
remove_overlaps_min_spans,
get_qa_need_list_v2,
)
from magic_pdf.pre_proc.equations_replace import (
combine_chars_to_pymudict,
remove_chars_in_text_blocks,
replace_equations_in_textblock,
)
from magic_pdf.pre_proc.equations_replace import (
combine_chars_to_pymudict,
remove_chars_in_text_blocks,
replace_equations_in_textblock,
)
from magic_pdf.pre_proc.citationmarker_remove import remove_citation_marker
from magic_pdf.libs.math import float_equal
from magic_pdf.para.para_split_v2 import para_split
from magic_pdf.pre_proc.resolve_bbox_conflict import check_useful_block_horizontal_overlap
def txt_spans_extract(pdf_page, inline_equations, interline_equations):
text_raw_blocks = pdf_page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
char_level_text_blocks = pdf_page.get_text("rawdict", flags=fitz.TEXTFLAGS_TEXT)[
"blocks"
]
text_blocks = combine_chars_to_pymudict(text_raw_blocks, char_level_text_blocks)
text_blocks = replace_equations_in_textblock(
text_blocks, inline_equations, interline_equations
)
text_blocks = remove_citation_marker(text_blocks)
text_blocks = remove_chars_in_text_blocks(text_blocks)
spans = []
for v in text_blocks:
for line in v["lines"]:
for span in line["spans"]:
bbox = span["bbox"]
if float_equal(bbox[0], bbox[2]) or float_equal(bbox[1], bbox[3]):
continue
spans.append(
{
"bbox": list(span["bbox"]),
"content": span["text"],
"type": ContentType.Text,
}
)
return spans
def replace_text_span(pymu_spans, ocr_spans):
return list(filter(lambda x: x["type"] != ContentType.Text, ocr_spans)) + pymu_spans
from magic_pdf.pdf_parse_union_core import pdf_parse_union
def parse_pdf_by_txt(
......@@ -77,176 +9,48 @@ def parse_pdf_by_txt(
end_page_id=None,
debug_mode=False,
):
pdf_bytes_md5 = compute_md5(pdf_bytes)
pdf_docs = fitz.open("pdf", pdf_bytes)
"""初始化空的pdf_info_dict"""
pdf_info_dict = {}
"""用model_list和docs对象初始化magic_model"""
magic_model = MagicModel(model_list, pdf_docs)
"""根据输入的起始范围解析pdf"""
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
"""初始化启动时间"""
start_time = time.time()
for page_id in range(start_page_id, end_page_id + 1):
"""debug时输出每页解析的耗时"""
if debug_mode:
time_now = time.time()
logger.info(
f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}"
)
start_time = time_now
"""从magic_model对象中获取后面会用到的区块信息"""
img_blocks = magic_model.get_imgs(page_id)
table_blocks = magic_model.get_tables(page_id)
discarded_blocks = magic_model.get_discarded(page_id)
text_blocks = magic_model.get_text_blocks(page_id)
title_blocks = magic_model.get_title_blocks(page_id)
inline_equations, interline_equations, interline_equation_blocks = (
magic_model.get_equations(page_id)
)
page_w, page_h = magic_model.get_page_size(page_id)
"""将所有区块的bbox整理到一起"""
all_bboxes = ocr_prepare_bboxes_for_layout_split(
img_blocks,
table_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equations,
page_w,
page_h,
)
"""在切分之前,先检查一下bbox是否有左右重叠的情况,如果有,那么就认为这个pdf暂时没有能力处理好,这种左右重叠的情况大概率是由于pdf里的行间公式、表格没有被正确识别出来造成的 """
useful_blocks = []
for bbox in all_bboxes:
useful_blocks.append({
"bbox": bbox[:4]
})
is_useful_block_horz_overlap = check_useful_block_horizontal_overlap(useful_blocks)
if is_useful_block_horz_overlap:
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.TEXT_BLCOK_HOR_OVERLAP}")
continue
'''根据区块信息计算layout'''
page_boundry = [0, 0, page_w, page_h]
layout_bboxes, layout_tree = get_bboxes_layout(all_bboxes, page_boundry, page_id)
if len(text_blocks) > 0 and len(all_bboxes) > 0 and len(layout_bboxes) == 0:
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}")
continue
"""以下去掉复杂的布局和超过2列的布局"""
if any([lay["layout_label"] == LAYOUT_UNPROC for lay in layout_bboxes]): # 复杂的布局
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.COMPLICATED_LAYOUT}")
continue
layout_column_width = get_columns_cnt_of_layout(layout_tree)
if layout_column_width > 2: # 去掉超过2列的布局pdf
logger.warning(
f"pdf: {pdf_bytes_md5}, skip this page, page_id: {page_id}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}")
continue
"""根据layout顺序,对当前页面所有需要留下的block进行排序"""
sorted_blocks = sort_blocks_by_layout(all_bboxes, layout_bboxes)
"""ocr 中文本类的 span 用 pymu spans 替换!"""
ocr_spans = magic_model.get_all_spans(page_id)
pymu_spans = txt_spans_extract(
pdf_docs[page_id], inline_equations, interline_equations
)
spans = replace_text_span(pymu_spans, ocr_spans)
"""删除重叠spans中较小的那些"""
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
"""对image和table截图"""
spans = ocr_cut_image_and_table(
spans, pdf_docs[page_id], page_id, pdf_bytes_md5, imageWriter
)
"""将span填入排好序的blocks中"""
block_with_spans = fill_spans_in_blocks(sorted_blocks, spans)
"""对block进行fix操作"""
fix_blocks = fix_block_spans(block_with_spans, img_blocks, table_blocks)
"""获取QA需要外置的list"""
images, tables, interline_equations = get_qa_need_list_v2(fix_blocks)
"""构造pdf_info_dict"""
page_info = ocr_construct_page_component_v2(
fix_blocks,
layout_bboxes,
page_id,
page_w,
page_h,
layout_tree,
images,
tables,
interline_equations,
discarded_blocks,
)
pdf_info_dict[f"page_{page_id}"] = page_info
"""分段"""
try:
para_split(pdf_info_dict, debug_mode=debug_mode)
except Exception as e:
logger.exception(e)
raise e
"""dict转list"""
pdf_info_list = dict_to_list(pdf_info_dict)
new_pdf_info_dict = {
"pdf_info": pdf_info_list,
}
return new_pdf_info_dict
return pdf_parse_union(pdf_bytes,
model_list,
imageWriter,
"txt",
start_page_id=start_page_id,
end_page_id=end_page_id,
debug_mode=debug_mode,
)
if __name__ == "__main__":
if 1:
import fitz
import json
with open("/opt/data/pdf/20240418/25536-00.pdf", "rb") as f:
pdf_bytes = f.read()
pdf_docs = fitz.open("pdf", pdf_bytes)
with open("/opt/data/pdf/20240418/25536-00.json") as f:
model_list = json.loads(f.readline())
magic_model = MagicModel(model_list, pdf_docs)
for i in range(7):
print(magic_model.get_imgs(i))
for page_no, page in enumerate(pdf_docs):
inline_equations, interline_equations, interline_equation_blocks = (
magic_model.get_equations(page_no)
)
text_raw_blocks = page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
char_level_text_blocks = page.get_text(
"rawdict", flags=fitz.TEXTFLAGS_TEXT
)["blocks"]
text_blocks = combine_chars_to_pymudict(
text_raw_blocks, char_level_text_blocks
)
text_blocks = replace_equations_in_textblock(
text_blocks, inline_equations, interline_equations
)
text_blocks = remove_citation_marker(text_blocks)
text_blocks = remove_chars_in_text_blocks(text_blocks)
pass
# if 1:
# import fitz
# import json
#
# with open("/opt/data/pdf/20240418/25536-00.pdf", "rb") as f:
# pdf_bytes = f.read()
# pdf_docs = fitz.open("pdf", pdf_bytes)
#
# with open("/opt/data/pdf/20240418/25536-00.json") as f:
# model_list = json.loads(f.readline())
#
# magic_model = MagicModel(model_list, pdf_docs)
# for i in range(7):
# print(magic_model.get_imgs(i))
#
# for page_no, page in enumerate(pdf_docs):
# inline_equations, interline_equations, interline_equation_blocks = (
# magic_model.get_equations(page_no)
# )
#
# text_raw_blocks = page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
# char_level_text_blocks = page.get_text(
# "rawdict", flags=fitz.TEXTFLAGS_TEXT
# )["blocks"]
# text_blocks = combine_chars_to_pymudict(
# text_raw_blocks, char_level_text_blocks
# )
# text_blocks = replace_equations_in_textblock(
# text_blocks, inline_equations, interline_equations
# )
# text_blocks = remove_citation_marker(text_blocks)
#
# text_blocks = remove_chars_in_text_blocks(text_blocks)
import time
from loguru import logger
from magic_pdf.libs.commons import fitz, get_delta_time
from magic_pdf.layout.layout_sort import get_bboxes_layout, LAYOUT_UNPROC, get_columns_cnt_of_layout
from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.math import float_equal
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.model.magic_model import MagicModel
from magic_pdf.para.para_split_v2 import para_split
from magic_pdf.pre_proc.citationmarker_remove import remove_citation_marker
from magic_pdf.pre_proc.construct_page_dict import ocr_construct_page_component_v2
from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
from magic_pdf.pre_proc.equations_replace import remove_chars_in_text_blocks, replace_equations_in_textblock, \
combine_chars_to_pymudict
from magic_pdf.pre_proc.ocr_detect_all_bboxes import ocr_prepare_bboxes_for_layout_split
from magic_pdf.pre_proc.ocr_dict_merge import sort_blocks_by_layout, fill_spans_in_blocks, fix_block_spans
from magic_pdf.pre_proc.ocr_span_list_modify import remove_overlaps_min_spans, get_qa_need_list_v2
from magic_pdf.pre_proc.resolve_bbox_conflict import check_useful_block_horizontal_overlap
def remove_horizontal_overlap_block_which_smaller(all_bboxes):
useful_blocks = []
for bbox in all_bboxes:
useful_blocks.append({
"bbox": bbox[:4]
})
is_useful_block_horz_overlap, smaller_bbox = check_useful_block_horizontal_overlap(useful_blocks)
if is_useful_block_horz_overlap:
logger.warning(
f"skip this page, reason: {DropReason.TEXT_BLCOK_HOR_OVERLAP}")
for bbox in all_bboxes.copy():
if smaller_bbox == bbox[:4]:
all_bboxes.remove(bbox)
return is_useful_block_horz_overlap, all_bboxes
def txt_spans_extract(pdf_page, inline_equations, interline_equations):
text_raw_blocks = pdf_page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"]
char_level_text_blocks = pdf_page.get_text("rawdict", flags=fitz.TEXTFLAGS_TEXT)[
"blocks"
]
text_blocks = combine_chars_to_pymudict(text_raw_blocks, char_level_text_blocks)
text_blocks = replace_equations_in_textblock(
text_blocks, inline_equations, interline_equations
)
text_blocks = remove_citation_marker(text_blocks)
text_blocks = remove_chars_in_text_blocks(text_blocks)
spans = []
for v in text_blocks:
for line in v["lines"]:
for span in line["spans"]:
bbox = span["bbox"]
if float_equal(bbox[0], bbox[2]) or float_equal(bbox[1], bbox[3]):
continue
if span.get('type') == ContentType.InlineEquation:
spans.append(
{
"bbox": list(span["bbox"]),
"content": span["latex"],
"type": ContentType.InlineEquation,
}
)
elif span.get('type') == ContentType.InterlineEquation:
spans.append(
{
"bbox": list(span["bbox"]),
"content": span["latex"],
"type": ContentType.InterlineEquation,
}
)
else:
spans.append(
{
"bbox": list(span["bbox"]),
"content": span["text"],
"type": ContentType.Text,
}
)
return spans
def replace_text_span(pymu_spans, ocr_spans):
return list(filter(lambda x: x["type"] != ContentType.Text, ocr_spans)) + pymu_spans
def parse_page_core(pdf_docs, magic_model, page_id, pdf_bytes_md5, imageWriter, parse_mode):
need_drop = False
drop_reason = ""
'''从magic_model对象中获取后面会用到的区块信息'''
img_blocks = magic_model.get_imgs(page_id)
table_blocks = magic_model.get_tables(page_id)
discarded_blocks = magic_model.get_discarded(page_id)
text_blocks = magic_model.get_text_blocks(page_id)
title_blocks = magic_model.get_title_blocks(page_id)
inline_equations, interline_equations, interline_equation_blocks = magic_model.get_equations(page_id)
page_w, page_h = magic_model.get_page_size(page_id)
spans = magic_model.get_all_spans(page_id)
'''根据parse_mode,构造spans'''
if parse_mode == "txt":
"""ocr 中文本类的 span 用 pymu spans 替换!"""
pymu_spans = txt_spans_extract(
pdf_docs[page_id], inline_equations, interline_equations
)
spans = replace_text_span(pymu_spans, spans)
elif parse_mode == "ocr":
pass
else:
raise Exception("parse_mode must be txt or ocr")
'''删除重叠spans中较小的那些'''
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
'''对image和table截图'''
spans = ocr_cut_image_and_table(spans, pdf_docs[page_id], page_id, pdf_bytes_md5, imageWriter)
'''将所有区块的bbox整理到一起'''
all_bboxes = ocr_prepare_bboxes_for_layout_split(
img_blocks, table_blocks, discarded_blocks, text_blocks, title_blocks,
interline_equations, page_w, page_h)
'''如果当前页面没有bbox则跳过'''
if len(all_bboxes) == 0:
logger.warning(f"skip this page, not found bbox, page_id: {page_id}")
return ocr_construct_page_component_v2([], [], page_id, page_w, page_h, [],
[], [], interline_equations, discarded_blocks,
need_drop, drop_reason)
"""在切分之前,先检查一下bbox是否有左右重叠的情况,如果有,那么就认为这个pdf暂时没有能力处理好,这种左右重叠的情况大概率是由于pdf里的行间公式、表格没有被正确识别出来造成的 """
while True: # 循环检查左右重叠的情况,如果存在就删除掉较小的那个bbox,直到不存在左右重叠的情况
is_useful_block_horz_overlap, all_bboxes = remove_horizontal_overlap_block_which_smaller(all_bboxes)
if is_useful_block_horz_overlap:
need_drop = True
drop_reason = DropReason.USEFUL_BLOCK_HOR_OVERLAP
else:
break
'''根据区块信息计算layout'''
page_boundry = [0, 0, page_w, page_h]
layout_bboxes, layout_tree = get_bboxes_layout(all_bboxes, page_boundry, page_id)
if len(text_blocks) > 0 and len(all_bboxes) > 0 and len(layout_bboxes) == 0:
logger.warning(
f"skip this page, page_id: {page_id}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}")
need_drop = True
drop_reason = DropReason.CAN_NOT_DETECT_PAGE_LAYOUT
"""以下去掉复杂的布局和超过2列的布局"""
if any([lay["layout_label"] == LAYOUT_UNPROC for lay in layout_bboxes]): # 复杂的布局
logger.warning(
f"skip this page, page_id: {page_id}, reason: {DropReason.COMPLICATED_LAYOUT}")
need_drop = True
drop_reason = DropReason.COMPLICATED_LAYOUT
layout_column_width = get_columns_cnt_of_layout(layout_tree)
if layout_column_width > 2: # 去掉超过2列的布局pdf
logger.warning(
f"skip this page, page_id: {page_id}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}")
need_drop = True
drop_reason = DropReason.TOO_MANY_LAYOUT_COLUMNS
'''根据layout顺序,对当前页面所有需要留下的block进行排序'''
sorted_blocks = sort_blocks_by_layout(all_bboxes, layout_bboxes)
'''将span填入排好序的blocks中'''
block_with_spans = fill_spans_in_blocks(sorted_blocks, spans)
'''对block进行fix操作'''
fix_blocks = fix_block_spans(block_with_spans, img_blocks, table_blocks)
'''获取QA需要外置的list'''
images, tables, interline_equations = get_qa_need_list_v2(fix_blocks)
'''构造pdf_info_dict'''
page_info = ocr_construct_page_component_v2(fix_blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, discarded_blocks,
need_drop, drop_reason)
return page_info
def pdf_parse_union(pdf_bytes,
model_list,
imageWriter,
parse_mode,
start_page_id=0,
end_page_id=None,
debug_mode=False,
):
pdf_bytes_md5 = compute_md5(pdf_bytes)
pdf_docs = fitz.open("pdf", pdf_bytes)
'''初始化空的pdf_info_dict'''
pdf_info_dict = {}
'''用model_list和docs对象初始化magic_model'''
magic_model = MagicModel(model_list, pdf_docs)
'''根据输入的起始范围解析pdf'''
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
'''初始化启动时间'''
start_time = time.time()
for page_id in range(start_page_id, end_page_id + 1):
'''debug时输出每页解析的耗时'''
if debug_mode:
time_now = time.time()
logger.info(
f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}"
)
start_time = time_now
'''解析pdf中的每一页'''
page_info = parse_page_core(pdf_docs, magic_model, page_id, pdf_bytes_md5, imageWriter, parse_mode)
pdf_info_dict[f"page_{page_id}"] = page_info
"""分段"""
para_split(pdf_info_dict, debug_mode=debug_mode)
"""dict转list"""
pdf_info_list = dict_to_list(pdf_info_dict)
new_pdf_info_dict = {
"pdf_info": pdf_info_list,
}
return new_pdf_info_dict
if __name__ == '__main__':
pass
from abc import ABC, abstractmethod
from magic_pdf.dict2md.mkcontent import mk_universal_format, mk_mm_markdown
from magic_pdf.dict2md.ocr_mkcontent import make_standard_format_with_para, ocr_mk_mm_markdown_with_para
from magic_pdf.dict2md.ocr_mkcontent import make_standard_format_with_para, ocr_mk_mm_markdown_with_para, union_make
from magic_pdf.filter.pdf_classify_by_type import classify
from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
from magic_pdf.libs.MakeContentConfig import MakeMode, DropMode
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.json_compressor import JsonCompressor
......@@ -41,14 +42,14 @@ class AbsPipe(ABC):
raise NotImplementedError
@abstractmethod
def pipe_mk_uni_format(self):
def pipe_mk_uni_format(self, img_parent_path, drop_mode):
"""
有状态的组装统一格式
"""
raise NotImplementedError
@abstractmethod
def pipe_mk_markdown(self):
def pipe_mk_markdown(self, img_parent_path, drop_mode):
"""
有状态的组装markdown
"""
......@@ -83,34 +84,23 @@ class AbsPipe(ABC):
return AbsPipe.PIP_OCR
@staticmethod
def mk_uni_format(compressed_pdf_mid_data: str, img_buket_path: str) -> list:
def mk_uni_format(compressed_pdf_mid_data: str, img_buket_path: str, drop_mode=DropMode.WHOLE_PDF) -> list:
"""
根据pdf类型,生成统一格式content_list
"""
pdf_mid_data = JsonCompressor.decompress_json(compressed_pdf_mid_data)
parse_type = pdf_mid_data["_parse_type"]
pdf_info_list = pdf_mid_data["pdf_info"]
if parse_type == AbsPipe.PIP_TXT:
# content_list = mk_universal_format(pdf_info_list, img_buket_path)
content_list = make_standard_format_with_para(pdf_info_list, img_buket_path)
elif parse_type == AbsPipe.PIP_OCR:
content_list = make_standard_format_with_para(pdf_info_list, img_buket_path)
content_list = union_make(pdf_info_list, MakeMode.STANDARD_FORMAT, drop_mode, img_buket_path)
return content_list
@staticmethod
def mk_markdown(compressed_pdf_mid_data: str, img_buket_path: str) -> list:
def mk_markdown(compressed_pdf_mid_data: str, img_buket_path: str, drop_mode=DropMode.WHOLE_PDF) -> list:
"""
根据pdf类型,markdown
"""
pdf_mid_data = JsonCompressor.decompress_json(compressed_pdf_mid_data)
parse_type = pdf_mid_data["_parse_type"]
pdf_info_list = pdf_mid_data["pdf_info"]
if parse_type == AbsPipe.PIP_TXT:
# content_list = mk_universal_format(pdf_info_list, img_buket_path)
# md_content = mk_mm_markdown(content_list)
md_content = ocr_mk_mm_markdown_with_para(pdf_info_list, img_buket_path)
elif parse_type == AbsPipe.PIP_OCR:
md_content = ocr_mk_mm_markdown_with_para(pdf_info_list, img_buket_path)
md_content = union_make(pdf_info_list, MakeMode.MM_MD, drop_mode, img_buket_path)
return md_content
from magic_pdf.libs.MakeContentConfig import DropMode
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.pipe.AbsPipe import AbsPipe
......@@ -15,10 +16,10 @@ class OCRPipe(AbsPipe):
def pipe_parse(self):
self.pdf_mid_data = parse_ocr_pdf(self.pdf_bytes, self.model_list, self.image_writer, is_debug=self.is_debug)
def pipe_mk_uni_format(self, img_parent_path: str):
content_list = AbsPipe.mk_uni_format(self.get_compress_pdf_mid_data(), img_parent_path)
def pipe_mk_uni_format(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
content_list = AbsPipe.mk_uni_format(self.get_compress_pdf_mid_data(), img_parent_path, drop_mode)
return content_list
def pipe_mk_markdown(self, img_parent_path: str):
md_content = AbsPipe.mk_markdown(self.get_compress_pdf_mid_data(), img_parent_path)
def pipe_mk_markdown(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
md_content = AbsPipe.mk_markdown(self.get_compress_pdf_mid_data(), img_parent_path, drop_mode)
return md_content
from magic_pdf.libs.MakeContentConfig import DropMode
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.pipe.AbsPipe import AbsPipe
......@@ -15,10 +16,10 @@ class TXTPipe(AbsPipe):
def pipe_parse(self):
self.pdf_mid_data = parse_txt_pdf(self.pdf_bytes, self.model_list, self.image_writer, is_debug=self.is_debug)
def pipe_mk_uni_format(self, img_parent_path: str):
content_list = AbsPipe.mk_uni_format(self.get_compress_pdf_mid_data(), img_parent_path)
def pipe_mk_uni_format(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
content_list = AbsPipe.mk_uni_format(self.get_compress_pdf_mid_data(), img_parent_path, drop_mode)
return content_list
def pipe_mk_markdown(self, img_parent_path: str):
md_content = AbsPipe.mk_markdown(self.get_compress_pdf_mid_data(), img_parent_path)
def pipe_mk_markdown(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
md_content = AbsPipe.mk_markdown(self.get_compress_pdf_mid_data(), img_parent_path, drop_mode)
return md_content
import json
from loguru import logger
from magic_pdf.libs.MakeContentConfig import DropMode
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.libs.commons import join_path
......@@ -25,12 +27,12 @@ class UNIPipe(AbsPipe):
self.pdf_mid_data = parse_ocr_pdf(self.pdf_bytes, self.model_list, self.image_writer,
is_debug=self.is_debug)
def pipe_mk_uni_format(self, img_parent_path: str):
content_list = AbsPipe.mk_uni_format(self.get_compress_pdf_mid_data(), img_parent_path)
def pipe_mk_uni_format(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
content_list = AbsPipe.mk_uni_format(self.get_compress_pdf_mid_data(), img_parent_path, drop_mode)
return content_list
def pipe_mk_markdown(self, img_parent_path: str):
markdown_content = AbsPipe.mk_markdown(self.get_compress_pdf_mid_data(), img_parent_path)
def pipe_mk_markdown(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
markdown_content = AbsPipe.mk_markdown(self.get_compress_pdf_mid_data(), img_parent_path, drop_mode)
return markdown_content
......
......@@ -55,7 +55,7 @@ def ocr_construct_page_component(blocks, layout_bboxes, page_id, page_w, page_h,
def ocr_construct_page_component_v2(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, discarded_blocks):
images, tables, interline_equations, discarded_blocks, need_drop, drop_reason):
return_dict = {
'preproc_blocks': blocks,
'layout_bboxes': layout_bboxes,
......@@ -66,5 +66,7 @@ def ocr_construct_page_component_v2(blocks, layout_bboxes, page_id, page_w, page
'tables': tables,
'interline_equations': interline_equations,
'discarded_blocks': discarded_blocks,
'need_drop': need_drop,
'drop_reason': drop_reason,
}
return return_dict
......@@ -107,10 +107,10 @@ def _is_in_or_part_overlap(box1, box2) -> bool:
or y0_1 > y1_2
) # box1在box2的下边
def remove_text_block_overlap_interline_equation_bbox(
interline_eq_bboxes, pymu_block_list
):
"""消除掉行行内公式有部分重叠的文本块的内容。
同时重新计算消除重叠之后文本块的大小"""
deleted_block = []
......@@ -191,13 +191,13 @@ def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
"spans": [
{
"size": 9.962599754333496,
"_type": TYPE_INTERLINE_EQUATION,
"type": TYPE_INTERLINE_EQUATION,
"flags": 4,
"font": TYPE_INTERLINE_EQUATION,
"color": 0,
"ascender": 0.9409999847412109,
"descender": -0.3050000071525574,
"text": f"\n$$\n{latex_content}\n$$\n",
"latex": latex_content,
"origin": [bbox[0], bbox[1]],
"bbox": bbox,
}
......@@ -258,6 +258,9 @@ def replace_line_v2(eqinfo, line):
last_overlap_span = -1
delete_chars = []
for i in range(0, len(line["spans"])):
if "chars" not in line["spans"][i]:
continue
if line["spans"][i].get("_type", None) is not None:
continue # 忽略,因为已经是插入的伪造span公式了
......@@ -309,27 +312,22 @@ def replace_line_v2(eqinfo, line):
equation_span = {
"size": 9.962599754333496,
"_type": TYPE_INLINE_EQUATION,
"type": TYPE_INLINE_EQUATION,
"flags": 4,
"font": TYPE_INLINE_EQUATION,
"color": 0,
"ascender": 0.9409999847412109,
"descender": -0.3050000071525574,
"text": "",
"latex": "",
"origin": [337.1410153102337, 216.0205245153934],
"bbox": [
337.1410153102337,
216.0205245153934,
390.4496373892022,
228.50171037628277,
],
"bbox": eqinfo["bbox"]
}
# equation_span = line['spans'][0].copy()
equation_span["text"] = f" ${eqinfo['latex']}$ "
equation_span["latex"] = eqinfo['latex']
equation_span["bbox"] = [x0, equation_span["bbox"][1], x1, equation_span["bbox"][3]]
equation_span["origin"] = [equation_span["bbox"][0], equation_span["bbox"][1]]
equation_span["chars"] = delete_chars
equation_span["_type"] = TYPE_INLINE_EQUATION
equation_span["type"] = TYPE_INLINE_EQUATION
equation_span["_eq_bbox"] = eqinfo["bbox"]
line["spans"].insert(first_overlap_span_idx + 1, equation_span) # 放入公式
......@@ -363,6 +361,11 @@ def replace_line_v2(eqinfo, line):
line["spans"].remove(first_overlap_span)
if len(tail_span_chars) > 0:
min_of_tail_span_x0 = min([chr["bbox"][0] for chr in tail_span_chars])
min_of_tail_span_y0 = min([chr["bbox"][1] for chr in tail_span_chars])
max_of_tail_span_x1 = max([chr["bbox"][2] for chr in tail_span_chars])
max_of_tail_span_y1 = max([chr["bbox"][3] for chr in tail_span_chars])
if last_overlap_span == first_overlap_span: # 这个时候应该插入一个新的
tail_span_txt = "".join([char["c"] for char in tail_span_chars])
last_span_to_insert = last_overlap_span.copy()
......@@ -370,12 +373,20 @@ def replace_line_v2(eqinfo, line):
last_span_to_insert["text"] = "".join(
[char["c"] for char in tail_span_chars]
)
last_span_to_insert["bbox"] = (
min([chr["bbox"][0] for chr in tail_span_chars]),
last_overlap_span["bbox"][1],
last_overlap_span["bbox"][2],
last_overlap_span["bbox"][3],
)
if equation_span["bbox"][2] >= last_overlap_span["bbox"][2]:
last_span_to_insert["bbox"] = (
min_of_tail_span_x0,
min_of_tail_span_y0,
max_of_tail_span_x1,
max_of_tail_span_y1
)
else:
last_span_to_insert["bbox"] = (
min([chr["bbox"][0] for chr in tail_span_chars]),
last_overlap_span["bbox"][1],
last_overlap_span["bbox"][2],
last_overlap_span["bbox"][3],
)
# 插入到公式对象之后
equation_idx = line["spans"].index(equation_span)
line["spans"].insert(equation_idx + 1, last_span_to_insert) # 放入公式
......@@ -460,17 +471,16 @@ def replace_equations_in_textblock(
"""
替换行间和和行内公式为latex
"""
raw_text_blocks = remove_text_block_in_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消除重叠:第一步,在公式内部的
raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消重,第二步,和公式覆盖的
insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks)
return raw_text_blocks
......
......@@ -180,7 +180,12 @@ def check_useful_block_horizontal_overlap(useful_blocks: list) -> bool:
for i in range(len(useful_bboxes)):
for j in range(i + 1, len(useful_bboxes)):
area_i = (useful_bboxes[i][2] - useful_bboxes[i][0]) * (useful_bboxes[i][3] - useful_bboxes[i][1])
area_j = (useful_bboxes[j][2] - useful_bboxes[j][0]) * (useful_bboxes[j][3] - useful_bboxes[j][1])
if _is_left_overlap(useful_bboxes[i], useful_bboxes[j]) or _is_left_overlap(useful_bboxes[j], useful_bboxes[i]):
return True
if area_i > area_j:
return True, useful_bboxes[j]
else:
return True, useful_bboxes[i]
return False
return False, None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment