2022년 8월 5일 금요일

tensorflow keras / Sequence / DataLoader

 Pytorch에서는 사용이 편한 DataLoader Class가 있어서 batch 작업을 하기 편했습니다. tensorflow에도 유사한 것을 찾아보았습니다.

kreas 에서는 Sequence를 조금만 응용하면 비슷하게 사용이 가능합니다.

기본 개념은 https://swlock.blogspot.com/2022/07/pytorch-dataloader.html pytorch것을 이용해서 비슷하게 만들어 볼 예정입니다.

pytorch에는 Dataset/DataLoader라는 두개의 클래스가 존재하는데 kreas 에서는 Sequence 만 존재합니다. Sequence == DataLoader 라고 생각하면 됩니다.

여기에서 Dataset/DataLoader(Sequence)의 차이점을 알고 나면 구현해야 할 것이 좀 더 명확해집니다.


입력 데이터

pytorch 와 같은데... 아래와 같이 실행해서 파일로 저장합니다. 실행 시키면 x_data.csv, y_data.csv 파일이 생성됩니다.

import numpy as np

####################################### 데이터 준비
np.random.seed(42)
# np.random.rand(m,n) n*m matrix 로 0~1 랜덤 생성
x = np.random.rand(100, 3)
'''
[[0.37454012 0.95071431 0.73199394]
 [0.59865848 0.15601864 0.15599452]
 [0.05808361 0.86617615 0.60111501]
 ...
'''
x_one_d = x[:,0:1] * x[:,1:2] * x[:,2:3]
'''
[[2.60648878e-01]  # <= [0.37454012 * 0.95071431 * 0.73199394]
 [1.45701819e-02]
 [3.02424805e-02]
 ...
'''

# y는 원하는 값 목표값인 label로 표현합니다.
y = 1 + 2 * x_one_d + .1 * np.random.randn(100, 1)
'''
[[1.52585494]
 [0.96398033]
 [1.27487937]
 ...
'''
# 저장전 확인
print(x.shape,type(x),x[0])
print(y.shape,type(y),y[0])

# 저장
np.savetxt("x_data.csv", x, delimiter=',')
np.savetxt("y_data.csv", y, delimiter=',')

# 저장된것 확인
x_data = np.loadtxt("x_data.csv", delimiter=',', skiprows=0, max_rows=1)
print(x_data.shape,type(x_data),x_data)
if x_data.shape==():
    x_data = np.array([x_data]) 
print(x_data.shape,type(x_data),x_data)
y_data = np.loadtxt("y_data.csv", delimiter=',', skiprows=0, max_rows=1)
print(y_data.shape,type(y_data),y_data)
if y_data.shape==():
    y_data = np.array([y_data]) 
print(y_data.shape,type(y_data),y_data)


용어의 정리

Dataset : index가 주어지면 데이터를 하나 (x,y) 로딩해서 넘겨주는 class, input format이 바뀌는경우 수정을 해야 함

DataLoader : batch 단위의 묶음을 처리하는 class


Dataset/DataLoader 두개의 class 공통으로 구현해야 하는 부분은 __len__, __getitem__ 입니다.

* __len__ 은 class의 instance의 길이를 측정하는 len()이라는 함수를 호출하면 넘겨주는 리턴 값입니다.

* __getitem__ 은 [ ] 를 사용할 때 처리하는 함수 입니다.


Dataset

__init__에서는 파일을 열어서 전체 라인을 체크한 후 data크기를 total_len에 기록해 둡니다.

( __len__ 함수에서 리턴할때 사용할 겁니다. 여기에서는 x, y 의 갯수가 같기 때문에 x 파일을 열어서 전체 라인을 측정하였습니다.)

base_idx 는 얼마만큼 건너 뛸지 결정하는 index가 됩니다. 이것의 용도는 전체 data에서 일부분만 data를 사용하고픈 경우가 있기 때문입니다.

__getitem__ 에서는 np.loadtxt를 이용하여 data를 읽어오는데 앞에서 설명한 base_idx가 있는경우 해당 숫자 만큼 더해서 데이터를 읽어오도록 하였습니다.

class CustomDataset:
    def __init__(self, x_tensor_filename, y_tensor_filename, base_idx=None, cnt=None):
        self.x_fn = x_tensor_filename
        self.y_fn = y_tensor_filename
        self.base_idx = base_idx
        self.cnt = cnt
        if self.base_idx is None:
            with open(y_tensor_filename, 'r') as fp:
                for count, line in enumerate(fp):
                    pass
            self.total_len = count + 1
        else:
            self.total_len = cnt

    def __getitem__(self, index):
        """
        주어진 인덱스 index 에 해당하는 샘플을 데이터셋에서 불러오고 반환합니다.
        """
        if self.base_idx is not None:
            index = index + self.base_idx
        x_data = np.loadtxt(self.x_fn, delimiter=',', skiprows=index, max_rows=1)
        # [0.37454012 0.95071431 0.73199394]
        if x_data.shape == ():
            x_data = np.array([x_data])
        y_data = np.loadtxt(self.y_fn, delimiter=',', skiprows=index, max_rows=1)
        # 1.5258549401766552
        if y_data.shape == ():
            y_data = np.array([y_data])
        # [1.5258549401766552]
        return x_data, y_data

    def __len__(self):
        """
        데이터셋의 샘플 개수를 반환합니다.
        """
        return self.total_len


DataLoader 

전체 count는 batch 단위로 나눠준 숫자가 됩니다. 그런데 그냥 나눠주면 나머지가 남기 때문에 match.ceil 을 이용해서 나머지가 남으면 숫자를 하나 올려주는 올림으로 count를 계산합니다. 

특이한 on_epoch_end() 함수가 있는데 이건 Sequence 의 멤버 함수로 epoch가 끝나면 호출하게 되는 함수입니다. 그래서 여기에서는 끝날때마다 shuffle 호출하도록 하였습니다.

index에 해당하는 dataset을 읽는것은 아래 코드에 의해 구현되었습니다. 

        indexer = self.indexer[index * self.batch_size:(index + 1) * self.batch_size]

그리고 indexer에 들어있는 실제 data를 가져오는건 self.dataset[index] 형태로 되는데, 여기에서는 [0]은 x, [1]은 y값을 읽도록 dataset class가 구현이 되어있습니다. 이걸 for loop에 의해서 batch묶음을 만들어서 list 로 만들어 줍니다.

        batch_x = [self.dataset[i][0] for i in indexer]
        batch_y = [self.dataset[i][1] for i in indexer]

아래는 전체 코드입니다.

class CustomDataloader(Sequence):
    def __init__(self, _dataset, batch_size=1, shuffle=False):
        self.dataset = _dataset
        self.batch_size = batch_size
        self.total_len = math.ceil(len(self.dataset) / self.batch_size)
        self.shuffle = shuffle
        self.indexer = np.arange(len(self.dataset))
        self.on_epoch_end()

    def __getitem__(self, index):
        indexer = self.indexer[index * self.batch_size:(index + 1) * self.batch_size]
        batch_x = [self.dataset[i][0] for i in indexer]
        batch_y = [self.dataset[i][1] for i in indexer]
        return np.array(batch_x), np.array(batch_y)

    def __len__(self):
        return self.total_len

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexer)
            print("shuffle")


전체 코드

epoch 루프내에서 train_loader를 이용해서 batch 단위로 데이터를 읽어내는 예제입니다.

from tensorflow.keras.utils import Sequence
import numpy as np
import math

batch_size = 9
lr = 1e-1
n_epochs = 5
train_data_ratio = 0.8


class CustomDataset:
    def __init__(self, x_tensor_filename, y_tensor_filename, base_idx=None, cnt=None):
        self.x_fn = x_tensor_filename
        self.y_fn = y_tensor_filename
        self.base_idx = base_idx
        self.cnt = cnt
        if self.base_idx is None:
            with open(y_tensor_filename, 'r') as fp:
                for count, line in enumerate(fp):
                    pass
            self.total_len = count + 1
        else:
            self.total_len = cnt

    def __getitem__(self, index):
        """
        주어진 인덱스 index 에 해당하는 샘플을 데이터셋에서 불러오고 반환합니다.
        """
        if self.base_idx is not None:
            index = index + self.base_idx
        x_data = np.loadtxt(self.x_fn, delimiter=',', skiprows=index, max_rows=1)
        # [0.37454012 0.95071431 0.73199394]
        if x_data.shape == ():
            x_data = np.array([x_data])
        y_data = np.loadtxt(self.y_fn, delimiter=',', skiprows=index, max_rows=1)
        # 1.5258549401766552
        if y_data.shape == ():
            y_data = np.array([y_data])
        # [1.5258549401766552]
        return x_data, y_data

    def __len__(self):
        """
        데이터셋의 샘플 개수를 반환합니다.
        """
        return self.total_len


class CustomDataloader(Sequence):
    def __init__(self, _dataset, batch_size=1, shuffle=False):
        self.dataset = _dataset
        self.batch_size = batch_size
        self.total_len = math.ceil(len(self.dataset) / self.batch_size)
        self.shuffle = shuffle
        self.indexer = np.arange(len(self.dataset))
        self.on_epoch_end()

    def __getitem__(self, index):
        indexer = self.indexer[index * self.batch_size:(index + 1) * self.batch_size]
        batch_x = [self.dataset[i][0] for i in indexer]
        batch_y = [self.dataset[i][1] for i in indexer]
        return np.array(batch_x), np.array(batch_y)

    def __len__(self):
        return self.total_len

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexer)
            print("shuffle")


# temp_dataset 을 사용하는 이유는 전체 크기를 알기 위함입니다.
temp_dataset = CustomDataset("x_data.csv", "y_data.csv")
# 전체 dataset에서 train_dataset_cnt, val_dataset_cnt 갯수를 각각 분리
train_dataset_cnt = int(len(temp_dataset) * train_data_ratio)
val_dataset_cnt = len(temp_dataset) - train_dataset_cnt
# CustomDataset 인가 없으면 전체 dataset을 사용합니다. 여기에서는 정해진 크기까지만 사용하기 위해서 base_idx 시작 인덱스, cnt 전체크기 인자를 넘겼습니다.
train_dataset = CustomDataset("x_data.csv", "y_data.csv", 0, train_dataset_cnt)
val_dataset = CustomDataset("x_data.csv", "y_data.csv", train_dataset_cnt, val_dataset_cnt)
print(len(temp_dataset), train_dataset_cnt, val_dataset_cnt)
# CustomDataloader를 이용하면 한번에 batch_size 크기만큼 데이터가 올라오게 됩니다.
train_loader = CustomDataloader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = CustomDataloader(val_dataset, batch_size=batch_size, shuffle=True)


for epoch in range(n_epochs):
    print(f"epoch ##{epoch}")
    batch_no = 0
    for x_batch, y_batch in train_loader:
        print(f"train batch #{batch_no}, epoch #{epoch}, size {len(x_batch)}")
        print(x_batch, "\n", y_batch)
        batch_no += 1


실행 결과


100 80 20
shuffle
shuffle
epoch ##0
train batch #0, epoch #0, size 9
[[0.66350177 0.00506158 0.16080805]
 [0.90826589 0.23956189 0.14489487]
 [0.35846573 0.11586906 0.86310343]
 [0.70807258 0.02058449 0.96990985]
 [0.30461377 0.09767211 0.68423303]
 [0.52273283 0.42754102 0.02541913]
 [0.89204656 0.63113863 0.7948113 ]
 [0.32320293 0.51879062 0.70301896]
 [0.08413996 0.16162871 0.89855419]] 
 [[0.945062  ]
 [1.22165584]
 [1.11608009]
 [1.09166539]
 [0.92068533]
 [0.98610501]
 [1.94967527]
 [1.17573523]
 [1.08740254]]
train batch #1, epoch #0, size 9
...
...
train batch #8, epoch #4, size 8
[[0.16122129 0.92969765 0.80812038]
 [0.11986537 0.33761517 0.9429097 ]
 [0.72821635 0.36778313 0.63230583]
 [0.48945276 0.98565045 0.24205527]
 [0.66252228 0.31171108 0.52006802]
 [0.5107473  0.417411   0.22210781]
 [0.18657006 0.892559   0.53934224]
 [0.94888554 0.96563203 0.80839735]] 
 [[1.25532762]
 [1.096222  ]
 [1.14348671]
 [1.10976758]
 [1.14947133]
 [1.14091336]
 [1.03604149]
 [2.5680029 ]]



댓글 없음:

댓글 쓰기