2022년 7월 23일 토요일

Machine Learning(머신러닝) 기초, Pytorch train 기본


문제

자연계에 알 수 없는 3가지 정도의 값(현상)(x)이 주어집니다. 그 값이 주어질때 공교롭게도 결과(y)값이 나타납니다.

시간이 흘러 사람들은 x가 주어질때 y값이 궁금해지기 시작했습니다.... 어떻게 알 수 있을까요? 여기에서 대수학자가 나타납니다. "x와 y는 관계가 있는 값이다. 그렇지 않다면 x가 아닐지니..."

그렇습니다. 사실 자연계에는 x보다 더 많은 변수들이 존재했지만 y에 영향을 미치는 변수 x1,x2,x3 3개만을 이용해서 지금까지 y 변수와의 관계를 곰곰히 생각했습니다.

과거의 데이터를 x1,x2,x3 를 이용해서 y 값이 어떻게 될지 예측해보는 것이 바로 회귀 분석 입니다. 그런데 도저히 어떤 수식으로 표현하면 될지 모르는 경우 ML이 빛을 발합니다.

어떨때 ML을 사용 해야 할지 대충 감이 잡히나요?

현재는 데이터 수집해서 비슷한 데이터를 만드는 GAN 영역까지 있긴 하지만, 기본적으로 기존데이터(x)를 이용(학습)해 출럭(y)값을 예측하는 과정입니다.

(x가 시계열 데이터인 경우 또 다른 방식으로 접근하긴 하지만 여기에서는 다루지 않습니다.)


pytorch

python으로 ML을 쉽게 사용할 수 있는 패키지 입니다. tensorflow 보다 좀 더 사용하기 쉽습니다.


x, y 의 생성

data_count=5000 x1,x2,x3,y 의 데이터를 5000개 생성합니다.

numpy의 random.rand 함수를 이용해서 생성합니다.

np.random.rand(m,n) n*m matrix 로 0~1 랜덤 생성 됩니다.

그 다음으로 궁극적으로 알아내야 하는 미지의 함수입니다.

y = 1 + 2 * x_one_d + .1 * np.random.randn(data_count, 1)

뒤쪽 np.random.randn(data_count, 1)은 y값의 변화를 주기 위한 내용입니다.

결국 y = 1 + 2 * x1 * x2 * x3 라고 생각하면 됩니다.

import numpy as np

####################################### 데이터 준비
data_count = 5000
np.random.seed(42)
# np.random.rand(m,n) n*m matrix 로 0~1 랜덤 생성
x = np.random.rand(data_count, 3)
'''
[[0.37454012 0.95071431 0.73199394]
 [0.59865848 0.15601864 0.15599452]
 [0.05808361 0.86617615 0.60111501]
 ...
'''
x_one_d = x[:,0:1] * x[:,1:2] * x[:,2:3]
'''
[[2.60648878e-01]  # <= [0.37454012 * 0.95071431 * 0.73199394]
 [1.45701819e-02]
 [3.02424805e-02]
 ...
'''

# y는 원하는 값 목표값인 label로 표현합니다.
y = 1 + 2 * x_one_d + .1 * np.random.randn(data_count, 1)
'''
[[1.52585494]
 [0.96398033]
 [1.27487937]
 ...
'''
# 저장전 확인
print(x.shape,type(x),x[0])
print(y.shape,type(y),y[0])

# 저장
np.savetxt("x_data.csv", x, delimiter=',')
np.savetxt("y_data.csv", y, delimiter=',')

# 저장된것 확인
x_data = np.loadtxt("x_data.csv", delimiter=',', skiprows=0, max_rows=1)
print(x_data.shape,type(x_data),x_data)
if x_data.shape==():
    x_data = np.array([x_data]) 
print(x_data.shape,type(x_data),x_data)
y_data = np.loadtxt("y_data.csv", delimiter=',', skiprows=0, max_rows=1)
print(y_data.shape,type(y_data),y_data)
if y_data.shape==():
    y_data = np.array([y_data]) 
print(y_data.shape,type(y_data),y_data)

실행하고 나면 x_data.csv, y_data.csv 파일이 생성됩니다.


file dataloader

파일에서 데이터 로딩하는 부분은 이전 포스팅을 참고 하시기 바랍니다.

https://swlock.blogspot.com/2022/07/pytorch-dataloader.html


pytorch Learning basic

여기에서는 아주 간단한 Linear Model (모델/네트워크이란 ML에서 알고리즘 같은것으로 생각하면 됩니다. 입력과 출력간의 관계(?) 정도만 생각하면 됩니다. 여러 개를 구조를 연결해서 Layer를 만들기도 하는데 여기에서는 2개의 Linear model 연결해서 만들었습니다. 좀 더 여러가지 Network에 대한 정보를 원하시면 다음 링크를 참고하시기 바랍니다.  https://pytorch.org/docs/stable/nn.html ) 두 개를 연결해서 만들었습니다.


전체 코드

python 3.8 환경, GPU 있는 조건과 없는 조건에서도 정상 동작하는 코드입니다.

import torch
import torch.optim as optim
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
import matplotlib.pyplot as plt

batch_size = 100
lr = 1e-1
n_epochs = 5
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_data_ratio = 0.8

class CustomDataset(Dataset):
    def __init__(self, x_tensor_filename, y_tensor_filename):
        self.x_fn = x_tensor_filename
        self.y_fn = y_tensor_filename
        with open(y_tensor_filename, 'r') as fp:
            for count, line in enumerate(fp):
                pass
        self.total_len = count + 1
    def __getitem__(self, index):
        """
        주어진 인덱스 index 에 해당하는 샘플을 데이터셋에서 불러오고 반환합니다.
        """
        x_data = np.loadtxt(self.x_fn, delimiter=',', dtype=float, skiprows=index, max_rows=1)
        # [0.37454012 0.95071431 0.73199394]
        if x_data.shape==():
            x_data = np.array([x_data]) 
        y_data = np.loadtxt(self.y_fn, delimiter=',', dtype=float, skiprows=index, max_rows=1)
        # 1.5258549401766552
        if y_data.shape==():
            y_data = np.array([y_data])
        # [1.5258549401766552]
        return (x_data, y_data)
    def __len__(self):
        """
        데이터셋의 샘플 개수를 반환합니다.
        """
        return self.total_len
        
dataset = CustomDataset("x_data.csv", "y_data.csv")
train_dataset_cnt = int(len(dataset)*train_data_ratio)
val_dataset_cnt = len(dataset) - train_dataset_cnt
train_dataset, val_dataset = random_split(dataset, [train_dataset_cnt, val_dataset_cnt])

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size)
val_loader   = DataLoader(dataset=val_dataset, batch_size=batch_size) 


class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.main = nn.Sequential(
            nn.Linear(3, 6),
            nn.Linear(6, 1)
        )
                
    def forward(self, x):
        return self.main(x)


def make_train_step(model, loss_fn, optimizer):
    # Builds function that performs a step in the train loop
    def train_step(x, y):
        # Sets model to TRAIN mode
        model.train()
        # Makes predictions
        yhat = model(x)
        # Computes loss
        loss = loss_fn(yhat, y)
        # zeroes gradients
        optimizer.zero_grad()
        # Computes gradients
        loss.backward()
        # Updates parameters
        optimizer.step()
        # Returns the loss
        return loss.item()
    
    # Returns the function that will be called inside the train loop
    return train_step

model = CustomModel().to(device)
loss_fn = nn.MSELoss(reduction='mean')
print(model.parameters())
optimizer = optim.SGD(model.parameters(), lr=lr)

# Creates the train_step function for our model, loss function and optimizer
train_step = make_train_step(model, loss_fn, optimizer)
losses = []
val_losses = []

for epoch in range(n_epochs):
    print(f"epoch ##{epoch}")
    for x_batch, y_batch in train_loader:
        x_batch = x_batch.float()
        y_batch = y_batch.float()
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        loss = train_step(x_batch, y_batch)
        losses.append(loss)
    
    with torch.no_grad():
        for x_val, y_val in val_loader:
            x_val = x_val.float()
            y_val = y_val.float()
            x_val = x_val.to(device)
            y_val = y_val.to(device)
            
            model.eval()

            yhat = model(x_val)
            val_loss = loss_fn(y_val, yhat)
            val_losses.append(val_loss.item())

print(model.state_dict())

fig, ax = plt.subplots()
ax.plot(losses)
ax.plot(val_losses)
plt.savefig("losses.png")

# test
model.eval()
x_prd = np.array([0.1, 0.2, 0.1]) # y = 1 + 2*0.1*0.2*0.1
print("expect:",1 + 2*0.1*0.2*0.1)
x_prd = torch.from_numpy(x_prd)
x_prd = x_prd.float()
x_prd = x_prd.to(device)
print(model(x_prd))
# y는 원하는 값 목표값인 label로 표현합니다.
# 아래 방정식은 모르는 상태에서 학습을 통해 알아내는 것이 머신러닝이다.

실행 결과

<generator object Module.parameters at 0x00000267BEE0AF90>
epoch ##0
epoch ##1
epoch ##2
epoch ##3
epoch ##4
OrderedDict([('main.0.weight', tensor([[ 0.5161,  0.1297,  0.5265],
        [ 0.3741,  0.0422, -0.3313],
        [ 0.3264,  0.1173,  0.1090],
        [-0.4400, -0.0018, -0.1323],
        [ 0.0353, -0.2519, -0.4348],
        [-0.0822,  0.5510, -0.1309]])), ('main.0.bias', tensor([ 0.5970,  0.4329, -0.1158,  0.3576, -0.3454,  0.1671])), ('main.1.weight', tensor([[ 0.7018,  0.0288,  0.3396, -0.0615, -0.3310,  0.4449]])), ('main.1.bias', tensor([-0.0457]))])
expect: 1.004
tensor([0.7006], grad_fn=<AddBackward0>)


코드 설명

사용자가 조절해야 하는 parameter 입니다.

batch_size = 100
 한번에 처리하는 batch size 입니다.
 메모리에 한번에 올라가는 양이됩니다. 
 훈련값 갱신도 batch 크기 단위로 이루어 집니다.
lr = 1e-1
 learning rate(훈련 비율) 이 값이 작으면 훈련하는데 시간이 오래걸립니다.
 반대로 크면 훈련이 정상적으로 되지 않고 진동하게 됩니다.
n_epochs = 5
 epoch 라고 하는데 훈련 데이터를 전체적으로 훈련하는것을 1 epoch라고 합니다.
 수치가 높으면 훈련을 여러번하게 되는데 여러번해도 효율이 좋아지지 않는다면 수치를 낮추면 됩니다.
 즉 100으로 하나 5로 하나 차이가 없다면 5로 하면 됩니다.
train_data_ratio = 0.8
 이건 data셋을 훈련용과 test용을 나누는 비율입니다.
 0.8은 data셋 중 80%가 훈련용으로 사용하게 됩니다.
device = 'cuda' if torch.cuda.is_available() else 'cpu'
 이 값은 GPU를 사용하지 CPU를 사용할지 나타내는 변수 입니다.

아래 class는 파일로부터 데이터를 읽어오는 코드로 file dataloader아래 링크에서 좀 더 자세한 정보를 확인하시면 됩니다.

class CustomDataset(Dataset):

https://swlock.blogspot.com/2022/07/pytorch-dataloader.html

아래에 있는 다음 코드들도 위 링크 설명 참고하면 됩니다.

dataset = CustomDataset("x_data.csv", "y_data.csv")
train_dataset_cnt = int(len(dataset)*train_data_ratio)
val_dataset_cnt = len(dataset) - train_dataset_cnt
train_dataset, val_dataset = random_split(dataset, [train_dataset_cnt, val_dataset_cnt])

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size)
val_loader   = DataLoader(dataset=val_dataset, batch_size=batch_size) 

데이터set을 train과 test 용으로 구분을 하기 위한 코드입니다.


모델

앞서 설명한 모델에 대한 구성을 정의하는 곳입니다. 다양한 nn 모델에 대한 정보는 https://pytorch.org/docs/stable/nn.html 링크를 참고하면 됩니다.

간단한 설명으로는 nn.Module을 상속 받아 __init__, forward 함수를 구현합니다.

nn.Sequential() 함수는 여러개의 네트워크들을 연결해서 순차적으로 실행하게 됩니다.

class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.main = nn.Sequential(
            nn.Linear(3, 6),
            nn.Linear(6, 1)
        )
                
    def forward(self, x):
        return self.main(x)

forward() 에 있는 인자 x 와 Linear(3,6) 등에 대해서 좀 더 자세한 설명이 필요합니다.
3, 6은 각각 입력 feature 수와 출력 feature 수입니다.
조금만 생각해보면 x1,x2,x3 이 있기 때문에 입력은 3개가 됩니다. 그런데 6은 제가 임으로 넣은 수치입니다. 여기에서 2단 linear layer의 hidden 입력이 됩니다.
그림으로 그려 보면 아래와 같습니다.

모델을 훈련 시키기

모델을 훈련 시킨다는 의미는 모델의 network에서 가중치(위 그림에서 동그라미와 동그라미사이의 가중치)를 결정짓는 과정이 됩니다.

pytorch에서는 몇 가지 알아둬야 할 부분이 있는데 model.train() 으로 train을 하겠다고 설정해야합니다. 만약 train 이 아니라면 model.eval() 평가(테스트)모드로 세팅을 해줍니다.

그리고 나서 모델에 x값을 넣고 y값을 알아냅니다. 그 부분이 바로 아래 코드입니다.

        # Sets model to TRAIN mode
        model.train()
        # Makes predictions
        yhat = model(x)

그리고 우리가 생각하는 목표 y와 모델을 통과한 y값과의 차이의 평균을 구해서 얼마나 차이가 나는지 확인을 해야하는데 이것을 loss 함수라고 합니다. 당연한 얘기인데 loss가 작아지는 방향(목표 값과 차이가 작아지는 방향)으로 훈련을 하는것이 목표입니다.

여기에서는 MSELoss 함수를 이용했습니다.

loss_fn = nn.MSELoss(reduction='mean')

실제 동작은 아래와 같이 y, yhat 을 넣어서 차이를 계산합니다.

        # Computes loss
        loss = loss_fn(y, yhat)

그런 다음 model을 결국 가중치 값을 적용 해야하는데 loss가 작아지는 방향으로 업데이트 하는 과정은 아래 코드로 이루어집니다. 

        # zeroes gradients
        optimizer.zero_grad()
        # Computes gradients
        loss.backward()
        # Updates parameters
        optimizer.step()

여기에서도 optimizer 라고 하는 부분이 있는데 아래와 같이 미리 정의하며 SGD라는 함수를 사용했습니다. 이 부분도 다양한 함수가 준비되어 있습니다.

optimizer = optim.SGD(model.parameters(), lr=lr)

앞서 설명한 모든 과정을 표기해보면 아래와 같은 코드가 완성 됩니다.

def make_train_step(model, loss_fn, optimizer):
    # Builds function that performs a step in the train loop
    def train_step(x, y):
        # Sets model to TRAIN mode
        model.train()
        # Makes predictions
        yhat = model(x)
        # Computes loss
        loss = loss_fn(yhat, y)
        # zeroes gradients
        optimizer.zero_grad()
        # Computes gradients
        loss.backward()
        # Updates parameters
        optimizer.step()
        # Returns the loss
        return loss.item()
    
    # Returns the function that will be called inside the train loop
    return train_step

model = CustomModel().to(device)
loss_fn = nn.MSELoss(reduction='mean')
print(model.parameters())
optimizer = optim.SGD(model.parameters(), lr=lr)

# Creates the train_step function for our model, loss function and optimizer
train_step = make_train_step(model, loss_fn, optimizer)


훈련과 평가 루프

실제 훈련과 평가의 상위 루프는 여기에서 이루어 집니다. losses, val_losses들은 그래프를 그려보기 위해 따로 저장하고 실제 훈련은 아래 함수가 핵심이 됩니다.

loss = train_step(x_batch, y_batch)

한번 훈련한 뒤 평가가 곧장 이루어지며 아래와 같이 model(x)값을 넣으면 y값이 나오게 됩니다. 다만 훈련이 아닐때는 미리 model.eval() 평가 모드로 전환을 합니다.

            yhat = model(x_val)
            val_loss = loss_fn(y_val, yhat)

아래 부분이 전체 코드입니다.

for epoch in range(n_epochs):
    print(f"epoch ##{epoch}")
    for x_batch, y_batch in train_loader:
        x_batch = x_batch.float()
        y_batch = y_batch.float()
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        loss = train_step(x_batch, y_batch)
        losses.append(loss)
    
    with torch.no_grad():
        for x_val, y_val in val_loader:
            x_val = x_val.float()
            y_val = y_val.float()
            x_val = x_val.to(device)
            y_val = y_val.to(device)
            
            model.eval()

            yhat = model(x_val)
            val_loss = loss_fn(y_val, yhat)
            val_losses.append(val_loss.item())


그래프 및 평가

파일로 저장해 봤습니다.

fig, ax = plt.subplots()
ax.plot(losses)
ax.plot(val_losses)
plt.savefig("losses.png")

그래프를 보면서 lr이 적절한지 epoch이 적절한지 data가 적절한지 살펴봅니다.


파란색이 train의 loss 함수인데 그래프를 보면 수치가 갑자기 작아집니다. 아무래도 lr이 좀 크다고 볼 수 있고 epoch 도 크다고 볼 수 있습니다. 즉 훈련을 해도 loss가 작아지지 않는 다는 의미 입니다.

실제 목표 값?

이제 값을 예측할 때가 되었습니다. 즉 3개의 변수 x1=0.1,x2=0.2,x3=0.1 이 주어 질 때 y는 어떻게 될까요? 당연히 계산 공식은 주어지지 않습니다. 훈련된 모델에 해당 값을 넣어서 y값을 알아 내야 합니다.

model.eval()
x_prd = np.array([0.1, 0.2, 0.1]) # y = 1 + 2*0.1*0.2*0.1
print("expect:",1 + 2*0.1*0.2*0.1)
x_prd = torch.from_numpy(x_prd)
x_prd = x_prd.float()
x_prd = x_prd.to(device)
print(model(x_prd))

model.eval()로 설정하고 model(x) x에 입력 데이터를 넘겨 주면 됩니다. 결과를 한번 보겠습니다. 위에 결과를 보시면 아래와 같은 데이터가 나왔습니다.

expect: 1.004
tensor([0.7006], grad_fn=<AddBackward0>)

0.7이라... 터무니 없는 값은 아니지만 그렇다고 썩 마음에 드는 수치는 아닙니다.

이렇게 나오는 이유는 입력 데이터에 대한 데이터들이 많지 않다고 볼 수 있습니다. 즉 입력으로 넣은 0.1,0.2,0.1 이라는 데이터가 우리가 훈련은 거의 하지 않았던 영역이라고 볼 수 있는것 입니다.

ML에 중요한건 데이터와 Feature(앞에서 말한 x1,x2,x3등등)라고 할 수 있습니다.

여기에서는 가장 기본적인 ML의 컨셉을 pytorch로 구현해 봤습니다. 전문적인 지식을 skip 한부분도 있는데 사실 그런 세부적인 내용을 몰라도 된다고 생각합니다.

즉 random 함수를 호출하는데 random 내부가 어떻게 만들어졌는지 알 필요는 없다고 생각되니까요.

훈련하는데 많은 시간을 필요로 합니다. 여기에서는 데이터를 5000개 와 3개의 feature로만 훈련을 했는데... 결과를 뽑고자 매번 해서 결과를 가져올 수 는 없습니다. 

훈련한 데이터를 저장하고 훈련된 데이터로 데이터를 가져와서 결과를 뽑아내는 부분에 대해서 다룹니다.


훈련 데이터 저장하기

python의 pickle 모듈을 이용합니다. pickle 는 아래 내용 참고 합니다.

https://swlock.blogspot.com/2022/06/python-pickle-python.html

훈련이 끝난뒤 model.pickle로 class를 저장 합니다.

with open("model.pickle", "wb") as f:
    pickle.dump(model, f)

그리고 필요할 때 읽어내면 됩니다.

pytorch에서도 데이터를 저장할 수 있습니다.

torch.save(model.state_dict(), "model.pt")


읽어내는 코드

class는 정상적으로 존재해야 합니다. 그래서 CustomModel class는 그대로 복사해 왔습니다. 만약 달라진다면 그 부분도 같이 변경해야 합니다.

import torch
import torch.nn as nn
import numpy as np
import pickle

device = 'cuda' if torch.cuda.is_available() else 'cpu'


class CustomModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.main = nn.Sequential(
            nn.Linear(3, 6),
            nn.Linear(6, 1)
        )

    def forward(self, x):
        return self.main(x)


with open("model.pickle", "rb") as f:
    model = pickle.load(f)

print(model.state_dict())

# test
model.eval()
x_prd = np.array([0.1, 0.2, 0.1])  # y = 1 + 2*0.1*0.2*0.1
print("expect:", 1 + 2 * 0.1 * 0.2 * 0.1)
x_prd = torch.from_numpy(x_prd)
x_prd = x_prd.float()
x_prd = x_prd.to(device)
print(model(x_prd))

결과를 보면 0.77로 이전과 결과는 같음을 알 수 있습니다.

OrderedDict([('main.0.weight', tensor([[ 0.4500,  0.3919,  0.0700],
        [-0.3895,  0.3350,  0.6002],
        [-0.0274,  0.4683,  0.5659],
        [ 0.5995,  0.3789,  0.0616],
        [ 0.6352,  0.2591,  0.1865],
        [ 0.0891, -0.1224, -0.2626]])), ('main.0.bias', tensor([ 0.4082,  0.1782, -0.0710,  0.2986,  0.0352, -0.4244])), ('main.1.weight', tensor([[ 0.2669,  0.2842,  0.2328,  0.2258,  0.4773, -0.0648]])), ('main.1.bias', tensor([0.3285]))])
expect: 1.004
tensor([0.7761], grad_fn=<AddBackward0>)


pickle를 사용하지 않았다면 아래와 같이 하면됩니다.

model.load_state_dict(torch.load("model.pt"))


전체 소스

sourcecode/pytorch/02_basic_gen_train_test at main · donarts/sourcecode · GitHub


Pytorch 링크

pytorch Dataset과 DataLoader

pytorch 파일 기반(file based) DataLoader

Machine Learning(머신러닝) 기초, Pytorch train 기본

Machine Learning(머신러닝) 기초, Pytorch 영상 분류, 영상 불량 검출, 개 고양이 분류 (image classification)



댓글 없음:

댓글 쓰기