Unverified Commit 049104a4 authored by drunkpig's avatar drunkpig Committed by GitHub

Merge pull request #16 from papayalove/master

更新了read_jsonl方法
parents db54796a b5b58d64
......@@ -10,10 +10,10 @@ class AbsReaderWriter(ABC):
def __init__(self, parent_path):
# 初始化代码可以在这里添加,如果需要的话
self.parent_path = parent_path # 对于本地目录是父目录,对于s3是会写到这个apth下。
self.parent_path = parent_path # 对于本地目录是父目录,对于s3是会写到这个path下。
@abstractmethod
def read(self, path: str, mode="text"):
def read(self, path: str, mode=MODE_TXT):
"""
无论对于本地还是s3的路径,检查如果path是绝对路径,那么就不再 拼接parent_path, 如果是相对路径就拼接parent_path
"""
......
import os
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from loguru import logger
MODE_TXT = "text"
MODE_BIN = "binary"
class DiskReaderWriter(AbsReaderWriter):
def __init__(self, parent_path, encoding='utf-8'):
self.path = parent_path
self.encoding = encoding
def read(self, mode="text"):
if not os.path.exists(self.path):
logger.error(f"文件 {self.path} 不存在")
raise Exception(f"文件 {self.path} 不存在")
if mode == "text":
with open(self.path, 'r', encoding = self.encoding) as f:
def read(self, path, mode=MODE_TXT):
if os.path.isabs(path):
abspath = path
else:
abspath = os.path.join(self.path, path)
if not os.path.exists(abspath):
logger.error(f"文件 {abspath} 不存在")
raise Exception(f"文件 {abspath} 不存在")
if mode == MODE_TXT:
with open(abspath, 'r', encoding = self.encoding) as f:
return f.read()
elif mode == "binary":
with open(self.path, 'rb') as f:
elif mode == MODE_BIN:
with open(abspath, 'rb') as f:
return f.read()
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
def write(self, data, mode="text"):
if mode == "text":
with open(self.path, 'w', encoding=self.encoding) as f:
f.write(data)
logger.info(f"内容已成功写入 {self.path}")
def write(self, content, path, mode=MODE_TXT):
if os.path.isabs(path):
abspath = path
else:
abspath = os.path.join(self.path, path)
if mode == MODE_TXT:
with open(abspath, 'w', encoding=self.encoding) as f:
f.write(content)
logger.info(f"内容已成功写入 {abspath}")
elif mode == "binary":
with open(self.path, 'wb') as f:
f.write(data)
logger.info(f"内容已成功写入 {self.path}")
elif mode == MODE_BIN:
with open(abspath, 'wb') as f:
f.write(content)
logger.info(f"内容已成功写入 {abspath}")
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
def read_jsonl(self, path: str, byte_start=0, byte_end=None, encoding='utf-8'):
return self.read(path)
# 使用示例
if __name__ == "__main__":
file_path = "example.txt"
drw = DiskReaderWriter(file_path)
file_path = "io/example.txt"
drw = DiskReaderWriter("D:\projects\papayfork\Magic-PDF\magic_pdf")
# 写入内容到文件
drw.write(b"Hello, World!", mode="binary")
drw.write(b"Hello, World!", path="io/example.txt", mode="binary")
# 从文件读取内容
content = drw.read()
content = drw.read(path=file_path)
if content:
logger.info(f"从 {file_path} 读取的内容: {content}")
......
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key
import boto3
from loguru import logger
from boto3.s3.transfer import TransferConfig
from botocore.config import Config
import os
MODE_TXT = "text"
MODE_BIN = "binary"
class S3ReaderWriter(AbsReaderWriter):
def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str, parent_path: str):
self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
self.path = parent_path
def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
s3_client = boto3.client(
......@@ -22,51 +25,83 @@ class S3ReaderWriter(AbsReaderWriter):
retries={'max_attempts': 5, 'mode': 'standard'}),
)
return s3_client
def read(self, s3_path, mode="text", encoding="utf-8"):
bucket_name, bucket_key = parse_bucket_key(s3_path)
res = self.client.get_object(Bucket=bucket_name, Key=bucket_key)
def read(self, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
if s3_relative_path.startswith("s3://"):
s3_path = s3_relative_path
else:
s3_path = os.path.join(self.path, s3_relative_path)
bucket_name, key = parse_bucket_key(s3_path)
res = self.client.get_object(Bucket=bucket_name, Key=key)
body = res["Body"].read()
if mode == 'text':
if mode == MODE_TXT:
data = body.decode(encoding) # Decode bytes to text
elif mode == 'binary':
elif mode == MODE_BIN:
data = body
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
return data
def write(self, data, s3_path, mode="text", encoding="utf-8"):
if mode == 'text':
body = data.encode(encoding) # Encode text data as bytes
elif mode == 'binary':
body = data
def write(self, content, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
if s3_relative_path.startswith("s3://"):
s3_path = s3_relative_path
else:
s3_path = os.path.join(self.path, s3_relative_path)
if mode == MODE_TXT:
body = content.encode(encoding) # Encode text data as bytes
elif mode == MODE_BIN:
body = content
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
bucket_name, bucket_key = parse_bucket_key(s3_path)
self.client.put_object(Body=body, Bucket=bucket_name, Key=bucket_key)
bucket_name, key = parse_bucket_key(s3_path)
self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
logger.info(f"内容已写入 {s3_path} ")
def read_jsonl(self, path: str, byte_start=0, byte_end=None, mode=MODE_TXT, encoding='utf-8'):
if path.startswith("s3://"):
s3_path = path
else:
s3_path = os.path.join(self.path, path)
bucket_name, key = parse_bucket_key(s3_path)
range_header = f'bytes={byte_start}-{byte_end}' if byte_end else f'bytes={byte_start}-'
res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
body = res["Body"].read()
if mode == MODE_TXT:
data = body.decode(encoding) # Decode bytes to text
elif mode == MODE_BIN:
data = body
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
return data
if __name__ == "__main__":
# Config the connection info
ak = ""
sk = ""
endpoint_url = ""
addressing_style = ""
addressing_style = "auto"
bucket_name = ""
# Create an S3ReaderWriter object
s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style)
s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style, "s3://bucket_name/")
# Write text data to S3
text_data = "This is some text data"
s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test.json", mode='text')
s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
# Read text data from S3
text_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test.json", mode='text')
text_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
logger.info(f"Read text data from S3: {text_data_read}")
# Write binary data to S3
binary_data = b"This is some binary data"
s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary')
s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
# Read binary data from S3
binary_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary')
binary_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
logger.info(f"Read binary data from S3: {binary_data_read}")
# Range Read text data from S3
binary_data_read = s3_reader_writer.read_jsonl(path=f"s3://{bucket_name}/ebook/test/test.json",
byte_start=0, byte_end=10, mode=MODE_BIN)
logger.info(f"Read binary data from S3: {binary_data_read}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment