Unverified Commit 31f3d4cc authored by drunkpig's avatar drunkpig Committed by GitHub

Merge pull request #26 from myhloli/master

切图逻辑修复,Pipe类重构
parents ef03c906 435ab922
from loguru import logger
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.language import detect_lang
from magic_pdf.libs.markdown_utils import ocr_escape_special_markdown_char
......@@ -151,6 +153,7 @@ def para_to_standard_format(para, img_buket_path):
for span in line['spans']:
language = ''
span_type = span.get('type')
content = ""
if span_type == ContentType.Text:
content = span['content']
language = detect_lang(content)
......
from loguru import logger
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.commons import fitz
......@@ -20,10 +19,6 @@ def cut_image(bbox: tuple, page_num: int, page: fitz.Page, return_path, imageWri
# 新版本生成平铺路径
img_hash256_path = f"{compute_sha256(img_path)}.jpg"
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"image_bboxes: 错误的box, {bbox}")
return img_hash256_path
# 将坐标转换为fitz.Rect对象
rect = fitz.Rect(*bbox)
# 配置缩放倍数为3倍
......
......@@ -160,6 +160,9 @@ def parse_pdf_by_ocr(
'''bbox去除粘连'''
spans = remove_overlap_between_bbox(spans)
'''用现有的bbox计算layout'''
'''
对tpye=["interline_equation", "image", "table"]进行额外处理,
如果左边有字的话,将该span的bbox中y0调整至不高于文字的y0
......
from abc import ABC, abstractmethod
from magic_pdf.dict2md.mkcontent import mk_universal_format, mk_mm_markdown
from magic_pdf.dict2md.ocr_mkcontent import make_standard_format_with_para, ocr_mk_mm_markdown_with_para
from magic_pdf.filter.pdf_classify_by_type import classify
from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.json_compressor import JsonCompressor
class AbsPipe(ABC):
"""
txt和ocr处理的抽象类
"""
def __init__(self, pdf_bytes: bytes, model_list: list, image_writer: AbsReaderWriter):
self.pdf_bytes = pdf_bytes
self.model_list = model_list
self.image_writer = image_writer
@abstractmethod
def pipe_classify(self):
"""
有状态的分类
"""
raise NotImplementedError
@abstractmethod
def pipe_parse(self):
"""
有状态的解析
"""
raise NotImplementedError
@abstractmethod
def pipe_mk_uni_format(self):
"""
有状态的组装统一格式
"""
raise NotImplementedError
@abstractmethod
def pipe_mk_markdown(self):
"""
有状态的组装markdown
"""
raise NotImplementedError
@staticmethod
def classify(pdf_bytes: bytes) -> str:
"""
根据pdf的元数据,判断是否是文本pdf,还是ocr pdf
"""
pdf_meta = pdf_meta_scan(pdf_bytes)
if pdf_meta.get("_need_drop", False): # 如果返回了需要丢弃的标志,则抛出异常
raise Exception(f"pdf meta_scan need_drop,reason is {pdf_meta['_drop_reason']}")
else:
is_encrypted = pdf_meta["is_encrypted"]
is_needs_password = pdf_meta["is_needs_password"]
if is_encrypted or is_needs_password: # 加密的,需要密码的,没有页面的,都不处理
raise Exception(f"pdf meta_scan need_drop,reason is {DropReason.ENCRYPTED}")
else:
is_text_pdf, results = classify(
pdf_meta["total_page"],
pdf_meta["page_width_pts"],
pdf_meta["page_height_pts"],
pdf_meta["image_info_per_page"],
pdf_meta["text_len_per_page"],
pdf_meta["imgs_per_page"],
pdf_meta["text_layout_per_page"],
)
if is_text_pdf:
return "txt"
else:
return "ocr"
@staticmethod
def mk_uni_format(compressed_pdf_mid_data: str, img_buket_path: str) -> list:
"""
根据pdf类型,生成统一格式content_list
"""
pdf_mid_data = JsonCompressor.decompress_json(compressed_pdf_mid_data)
parse_type = pdf_mid_data["_parse_type"]
pdf_info_list = pdf_mid_data["pdf_info"]
if parse_type == "txt":
content_list = mk_universal_format(pdf_info_list, img_buket_path)
elif parse_type == "ocr":
content_list = make_standard_format_with_para(pdf_info_list, img_buket_path)
return content_list
@staticmethod
def mk_markdown(compressed_pdf_mid_data: str, img_buket_path: str) -> list:
"""
根据pdf类型,markdown
"""
pdf_mid_data = JsonCompressor.decompress_json(compressed_pdf_mid_data)
parse_type = pdf_mid_data["_parse_type"]
pdf_info_list = pdf_mid_data["pdf_info"]
if parse_type == "txt":
content_list = mk_universal_format(pdf_info_list, img_buket_path)
md_content = mk_mm_markdown(content_list)
elif parse_type == "ocr":
md_content = ocr_mk_mm_markdown_with_para(pdf_info_list, img_buket_path)
return md_content
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.pipe.AbsPipe import AbsPipe
from magic_pdf.user_api import parse_ocr_pdf
class OCRPipe(AbsPipe):
def __init__(self, pdf_bytes: bytes, model_list: list, image_writer: AbsReaderWriter, img_bucket_path: str):
self.compressed_pdf_mid_data = None
self.pdf_mid_data = None
self.pdf_bytes = pdf_bytes
self.model_list = model_list
self.image_writer = image_writer
self.img_bucket_path = img_bucket_path
def pipe_classify(self):
pass
def pipe_parse(self):
self.pdf_mid_data = parse_ocr_pdf(self.pdf_bytes, self.model_list, self.image_writer)
self.compressed_pdf_mid_data = JsonCompressor.compress_json(self.pdf_mid_data)
def pipe_mk_uni_format(self):
content_list = AbsPipe.mk_uni_format(self.compressed_pdf_mid_data, self.img_bucket_path)
return content_list
def pipe_mk_markdown(self):
md_content = AbsPipe.mk_markdown(self.compressed_pdf_mid_data, self.img_bucket_path)
return md_content
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.pipe.AbsPipe import AbsPipe
from magic_pdf.user_api import parse_txt_pdf
class TXTPipe(AbsPipe):
def __init__(self, pdf_bytes: bytes, model_list: list, image_writer: AbsReaderWriter, img_bucket_path: str):
self.compressed_pdf_mid_data = None
self.pdf_mid_data = None
self.pdf_bytes = pdf_bytes
self.model_list = model_list
self.image_writer = image_writer
self.img_bucket_path = img_bucket_path
def pipe_classify(self):
pass
def pipe_parse(self):
self.pdf_mid_data = parse_txt_pdf(self.pdf_bytes, self.model_list, self.image_writer)
self.compressed_pdf_mid_data = JsonCompressor.compress_json(self.pdf_mid_data)
def pipe_mk_uni_format(self):
content_list = AbsPipe.mk_uni_format(self.compressed_pdf_mid_data, self.img_bucket_path)
return content_list
def pipe_mk_markdown(self):
md_content = AbsPipe.mk_markdown(self.compressed_pdf_mid_data, self.img_bucket_path)
return md_content
import json
from loguru import logger
from magic_pdf.dict2md.mkcontent import mk_universal_format, mk_mm_markdown
from magic_pdf.dict2md.ocr_mkcontent import make_standard_format_with_para, ocr_mk_mm_markdown_with_para
from magic_pdf.filter.pdf_classify_by_type import classify
......@@ -9,118 +8,63 @@ from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.detect_language_from_model import get_language_from_model
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.pipe.AbsPipe import AbsPipe
from magic_pdf.user_api import parse_union_pdf, parse_ocr_pdf
class UNIPipe:
def __init__(self):
pass
class UNIPipe(AbsPipe):
@staticmethod
def classify(pdf_bytes: bytes) -> str:
"""
根据pdf的元数据,判断是否是文本pdf,还是ocr pdf
"""
pdf_meta = pdf_meta_scan(pdf_bytes)
if pdf_meta.get("_need_drop", False): # 如果返回了需要丢弃的标志,则抛出异常
raise Exception(f"pdf meta_scan need_drop,reason is {pdf_meta['_drop_reason']}")
else:
is_encrypted = pdf_meta["is_encrypted"]
is_needs_password = pdf_meta["is_needs_password"]
if is_encrypted or is_needs_password: # 加密的,需要密码的,没有页面的,都不处理
raise Exception(f"pdf meta_scan need_drop,reason is {DropReason.ENCRYPTED}")
else:
is_text_pdf, results = classify(
pdf_meta["total_page"],
pdf_meta["page_width_pts"],
pdf_meta["page_height_pts"],
pdf_meta["image_info_per_page"],
pdf_meta["text_len_per_page"],
pdf_meta["imgs_per_page"],
pdf_meta["text_layout_per_page"],
)
if is_text_pdf:
return "txt"
else:
return "ocr"
def __init__(self, pdf_bytes: bytes, model_list: list, image_writer: AbsReaderWriter, img_bucket_path: str):
self.pdf_type = "ocr"
self.compressed_pdf_mid_data = None
self.pdf_mid_data = None
self.pdf_bytes = pdf_bytes
self.model_list = model_list
self.image_writer = image_writer
self.img_bucket_path = img_bucket_path
def parse(self, pdf_bytes: bytes, image_writer, jso_useful_key) -> dict:
"""
根据pdf类型,解析pdf
"""
text_language = get_language_from_model(jso_useful_key['model_list'])
allow_language = ["zh", "en"] # 允许的语言,目前只允许简中和英文的
logger.info(f"pdf text_language is {text_language}")
if text_language not in allow_language: # 如果语言不在允许的语言中,则drop
raise Exception(f"pdf meta_scan need_drop,reason is {DropReason.NOT_ALLOW_LANGUAGE}")
else:
if jso_useful_key['_pdf_type'] == "txt":
pdf_mid_data = parse_union_pdf(pdf_bytes, jso_useful_key['model_list'], image_writer)
elif jso_useful_key['_pdf_type'] == "ocr":
pdf_mid_data = parse_ocr_pdf(pdf_bytes, jso_useful_key['model_list'], image_writer)
else:
raise Exception(f"pdf type is not txt or ocr")
return JsonCompressor.compress_json(pdf_mid_data)
def pipe_classify(self):
self.pdf_type = UNIPipe.classify(self.pdf_bytes)
@staticmethod
def mk_uni_format(pdf_mid_data: str, img_buket_path: str) -> list:
"""
根据pdf类型,生成统一格式content_list
"""
pdf_mid_data = JsonCompressor.decompress_json(pdf_mid_data)
parse_type = pdf_mid_data["_parse_type"]
pdf_info_list = pdf_mid_data["pdf_info"]
if parse_type == "txt":
content_list = mk_universal_format(pdf_info_list, img_buket_path)
elif parse_type == "ocr":
content_list = make_standard_format_with_para(pdf_info_list, img_buket_path)
return content_list
def pipe_parse(self):
if self.pdf_type == "txt":
self.pdf_mid_data = parse_union_pdf(pdf_bytes, self.model_list, self.image_writer)
elif self.pdf_type == "ocr":
self.pdf_mid_data = parse_ocr_pdf(pdf_bytes, self.model_list, self.image_writer)
self.compressed_pdf_mid_data = JsonCompressor.compress_json(self.pdf_mid_data)
@staticmethod
def mk_markdown(pdf_mid_data: str, img_buket_path: str) -> list:
"""
根据pdf类型,markdown
"""
pdf_mid_data = JsonCompressor.decompress_json(pdf_mid_data)
parse_type = pdf_mid_data["_parse_type"]
pdf_info_list = pdf_mid_data["pdf_info"]
if parse_type == "txt":
content_list = mk_universal_format(pdf_info_list, img_buket_path)
md_content = mk_mm_markdown(content_list)
elif parse_type == "ocr":
md_content = ocr_mk_mm_markdown_with_para(pdf_info_list, img_buket_path)
return md_content
def pipe_mk_uni_format(self):
content_list = AbsPipe.mk_uni_format(self.compressed_pdf_mid_data, self.img_bucket_path)
return content_list
def pipe_mk_markdown(self):
markdown_content = AbsPipe.mk_markdown(self.compressed_pdf_mid_data, self.img_bucket_path)
return markdown_content
if __name__ == '__main__':
# 测试
# file_path = r"tmp/unittest/download-pdfs/数学新星网/edu_00001236.pdf"
drw = DiskReaderWriter(r"D:/project/20231108code-clean")
# pdf_bytes = drw.read(path=file_path, mode=AbsReaderWriter.MODE_BIN)
# pdf_type = UNIPipe.classify(pdf_bytes)
# logger.info(f"pdf_type is {pdf_type}")
pdf_file_path = r"linshixuqiu\25536-00.pdf"
model_file_path = r"linshixuqiu\25536-00.json"
pdf_file_path = r"linshixuqiu\19983-00.pdf"
model_file_path = r"linshixuqiu\19983-00.json"
pdf_bytes = drw.read(pdf_file_path, AbsReaderWriter.MODE_BIN)
model_json_txt = drw.read(model_file_path, AbsReaderWriter.MODE_TXT)
model_list = json.loads(model_json_txt)
write_path = r"D:\project\20231108code-clean\linshixuqiu\19983-00"
img_bucket_path = "imgs"
img_writer = DiskReaderWriter(join_path(write_path, img_bucket_path))
pdf_type = UNIPipe.classify(pdf_bytes)
logger.info(f"pdf_type is {pdf_type}")
jso_useful_key = {
"_pdf_type": pdf_type,
"model_list": json.loads(model_json_txt),
}
pipe = UNIPipe()
write_path = r"D:\project\20231108code-clean\linshixuqiu\25536-00"
img_buket_path = "imgs"
img_writer = DiskReaderWriter(join_path(write_path, img_buket_path))
pdf_mid_data = pipe.parse(pdf_bytes, img_writer, jso_useful_key)
pipe = UNIPipe(pdf_bytes, model_list, img_writer, img_bucket_path)
pipe.pipe_classify()
pipe.pipe_parse()
md_content = pipe.pipe_mk_markdown()
try:
content_list = pipe.pipe_mk_uni_format()
except Exception as e:
logger.exception(e)
md_content = pipe.mk_markdown(pdf_mid_data, "imgs")
md_writer = DiskReaderWriter(write_path)
md_writer.write(md_content, "25536-00.md", AbsReaderWriter.MODE_TXT)
md_writer.write(json.dumps(JsonCompressor.decompress_json(pdf_mid_data), ensure_ascii=False, indent=4), "25536-00.json", AbsReaderWriter.MODE_TXT)
md_writer.write(md_content, "19983-00.md", AbsReaderWriter.MODE_TXT)
md_writer.write(json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4), "19983-00.json", AbsReaderWriter.MODE_TXT)
md_writer.write(str(content_list), "19983-00.txt", AbsReaderWriter.MODE_TXT)
from loguru import logger
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.libs.pdf_image_tools import cut_image
......@@ -10,9 +12,13 @@ def ocr_cut_image_and_table(spans, page, page_id, pdf_bytes_md5, imageWriter):
for span in spans:
span_type = span['type']
if span_type == ContentType.Image:
if not check_img_bbox(span['bbox']):
continue
span['image_path'] = cut_image(span['bbox'], page_id, page, return_path=return_path('images'),
imageWriter=imageWriter)
elif span_type == ContentType.Table:
if not check_img_bbox(span['bbox']):
continue
span['image_path'] = cut_image(span['bbox'], page_id, page, return_path=return_path('tables'),
imageWriter=imageWriter)
......@@ -38,15 +44,28 @@ def txt_save_images_by_bboxes(page_num: int, page, pdf_bytes_md5: str,
return join_path(pdf_bytes_md5, type)
for bbox in image_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path("images"), imageWriter)
image_info.append({"bbox": bbox, "image_path": image_path})
for bbox in images_overlap_backup:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path("images"), imageWriter)
image_backup_info.append({"bbox": bbox, "image_path": image_path})
for bbox in table_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path("tables"), imageWriter)
table_info.append({"bbox": bbox, "image_path": image_path})
return image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info
def check_img_bbox(bbox) -> bool:
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"image_bboxes: 错误的box, {bbox}")
return False
return True
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment