MLOps/MinIO

MinIO Client with Python

ai-notes 2024. 7. 17. 17:00
반응형

지금까지 진행한 MinIO 설치과정은 모두 Python에서 사용하기 위해서 였습니다.

Python에서 업로드와 다운로드를 수행하는 코드를 마지막으로 마무리하고자 합니다.


warp을 이용한 부하테스트(벤치마크)가 아니라 실제 파일을 가지고 업로드 테스트를 했을때는 속도가 생각만큼 나오진 않았다.
*이전 업로드 테스트에서 학습데이터 낱개파일을 모두 올리는건 시간이 너무 오래 걸려 압축 파일로 관리하기로 약속했다.

Nodeport를 이용한 Tiny ImageNet 업로드

현재 서버 구성은 500Mbps로 추정되는 3대 서버와 2대의 100Mbps 서버로 구성되어 있다. 이전글에서 언급했듯, 물리적 거리 또한 떨어져 있는데, 이번에는 NginX에서 리버스 프록시를 2대의 서버를 제외하고 로드밸런싱 하도록 수정하고 동일 테스트를 수행했다.

  minio.conf: |
    upstream minio_s3 {
        least_conn;
        # server minio-0.minio-service.minio-data.svc.cluster.local:9000; <- 100Mbps
        # server minio-1.minio-service.minio-data.svc.cluster.local:9000; <- 100Mbps     
        server minio-2.minio-service.minio-data.svc.cluster.local:9000;
        server minio-3.minio-service.minio-data.svc.cluster.local:9000;
        server minio-4.minio-service.minio-data.svc.cluster.local:9000;
    }

NginX 결과

여러번의 테스트를 진행했고, 유의미한 차이를 찾아볼 수 있었다. 이 결과를 토대로 MinIO의 접근은 NginX를 통해 하기로 결정했다.

 

업로드와 다운로드는 하나의 코드로 통일했으며, 다운로드를 받는 경우가 더 많으므로 --upload 인자를 통해 업로드와 다운로드를 구분했다.

업로드에서는 progress를 이용한 업로드 진행 상황을 보여주지만, 다운로드의 경우 따로 지원하지 않아 Stream을 통해 구현했다.

# python client.py {bucket_name} {zip_file_path} {minio_path}
## Upload
python client.py —upload image tiny-imagenet-200.zip classification

upload의 경우 minio_path/업로드파일명.zip으로 저장되도록 코딩했다.

결과적으로 image/classification/tiny-imagenet-200.zip 으로 MinIO 서버에 올라가게 된다.

업로드하려는 파일이 존재하면 업로드가 안되게 했는데 --force인자를 통해 덮어쓰기를 가능하게 했지만, 버저닝을 하는것으로 권장했다.

## Download
python client.py image local_file_name.zip classification/tiny-imagenet-200.zip

위의 예제 파일이 업로드가 되어있다고 가정했을때, 업로드와 달리 다운로드에서는 MinIO 서버에 있는 파일 경로를 fullpath로 지정하게 했다.

local_file_name은 로컬 폴더에 저장할 파일명으로 경로를 지정해도 된다.

 

최종 코드는 아래와 같다.

from minio import Minio
from pathlib import Path
from minio.error import S3Error

import sys
import time
import argparse
from tqdm import tqdm
from queue import Empty, Queue
from threading import Thread

_BAR_SIZE = 20
_KILOBYTE = 1024
_FINISHED_BAR = '#'
_REMAINING_BAR = '-'

_UNKNOWN_SIZE = '?'
_STR_MEGABYTE = ' MB'

_HOURS_OF_ELAPSED = '%d:%02d:%02d'
_MINUTES_OF_ELAPSED = '%02d:%02d'

_RATE_FORMAT = '%5.2f'
_PERCENTAGE_FORMAT = '%3d%%'
_HUMANINZED_FORMAT = '%0.2f'

_DISPLAY_FORMAT = '|%s| %s/%s %s [elapsed: %s left: %s, %s MB/sec]'

_REFRESH_CHAR = '\r'


class Progress(Thread):
    """
        Constructs a :class:`Progress` object.
        :param interval: Sets the time interval to be displayed on the screen.
        :param stdout: Sets the standard output

        :return: :class:`Progress` object
    """

    def __init__(self, interval=1, stdout=sys.stdout):
        Thread.__init__(self)
        self.daemon = True
        self.total_length = 0
        self.interval = interval
        self.object_name = None

        self.last_printed_len = 0
        self.current_size = 0

        self.display_queue = Queue()
        self.initial_time = time.time()
        self.stdout = stdout
        self.start()

    def set_meta(self, total_length, object_name):
        """
        Metadata settings for the object. This method called before uploading
        object
        :param total_length: Total length of object.
        :param object_name: Object name to be showed.
        """
        self.total_length = total_length
        self.object_name = object_name
        self.prefix = self.object_name + ': ' if self.object_name else ''

    def run(self):
        displayed_time = 0
        while True:
            try:
                # display every interval secs
                task = self.display_queue.get(timeout=self.interval)
            except Empty:
                elapsed_time = time.time() - self.initial_time
                if elapsed_time > displayed_time:
                    displayed_time = elapsed_time
                self.print_status(current_size=self.current_size,
                                  total_length=self.total_length,
                                  displayed_time=displayed_time,
                                  prefix=self.prefix)
                continue

            current_size, total_length = task
            displayed_time = time.time() - self.initial_time
            self.print_status(current_size=current_size,
                              total_length=total_length,
                              displayed_time=displayed_time,
                              prefix=self.prefix)
            self.display_queue.task_done()
            if current_size == total_length:
                # once we have done uploading everything return
                self.done_progress()
                return

    def update(self, size):
        """
        Update object size to be showed. This method called while uploading
        :param size: Object size to be showed. The object size should be in
                     bytes.
        """
        if not isinstance(size, int):
            raise ValueError('{} type can not be displayed. '
                             'Please change it to Int.'.format(type(size)))

        self.current_size += size
        self.display_queue.put((self.current_size, self.total_length))

    def done_progress(self):
        self.total_length = 0
        self.object_name = None
        self.last_printed_len = 0
        self.current_size = 0

    def print_status(self, current_size, total_length, displayed_time, prefix):
        formatted_str = prefix + format_string(
            current_size, total_length, displayed_time)
        self.stdout.write(_REFRESH_CHAR + formatted_str + ' ' *
                          max(self.last_printed_len - len(formatted_str), 0))
        self.stdout.flush()
        self.last_printed_len = len(formatted_str)


def seconds_to_time(seconds):
    """
    Consistent time format to be displayed on the elapsed time in screen.
    :param seconds: seconds
    """
    minutes, seconds = divmod(int(seconds), 60)
    hours, m = divmod(minutes, 60)
    if hours:
        return _HOURS_OF_ELAPSED % (hours, m, seconds)
    else:
        return _MINUTES_OF_ELAPSED % (m, seconds)


def format_string(current_size, total_length, elapsed_time):
    """
    Consistent format to be displayed on the screen.
    :param current_size: Number of finished object size
    :param total_length: Total object size
    :param elapsed_time: number of seconds passed since start
    """

    n_to_mb = current_size / _KILOBYTE / _KILOBYTE
    elapsed_str = seconds_to_time(elapsed_time)

    rate = _RATE_FORMAT % (
        n_to_mb / elapsed_time) if elapsed_time else _UNKNOWN_SIZE
    frac = float(current_size) / total_length
    bar_length = int(frac * _BAR_SIZE)
    bar = (_FINISHED_BAR * bar_length +
           _REMAINING_BAR * (_BAR_SIZE - bar_length))
    percentage = _PERCENTAGE_FORMAT % (frac * 100)
    left_str = (
        seconds_to_time(
            elapsed_time / current_size * (total_length - current_size))
        if current_size else _UNKNOWN_SIZE)

    humanized_total = _HUMANINZED_FORMAT % (
        total_length / _KILOBYTE / _KILOBYTE) + _STR_MEGABYTE
    humanized_n = _HUMANINZED_FORMAT % n_to_mb + _STR_MEGABYTE

    return _DISPLAY_FORMAT % (bar, humanized_n, humanized_total, percentage,
                              elapsed_str, left_str, rate)

def download_to_local(server_url, access_key, secret_key, bucket_name, zip_file_path, minio_path):
    client = Minio(
        server_url,
        access_key=access_key,
        secret_key=secret_key,
        secure=False  # Use True if your server uses HTTPS
    )
    bucket_name = bucket_name.lower()
    minio_path = '/'.join([s.lower() for s in minio_path.split('/')])
    zip_file_path = '/'.join([s.lower() for s in zip_file_path.split('/')])
    
    stat = client.stat_object(bucket_name, minio_path)
    response = client.get_object(bucket_name, minio_path)
    with open(zip_file_path,'wb') as file:
        with tqdm(total=stat.size, unit='B', unit_scale=True, desc=f'{zip_file_path}') as progress_bar:
            for data in response.stream(20*1024*1024):  # 20MB 단위로 읽기
                file.write(data)
                progress_bar.update(len(data))
    print(f'File {bucket_name}/{minio_path} downloaded to {zip_file_path} successfully.') 

def file_exists_in_minio(minio_client, bucket_name, object_name):
    try:
        minio_client.stat_object(bucket_name, object_name)
        return True
    except S3Error as exc:
        if exc.code == 'NoSuchKey':
            return False
        else:
            raise

def upload_to_minio(server_url, access_key, secret_key, bucket_name, zip_file_path, minio_path, force=False):
    # Initialize the MinIO client
    client = Minio(
        server_url,
        access_key=access_key,
        secret_key=secret_key,
        secure=False  # Use True if your server uses HTTPS
    )
    bucket_name = bucket_name.lower()
    minio_path = '/'.join([s.lower() for s in minio_path.split('/')])

    object_name = str(Path(minio_path) / Path(zip_file_path).name.lower())
    # Check if the bucket exists
    if not client.bucket_exists(bucket_name):
        print("*" * 100)
        print("Bucket is not exist, Please Check Bucket Name")
        print("*" * 100)
        raise

    if file_exists_in_minio(client, bucket_name, object_name):
        if not force:
            print("*" * 100)
            print("Zip file already exists in MinIO, please delete or change the name")
            print("*" * 100)
            raise
        else:
            print("*" * 100)
            print("Zip file already exists in MinIO, but '--force' option is enabled. Overwriting the file.")
            print("*" * 100)

    client.fput_object(bucket_name, object_name, zip_file_path, progress=Progress())
    print(f'File {zip_file_path} uploaded to {bucket_name}/{object_name} successfully.') 

def main():
    # ArgumentParser 설정
    parser = argparse.ArgumentParser(description="Upload a folder to MinIO server")
    parser.add_argument("bucket_name", help="Name of the bucket to upload to")
    parser.add_argument("zip_file_path", help="Path to the local zip file to upload")
    parser.add_argument("minio_path", help="Path in MinIO where files will be stored")
    parser.add_argument("--force", action="store_true", help="Force upload even if the file already exists in MinIO")
    parser.add_argument("--upload", action="store_true", help="If true, Upload the specified file from local to the MinIO.")
    args = parser.parse_args()
    # Example usage:
    server_url = '<IP>:<Port>'
    access_key = '<ID>'
    secret_key = '<PASSWORD>'
    if not args.upload:
        download_to_local(server_url, access_key, secret_key, args.bucket_name, args.zip_file_path, args.minio_path)
    else:
        upload_to_minio(server_url, access_key, secret_key, args.bucket_name, args.zip_file_path, args.minio_path, args.force)

if __name__ == "__main__":
    main()

 

마치며

MinIO에 대한 과정은 이 글에서 마무리될 것 같습니다.

앞으로는 사용중에 문제가 발생하거나 특이사항이 생기면 글을 적게 될 것 같습니다.

반응형