Commit 6bc9df82 authored by liukaiwen's avatar liukaiwen

io modules

parent 17645527
...@@ -10,10 +10,10 @@ class AbsReaderWriter(ABC): ...@@ -10,10 +10,10 @@ class AbsReaderWriter(ABC):
def __init__(self, parent_path): def __init__(self, parent_path):
# 初始化代码可以在这里添加,如果需要的话 # 初始化代码可以在这里添加,如果需要的话
self.parent_path = parent_path # 对于本地目录是父目录,对于s3是会写到这个apth下。 self.parent_path = parent_path # 对于本地目录是父目录,对于s3是会写到这个path下。
@abstractmethod @abstractmethod
def read(self, path: str, mode="text"): def read(self, path: str, mode=MODE_TXT):
""" """
无论对于本地还是s3的路径,检查如果path是绝对路径,那么就不再 拼接parent_path, 如果是相对路径就拼接parent_path 无论对于本地还是s3的路径,检查如果path是绝对路径,那么就不再 拼接parent_path, 如果是相对路径就拼接parent_path
""" """
......
import os import os
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from loguru import logger from loguru import logger
MODE_TXT = "text"
MODE_BIN = "binary"
class DiskReaderWriter(AbsReaderWriter): class DiskReaderWriter(AbsReaderWriter):
def __init__(self, parent_path, encoding='utf-8'): def __init__(self, parent_path, encoding='utf-8'):
self.path = parent_path self.path = parent_path
self.encoding = encoding self.encoding = encoding
def read(self, mode="text"): def read(self, path, mode=MODE_TXT):
if not os.path.exists(self.path): if os.path.isabs(path):
logger.error(f"文件 {self.path} 不存在") abspath = path
raise Exception(f"文件 {self.path} 不存在") else:
if mode == "text": abspath = os.path.join(self.path, path)
with open(self.path, 'r', encoding = self.encoding) as f: if not os.path.exists(abspath):
logger.error(f"文件 {abspath} 不存在")
raise Exception(f"文件 {abspath} 不存在")
if mode == MODE_TXT:
with open(abspath, 'r', encoding = self.encoding) as f:
return f.read() return f.read()
elif mode == "binary": elif mode == MODE_BIN:
with open(self.path, 'rb') as f: with open(abspath, 'rb') as f:
return f.read() return f.read()
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
def write(self, data, mode="text"): def write(self, content, path, mode=MODE_TXT):
if mode == "text": if os.path.isabs(path):
with open(self.path, 'w', encoding=self.encoding) as f: abspath = path
f.write(data) else:
logger.info(f"内容已成功写入 {self.path}") abspath = os.path.join(self.path, path)
if mode == MODE_TXT:
with open(abspath, 'w', encoding=self.encoding) as f:
f.write(content)
logger.info(f"内容已成功写入 {abspath}")
elif mode == "binary": elif mode == MODE_BIN:
with open(self.path, 'wb') as f: with open(abspath, 'wb') as f:
f.write(data) f.write(content)
logger.info(f"内容已成功写入 {self.path}") logger.info(f"内容已成功写入 {abspath}")
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
def read_jsonl(self, path: str, byte_start=0, byte_end=None, encoding='utf-8'):
return self.read(path)
# 使用示例 # 使用示例
if __name__ == "__main__": if __name__ == "__main__":
file_path = "example.txt" file_path = "io/example.txt"
drw = DiskReaderWriter(file_path) drw = DiskReaderWriter("D:\projects\papayfork\Magic-PDF\magic_pdf")
# 写入内容到文件 # 写入内容到文件
drw.write(b"Hello, World!", mode="binary") drw.write(b"Hello, World!", path="io/example.txt", mode="binary")
# 从文件读取内容 # 从文件读取内容
content = drw.read() content = drw.read(path=file_path)
if content: if content:
logger.info(f"从 {file_path} 读取的内容: {content}") logger.info(f"从 {file_path} 读取的内容: {content}")
......
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key
import boto3 import boto3
from loguru import logger from loguru import logger
from boto3.s3.transfer import TransferConfig from boto3.s3.transfer import TransferConfig
from botocore.config import Config from botocore.config import Config
import os
MODE_TXT = "text"
MODE_BIN = "binary"
class S3ReaderWriter(AbsReaderWriter): class S3ReaderWriter(AbsReaderWriter):
def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str): def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str, parent_path: str):
self.client = self._get_client(ak, sk, endpoint_url, addressing_style) self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
self.path = parent_path
def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str): def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
s3_client = boto3.client( s3_client = boto3.client(
...@@ -22,51 +25,83 @@ class S3ReaderWriter(AbsReaderWriter): ...@@ -22,51 +25,83 @@ class S3ReaderWriter(AbsReaderWriter):
retries={'max_attempts': 5, 'mode': 'standard'}), retries={'max_attempts': 5, 'mode': 'standard'}),
) )
return s3_client return s3_client
def read(self, s3_path, mode="text", encoding="utf-8"):
bucket_name, bucket_key = parse_bucket_key(s3_path) def read(self, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
res = self.client.get_object(Bucket=bucket_name, Key=bucket_key) if s3_relative_path.startswith("s3://"):
s3_path = s3_relative_path
else:
s3_path = os.path.join(self.path, s3_relative_path)
bucket_name, key = parse_bucket_key(s3_path)
res = self.client.get_object(Bucket=bucket_name, Key=key)
body = res["Body"].read() body = res["Body"].read()
if mode == 'text': if mode == MODE_TXT:
data = body.decode(encoding) # Decode bytes to text data = body.decode(encoding) # Decode bytes to text
elif mode == 'binary': elif mode == MODE_BIN:
data = body data = body
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
return data return data
def write(self, data, s3_path, mode="text", encoding="utf-8"): def write(self, content, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
if mode == 'text': if s3_relative_path.startswith("s3://"):
body = data.encode(encoding) # Encode text data as bytes s3_path = s3_relative_path
elif mode == 'binary': else:
body = data s3_path = os.path.join(self.path, s3_relative_path)
if mode == MODE_TXT:
body = content.encode(encoding) # Encode text data as bytes
elif mode == MODE_BIN:
body = content
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
bucket_name, bucket_key = parse_bucket_key(s3_path) bucket_name, key = parse_bucket_key(s3_path)
self.client.put_object(Body=body, Bucket=bucket_name, Key=bucket_key) self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
logger.info(f"内容已写入 {s3_path} ") logger.info(f"内容已写入 {s3_path} ")
def read_jsonl(self, path: str, byte_start=0, byte_end=None, mode=MODE_TXT, encoding='utf-8'):
if path.startswith("s3://"):
s3_path = path
else:
s3_path = os.path.join(self.path, path)
bucket_name, key = parse_bucket_key(s3_path)
range_header = f'bytes={byte_start}-{byte_end}' if byte_end else f'bytes={byte_start}-'
res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
body = res["Body"].read()
if mode == MODE_TXT:
data = body.decode(encoding) # Decode bytes to text
elif mode == MODE_BIN:
data = body
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
return data
if __name__ == "__main__": if __name__ == "__main__":
# Config the connection info # Config the connection info
ak = "" ak = ""
sk = "" sk = ""
endpoint_url = "" endpoint_url = ""
addressing_style = "" addressing_style = "auto"
bucket_name = ""
# Create an S3ReaderWriter object # Create an S3ReaderWriter object
s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style) s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style, "s3://bucket_name/")
# Write text data to S3 # Write text data to S3
text_data = "This is some text data" text_data = "This is some text data"
s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test.json", mode='text') s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
# Read text data from S3 # Read text data from S3
text_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test.json", mode='text') text_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
logger.info(f"Read text data from S3: {text_data_read}") logger.info(f"Read text data from S3: {text_data_read}")
# Write binary data to S3 # Write binary data to S3
binary_data = b"This is some binary data" binary_data = b"This is some binary data"
s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary') s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
# Read binary data from S3 # Read binary data from S3
binary_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary') binary_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
logger.info(f"Read binary data from S3: {binary_data_read}") logger.info(f"Read binary data from S3: {binary_data_read}")
\ No newline at end of file
# Range Read text data from S3
binary_data_read = s3_reader_writer.read_jsonl(path=f"s3://{bucket_name}/ebook/test/test.json",
byte_start=0, byte_end=10, mode=MODE_BIN)
logger.info(f"Read binary data from S3: {binary_data_read}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment