Commit 88f5b932 authored by 赵小蒙's avatar 赵小蒙

parse_pdf_by_txt 和 cut_image 重构,使用抽象类进行写出操作

parent 0e2d0b8b
import os
from pathlib import Path
from typing import Tuple
import io
# from app.common.s3 import get_s3_client
from magic_pdf.libs.commons import fitz
from loguru import logger
from magic_pdf.libs.commons import parse_bucket_key, join_path
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.hash_utils import compute_sha256
def cut_image(bbox: Tuple, page_num: int, page: fitz.Page, return_path, imageWriter, upload_switch=True):
def cut_image(bbox: tuple, page_num: int, page: fitz.Page, return_path, imageWriter, upload=True):
"""
从第page_num页的page中,根据bbox进行裁剪出一张jpg图片,返回图片路径
save_path:需要同时支持s3和本地, 图片存放在save_path下,文件名是: {page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg , bbox内数字取整。
......@@ -24,23 +19,25 @@ def cut_image(bbox: Tuple, page_num: int, page: fitz.Page, return_path, imageWri
# 新版本生成平铺路径
img_hash256_path = f"{compute_sha256(img_path)}.jpg"
# 将坐标转换为fitz.Rect对象
rect = fitz.Rect(*bbox)
# 配置缩放倍数为3倍
zoom = fitz.Matrix(3, 3)
# 截取图片
pix = page.get_pixmap(clip=rect, matrix=zoom)
if upload:
# 将坐标转换为fitz.Rect对象
rect = fitz.Rect(*bbox)
# 配置缩放倍数为3倍
zoom = fitz.Matrix(3, 3)
# 截取图片
pix = page.get_pixmap(clip=rect, matrix=zoom)
byte_data = pix.tobytes(output='jpeg', jpg_quality=95)
byte_data = pix.tobytes(output='jpeg', jpg_quality=95)
imageWriter.write(data=byte_data, path=img_hash256_path, mode="binary")
imageWriter.write(data=byte_data, path=img_hash256_path, mode="binary")
return img_hash256_path
def save_images_by_bboxes(book_name: str, page_num: int, page: fitz.Page, save_path: str,
image_bboxes: list, images_overlap_backup:list, table_bboxes: list, equation_inline_bboxes: list,
equation_interline_bboxes: list, img_s3_client) -> dict:
def save_images_by_bboxes(page_num: int, page: fitz.Page, pdf_bytes_md5: str,
image_bboxes: list, images_overlap_backup: list, table_bboxes: list,
equation_inline_bboxes: list,
equation_interline_bboxes: list, imageWriter) -> dict:
"""
返回一个dict, key为bbox, 值是图片地址
"""
......@@ -51,53 +48,44 @@ def save_images_by_bboxes(book_name: str, page_num: int, page: fitz.Page, save_p
interline_eq_info = []
# 图片的保存路径组成是这样的: {s3_or_local_path}/{book_name}/{images|tables|equations}/{page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg
s3_return_image_path = join_path(book_name, "images")
image_save_path = join_path(save_path, s3_return_image_path)
s3_return_table_path = join_path(book_name, "tables")
table_save_path = join_path(save_path, s3_return_table_path)
s3_return_equations_inline_path = join_path(book_name, "equations_inline")
equation_inline_save_path = join_path(save_path, s3_return_equations_inline_path)
s3_return_equation_interline_path = join_path(book_name, "equation_interline")
equation_interline_save_path = join_path(save_path, s3_return_equation_interline_path)
def return_path(type):
return join_path(pdf_bytes_md5, type)
for bbox in image_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"image_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox, page_num, page, image_save_path, s3_return_image_path, img_s3_client)
image_path = cut_image(bbox, page_num, page, return_path("images"), imageWriter)
image_info.append({"bbox": bbox, "image_path": image_path})
for bbox in images_overlap_backup:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"images_overlap_backup: 错误的box, {bbox}")
continue
image_path = cut_image(bbox, page_num, page, image_save_path, s3_return_image_path, img_s3_client)
image_path = cut_image(bbox, page_num, page, return_path("images"), imageWriter)
image_backup_info.append({"bbox": bbox, "image_path": image_path})
for bbox in table_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"table_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox, page_num, page, table_save_path, s3_return_table_path, img_s3_client)
image_path = cut_image(bbox, page_num, page, return_path("tables"), imageWriter)
table_info.append({"bbox": bbox, "image_path": image_path})
for bbox in equation_inline_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"equation_inline_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox[:4], page_num, page, equation_inline_save_path, s3_return_equations_inline_path, img_s3_client, upload_switch=False)
inline_eq_info.append({'bbox':bbox[:4], "image_path":image_path, "latex_text":bbox[4]})
image_path = cut_image(bbox[:4], page_num, page, return_path("equations_inline"), imageWriter, upload=False)
inline_eq_info.append({'bbox': bbox[:4], "image_path": image_path, "latex_text": bbox[4]})
for bbox in equation_interline_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"equation_interline_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox[:4], page_num, page, equation_interline_save_path, s3_return_equation_interline_path, img_s3_client, upload_switch=False)
interline_eq_info.append({"bbox":bbox[:4], "image_path":image_path, "latex_text":bbox[4]})
image_path = cut_image(bbox[:4], page_num, page, return_path("equation_interline"), imageWriter, upload=False)
interline_eq_info.append({"bbox": bbox[:4], "image_path": image_path, "latex_text": bbox[4]})
return image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info
\ No newline at end of file
return image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info
\ No newline at end of file
......@@ -12,6 +12,7 @@ from magic_pdf.layout.bbox_sort import (
)
from magic_pdf.layout.layout_sort import LAYOUT_UNPROC, get_bboxes_layout, get_columns_cnt_of_layout, sort_text_block
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.markdown_utils import escape_special_markdown_char
from magic_pdf.libs.safe_filename import sanitize_filename
from magic_pdf.libs.vis_utils import draw_bbox_on_page, draw_layout_bbox_on_page
......@@ -73,41 +74,20 @@ paraMergeException_msg = ParaMergeException().message
def parse_pdf_by_txt(
pdf_bytes,
pdf_model_output,
imageWriter,
start_page_id=0,
end_page_id=None,
debug_mode=False,
):
save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest")
md_bookname_save_path = ""
book_name = sanitize_filename(book_name)
if debug_mode:
save_path = join_path(save_tmp_path, "md")
pdf_local_path = join_path(save_tmp_path, "download-pdfs", book_name)
if not os.path.exists(os.path.dirname(pdf_local_path)):
# 如果目录不存在,创建它
os.makedirs(os.path.dirname(pdf_local_path))
md_bookname_save_path = join_path(save_tmp_path, "md", book_name)
if not os.path.exists(md_bookname_save_path):
# 如果目录不存在,创建它
os.makedirs(md_bookname_save_path)
with open(pdf_local_path + ".pdf", "wb") as pdf_file:
pdf_file.write(pdf_bytes)
pdf_bytes_md5 = compute_md5(pdf_bytes)
pdf_docs = fitz.open("pdf", pdf_bytes)
pdf_info_dict = {}
img_s3_client = get_img_s3_client(save_path, image_s3_config) # 更改函数名和参数,避免歧义
# img_s3_client = "img_s3_client" #不创建这个对象,直接用字符串占位
start_time = time.time()
"""通过统计pdf全篇文字,识别正文字体"""
main_text_font = get_main_text_font(pdf_docs)
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
for page_id in range(start_page_id, end_page_id + 1):
page = pdf_docs[page_id]
......@@ -126,26 +106,10 @@ def parse_pdf_by_txt(
# 去除对junkimg的依赖,简化逻辑
if len(page_imgs) > 1500: # 如果当前页超过1500张图片,直接跳过
logger.warning(f"page_id: {page_id}, img_counts: {len(page_imgs)}, drop this pdf: {book_name}")
logger.warning(f"page_id: {page_id}, img_counts: {len(page_imgs)}, drop this pdf")
result = {"need_drop": True, "drop_reason": DropReason.HIGH_COMPUTATIONAL_lOAD_BY_IMGS}
if not debug_mode:
return result
# img_counts = 0
# for img in page_imgs:
# img_bojid = img[0]
# if img_bojid in junk_img_bojids: # 判断这个图片在不在junklist中
# continue # 如果在junklist就不用管了,跳过
# else:
# recs = page.get_image_rects(img, transform=True)
# if recs: # 如果这张图在当前页面有展示
# img_counts += 1
# if img_counts >= 1500: # 如果去除了junkimg的影响,单页img仍然超过1500的话,就排除当前pdf
# logger.warning(
# f"page_id: {page_id}, img_counts: {img_counts}, drop this pdf: {book_name}, drop_reason: {DropReason.HIGH_COMPUTATIONAL_lOAD_BY_IMGS}"
# )
# result = {"need_drop": True, "drop_reason": DropReason.HIGH_COMPUTATIONAL_lOAD_BY_IMGS}
# if not debug_mode:
# return result
"""
==================================================================================================================================
......@@ -227,22 +191,18 @@ def parse_pdf_by_txt(
"""
==================================================================================================================================
"""
if debug_mode: # debugmode截图到本地
save_path = join_path(save_tmp_path, "md")
# 把图、表、公式都进行截图,保存到存储上,返回图片路径作为内容
image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info = save_images_by_bboxes(
book_name,
page_id,
page,
save_path,
pdf_bytes_md5,
image_bboxes,
images_overlap_backup,
table_bboxes,
equations_inline_bboxes,
equations_interline_bboxes,
# 传入img_s3_client
img_s3_client,
imageWriter
) # 只要表格和图片的截图
""""以下进入到公式替换环节 """
......@@ -255,13 +215,13 @@ def parse_pdf_by_txt(
"""去掉footnote, 从文字和图片中(先去角标再去footnote试试)"""
# 通过模型识别到的footnote
footnote_bboxes_by_model = parse_footnotes_by_model(page_id, page, model_output_json, md_bookname_save_path, debug_mode=debug_mode)
footnote_bboxes_by_model = parse_footnotes_by_model(page_id, page, model_output_json, debug_mode=debug_mode)
# 通过规则识别到的footnote
footnote_bboxes_by_rule = parse_footnotes_by_rule(remain_text_blocks, page_height, page_id, main_text_font)
"""进入pdf过滤器,去掉一些不合理的pdf"""
is_good_pdf, err = pdf_filter(page, remain_text_blocks, table_bboxes, image_bboxes)
if not is_good_pdf:
logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {err}")
logger.warning(f"page_id: {page_id}, drop this pdf: {pdf_bytes_md5}, reason: {err}")
if not debug_mode:
return err
......@@ -275,7 +235,7 @@ def parse_pdf_by_txt(
if is_text_block_horz_overlap:
# debug_show_bbox(pdf_docs, page_id, [b['bbox'] for b in remain_text_blocks], [], [], join_path(save_path, book_name, f"{book_name}_debug.pdf"), 0)
logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.TEXT_BLCOK_HOR_OVERLAP}")
logger.warning(f"page_id: {page_id}, drop this pdf: {pdf_bytes_md5}, reason: {DropReason.TEXT_BLCOK_HOR_OVERLAP}")
result = {"need_drop": True, "drop_reason": DropReason.TEXT_BLCOK_HOR_OVERLAP}
if not debug_mode:
return result
......@@ -294,21 +254,21 @@ def parse_pdf_by_txt(
layout_bboxes, layout_tree = get_bboxes_layout(all_bboxes, page_boundry, page_id)
if len(remain_text_blocks)>0 and len(all_bboxes)>0 and len(layout_bboxes)==0:
logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}")
logger.warning(f"page_id: {page_id}, drop this pdf: {pdf_bytes_md5}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}")
result = {"need_drop": True, "drop_reason": DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}
if not debug_mode:
return result
"""以下去掉复杂的布局和超过2列的布局"""
if any([lay["layout_label"] == LAYOUT_UNPROC for lay in layout_bboxes]): # 复杂的布局
logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.COMPLICATED_LAYOUT}")
logger.warning(f"page_id: {page_id}, drop this pdf: {pdf_bytes_md5}, reason: {DropReason.COMPLICATED_LAYOUT}")
result = {"need_drop": True, "drop_reason": DropReason.COMPLICATED_LAYOUT}
if not debug_mode:
return result
layout_column_width = get_columns_cnt_of_layout(layout_tree)
if layout_column_width > 2: # 去掉超过2列的布局pdf
logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}")
logger.warning(f"page_id: {page_id}, drop this pdf: {pdf_bytes_md5}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}")
result = {
"need_drop": True,
"drop_reason": DropReason.TOO_MANY_LAYOUT_COLUMNS,
......@@ -392,28 +352,11 @@ def parse_pdf_by_txt(
for page_info in pdf_info_dict.values():
is_good_pdf, err = pdf_post_filter(page_info)
if not is_good_pdf:
logger.warning(f"page_id: {i}, drop this pdf: {book_name}, reason: {err}")
logger.warning(f"page_id: {i}, drop this pdf: {pdf_bytes_md5}, reason: {err}")
if not debug_mode:
return err
i += 1
if debug_mode:
params_file_save_path = join_path(save_tmp_path, "md", book_name, "preproc_out.json")
page_draw_rect_save_path = join_path(save_tmp_path, "md", book_name, "layout.pdf")
# dir_path = os.path.dirname(page_draw_rect_save_path)
# if not os.path.exists(dir_path):
# # 如果目录不存在,创建它
# os.makedirs(dir_path)
with open(params_file_save_path, "w", encoding="utf-8") as f:
json.dump(pdf_info_dict, f, ensure_ascii=False, indent=4)
# 先检测本地 page_draw_rect_save_path 是否存在,如果存在则删除
if os.path.exists(page_draw_rect_save_path):
os.remove(page_draw_rect_save_path)
# 绘制bbox和layout到pdf
draw_bbox_on_page(pdf_docs, pdf_info_dict, page_draw_rect_save_path)
draw_layout_bbox_on_page(pdf_docs, pdf_info_dict, header, footer, page_draw_rect_save_path)
if debug_mode:
# 打印后处理阶段耗时
logger.info(f"post_processing_time: {get_delta_time(start_time)}")
......@@ -431,58 +374,30 @@ def parse_pdf_by_txt(
para_process_pipeline = ParaProcessPipeline()
def _deal_with_text_exception(error_info):
logger.warning(f"page_id: {page_id}, drop this pdf: {book_name}, reason: {error_info}")
logger.warning(f"page_id: {page_id}, drop this pdf: {pdf_bytes_md5}, reason: {error_info}")
if error_info == denseSingleLineBlockException_msg:
logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.DENSE_SINGLE_LINE_BLOCK}")
logger.warning(f"Drop this pdf: {pdf_bytes_md5}, reason: {DropReason.DENSE_SINGLE_LINE_BLOCK}")
result = {"need_drop": True, "drop_reason": DropReason.DENSE_SINGLE_LINE_BLOCK}
return result
if error_info == titleDetectionException_msg:
logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.TITLE_DETECTION_FAILED}")
logger.warning(f"Drop this pdf: {pdf_bytes_md5}, reason: {DropReason.TITLE_DETECTION_FAILED}")
result = {"need_drop": True, "drop_reason": DropReason.TITLE_DETECTION_FAILED}
return result
elif error_info == titleLevelException_msg:
logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.TITLE_LEVEL_FAILED}")
logger.warning(f"Drop this pdf: {pdf_bytes_md5}, reason: {DropReason.TITLE_LEVEL_FAILED}")
result = {"need_drop": True, "drop_reason": DropReason.TITLE_LEVEL_FAILED}
return result
elif error_info == paraSplitException_msg:
logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.PARA_SPLIT_FAILED}")
logger.warning(f"Drop this pdf: {pdf_bytes_md5}, reason: {DropReason.PARA_SPLIT_FAILED}")
result = {"need_drop": True, "drop_reason": DropReason.PARA_SPLIT_FAILED}
return result
elif error_info == paraMergeException_msg:
logger.warning(f"Drop this pdf: {book_name}, reason: {DropReason.PARA_MERGE_FAILED}")
logger.warning(f"Drop this pdf: {pdf_bytes_md5}, reason: {DropReason.PARA_MERGE_FAILED}")
result = {"need_drop": True, "drop_reason": DropReason.PARA_MERGE_FAILED}
return result
if debug_mode:
input_pdf_file = f"{pdf_local_path}.pdf"
output_dir = f"{save_path}/{book_name}"
output_pdf_file = f"{output_dir}/pdf_annos.pdf"
"""
Call the para_process_pipeline function to process the pdf_info_dict.
Parameters:
para_debug_mode: str or None
If para_debug_mode is None, the para_process_pipeline will not keep any intermediate results.
If para_debug_mode is "simple", the para_process_pipeline will only keep the annos on the pdf and the final results as a json file.
If para_debug_mode is "full", the para_process_pipeline will keep all the intermediate results generated during each step.
"""
pdf_info_dict, error_info = para_process_pipeline.para_process_pipeline(
pdf_info_dict,
para_debug_mode="simple",
input_pdf_path=input_pdf_file,
output_pdf_path=output_pdf_file,
)
# 打印段落处理阶段耗时
logger.info(f"para_process_time: {get_delta_time(start_time)}")
# debug的时候不return drop信息
if error_info is not None:
_deal_with_text_exception(error_info)
return pdf_info_dict
else:
pdf_info_dict, error_info = para_process_pipeline.para_process_pipeline(pdf_info_dict)
if error_info is not None:
return _deal_with_text_exception(error_info)
pdf_info_dict, error_info = para_process_pipeline.para_process_pipeline(pdf_info_dict)
if error_info is not None:
return _deal_with_text_exception(error_info)
return pdf_info_dict
......@@ -16,12 +16,21 @@
from magic_pdf.io import AbsReaderWriter
from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
def parse_txt_pdf(pdf_bytes:bytes, pdf_models:list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args, **kwargs):
"""
解析文本类pdf
"""
pdf_info_dict = parse_pdf_by_txt(
pdf_bytes,
pdf_models,
imageWriter,
start_page_id=start_page,
debug_mode=is_debug,
)
return pdf_info_dict
pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment