Commit c9af3457 authored by 赵小蒙's avatar 赵小蒙

delete useless files

parent 89518eff
import json
from magic_pdf.libs.config_reader import get_s3_config_dict
from magic_pdf.libs.commons import join_path, read_file, json_dump_path
local_json_path = "Z:/format.json"
local_jsonl_path = "Z:/format.jsonl"
def get_json_from_local_or_s3(book_name=None):
if book_name is None:
with open(local_json_path, "r", encoding="utf-8") as json_file:
json_line = json_file.read()
json_object = json.loads(json_line)
else:
# error_log_path & json_dump_path
# 可配置从上述两个地址获取源json
json_path = join_path(json_dump_path, book_name + ".json")
s3_config = get_s3_config_dict(json_path)
file_content = read_file(json_path, s3_config)
json_str = file_content.decode("utf-8")
# logger.info(json_str)
json_object = json.loads(json_str)
return json_object
def write_json_to_local(jso, book_name=None):
if book_name is None:
with open(local_json_path, "w", encoding="utf-8") as file:
file.write(json.dumps(jso, ensure_ascii=False))
else:
pass
\ No newline at end of file
import json
import os
from tqdm import tqdm
from magic_pdf.libs.commons import join_path
with open('/mnt/petrelfs/share_data/ouyanglinke/OCR/OCR_validation_dataset.json', 'r') as f:
samples = json.load(f)
pdf_model_dir = 's3://llm-pdf-text/eval_1k/layout_res/'
labels = []
det_res = []
edit_distance_list = []
for sample in tqdm(samples):
pdf_name = sample['pdf_name']
page_num = sample['page']
pdf_model_path = join_path(pdf_model_dir, pdf_name)
model_output_json = join_path(pdf_model_path, f"page_{page_num}.json") # 模型输出的页面编号从1开始的
save_root_path = '/mnt/petrelfs/share_data/ouyanglinke/OCR/OCR_val_docxchain/'
save_path = join_path(save_root_path, pdf_name)
os.makedirs(save_path, exist_ok=True)
# print("s3c cp {} {}".format(model_output_json, save_path))
os.system("aws --profile langchao --endpoint-url=http://10.140.85.161:80 s3 cp {} {}".format(model_output_json, save_path))
import json
import os
import sys
import time
from loguru import logger
from pathlib import Path
from magic_pdf.libs.config_reader import get_s3_config_dict
from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
from demo.demo_commons import get_json_from_local_or_s3
from magic_pdf.dict2md.ocr_mkcontent import (
ocr_mk_mm_markdown_with_para,
make_standard_format_with_para
)
from magic_pdf.libs.commons import join_path, read_file, formatted_time
def save_markdown(markdown_text, input_filepath):
# 获取输入文件的目录
directory = os.path.dirname(input_filepath)
# 获取输入文件的文件名(不带扩展名)
base_name = os.path.basename(input_filepath)
file_name_without_ext = os.path.splitext(base_name)[0]
# 定义输出文件的路径
output_filepath = os.path.join(directory, f"{file_name_without_ext}.md")
# 将Markdown文本写入.md文件
with open(output_filepath, 'w', encoding='utf-8') as file:
file.write(markdown_text)
def read_json_file(file_path):
with open(file_path, 'r') as f:
data = json.load(f)
return data
def ocr_local_parse(ocr_pdf_path, ocr_json_file_path):
try:
ocr_pdf_model_info = read_json_file(ocr_json_file_path)
pth = Path(ocr_json_file_path)
book_name = pth.name
pdf_bytes = read_file(ocr_pdf_path, None)
ocr_parse_core(book_name, pdf_bytes, ocr_pdf_model_info)
except Exception as e:
logger.exception(e)
def ocr_online_parse(book_name, start_page_id=0, debug_mode=True):
try:
json_object = get_json_from_local_or_s3(book_name)
# logger.info(json_object)
s3_pdf_path = json_object["file_location"]
s3_config = get_s3_config_dict(s3_pdf_path)
pdf_bytes = read_file(s3_pdf_path, s3_config)
ocr_pdf_model_info = json_object.get("doc_layout_result")
ocr_parse_core(book_name, pdf_bytes, ocr_pdf_model_info)
except Exception as e:
logger.exception(e)
def ocr_parse_core(book_name, pdf_bytes, ocr_pdf_model_info, start_page_id=0):
save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest")
save_path = join_path(save_tmp_path, "md")
save_path_with_bookname = os.path.join(save_path, book_name)
text_content_save_path = f"{save_path_with_bookname}/book.md"
pdf_info_dict, parse_time = ocr_parse_pdf_core(pdf_bytes, ocr_pdf_model_info, book_name, start_page_id=start_page_id, debug_mode=True)
parent_dir = os.path.dirname(text_content_save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
# markdown_content = mk_nlp_markdown(pdf_info_dict)
markdown_content = ocr_mk_mm_markdown_with_para(pdf_info_dict)
# markdown_pagination = ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_dict)
with open(text_content_save_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
standard_format = make_standard_format_with_para(pdf_info_dict)
standard_format_save_path = f"{save_path_with_bookname}/standard_format.txt"
with open(standard_format_save_path, "w", encoding="utf-8") as f:
# 将standard_format dump成json文本并保存
f.write(json.dumps(standard_format, ensure_ascii=False))
def ocr_parse_pdf_core(pdf_bytes, model_output_json_list, book_name, start_page_id=0, debug_mode=False):
start_time = time.time() # 记录开始时间
# 先打印一下book_name和解析开始的时间
logger.info(
f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
file=sys.stderr,
)
pdf_info_dict = parse_pdf_by_ocr(
pdf_bytes,
model_output_json_list,
"",
book_name,
pdf_model_profile=None,
start_page_id=start_page_id,
debug_mode=debug_mode,
)
end_time = time.time() # 记录完成时间
parse_time = int(end_time - start_time) # 计算执行时间
# 解析完成后打印一下book_name和耗时
logger.info(
f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
file=sys.stderr,
)
return pdf_info_dict, parse_time
if __name__ == '__main__':
pdf_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/j.1540-627x.2006.00176.x.pdf"
json_file_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/j.1540-627x.2006.00176.x.json"
# ocr_local_parse(pdf_path, json_file_path)
book_name = "数学新星网/edu_00001236"
ocr_online_parse(book_name)
pass
import json
import os
import sys
from pathlib import Path
import click
from loguru import logger
from magic_pdf.libs.commons import join_path, read_file
from magic_pdf.dict2md.mkcontent import mk_mm_markdown, mk_universal_format
from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
def main(s3_pdf_path: str, s3_pdf_profile: str, pdf_model_path: str, pdf_model_profile: str, start_page_num=0, debug_mode=True):
""" """
pth = Path(s3_pdf_path)
book_name = pth.name
# book_name = "".join(os.path.basename(s3_pdf_path).split(".")[0:-1])
save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "..", "tmp", "unittest")
save_path = join_path(save_tmp_path, "md")
text_content_save_path = f"{save_path}/{book_name}/book.md"
# metadata_save_path = f"{save_path}/{book_name}/metadata.json"
pdf_bytes = read_file(s3_pdf_path, s3_pdf_profile)
try:
paras_dict = parse_pdf_by_txt(
pdf_bytes, pdf_model_path, save_path, book_name, pdf_model_profile, start_page_num, debug_mode=debug_mode
)
parent_dir = os.path.dirname(text_content_save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if not paras_dict.get('need_drop'):
content_list = mk_universal_format(paras_dict)
markdown_content = mk_mm_markdown(content_list)
else:
markdown_content = paras_dict['drop_reason']
with open(text_content_save_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
except Exception as e:
print(f"ERROR: {s3_pdf_path}, {e}", file=sys.stderr)
logger.exception(e)
@click.command()
@click.option("--pdf-file-path", help="s3上pdf文件的路径")
@click.option("--save-path", help="解析出来的图片,文本的保存父目录")
def main_shell(pdf_file_path: str, save_path: str):
# pdf_bin_file_path = "s3://llm-raw-snew/llm-raw-scihub/scimag07865000-07865999/10.1007/"
pdf_bin_file_parent_path = "s3://llm-raw-snew/llm-raw-scihub/"
pdf_bin_file_profile = "s2"
pdf_model_parent_dir = "s3://llm-pdf-text/eval_1k/layout_res/"
pdf_model_profile = "langchao"
p = Path(pdf_file_path)
pdf_parent_path = p.parent
pdf_file_name = p.name # pdf文件名字,含后缀
pdf_bin_file_path = join_path(pdf_bin_file_parent_path, pdf_parent_path)
pdf_model_dir = join_path(pdf_model_parent_dir, pdf_parent_path)
main(
join_path(pdf_bin_file_path, pdf_file_name),
pdf_bin_file_profile,
join_path(pdf_model_dir, pdf_file_name),
pdf_model_profile,
save_path,
)
@click.command()
@click.option("--pdf-dir", help="本地pdf文件的路径")
@click.option("--model-dir", help="本地模型文件的路径")
@click.option("--start-page-num", default=0, help="从第几页开始解析")
def main_shell2(pdf_dir: str, model_dir: str,start_page_num: int):
# 先扫描所有的pdf目录里的文件名字
pdf_dir = Path(pdf_dir)
model_dir = Path(model_dir)
if pdf_dir.is_file():
pdf_file_names = [pdf_dir.name]
pdf_dir = pdf_dir.parent
else:
pdf_file_names = [f.name for f in pdf_dir.glob("*.pdf")]
for pdf_file in pdf_file_names:
pdf_file_path = os.path.join(pdf_dir, pdf_file)
model_file_path = os.path.join(model_dir, pdf_file).rstrip(".pdf") + ".json"
with open(model_file_path, "r") as json_file:
model_list = json.load(json_file)
main(pdf_file_path, None, model_list, None, start_page_num)
if __name__ == "__main__":
main_shell2()
from pathlib import Path
import click
import json
from demo.pdf2md import main
@click.command()
@click.option("--pdf-file-path", help="s3上pdf文件的路径")
@click.option("--pdf-name", help="pdf name")
def main_shell(pdf_file_path: str, pdf_name: str):
with open('/mnt/petrelfs/share_data/ouyanglinke/OCR/OCR_validation_dataset_final_rotated_formulafix_highdpi_scihub.json', 'r') as f:
samples = json.load(f)
for sample in samples:
pdf_file_path = sample['s3_path']
pdf_bin_file_profile = "outsider"
pdf_name = sample['pdf_name']
pdf_model_dir = f"s3://llm-pdf-text/eval_1k/layout_res/{pdf_name}"
pdf_model_profile = "langchao"
p = Path(pdf_file_path)
pdf_file_name = p.name # pdf文件名字,含后缀
#pdf_model_dir = join_path(pdf_model_parent_dir, pdf_file_name)
main(
pdf_file_path,
pdf_bin_file_profile,
pdf_model_dir,
pdf_model_profile,
debug_mode=True,
)
if __name__ == "__main__":
main_shell()
import json
import os
import sys
from pathlib import Path
import click
from demo.demo_commons import get_json_from_local_or_s3, write_json_to_local, local_jsonl_path, local_json_path
from magic_pdf.dict2md.mkcontent import mk_mm_markdown, mk_universal_format
from magic_pdf.filter.pdf_classify_by_type import classify
from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
from magic_pdf.libs.commons import join_path, read_file
from loguru import logger
from magic_pdf.libs.config_reader import get_s3_config_dict
from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
from magic_pdf.spark.spark_api import get_data_source
def demo_parse_pdf(book_name=None, start_page_id=0, debug_mode=True):
json_object = get_json_from_local_or_s3(book_name)
s3_pdf_path = json_object.get("file_location")
s3_config = get_s3_config_dict(s3_pdf_path)
pdf_bytes = read_file(s3_pdf_path, s3_config)
model_output_json_list = json_object.get("doc_layout_result")
data_source = get_data_source(json_object)
file_id = json_object.get("file_id")
junk_img_bojids = json_object["pdf_meta"]["junk_img_bojids"]
save_path = ""
pdf_info_dict = parse_pdf_by_txt(
pdf_bytes,
model_output_json_list,
save_path,
f"{data_source}/{file_id}",
pdf_model_profile=None,
start_page_id=start_page_id,
junk_img_bojids=junk_img_bojids,
debug_mode=debug_mode,
)
write_json_to_local(pdf_info_dict, book_name)
content_list = mk_universal_format(pdf_info_dict)
markdown_content = mk_mm_markdown(content_list)
if book_name is not None:
save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest", "md", book_name)
uni_format_save_path = join_path(save_tmp_path, "book" + ".json")
markdown_save_path = join_path(save_tmp_path, "book" + ".md")
with open(uni_format_save_path, "w", encoding="utf-8") as f:
f.write(json.dumps(content_list, ensure_ascii=False, indent=4))
with open(markdown_save_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
else:
logger.info(json.dumps(content_list, ensure_ascii=False))
def demo_classify_by_type(book_name=None, debug_mode=True):
json_object = get_json_from_local_or_s3(book_name)
pdf_meta = json_object.get("pdf_meta")
total_page = pdf_meta["total_page"]
page_width = pdf_meta["page_width_pts"]
page_height = pdf_meta["page_height_pts"]
img_sz_list = pdf_meta["image_info_per_page"]
img_num_list = pdf_meta["imgs_per_page"]
text_len_list = pdf_meta["text_len_per_page"]
text_layout_list = pdf_meta["text_layout_per_page"]
is_text_pdf, results = classify(
total_page,
page_width,
page_height,
img_sz_list,
text_len_list,
img_num_list,
text_layout_list,
)
logger.info(f"is_text_pdf: {is_text_pdf}")
logger.info(json.dumps(results, ensure_ascii=False))
write_json_to_local(results, book_name)
def demo_meta_scan(book_name=None, debug_mode=True):
json_object = get_json_from_local_or_s3(book_name)
s3_pdf_path = json_object.get("file_location")
s3_config = get_s3_config_dict(s3_pdf_path)
pdf_bytes = read_file(s3_pdf_path, s3_config)
res = pdf_meta_scan(pdf_bytes)
logger.info(json.dumps(res, ensure_ascii=False))
write_json_to_local(res, book_name)
def demo_test5():
with open(local_json_path, "r", encoding="utf-8") as json_file:
json_line = json_file.read()
jso = json.loads(json_line)
img_list_len = len(jso["content"]["image_info_per_page"])
logger.info(f"img_list_len: {img_list_len}")
def read_more_para_test_samples(type="scihub"):
# 读取多段落测试样本
curr_dir = Path(__file__).parent
files_path = ""
if type == "gift":
relative_path = "../tests/assets/more_para_test_samples/gift_files.txt"
files_path = os.path.join(curr_dir, relative_path)
if type == "scihub":
relative_path = "../tests/assets/more_para_test_samples/scihub_files.txt"
files_path = os.path.join(curr_dir, relative_path)
if type == "zlib":
relative_path = "../tests/assets/more_para_test_samples/zlib_files.txt"
files_path = os.path.join(curr_dir, relative_path)
# check if file exists
if not os.path.exists(files_path):
print("File not exist!")
sys.exit(0)
with open(files_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# print("lines", lines)
return lines
def batch_test_more_para(type="scihub"):
# 批量测试多段落
para_test_files = read_more_para_test_samples(type)
for file in para_test_files:
file = file.strip()
print(file)
demo_parse_pdf(book_name=file)
@click.command()
@click.option("--book-name", help="s3上pdf文件的路径")
def main(book_name: str):
demo_parse_pdf(book_name, start_page_id=0)
if __name__ == "__main__":
main()
import time
from loguru import logger
from magic_pdf.libs.commons import (
fitz,
get_delta_time,
get_docx_model_output,
)
from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.coordinate_transform import get_scale_ratio
from magic_pdf.libs.drop_tag import DropTag
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.para.para_split import para_split
from magic_pdf.pre_proc.construct_page_dict import ocr_construct_page_component
from magic_pdf.pre_proc.detect_footer_by_model import parse_footers
from magic_pdf.pre_proc.detect_footnote import parse_footnotes_by_model
from magic_pdf.pre_proc.detect_header import parse_headers
from magic_pdf.pre_proc.detect_page_number import parse_pageNos
from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
from magic_pdf.pre_proc.ocr_detect_layout import layout_detect
from magic_pdf.pre_proc.ocr_dict_merge import (
merge_spans_to_line_by_layout, merge_lines_to_block,
)
from magic_pdf.pre_proc.ocr_span_list_modify import remove_spans_by_bboxes, remove_overlaps_min_spans, \
adjust_bbox_for_standalone_block, modify_y_axis, modify_inline_equation, get_qa_need_list, \
remove_spans_by_bboxes_dict
from magic_pdf.pre_proc.remove_bbox_overlap import remove_overlap_between_bbox_for_span
def parse_pdf_by_ocr(
pdf_bytes,
pdf_model_output,
imageWriter,
start_page_id=0,
end_page_id=None,
debug_mode=False,
):
pdf_bytes_md5 = compute_md5(pdf_bytes)
pdf_docs = fitz.open("pdf", pdf_bytes)
# 初始化空的pdf_info_dict
pdf_info_dict = {}
start_time = time.time()
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
for page_id in range(start_page_id, end_page_id + 1):
# 获取当前页的page对象
page = pdf_docs[page_id]
# 获取当前页的宽高
page_w = page.rect.width
page_h = page.rect.height
if debug_mode:
time_now = time.time()
logger.info(
f"page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}"
)
start_time = time_now
# 获取当前页的模型数据
ocr_page_info = get_docx_model_output(
pdf_model_output, page_id
)
"""从json中获取每页的页码、页眉、页脚的bbox"""
page_no_bboxes = parse_pageNos(page_id, page, ocr_page_info)
header_bboxes = parse_headers(page_id, page, ocr_page_info)
footer_bboxes = parse_footers(page_id, page, ocr_page_info)
footnote_bboxes = parse_footnotes_by_model(page_id, page, ocr_page_info, debug_mode=debug_mode)
# 构建需要remove的bbox字典
need_remove_spans_bboxes_dict = {
DropTag.PAGE_NUMBER: page_no_bboxes,
DropTag.HEADER: header_bboxes,
DropTag.FOOTER: footer_bboxes,
DropTag.FOOTNOTE: footnote_bboxes,
}
layout_dets = ocr_page_info["layout_dets"]
spans = []
# 计算模型坐标和pymu坐标的缩放比例
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(
ocr_page_info, page
)
for layout_det in layout_dets:
category_id = layout_det["category_id"]
allow_category_id_list = [1, 7, 13, 14, 15]
if category_id in allow_category_id_list:
x0, y0, _, _, x1, y1, _, _ = layout_det["poly"]
bbox = [
int(x0 / horizontal_scale_ratio),
int(y0 / vertical_scale_ratio),
int(x1 / horizontal_scale_ratio),
int(y1 / vertical_scale_ratio),
]
# 删除高度或者宽度为0的spans
if bbox[2] - bbox[0] == 0 or bbox[3] - bbox[1] == 0:
continue
"""要删除的"""
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
"""当成span拼接的"""
# 1: 'image', # 图片
# 7: 'table', # 表格
# 13: 'inline_equation', # 行内公式
# 14: 'interline_equation', # 行间公式
# 15: 'text', # ocr识别文本
"""layout信息"""
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
span = {
"bbox": bbox,
}
if category_id == 1:
span["type"] = ContentType.Image
elif category_id == 7:
span["type"] = ContentType.Table
elif category_id == 13:
span["content"] = layout_det["latex"]
span["type"] = ContentType.InlineEquation
elif category_id == 14:
span["content"] = layout_det["latex"]
span["type"] = ContentType.InterlineEquation
elif category_id == 15:
span["content"] = layout_det["text"]
span["type"] = ContentType.Text
# print(span)
spans.append(span)
else:
continue
'''删除重叠spans中较小的那些'''
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
'''
删除remove_span_block_bboxes中的bbox
并增加drop相关数据
'''
spans, dropped_spans_by_removed_bboxes = remove_spans_by_bboxes_dict(spans, need_remove_spans_bboxes_dict)
'''对image和table截图'''
spans = ocr_cut_image_and_table(spans, page, page_id, pdf_bytes_md5, imageWriter)
'''行内公式调整, 高度调整至与同行文字高度一致(优先左侧, 其次右侧)'''
displayed_list = []
text_inline_lines = []
modify_y_axis(spans, displayed_list, text_inline_lines)
'''模型识别错误的行间公式, type类型转换成行内公式'''
spans = modify_inline_equation(spans, displayed_list, text_inline_lines)
'''bbox去除粘连'''
spans = remove_overlap_between_bbox_for_span(spans)
'''
对tpye=["interline_equation", "image", "table"]进行额外处理,
如果左边有字的话,将该span的bbox中y0调整至不高于文字的y0
'''
spans = adjust_bbox_for_standalone_block(spans)
'''从ocr_page_info中解析layout信息(按自然阅读方向排序,并修复重叠和交错的bad case)'''
layout_bboxes, layout_tree = layout_detect(ocr_page_info['subfield_dets'], page, ocr_page_info)
'''将spans合并成line(在layout内,从上到下,从左到右)'''
lines, dropped_spans_by_layout = merge_spans_to_line_by_layout(spans, layout_bboxes)
'''将lines合并成block'''
blocks = merge_lines_to_block(lines)
'''获取QA需要外置的list'''
images, tables, interline_equations, inline_equations = get_qa_need_list(blocks)
'''drop的span_list合并'''
dropped_spans = []
dropped_spans.extend(dropped_spans_by_span_overlap)
dropped_spans.extend(dropped_spans_by_removed_bboxes)
dropped_spans.extend(dropped_spans_by_layout)
dropped_text_block = []
dropped_image_block = []
dropped_table_block = []
dropped_equation_block = []
for span in dropped_spans:
# drop出的spans进行分类
if span['type'] == ContentType.Text:
dropped_text_block.append(span)
elif span['type'] == ContentType.Image:
dropped_image_block.append(span)
elif span['type'] == ContentType.Table:
dropped_table_block.append(span)
elif span['type'] in [ContentType.InlineEquation, ContentType.InterlineEquation]:
dropped_equation_block.append(span)
'''构造pdf_info_dict'''
page_info = ocr_construct_page_component(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, inline_equations,
dropped_text_block, dropped_image_block, dropped_table_block,
dropped_equation_block,
need_remove_spans_bboxes_dict)
pdf_info_dict[f"page_{page_id}"] = page_info
"""分段"""
para_split(pdf_info_dict, debug_mode=debug_mode)
"""dict转list"""
pdf_info_list = dict_to_list(pdf_info_dict)
new_pdf_info_dict = {
"pdf_info": pdf_info_list,
}
return new_pdf_info_dict
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
"""
文本型pdf转化为统一清洗格式
"""
# TODO 移动到spark/目录下
from loguru import logger
from magic_pdf.dict2md.mkcontent import mk_mm_markdown, mk_universal_format
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.spark.base import exception_handler, get_data_source
def txt_pdf_to_standard_format(jso: dict, debug_mode=False) -> dict:
"""
变成统一的标准格式
"""
if debug_mode:
pass
else: # 如果debug没开,则检测是否有needdrop字段
if jso.get("_need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop")
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
standard_format = mk_universal_format(pdf_intermediate_dict)
jso["content_list"] = standard_format
logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def txt_pdf_to_mm_markdown_format(jso: dict, debug_mode=False) -> dict:
"""
变成多模态的markdown格式
"""
if debug_mode:
pass
else: # 如果debug没开,则检测是否有needdrop字段
if jso.get("_need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop")
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
standard_format = mk_universal_format(pdf_intermediate_dict)
mm_content = mk_mm_markdown(standard_format)
jso["content"] = mm_content
logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",)
# 把无用的信息清空
to_del_keys = ["doc_layout_result", "pdf_intermediate_dict", "pdf_meta", "parsed_result"]
for key in to_del_keys:
if jso.get(key):
del jso[key]
except Exception as e:
jso = exception_handler(jso, e)
return jso
\ No newline at end of file
from loguru import logger
from magic_pdf.libs.drop_reason import DropReason
import re
from magic_pdf.libs.config_reader import get_s3_config_dict
__re_s3_path = re.compile("^s3a?://([^/]+)(?:/(.*))?$")
def get_s3_config(path):
bucket_name = split_s3_path(path)[0] if path else ""
return get_s3_config_dict(bucket_name)
def split_s3_path(path: str):
"split bucket and key from path"
m = __re_s3_path.match(path)
if m is None:
return "", ""
return m.group(1), (m.group(2) or "")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment