Unverified Commit e155d322 authored by icecraft's avatar icecraft Committed by GitHub

feat: remove dummpy code, magic_pdf/cli, magic_pdf/train_utils (#291)

* feat: remove dummpy code, magic_pdf/cli, magic_pdf/train_utils

* feat: expose version in command line

---------
Co-authored-by: 's avatarshenguanlin <shenguanlin@pjlab.org.cn>
parent 15125623
......@@ -192,9 +192,30 @@ pip install magic-pdf[full]==0.6.2b1 detectron2 --extra-index-url https://wheels
### 命令行
```bash
magic-pdf -p {some_pdf} -o {some_output_dir}
magic-pdf --help
Usage: magic-pdf [OPTIONS]
Options:
-v, --version display the version and exit
-p, --path PATH local pdf filepath or directory [required]
-o, --output-dir TEXT output local directory
-m, --method [ocr|txt|auto] the method for parsing pdf.
ocr: using ocr technique to extract information from pdf,
txt: suitable for the text-based pdf only and outperform ocr,
auto: automatically choose the best method for parsing pdf
from ocr and txt.
without method specified, auto will be used by default.
--help Show this message and exit.
## show version
magic-pdf -v
## command line example
magic-pdf -p {some_pdf} -o {some_output_dir} -m auto
```
其中 `{some_pdf}` 可以使单个pdf文件,也可以是一个包含多个pdf文件的目录。
其中 `{some_pdf}` 可以是单个pdf文件,也可以是一个包含多个pdf文件的目录。
运行完命令后输出的结果会保存在`{some_output_dir}`目录下, 输出的文件列表如下
```text
......
import os
import json as json_parse
import click
from loguru import logger
from pathlib import Path
from magic_pdf.libs.version import __version__
from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.libs.path_utils import (
parse_s3path,
parse_s3_range_params,
remove_non_official_s3_args,
)
from magic_pdf.libs.config_reader import (
get_local_dir,
get_s3_config,
)
from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
import csv
import copy
import magic_pdf.model as model_config
parse_pdf_methods = click.Choice(["ocr", "txt", "auto"])
def prepare_env(pdf_file_name, method):
local_parent_dir = os.path.join(get_local_dir(), "magic-pdf", pdf_file_name, method)
local_image_dir = os.path.join(str(local_parent_dir), "images")
local_md_dir = local_parent_dir
os.makedirs(local_image_dir, exist_ok=True)
os.makedirs(local_md_dir, exist_ok=True)
return local_image_dir, local_md_dir
def write_to_csv(csv_file_path, csv_data):
with open(csv_file_path, mode="a", newline="", encoding="utf-8") as csvfile:
# 创建csv writer对象
csv_writer = csv.writer(csvfile)
# 写入数据
csv_writer.writerow(csv_data)
logger.info(f"数据已成功追加到 '{csv_file_path}'")
def do_parse(
pdf_file_name,
pdf_bytes,
model_list,
parse_method,
f_draw_span_bbox=True,
f_draw_layout_bbox=True,
f_dump_md=True,
f_dump_middle_json=True,
f_dump_model_json=True,
f_dump_orig_pdf=True,
f_dump_content_list=True,
f_make_md_mode=MakeMode.MM_MD,
):
orig_model_list = copy.deepcopy(model_list)
local_image_dir, local_md_dir = prepare_env(pdf_file_name, parse_method)
image_writer, md_writer = DiskReaderWriter(local_image_dir), DiskReaderWriter(local_md_dir)
image_dir = str(os.path.basename(local_image_dir))
if parse_method == "auto":
jso_useful_key = {"_pdf_type": "", "model_list": model_list}
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer, is_debug=True)
elif parse_method == "txt":
pipe = TXTPipe(pdf_bytes, model_list, image_writer, is_debug=True)
elif parse_method == "ocr":
pipe = OCRPipe(pdf_bytes, model_list, image_writer, is_debug=True)
else:
logger.error("unknown parse method")
exit(1)
pipe.pipe_classify()
"""如果没有传入有效的模型数据,则使用内置model解析"""
if len(model_list) == 0:
if model_config.__use_inside_model__:
pipe.pipe_analyze()
orig_model_list = copy.deepcopy(pipe.model_list)
else:
logger.error("need model list input")
exit(1)
pipe.pipe_parse()
pdf_info = pipe.pdf_mid_data["pdf_info"]
if f_draw_layout_bbox:
draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir)
if f_draw_span_bbox:
draw_span_bbox(pdf_info, pdf_bytes, local_md_dir)
md_content = pipe.pipe_mk_markdown(image_dir, drop_mode=DropMode.NONE, md_make_mode=f_make_md_mode)
if f_dump_md:
"""写markdown"""
md_writer.write(
content=md_content,
path=f"{pdf_file_name}.md",
mode=AbsReaderWriter.MODE_TXT,
)
if f_dump_middle_json:
"""写middle_json"""
md_writer.write(
content=json_parse.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
path=f"{pdf_file_name}_middle.json",
mode=AbsReaderWriter.MODE_TXT,
)
if f_dump_model_json:
"""写model_json"""
md_writer.write(
content=json_parse.dumps(orig_model_list, ensure_ascii=False, indent=4),
path=f"{pdf_file_name}_model.json",
mode=AbsReaderWriter.MODE_TXT,
)
if f_dump_orig_pdf:
"""写源pdf"""
md_writer.write(
content=pdf_bytes,
path=f"{pdf_file_name}_origin.pdf",
mode=AbsReaderWriter.MODE_BIN,
)
content_list = pipe.pipe_mk_uni_format(image_dir, drop_mode=DropMode.NONE)
if f_dump_content_list:
"""写content_list"""
md_writer.write(
content=json_parse.dumps(content_list, ensure_ascii=False, indent=4),
path=f"{pdf_file_name}_content_list.json",
mode=AbsReaderWriter.MODE_TXT,
)
logger.info(f"local output dir is '{local_md_dir}', you can found the result in it.")
@click.group()
@click.version_option(__version__, "--version", "-v", help="显示版本信息")
@click.help_option("--help", "-h", help="显示帮助信息")
def cli():
pass
@cli.command()
@click.option("--json", type=str, help="输入一个S3路径")
@click.option(
"--method",
type=parse_pdf_methods,
help="指定解析方法。txt: 文本型 pdf 解析方法, ocr: 光学识别解析 pdf, auto: 程序智能选择解析方法",
default="auto",
)
@click.option("--inside_model", type=click.BOOL, default=True, help="使用内置模型测试")
@click.option("--model_mode", type=click.STRING, default="full",
help="内置模型选择。lite: 快速解析,精度较低,full: 高精度解析,速度较慢")
def json_command(json, method, inside_model, model_mode):
model_config.__use_inside_model__ = inside_model
model_config.__model_mode__ = model_mode
if not json.startswith("s3://"):
logger.error("usage: magic-pdf json-command --json s3://some_bucket/some_path")
exit(1)
def read_s3_path(s3path):
bucket, key = parse_s3path(s3path)
s3_ak, s3_sk, s3_endpoint = get_s3_config(bucket)
s3_rw = S3ReaderWriter(
s3_ak, s3_sk, s3_endpoint, "auto", remove_non_official_s3_args(s3path)
)
may_range_params = parse_s3_range_params(s3path)
if may_range_params is None or 2 != len(may_range_params):
byte_start, byte_end = 0, None
else:
byte_start, byte_end = int(may_range_params[0]), int(may_range_params[1])
byte_end += byte_start - 1
return s3_rw.read_jsonl(
remove_non_official_s3_args(s3path),
byte_start,
byte_end,
AbsReaderWriter.MODE_BIN,
)
jso = json_parse.loads(read_s3_path(json).decode("utf-8"))
s3_file_path = jso.get("file_location")
if s3_file_path is None:
s3_file_path = jso.get("path")
pdf_file_name = Path(s3_file_path).stem
pdf_data = read_s3_path(s3_file_path)
do_parse(
pdf_file_name,
pdf_data,
jso["doc_layout_result"],
method,
)
@cli.command()
@click.option("--local_json", type=str, help="输入一个本地jsonl路径")
@click.option(
"--method",
type=parse_pdf_methods,
help="指定解析方法。txt: 文本型 pdf 解析方法, ocr: 光学识别解析 pdf, auto: 程序智能选择解析方法",
default="auto",
)
@click.option("--inside_model", type=click.BOOL, default=True, help="使用内置模型测试")
@click.option("--model_mode", type=click.STRING, default="full",
help="内置模型选择。lite: 快速解析,精度较低,full: 高精度解析,速度较慢")
def local_json_command(local_json, method, inside_model, model_mode):
model_config.__use_inside_model__ = inside_model
model_config.__model_mode__ = model_mode
def read_s3_path(s3path):
bucket, key = parse_s3path(s3path)
s3_ak, s3_sk, s3_endpoint = get_s3_config(bucket)
s3_rw = S3ReaderWriter(
s3_ak, s3_sk, s3_endpoint, "auto", remove_non_official_s3_args(s3path)
)
may_range_params = parse_s3_range_params(s3path)
if may_range_params is None or 2 != len(may_range_params):
byte_start, byte_end = 0, None
else:
byte_start, byte_end = int(may_range_params[0]), int(may_range_params[1])
byte_end += byte_start - 1
return s3_rw.read_jsonl(
remove_non_official_s3_args(s3path),
byte_start,
byte_end,
AbsReaderWriter.MODE_BIN,
)
with open(local_json, "r", encoding="utf-8") as f:
for json_line in f:
jso = json_parse.loads(json_line)
s3_file_path = jso.get("file_location")
if s3_file_path is None:
s3_file_path = jso.get("path")
pdf_file_name = Path(s3_file_path).stem
pdf_data = read_s3_path(s3_file_path)
do_parse(
pdf_file_name,
pdf_data,
jso["doc_layout_result"],
method,
)
@cli.command()
@click.option(
"--pdf", type=click.Path(exists=True), required=True,
help='pdf 文件路径, 支持单个文件或文件列表, 文件列表需要以".list"结尾, 一行一个pdf文件路径')
@click.option("--model", type=click.Path(exists=True), help="模型的路径")
@click.option(
"--method",
type=parse_pdf_methods,
help="指定解析方法。txt: 文本型 pdf 解析方法, ocr: 光学识别解析 pdf, auto: 程序智能选择解析方法",
default="auto",
)
@click.option("--inside_model", type=click.BOOL, default=True, help="使用内置模型测试")
@click.option("--model_mode", type=click.STRING, default="full",
help="内置模型选择。lite: 快速解析,精度较低,full: 高精度解析,速度较慢")
def pdf_command(pdf, model, method, inside_model, model_mode):
model_config.__use_inside_model__ = inside_model
model_config.__model_mode__ = model_mode
def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
def get_model_json(model_path, doc_path):
# 这里处理pdf和模型相关的逻辑
if model_path is None:
file_name_without_extension, extension = os.path.splitext(doc_path)
if extension == ".pdf":
model_path = file_name_without_extension + ".json"
else:
raise Exception("pdf_path input error")
if not os.path.exists(model_path):
logger.warning(
f"not found json {model_path} existed"
)
# 本地无模型数据则调用内置paddle分析,先传空list,在内部识别到空list再调用paddle
model_json = "[]"
else:
model_json = read_fn(model_path).decode("utf-8")
else:
model_json = read_fn(model_path).decode("utf-8")
return model_json
def parse_doc(doc_path):
try:
file_name = str(Path(doc_path).stem)
pdf_data = read_fn(doc_path)
jso = json_parse.loads(get_model_json(model, doc_path))
do_parse(
file_name,
pdf_data,
jso,
method,
)
except Exception as e:
logger.exception(e)
if not pdf:
logger.error(f"Error: Missing argument '--pdf'.")
exit(f"Error: Missing argument '--pdf'.")
else:
'''适配多个文档的list文件输入'''
if pdf.endswith(".list"):
with open(pdf, "r") as f:
for line in f.readlines():
line = line.strip()
parse_doc(line)
else:
'''适配单个文档的输入'''
parse_doc(pdf)
if __name__ == "__main__":
"""
python magic_pdf/cli/magicpdf.py json-command --json s3://llm-pdf-text/pdf_ebook_and_paper/manual/v001/part-660407a28beb-000002.jsonl?bytes=0,63551
"""
cli()
import time
# from anyio import Path
from magic_pdf.libs.commons import (
fitz,
get_delta_time,
get_img_s3_client,
get_docx_model_output,
)
import json
import os
from copy import deepcopy
import math
from loguru import logger
from magic_pdf.layout.bbox_sort import (
prepare_bboxes_for_layout_split,
)
from magic_pdf.layout.layout_sort import (
LAYOUT_UNPROC,
get_bboxes_layout,
get_columns_cnt_of_layout,
sort_text_block,
)
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.markdown_utils import escape_special_markdown_char
from magic_pdf.libs.safe_filename import sanitize_filename
from magic_pdf.libs.vis_utils import draw_bbox_on_page, draw_layout_bbox_on_page
from magic_pdf.pre_proc.cut_image import txt_save_images_by_bboxes
from magic_pdf.pre_proc.detect_images import parse_images
from magic_pdf.pre_proc.detect_tables import parse_tables # 获取tables的bbox
from magic_pdf.pre_proc.detect_equation import parse_equations # 获取equations的bbox
from magic_pdf.pre_proc.detect_header import parse_headers # 获取headers的bbox
from magic_pdf.pre_proc.detect_page_number import parse_pageNos # 获取pageNos的bbox
from magic_pdf.pre_proc.detect_footnote import (
parse_footnotes_by_model,
parse_footnotes_by_rule,
) # 获取footnotes的bbox
from magic_pdf.pre_proc.detect_footer_by_model import parse_footers # 获取footers的bbox
from magic_pdf.post_proc.detect_para import (
ParaProcessPipeline,
TitleDetectionException,
TitleLevelException,
ParaSplitException,
ParaMergeException,
DenseSingleLineBlockException,
)
from magic_pdf.pre_proc.main_text_font import get_main_text_font
from magic_pdf.pre_proc.remove_colored_strip_bbox import remove_colored_strip_textblock
from magic_pdf.pre_proc.remove_footer_header import remove_headder_footer_one_page
from magic_pdf.train_utils.extract_caption import extract_caption_bbox
"""
from para.para_pipeline import ParaProcessPipeline
from para.exceptions import (
TitleDetectionException,
TitleLevelException,
ParaSplitException,
ParaMergeException,
DenseSingleLineBlockException,
)
"""
from magic_pdf.libs.commons import read_file, join_path
from magic_pdf.post_proc.remove_footnote import (
merge_footnote_blocks,
remove_footnote_blocks,
)
from magic_pdf.pre_proc.citationmarker_remove import remove_citation_marker
from magic_pdf.pre_proc.equations_replace import (
combine_chars_to_pymudict,
remove_chars_in_text_blocks,
replace_equations_in_textblock,
)
from magic_pdf.pre_proc.pdf_pre_filter import pdf_filter
from magic_pdf.pre_proc.detect_footer_header_by_statistics import drop_footer_header
from magic_pdf.pre_proc.construct_page_dict import construct_page_component
from magic_pdf.pre_proc.fix_image import (
combine_images,
fix_image_vertical,
fix_seperated_image,
include_img_title,
)
from magic_pdf.post_proc.pdf_post_filter import pdf_post_filter
from magic_pdf.pre_proc.remove_rotate_bbox import (
get_side_boundry,
remove_rotate_side_textblock,
remove_side_blank_block,
)
from magic_pdf.pre_proc.resolve_bbox_conflict import (
check_text_block_horizontal_overlap,
resolve_bbox_overlap_conflict,
)
from magic_pdf.pre_proc.fix_table import (
fix_table_text_block,
fix_tables,
include_table_title,
)
from magic_pdf.pre_proc.solve_line_alien import solve_inline_too_large_interval
denseSingleLineBlockException_msg = DenseSingleLineBlockException().message
titleDetectionException_msg = TitleDetectionException().message
titleLevelException_msg = TitleLevelException().message
paraSplitException_msg = ParaSplitException().message
paraMergeException_msg = ParaMergeException().message
def parse_pdf_for_train(
s3_pdf_path,
s3_pdf_profile,
pdf_model_output,
save_path,
book_name,
image_s3_config=None,
start_page_id=0,
end_page_id=None,
junk_img_bojids=[],
debug_mode=False,
):
pdf_bytes = read_file(s3_pdf_path, s3_pdf_profile)
save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest")
md_bookname_save_path = ""
book_name = sanitize_filename(book_name)
if debug_mode:
save_path = join_path(save_tmp_path, "md")
pdf_local_path = join_path(save_tmp_path, "download-pdfs", book_name)
if not os.path.exists(os.path.dirname(pdf_local_path)):
# 如果目录不存在,创建它
os.makedirs(os.path.dirname(pdf_local_path))
md_bookname_save_path = join_path(save_tmp_path, "md", book_name)
if not os.path.exists(md_bookname_save_path):
# 如果目录不存在,创建它
os.makedirs(md_bookname_save_path)
with open(pdf_local_path + ".pdf", "wb") as pdf_file:
pdf_file.write(pdf_bytes)
pdf_docs = fitz.open("pdf", pdf_bytes)
pdf_info_dict = {}
img_s3_client = get_img_s3_client(
save_path, image_s3_config
) # 更改函数名和参数,避免歧义
# img_s3_client = "img_s3_client" #不创建这个对象,直接用字符串占位
start_time = time.time()
"""通过统计pdf全篇文字,识别正文字体"""
main_text_font = get_main_text_font(pdf_docs)
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
for page_id in range(start_page_id, end_page_id + 1):
page = pdf_docs[page_id]
page_width = page.rect.width
page_height = page.rect.height
if debug_mode:
time_now = time.time()
logger.info(
f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}"
)
start_time = time_now
"""
# 通过一个规则,过滤掉单页超过1500非junkimg的pdf
# 对单页面非重复id的img数量做统计,如果当前页超过1500则直接return need_drop
"""
page_imgs = page.get_images()
img_counts = 0
for img in page_imgs:
img_bojid = img[0]
if img_bojid in junk_img_bojids: # 判断这个图片在不在junklist中
continue # 如果在junklist就不用管了,跳过
else:
recs = page.get_image_rects(img, transform=True)
if recs: # 如果这张图在当前页面有展示
img_counts += 1
if (
img_counts >= 1500
): # 如果去除了junkimg的影响,单页img仍然超过1500的话,就排除当前pdf
logger.warning(
f"page_id: {page_id}, img_counts: {img_counts}, drop this pdf: {book_name}, drop_reason: {DropReason.HIGH_COMPUTATIONAL_lOAD_BY_IMGS}"
)
result = {
"_need_drop": True,
"_drop_reason": DropReason.HIGH_COMPUTATIONAL_lOAD_BY_IMGS,
}
if not debug_mode:
return result
"""
==================================================================================================================================
首先获取基本的block数据,对pdf进行分解,获取图片、表格、公式、text的bbox
"""
# 解析pdf原始文本block
text_raw_blocks = page.get_text(
"dict",
flags=fitz.TEXTFLAGS_TEXT,
)["blocks"]
model_output_json = get_docx_model_output(
pdf_model_output, page_id
)
# 解析图片
image_bboxes = parse_images(page_id, page, model_output_json, junk_img_bojids)
image_bboxes = fix_image_vertical(
image_bboxes, text_raw_blocks
) # 修正图片的位置
image_bboxes = fix_seperated_image(image_bboxes) # 合并有边重合的图片
old_image_bboxes = deepcopy(image_bboxes)
image_bboxes = include_img_title(
text_raw_blocks, image_bboxes
) # 向图片上方和下方寻找title,使用规则进行匹配,暂时只支持英文规则
"""此时image_bboxes中可能出现这种情况,水平并列的2个图片,下方分别有各自的子标题,2个子标题下方又有大标题(形如Figxxx),会出现2个图片的bbox都包含了这个大标题,这种情况需要把图片合并"""
image_bboxes = combine_images(image_bboxes) # 合并图片
# 解析表格并对table_bboxes进行位置的微调,防止表格周围的文字被截断
table_bboxes = parse_tables(page_id, page, model_output_json)
table_bboxes = fix_tables(
page, table_bboxes, include_table_title=False, scan_line_num=2
) # 修正
table_bboxes = fix_table_text_block(
text_raw_blocks, table_bboxes
) # 修正与text block的关系,某些table修正与pymupdf获取到的table内textblock没有完全包含,因此要进行一次修正。
# debug_show_bbox(pdf_docs, page_id, table_bboxes, [], [b['bbox'] for b in text_raw_blocks], join_path(save_path, book_name, f"{book_name}_debug.pdf"), 7)
old_table_bboxes = deepcopy(table_bboxes)
table_bboxes = include_table_title(
text_raw_blocks, table_bboxes
) # 向table上方和下方寻找title,使用规则进行匹配,暂时只支持英文规则
# 解析公式
equations_inline_bboxes, equations_interline_bboxes = parse_equations(
page_id, page, model_output_json
)
# get image box and caption !
image_bboxes_with_caption = extract_caption_bbox(image_bboxes, old_image_bboxes)
# get table box and caption !
table_bboxes_with_caption = extract_caption_bbox(table_bboxes, old_table_bboxes)
"""
==================================================================================================================================
进入预处理-1阶段
-------------------
# # 解析标题
# title_bboxs = parse_titles(page_id, page, model_output_json)
# # 评估Layout是否规整、简单
# isSimpleLayout_flag, fullColumn_cnt, subColumn_cnt, curPage_loss = evaluate_pdf_layout(page_id, page, model_output_json)
接下来开始进行预处理过程
"""
# title_bboxs = parse_titles(page_id, page, model_output_json)
"""去掉每页的页码、页眉、页脚"""
page_no_bboxs = parse_pageNos(page_id, page, model_output_json)
header_bboxs = parse_headers(page_id, page, model_output_json)
footer_bboxs = parse_footers(page_id, page, model_output_json)
(
image_bboxes,
table_bboxes,
remain_text_blocks,
removed_hdr_foot_txt_block,
removed_hdr_foot_img_block,
removed_hdr_foot_table,
) = remove_headder_footer_one_page(
text_raw_blocks,
image_bboxes,
table_bboxes,
header_bboxs,
footer_bboxs,
page_no_bboxs,
page_width,
page_height,
)
"""去除页面上半部分长条色块内的文本块"""
remain_text_blocks, removed_colored_narrow_strip_background_text_block = (
remove_colored_strip_textblock(remain_text_blocks, page)
)
# debug_show_bbox(pdf_docs, page_id, footnote_bboxes_by_model, [b['bbox'] for b in remain_text_blocks], header_bboxs, join_path(save_path, book_name, f"{book_name}_debug.pdf"), 7)
"""去掉旋转的文字:水印、垂直排列的文字"""
remain_text_blocks, removed_non_horz_text_block = remove_rotate_side_textblock(
remain_text_blocks, page_width, page_height
) # 去掉水印,非水平文字
remain_text_blocks, removed_empty_side_block = remove_side_blank_block(
remain_text_blocks, page_width, page_height
) # 删除页面四周可能会留下的完全空白的textblock,这种block形成原因未知
"""出现在图片、表格上的文字块去掉,把层叠的图片单独分离出来,不参与layout的计算"""
(
image_bboxes,
table_bboxes,
equations_interline_bboxes,
equations_inline_bboxes,
remain_text_blocks,
text_block_on_image_removed,
images_overlap_backup,
interline_eq_temp_text_block,
) = resolve_bbox_overlap_conflict(
image_bboxes,
table_bboxes,
equations_interline_bboxes,
equations_inline_bboxes,
remain_text_blocks,
)
# """去掉footnote, 从文字和图片中"""
# # 通过模型识别到的footnote
# footnote_bboxes_by_model = parse_footnotes_by_model(page_id, page, model_output_json, md_bookname_save_path,
# debug_mode=debug_mode)
# # 通过规则识别到的footnote
# footnote_bboxes_by_rule = parse_footnotes_by_rule(remain_text_blocks, page_height, page_id)
"""
==================================================================================================================================
"""
if debug_mode: # debugmode截图到本地
save_path = join_path(save_tmp_path, "md")
# 把图、表、公式都进行截图,保存到存储上,返回图片路径作为内容
image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info = (
txt_save_images_by_bboxes(
book_name,
page_id,
page,
save_path,
image_bboxes,
images_overlap_backup,
table_bboxes,
equations_inline_bboxes,
equations_interline_bboxes,
# 传入img_s3_client
img_s3_client,
)
) # 只要表格和图片的截图
""""以下进入到公式替换环节 """
char_level_text_blocks = page.get_text("rawdict", flags=fitz.TEXTFLAGS_TEXT)[
"blocks"
]
remain_text_blocks = combine_chars_to_pymudict(
remain_text_blocks, char_level_text_blocks
) # 合并chars
remain_text_blocks = replace_equations_in_textblock(
remain_text_blocks, inline_eq_info, interline_eq_info
)
remain_text_blocks = remove_citation_marker(
remain_text_blocks
) # 公式替换之后去角标,防止公式无法替换成功。但是这样也会带来个问题就是把角标当公式。各有优劣。
remain_text_blocks = remove_chars_in_text_blocks(
remain_text_blocks
) # 减少中间态数据体积
# debug_show_bbox(pdf_docs, page_id, [b['bbox'] for b in inline_eq_info], [b['bbox'] for b in interline_eq_info], [], join_path(save_path, book_name, f"{book_name}_debug.pdf"), 3)
"""去掉footnote, 从文字和图片中(先去角标再去footnote试试)"""
# 通过模型识别到的footnote
footnote_bboxes_by_model = parse_footnotes_by_model(
page_id,
page,
model_output_json,
md_bookname_save_path,
debug_mode=debug_mode,
)
# 通过规则识别到的footnote
footnote_bboxes_by_rule = parse_footnotes_by_rule(
remain_text_blocks, page_height, page_id, main_text_font
)
"""进入pdf过滤器,去掉一些不合理的pdf"""
is_good_pdf, err = pdf_filter(
page, remain_text_blocks, table_bboxes, image_bboxes
)
if not is_good_pdf:
logger.warning(
f"page_id: {page_id}, drop this pdf: {book_name}, reason: {err}"
)
if not debug_mode:
return err
"""
==================================================================================================================================
进行版面布局切分和过滤
"""
"""在切分之前,先检查一下bbox是否有左右重叠的情况,如果有,那么就认为这个pdf暂时没有能力处理好,这种左右重叠的情况大概率是由于pdf里的行间公式、表格没有被正确识别出来造成的 """
is_text_block_horz_overlap = check_text_block_horizontal_overlap(
remain_text_blocks, header_bboxs, footer_bboxs
)
if is_text_block_horz_overlap:
# debug_show_bbox(pdf_docs, page_id, [b['bbox'] for b in remain_text_blocks], [], [], join_path(save_path, book_name, f"{book_name}_debug.pdf"), 0)
logger.warning(
f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.TEXT_BLCOK_HOR_OVERLAP}"
)
result = {
"_need_drop": True,
"_drop_reason": DropReason.TEXT_BLCOK_HOR_OVERLAP,
}
if not debug_mode:
return result
"""统一格式化成一个数据结构用于计算layout"""
page_y0 = 0 if len(header_bboxs) == 0 else max([b[3] for b in header_bboxs])
page_y1 = (
page_height if len(footer_bboxs) == 0 else min([b[1] for b in footer_bboxs])
)
left_x, right_x = get_side_boundry(
removed_non_horz_text_block, page_width, page_height
)
page_boundry = [
math.floor(left_x),
page_y0 + 1,
math.ceil(right_x),
page_y1 - 1,
]
# 返回的是一个数组,每个元素[x0, y0, x1, y1, block_content, idx_x, idx_y], 初始时候idx_x, idx_y都是None. 对于图片、公式来说,block_content是图片的地址, 对于段落来说,block_content是段落的内容
all_bboxes = prepare_bboxes_for_layout_split(
image_info,
image_backup_info,
table_info,
inline_eq_info,
interline_eq_info,
remain_text_blocks,
page_boundry,
page,
)
# debug_show_bbox(pdf_docs, page_id, [], [], all_bboxes, join_path(save_path, book_name, f"{book_name}_debug.pdf"), 1)
"""page_y0, page_y1能够过滤掉页眉和页脚,不会算作layout内"""
layout_bboxes, layout_tree = get_bboxes_layout(
all_bboxes, page_boundry, page_id
)
if (
len(remain_text_blocks) > 0
and len(all_bboxes) > 0
and len(layout_bboxes) == 0
):
logger.warning(
f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}"
)
result = {
"_need_drop": True,
"_drop_reason": DropReason.CAN_NOT_DETECT_PAGE_LAYOUT,
}
if not debug_mode:
return result
"""以下去掉复杂的布局和超过2列的布局"""
if any(
[lay["layout_label"] == LAYOUT_UNPROC for lay in layout_bboxes]
): # 复杂的布局
logger.warning(
f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.COMPLICATED_LAYOUT}"
)
result = {"_need_drop": True, "_drop_reason": DropReason.COMPLICATED_LAYOUT}
if not debug_mode:
return result
layout_column_width = get_columns_cnt_of_layout(layout_tree)
if layout_column_width > 2: # 去掉超过2列的布局pdf
logger.warning(
f"page_id: {page_id}, drop this pdf: {book_name}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}"
)
result = {
"_need_drop": True,
"_drop_reason": DropReason.TOO_MANY_LAYOUT_COLUMNS,
"extra_info": {"column_cnt": layout_column_width},
}
if not debug_mode:
return result
"""
==================================================================================================================================
构造出下游需要的数据结构
"""
remain_text_blocks = (
remain_text_blocks + interline_eq_temp_text_block
) # 把计算layout时候临时删除的行间公式再放回去,防止行间公式替换的时候丢失。
removed_text_blocks = []
removed_text_blocks.extend(removed_hdr_foot_txt_block)
# removed_text_blocks.extend(removed_footnote_text_block)
removed_text_blocks.extend(text_block_on_image_removed)
removed_text_blocks.extend(removed_non_horz_text_block)
removed_text_blocks.extend(removed_colored_narrow_strip_background_text_block)
removed_images = []
# removed_images.extend(footnote_imgs)
removed_images.extend(removed_hdr_foot_img_block)
images_backup = []
images_backup.extend(image_backup_info)
remain_text_blocks = escape_special_markdown_char(
remain_text_blocks
) # 转义span里的text
sorted_text_remain_text_block = sort_text_block(
remain_text_blocks, layout_bboxes
)
footnote_bboxes_tmp = []
footnote_bboxes_tmp.extend(footnote_bboxes_by_model)
footnote_bboxes_tmp.extend(footnote_bboxes_by_rule)
page_info = construct_page_component(
page_id,
image_info,
table_info,
sorted_text_remain_text_block,
layout_bboxes,
inline_eq_info,
interline_eq_info,
page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"],
removed_text_blocks=removed_text_blocks,
removed_image_blocks=removed_images,
images_backup=images_backup,
droped_table_block=[],
table_backup=[],
layout_tree=layout_tree,
page_w=page.rect.width,
page_h=page.rect.height,
footnote_bboxes_tmp=footnote_bboxes_tmp,
)
page_info["image_bboxes_with_caption"] = image_bboxes_with_caption # add by xr
page_info["table_bboxes_with_caption"] = table_bboxes_with_caption
page_info["bak_page_no_bboxes"] = page_no_bboxs
page_info["bak_header_bboxes"] = header_bboxs
page_info["bak_footer_bboxes"] = footer_bboxs
page_info["bak_footer_note_bboxes"] = footnote_bboxes_tmp
pdf_info_dict[f"page_{page_id}"] = page_info
# end page for
"""计算后处理阶段耗时"""
start_time = time.time()
"""
==================================================================================================================================
去掉页眉和页脚,这里需要用到一定的统计量,所以放到最后
页眉和页脚主要从文本box和图片box中去除,位于页面的四周。
下面函数会直接修改pdf_info_dict,从文字块中、图片中删除属于页眉页脚的内容,删除内容做相对应记录
"""
# 去页眉页脚
header, footer = drop_footer_header(
pdf_info_dict
) # TODO: using header and footer boxes here !
"""对单个layout内footnote和他下面的所有textbbox合并"""
for page_key, page_info in pdf_info_dict.items():
page_info = merge_footnote_blocks(page_info, main_text_font)
page_info = remove_footnote_blocks(page_info)
pdf_info_dict[page_key] = page_info
"""进入pdf后置过滤器,去掉一些不合理的pdf"""
i = 0
for page_info in pdf_info_dict.values():
is_good_pdf, err = pdf_post_filter(page_info)
if not is_good_pdf:
logger.warning(f"page_id: {i}, drop this pdf: {book_name}, reason: {err}")
if not debug_mode:
return err
i += 1
if debug_mode:
params_file_save_path = join_path(
save_tmp_path, "md", book_name, "preproc_out.json"
)
page_draw_rect_save_path = join_path(
save_tmp_path, "md", book_name, "layout.pdf"
)
# dir_path = os.path.dirname(page_draw_rect_save_path)
# if not os.path.exists(dir_path):
# # 如果目录不存在,创建它
# os.makedirs(dir_path)
with open(params_file_save_path, "w", encoding="utf-8") as f:
json.dump(pdf_info_dict, f, ensure_ascii=False, indent=4)
# 先检测本地 page_draw_rect_save_path 是否存在,如果存在则删除
if os.path.exists(page_draw_rect_save_path):
os.remove(page_draw_rect_save_path)
# 绘制bbox和layout到pdf
draw_bbox_on_page(pdf_docs, pdf_info_dict, page_draw_rect_save_path)
draw_layout_bbox_on_page(
pdf_docs, pdf_info_dict, header, footer, page_draw_rect_save_path
)
if debug_mode:
# 打印后处理阶段耗时
logger.info(f"post_processing_time: {get_delta_time(start_time)}")
"""
==================================================================================================================================
进入段落处理-2阶段
"""
# 处理行内文字间距较大问题
pdf_info_dict = solve_inline_too_large_interval(pdf_info_dict)
start_time = time.time()
para_process_pipeline = ParaProcessPipeline()
def _deal_with_text_exception(error_info):
logger.warning(
f"page_id: {page_id}, drop this pdf: {book_name}, reason: {error_info}"
)
if error_info == denseSingleLineBlockException_msg:
logger.warning(
f"Drop this pdf: {book_name}, reason: {DropReason.DENSE_SINGLE_LINE_BLOCK}"
)
result = {
"_need_drop": True,
"_drop_reason": DropReason.DENSE_SINGLE_LINE_BLOCK,
}
return result
if error_info == titleDetectionException_msg:
logger.warning(
f"Drop this pdf: {book_name}, reason: {DropReason.TITLE_DETECTION_FAILED}"
)
result = {
"_need_drop": True,
"_drop_reason": DropReason.TITLE_DETECTION_FAILED,
}
return result
elif error_info == titleLevelException_msg:
logger.warning(
f"Drop this pdf: {book_name}, reason: {DropReason.TITLE_LEVEL_FAILED}"
)
result = {"_need_drop": True, "_drop_reason": DropReason.TITLE_LEVEL_FAILED}
return result
elif error_info == paraSplitException_msg:
logger.warning(
f"Drop this pdf: {book_name}, reason: {DropReason.PARA_SPLIT_FAILED}"
)
result = {"_need_drop": True, "_drop_reason": DropReason.PARA_SPLIT_FAILED}
return result
elif error_info == paraMergeException_msg:
logger.warning(
f"Drop this pdf: {book_name}, reason: {DropReason.PARA_MERGE_FAILED}"
)
result = {"_need_drop": True, "_drop_reason": DropReason.PARA_MERGE_FAILED}
return result
if debug_mode:
input_pdf_file = f"{pdf_local_path}.pdf"
output_dir = f"{save_path}/{book_name}"
output_pdf_file = f"{output_dir}/pdf_annos.pdf"
"""
Call the para_process_pipeline function to process the pdf_info_dict.
Parameters:
para_debug_mode: str or None
If para_debug_mode is None, the para_process_pipeline will not keep any intermediate results.
If para_debug_mode is "simple", the para_process_pipeline will only keep the annos on the pdf and the final results as a json file.
If para_debug_mode is "full", the para_process_pipeline will keep all the intermediate results generated during each step.
"""
pdf_info_dict, error_info = para_process_pipeline.para_process_pipeline(
pdf_info_dict,
para_debug_mode="simple",
input_pdf_path=input_pdf_file,
output_pdf_path=output_pdf_file,
)
# 打印段落处理阶段耗时
logger.info(f"para_process_time: {get_delta_time(start_time)}")
# debug的时候不return drop信息
if error_info is not None:
_deal_with_text_exception(error_info)
return pdf_info_dict
else:
pdf_info_dict, error_info = para_process_pipeline.para_process_pipeline(
pdf_info_dict
)
if error_info is not None:
return _deal_with_text_exception(error_info)
return pdf_info_dict
......@@ -2,13 +2,16 @@ import os
import click
from loguru import logger
from pathlib import Path
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
import magic_pdf.model as model_config
from magic_pdf.tools.common import parse_pdf_methods, do_parse
from magic_pdf.libs.version import __version__
@click.command()
@click.version_option(__version__, "--version", "-v", help="display the version and exit")
@click.option(
"-p",
"--path",
......@@ -32,8 +35,9 @@ from magic_pdf.tools.common import parse_pdf_methods, do_parse
type=parse_pdf_methods,
help="""the method for parsing pdf.
ocr: using ocr technique to extract information from pdf.
txt: suitable for the text-based pdf only and outperform ocr.
auto: automatically choose the best method for parsing pdf from ocr and txt""",
txt: suitable for the text-based pdf only and outperform ocr.
auto: automatically choose the best method for parsing pdf from ocr and txt.
without method specified, auto will be used by default.""",
default="auto",
)
def cli(path, output_dir, method):
......
......@@ -15,6 +15,7 @@ from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
import magic_pdf.model as model_config
from magic_pdf.tools.common import parse_pdf_methods, do_parse
from magic_pdf.libs.version import __version__
def read_s3_path(s3path):
......@@ -39,6 +40,7 @@ def read_s3_path(s3path):
@click.group()
@click.version_option(__version__, "--version", "-v", help="显示版本信息")
def cli():
pass
......
def convert_to_train_format(jso: dict) -> []:
pages = []
for k, v in jso.items():
if not k.startswith("page_"):
continue
page_idx = v["page_idx"]
width, height = v["page_size"]
info = {"page_info": {"page_no": page_idx, "height": height, "width": width}}
bboxes: list[dict] = []
for img_bbox in v["image_bboxes_with_caption"]:
bbox = {"category_id": 1, "bbox": img_bbox["bbox"]}
if "caption" in img_bbox:
bbox["caption_bbox"] = img_bbox["caption"]
bboxes.append(bbox)
for tbl_bbox in v["table_bboxes_with_caption"]:
bbox = {"category_id": 7, "bbox": tbl_bbox["bbox"]}
if "caption" in tbl_bbox:
bbox["caption_bbox"] = tbl_bbox["caption"]
bboxes.append(bbox)
for bbox in v["bak_page_no_bboxes"]:
n_bbox = {"category_id": 4, "bbox": bbox}
bboxes.append(n_bbox)
for bbox in v["bak_header_bboxes"]:
n_bbox = {"category_id": 3, "bbox": bbox}
bboxes.append(n_bbox)
for bbox in v["bak_footer_bboxes"]:
n_bbox = {"category_id": 6, "bbox": bbox}
bboxes.append(n_bbox)
# 脚注, 目前没有看到例子
for para in v["para_blocks"]:
if "paras" in para:
paras = para["paras"]
for para_key, para_content in paras.items():
para_bbox = para_content["para_bbox"]
is_para_title = para_content["is_para_title"]
if is_para_title:
n_bbox = {"category_id": 0, "bbox": para_bbox}
else:
n_bbox = {"category_id": 2, "bbox": para_bbox}
bboxes.append(n_bbox)
for inline_equation in v["inline_equations"]:
n_bbox = {"category_id": 13, "bbox": inline_equation["bbox"]}
bboxes.append(n_bbox)
for inter_equation in v["interline_equations"]:
n_bbox = {"category_id": 10, "bbox": inter_equation["bbox"]}
bboxes.append(n_bbox)
for footnote_bbox in v["bak_footer_note_bboxes"]:
n_bbox = {"category_id": 5, "bbox": list(footnote_bbox)}
bboxes.append(n_bbox)
info["bboxes"] = bboxes
info["layout_tree"] = v["layout_bboxes"]
pages.append(info)
return pages
from magic_pdf.libs.boxbase import _is_in
def extract_caption_bbox(outer: list, inner: list) -> list:
"""
ret: list of {
"bbox": [1,2,3,4],
"caption": [5,6,7,8] # may existed
}
"""
found_count = 0 # for debug
print(outer, inner)
def is_float_equal(a, b):
if 0.01 > abs(a - b): # non strict float equal compare
return True
return False
outer_h = {i: outer[i] for i in range(len(outer))}
ret = []
for v in inner:
ix0, iy0, ix1, iy1 = v
found_idx = None
d = {"bbox": v[:4]}
for k in outer_h:
ox0, oy0, ox1, oy1 = outer_h[k]
equal_float_flags = [
is_float_equal(ix0, ox0),
is_float_equal(iy0, oy0),
is_float_equal(ix1, ox1),
is_float_equal(iy1, oy1),
]
if _is_in(v, outer_h[k]) and not all(equal_float_flags):
found_idx = k
break
if found_idx is not None:
found_count += 1
captions: list[list] = []
ox0, oy0, ox1, oy1 = outer_h[found_idx]
captions = [
[ox0, oy0, ix0, oy1],
[ox0, oy0, ox1, iy0],
[ox0, iy1, ox1, oy1],
[ix1, oy0, ox1, oy1],
]
captions = sorted(
captions,
key=lambda rect: abs(rect[0] - rect[2]) * abs(rect[1] - rect[3]),
) # 面积最大的框就是caption
d["caption"] = captions[-1]
outer_h.pop(
found_idx
) # 同一个 outer box 只能用于确定一个 inner box 的 caption 位置。
ret.append(d)
print("found_count: ", found_count)
return ret
import re
from magic_pdf.libs.boxbase import _is_in_or_part_overlap
from magic_pdf.libs.drop_tag import CONTENT_IN_FOOT_OR_HEADER, PAGE_NO
"""
copy from pre_proc/remove_footer_header.py
"""
def remove_headder_footer_one_page(
text_raw_blocks,
image_bboxes,
table_bboxes,
header_bboxs,
footer_bboxs,
page_no_bboxs,
page_w,
page_h,
):
"""
删除页眉页脚,页码
从line级别进行删除,删除之后观察这个text-block是否是空的,如果是空的,则移动到remove_list中
"""
if 1:
return image_bboxes, table_bboxes, text_raw_blocks, [], [], []
header = []
footer = []
if len(header) == 0:
model_header = header_bboxs
if model_header:
x0 = min([x for x, _, _, _ in model_header])
y0 = min([y for _, y, _, _ in model_header])
x1 = max([x1 for _, _, x1, _ in model_header])
y1 = max([y1 for _, _, _, y1 in model_header])
header = [x0, y0, x1, y1]
if len(footer) == 0:
model_footer = footer_bboxs
if model_footer:
x0 = min([x for x, _, _, _ in model_footer])
y0 = min([y for _, y, _, _ in model_footer])
x1 = max([x1 for _, _, x1, _ in model_footer])
y1 = max([y1 for _, _, _, y1 in model_footer])
footer = [x0, y0, x1, y1]
header_y0 = 0 if len(header) == 0 else header[3]
footer_y0 = page_h if len(footer) == 0 else footer[1]
if page_no_bboxs:
top_part = [b for b in page_no_bboxs if b[3] < page_h / 2]
btn_part = [b for b in page_no_bboxs if b[1] > page_h / 2]
top_max_y0 = max([b[1] for b in top_part]) if top_part else 0
btn_min_y1 = min([b[3] for b in btn_part]) if btn_part else page_h
header_y0 = max(header_y0, top_max_y0)
footer_y0 = min(footer_y0, btn_min_y1)
content_boundry = [0, header_y0, page_w, footer_y0]
header = [0, 0, page_w, header_y0]
footer = [0, footer_y0, page_w, page_h]
"""以上计算出来了页眉页脚的边界,下面开始进行删除"""
text_block_to_remove = []
# 首先检查每个textblock
for blk in text_raw_blocks:
if len(blk["lines"]) > 0:
for line in blk["lines"]:
line_del = []
for span in line["spans"]:
span_del = []
if span["bbox"][3] < header_y0:
span_del.append(span)
elif _is_in_or_part_overlap(
span["bbox"], header
) or _is_in_or_part_overlap(span["bbox"], footer):
span_del.append(span)
for span in span_del:
line["spans"].remove(span)
if not line["spans"]:
line_del.append(line)
for line in line_del:
blk["lines"].remove(line)
else:
# if not blk['lines']:
blk["tag"] = CONTENT_IN_FOOT_OR_HEADER
text_block_to_remove.append(blk)
"""有的时候由于pageNo太小了,总是会有一点和content_boundry重叠一点,被放入正文,因此对于pageNo,进行span粒度的删除"""
page_no_block_2_remove = []
if page_no_bboxs:
for pagenobox in page_no_bboxs:
for block in text_raw_blocks:
if _is_in_or_part_overlap(
pagenobox, block["bbox"]
): # 在span级别删除页码
for line in block["lines"]:
for span in line["spans"]:
if _is_in_or_part_overlap(pagenobox, span["bbox"]):
# span['text'] = ''
span["tag"] = PAGE_NO
# 检查这个block是否只有这一个span,如果是,那么就把这个block也删除
if len(line["spans"]) == 1 and len(block["lines"]) == 1:
page_no_block_2_remove.append(block)
else:
# 测试最后一个是不是页码:规则是,最后一个block仅有1个line,一个span,且text是数字,空格,符号组成,不含字母,并且包含数字
if len(text_raw_blocks) > 0:
text_raw_blocks.sort(key=lambda x: x["bbox"][1], reverse=True)
last_block = text_raw_blocks[0]
if len(last_block["lines"]) == 1:
last_line = last_block["lines"][0]
if len(last_line["spans"]) == 1:
last_span = last_line["spans"][0]
if (
last_span["text"].strip()
and not re.search("[a-zA-Z]", last_span["text"])
and re.search("[0-9]", last_span["text"])
):
last_span["tag"] = PAGE_NO
page_no_block_2_remove.append(last_block)
for b in page_no_block_2_remove:
text_block_to_remove.append(b)
for blk in text_block_to_remove:
if blk in text_raw_blocks:
text_raw_blocks.remove(blk)
text_block_remain = text_raw_blocks
image_bbox_to_remove = [
bbox
for bbox in image_bboxes
if not _is_in_or_part_overlap(bbox, content_boundry)
]
image_bbox_remain = [
bbox for bbox in image_bboxes if _is_in_or_part_overlap(bbox, content_boundry)
]
table_bbox_to_remove = [
bbox
for bbox in table_bboxes
if not _is_in_or_part_overlap(bbox, content_boundry)
]
table_bbox_remain = [
bbox for bbox in table_bboxes if _is_in_or_part_overlap(bbox, content_boundry)
]
# 1, 2, 3
return (
image_bbox_remain,
table_bbox_remain,
text_block_remain,
text_block_to_remove,
image_bbox_to_remove,
table_bbox_to_remove,
)
from magic_pdf.libs.commons import fitz
import os
from magic_pdf.libs.coordinate_transform import get_scale_ratio
def draw_model_output(
raw_pdf_doc: fitz.Document, paras_dict_arr: list[dict], save_path: str
):
"""
在page上画出bbox,保存到save_path
"""
"""
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
"""
color_map = {
"body": fitz.pdfcolor["green"],
"non_body": fitz.pdfcolor["red"],
}
"""
{"layout_dets": [], "subfield_dets": [], "page_info": {"page_no": 22, "height": 1650, "width": 1275}}
"""
for i, page in enumerate(raw_pdf_doc):
v = paras_dict_arr[i]
page_idx = v["page_info"]["page_no"]
width = v["page_info"]["width"]
height = v["page_info"]["height"]
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(
paras_dict_arr[i], page
)
for order, block in enumerate(v["layout_dets"]):
L = block["poly"][0] / horizontal_scale_ratio
U = block["poly"][1] / vertical_scale_ratio
R = block["poly"][2] / horizontal_scale_ratio
D = block["poly"][5] / vertical_scale_ratio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
bbox = [L, U, R, D]
color = color_map["body"]
if block["category_id"] in (3, 4, 5, 6, 0):
color = color_map["non_body"]
rect = fitz.Rect(bbox)
page.draw_rect(rect, fill=None, width=0.5, overlay=True, color=color)
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
raw_pdf_doc.save(save_path)
def debug_show_bbox(
raw_pdf_doc: fitz.Document,
page_idx: int,
bboxes: list,
droped_bboxes: list,
expect_drop_bboxes: list,
save_path: str,
expected_page_id: int,
):
"""
以覆盖的方式写个临时的pdf,用于debug
"""
if page_idx != expected_page_id:
return
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open("")
width = raw_pdf_doc[page_idx].rect.width
height = raw_pdf_doc[page_idx].rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(
color=fitz.pdfcolor["red"], fill=fitz.pdfcolor["blue"], fill_opacity=0.2
)
shape.finish()
shape.commit()
for bbox in droped_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor["yellow"], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in expect_drop_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=None)
shape.finish()
shape.commit()
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(bboxes)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def debug_show_page(
page,
bboxes1: list,
bboxes2: list,
bboxes3: list,
):
save_path = "./tmp/debug.pdf"
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open("")
width = page.rect.width
height = page.rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes1:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(
color=fitz.pdfcolor["red"], fill=fitz.pdfcolor["blue"], fill_opacity=0.2
)
shape.finish()
shape.commit()
for bbox in bboxes2:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor["yellow"], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes3:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=None)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def draw_layout_bbox_on_page(
raw_pdf_doc: fitz.Document, paras_dict: dict, header, footer, pdf_path: str
):
"""
在page上画出bbox,保存到save_path
"""
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open("")
for k, v in paras_dict.items():
page_idx = v["page_idx"]
layouts = v["layout_bboxes"]
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(layouts):
border_offset = 1
rect_box = layout["layout_bbox"]
layout_label = layout["layout_label"]
fill_color = fitz.pdfcolor["pink"] if layout_label == "U" else None
rect_box = [
rect_box[0] + 1,
rect_box[1] - border_offset,
rect_box[2] - 1,
rect_box[3] + border_offset,
]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=fill_color, fill_opacity=0.4)
"""
draw order text on layout box
"""
font_size = 10
shape.insert_text(
(rect_box[0] + 1, rect_box[1] + font_size),
f"{order}",
fontsize=font_size,
color=(0, 0, 0),
)
"""画上footer header"""
if header:
shape.draw_rect(fitz.Rect(header))
shape.finish(color=None, fill=fitz.pdfcolor["black"], fill_opacity=0.2)
if footer:
shape.draw_rect(fitz.Rect(footer))
shape.finish(color=None, fill=fitz.pdfcolor["black"], fill_opacity=0.2)
shape.commit()
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
@DeprecationWarning
def draw_layout_on_page(
raw_pdf_doc: fitz.Document, page_idx: int, page_layout: list, pdf_path: str
):
"""
把layout的box用红色边框花在pdf_path的page_idx上
"""
def draw(shape, layout, fill_color=fitz.pdfcolor["pink"]):
border_offset = 1
rect_box = layout["layout_bbox"]
layout_label = layout["layout_label"]
sub_layout = layout["sub_layout"]
if len(sub_layout) == 0:
fill_color = fill_color if layout_label == "U" else None
rect_box = [
rect_box[0] + 1,
rect_box[1] - border_offset,
rect_box[2] - 1,
rect_box[3] + border_offset,
]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=fill_color, fill_opacity=0.2)
# if layout_label=='U':
# bad_boxes = layout.get("bad_boxes", [])
# for bad_box in bad_boxes:
# rect = fitz.Rect(*bad_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['red'], fill_opacity=0.2)
# else:
# rect = fitz.Rect(*rect_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['blue'])
for sub_layout in sub_layout:
draw(shape, sub_layout)
shape.commit()
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open("")
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(page_layout):
draw(shape, layout, fitz.pdfcolor["yellow"])
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(layout)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(pdf_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment