레이블이 tensorflow인 게시물을 표시합니다. 모든 게시물 표시
레이블이 tensorflow인 게시물을 표시합니다. 모든 게시물 표시

2022년 8월 6일 토요일

tensorflow keras / 영상 분류, 영상 불량 검출, 개 고양이 분류 image classification


들어가는 글

Pytorch보다는 Keras가 대중적으로 많이 사용하고 있어서 Pytorch 아래 예제를 Keras 를 이용해서 변환하는 작업을 진행하였습니다. 

https://swlock.blogspot.com/2022/07/machine-learning-pytorch.html

작업을 하면서 Keras에는 이미 ImageDataGenerator 가 따로 있는데 이것을 이용하지 않고 CustomDataLoader를 만들어 사용하다 보니 막히는 곳이 많아서 구현에 어려움이 있었습니다.

(tf.keras.preprocessing.image 모듈의 ImageDataGenerator 클래스가 준비되어 있으며 예제가 많으니 쉽게 응용이 가능할 겁니다. 여기서는 해당 클래스를 사용하지 않았습니다.)

여기에서 하는 동작은 개 이미지와 고양이 이미지를 미리 훈련해서 주어지는 이미지를 보고 고양이인지 개인지 맞추는 것입니다. 즉 불량과 양품 이미지를 학습하고 결과를 출력하는 것과 비슷하다고 생각 할 수 있습니다.

참고로 여기에서는 Regression에서 많이 사용하는 MSE loss함수를 사용하였습니다. loss 함수가 개 고양이 분류하는데 적절하지 않을 수는 있긴 하지만 그건 읽는 분 몫으로 남겨둡니다. https://keras.io/api/losses/


개 고양이 분류

캐글에서 있던 부분이고 좀 더 자세한 내용은 아래 링크를 참고 하시면 됩니다.

https://www.kaggle.com/competitions/dogs-vs-cats

여기에서 사용된 자료는 아래 링크로부터 받았습니다.

https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip

위 링크에서 직접 받을 필요는 없고 다음 python 스크립트로 자동으로 받아옵니다.


훈련 데이터 이미지 가져오기

zip파일을 다운로드 받고 폴더에 압축을 풉니다. 실행시켜도 폴더가 존재하거나 하면 다시 다운로드 하지 않습니다.

from requests import get
from os.path import exists
import zipfile


def download(url, file_name=None):
	if not file_name:
		file_name = url.split('/')[-1]

	with open(file_name, "wb") as file:
		response = get(url)
		file.write(response.content)


if __name__ == '__main__':
	zip_folder = r"cats_and_dogs_filtered"
	url = "https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip"

	if not exists(zip_folder+".zip"):
		print("download data")
		download(url)

	if not exists(zip_folder):
		print("unzip")
		zip_ref = zipfile.ZipFile(zip_folder+".zip", 'r')
		zip_ref.extractall()
		zip_ref.close()


배치(batch)와 데이터 로딩

데이터를 읽어오는 부분은 이전 포스팅을 조금 참조 부탁드립니다.

배치란 물건을 놓는것을 말하는건 아니고 일괄 작업의 단위를 말하는 것입니다.

이미지는 아래 폴더에 존재합니다.

        cats_and_dogs_filtered/train/cats
        cats_and_dogs_filtered/train/dogs
        cats_and_dogs_filtered/validation/cats
        cats_and_dogs_filtered/validation/dogs

훈련이나 검증이냐에 따라서 base_path를 지정해서 class를 호출해 줄겁니다.

즉 cats_and_dogs_filtered/train , cats_and_dogs_filtered/validation 여기까지는 호출하는 쪽에서 경로를 알주고 cats, dogs라는 경로를 붙여 파일 목록을 구해냅니다.

데이터를 넘겨야 하는 __getitem__ 에서는 cats+dogs 이미지가 포함되어 있으므로 하위순위에서는 cats 이미지를 상위 숫자에서는 dogs 이미지를 넘기도록 합니다. 목표값도 개는 0,1을 고양이는 1,0을 리턴 하도록 구현 하였습니다.

def __getitem__(self, index):
        if index >= len(self.cat_fnames):
            # dog
        else:
            # cat

transform 처리가 있는데, 학습을 위해서 이미지 그대로 학습을 할 수 없습니다. 이미지를 Tensor로 변환하는 작업도 해야하고 또 다른 중요한 이유는 학습 데이터를 늘리기 위해 이미지에 변형을 하는 작업을 합니다.

아래 부분이 transform 부분입니다. 이미지를 array로 변환하고 255로 나누어 주는 부분이 있는데 

        if self.transform:
            image = tf.keras.preprocessing.image.img_to_array(image)
            image = image.astype('float32')
            image /= 255.0
            image = self.transform(image)

이 부분을 설명하자면 이미지는 일반적으로 R,G,B 3개의 채널을 가지고 있고 하나의 채널에 대해서 0~255 까지 1byte의 숫자로 표시됩니다. 따라서 1개의 채널에 대해서 255로 나누어 주면 값 범위가 0~1 사이의 숫자로 표현됩니다. 학습에 있어서 도움이 되기 때문에 이렇게 구현하였습니다. 

아래는 인자로 넘겨준 부분에 의해서 이미지 변형이 일어나는 구간입니다.

image = self.transform(image)

호출 쪽에서는 다음과 같은 형태로 호출하게 되는데

    train_dataset = CustomDataset("cats_and_dogs_filtered/train", transform=train_data_augmentation)
    val_dataset = CustomDataset("cats_and_dogs_filtered/validation", transform=val_processing)

이 부분은 tf.keras.Sequential 을 이용해서 구현하였습니다. 

val_processing 부분과 train_data_augmentation 두개로 분리해서 검증용 일때는 random으로 이미지 변형이 되지 않도록 하였습니다.

val_processing = tf.keras.Sequential([
    layers.Resizing(IMG_SIZE, IMG_SIZE),
    layers.CenterCrop(IMG_SIZE, IMG_SIZE),
    layers.Normalization(mean=[0.485, 0.456, 0.406], variance=[0.229, 0.224, 0.225]),
])

train_data_augmentation = tf.keras.Sequential([
    layers.RandomFlip("horizontal_and_vertical"),
    val_processing,
])


다음은 CustomDataset 의 전체 소스입니다.

class CustomDataset:
    def __init__(self, base_path, transform=None, target_transform=None, base_idx=None, cnt=None, file_name=None):
        self.file_name = None
        self.base_idx = base_idx
        self.cnt = cnt
        self.base_dir = base_path
        self.transform = transform
        self.cat_fnames = []
        self.dog_fnames = []
        self.target_transform = target_transform
        # 훈련에 사용되는 고양이/개 이미지 경로
        self.cats_dir = os.path.join(self.base_dir, 'cats')
        self.dogs_dir = os.path.join(self.base_dir, 'dogs')
        if file_name is not None:
            self.file_name = file_name
            self.total_len = 1
            return
        '''
        cats_and_dogs_filtered/train/cats
        cats_and_dogs_filtered/train/dogs
        cats_and_dogs_filtered/validation/cats
        cats_and_dogs_filtered/validation/dogs
        '''
        # os.listdir() 경로 내에 있는 파일의 이름을 리스트의 형태로 반환합니다.
        # [...'cat.102.jpg', 'cat.103.jpg', .... ]
        self.cat_fnames = os.listdir(self.cats_dir)
        self.dog_fnames = os.listdir(self.dogs_dir)
        if self.base_idx is None:
            self.total_len = len(self.cat_fnames) + len(self.dog_fnames)
        else:
            self.total_len = cnt

        print(f"total images:{self.total_len}")

    def __getitem__(self, index):
        """
        주어진 인덱스 index 에 해당하는 샘플을 데이터셋에서 불러오고 반환합니다.
        cat[xxx]
        cat[xxx]
        dog[xxx]
        dog[xxx]
        """
        if self.base_idx is not None:
            index = index + self.base_idx

        if self.file_name is not None:
            y_data = np.array([0.0, 1.0])
            image = Image.open(self.file_name)
        elif index >= len(self.cat_fnames):
            # dog
            x_data = self.dog_fnames[index - len(self.cat_fnames)]
            y_data = np.array([0.0, 1.0])
            img_path = os.path.join(self.dogs_dir, x_data)
            image = Image.open(img_path)
            # plt.imshow(image)
            # print(y_data)
            # plt.show()
        else:
            # cat
            x_data = self.cat_fnames[index]
            y_data = np.array([1.0, 0.0])
            img_path = os.path.join(self.cats_dir, x_data)
            image = Image.open(img_path)
            # plt.imshow(image)
            # print(y_data)
            # plt.show()

        if self.transform:
            image = tf.keras.preprocessing.image.img_to_array(image)
            image = image.astype('float32')
            image /= 255.0
            image = self.transform(image)
            # plt.imshow(image)
            # plt.show()

        if self.target_transform:
            y_data = self.target_transform(y_data)
        return image, y_data

    def __len__(self):
        """
        데이터셋의 샘플 개수를 반환합니다.
        """
        return self.total_len


class CustomDataloader(Sequence):
    def __init__(self, _dataset, batch_size=1, shuffle=False):
        self.dataset = _dataset
        self.batch_size = batch_size
        self.total_len = math.ceil(len(self.dataset) / self.batch_size)
        self.shuffle = shuffle
        self.indexer = np.arange(len(self.dataset))
        self.on_epoch_end()

    def __getitem__(self, index):
        indexer = self.indexer[index * self.batch_size:(index + 1) * self.batch_size]
        batch_x = [self.dataset[i][0] for i in indexer]
        batch_y = [self.dataset[i][1] for i in indexer]
        batch_x = np.array(batch_x)
        batch_y = np.array(batch_y)
        return batch_x, batch_y

    def __len__(self):
        return self.total_len

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexer)


이미지 모델

이미지 모델은 keras.Sequential 을 이용해서 만들었습니다. 

Conv2D이 핵심이 되며 구조는 Pytorch때와 동일하게 구현하였습니다.

    # https://keras.io/api/layers/
    model = keras.Sequential()
    model.add(keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3)))
    model.add(layers.Conv2D(16, kernel_size=3, activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(8, 8)))
    model.add(layers.Flatten())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dense(2, activation='relu'))
    model.summary()

이미지 처리에 있어서 Conv2D 이걸 빼면 학습이 잘 되지 않습니다.


LOSS함수와 optim

Loss함수도 많긴 하지만 클래스 분류에는 일반적으로 BCELoss 나 CrossEntropyLoss 함수를 많이 사용하는데 꼭 해당 함수를 사용해야 하는것은 아닙니다. 여기에서는 이전 기초에서 했던것도 있어서 단순하게 출력단을 2가지 0,1 을 목표값으로 설정해서 진행했습니다. 

    # https://keras.io/api/optimizers/
    # https://keras.io/api/losses/
    INPUT_OPTIMIZER = tf.keras.optimizers.SGD(learning_rate=lr)
    LOSS = tf.keras.losses.MeanSquaredError(reduction="auto", name="mean_squared_error")
    model.compile(optimizer=INPUT_OPTIMIZER, loss=LOSS)
    model.fit(train_loader, batch_size=batch_size, epochs=n_epochs, validation_data=val_loader)


전체소스

from tensorflow import keras
from tensorflow.keras.utils import Sequence
from tensorflow.keras import layers
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
from PIL import Image
import math

batch_size = 20
lr = 1e-3
n_epochs = 20
IMG_SIZE = 150

# fig = plt.gcf()


class CustomDataset:
    def __init__(self, base_path, transform=None, target_transform=None, base_idx=None, cnt=None, file_name=None):
        self.file_name = None
        self.base_idx = base_idx
        self.cnt = cnt
        self.base_dir = base_path
        self.transform = transform
        self.cat_fnames = []
        self.dog_fnames = []
        self.target_transform = target_transform
        # 훈련에 사용되는 고양이/개 이미지 경로
        self.cats_dir = os.path.join(self.base_dir, 'cats')
        self.dogs_dir = os.path.join(self.base_dir, 'dogs')
        if file_name is not None:
            self.file_name = file_name
            self.total_len = 1
            return
        '''
        cats_and_dogs_filtered/train/cats
        cats_and_dogs_filtered/train/dogs
        cats_and_dogs_filtered/validation/cats
        cats_and_dogs_filtered/validation/dogs
        '''
        # os.listdir() 경로 내에 있는 파일의 이름을 리스트의 형태로 반환합니다.
        # [...'cat.102.jpg', 'cat.103.jpg', .... ]
        self.cat_fnames = os.listdir(self.cats_dir)
        self.dog_fnames = os.listdir(self.dogs_dir)
        if self.base_idx is None:
            self.total_len = len(self.cat_fnames) + len(self.dog_fnames)
        else:
            self.total_len = cnt

        print(f"total images:{self.total_len}")

    def __getitem__(self, index):
        """
        주어진 인덱스 index 에 해당하는 샘플을 데이터셋에서 불러오고 반환합니다.
        cat[xxx]
        cat[xxx]
        dog[xxx]
        dog[xxx]
        """
        if self.base_idx is not None:
            index = index + self.base_idx

        if self.file_name is not None:
            y_data = np.array([0.0, 1.0])
            image = Image.open(self.file_name)
        elif index >= len(self.cat_fnames):
            # dog
            x_data = self.dog_fnames[index - len(self.cat_fnames)]
            y_data = np.array([0.0, 1.0])
            img_path = os.path.join(self.dogs_dir, x_data)
            image = Image.open(img_path)
            # plt.imshow(image)
            # print(y_data)
            # plt.show()
        else:
            # cat
            x_data = self.cat_fnames[index]
            y_data = np.array([1.0, 0.0])
            img_path = os.path.join(self.cats_dir, x_data)
            image = Image.open(img_path)
            # plt.imshow(image)
            # print(y_data)
            # plt.show()

        if self.transform:
            image = tf.keras.preprocessing.image.img_to_array(image)
            image = image.astype('float32')
            image /= 255.0
            image = self.transform(image)
            # plt.imshow(image)
            # plt.show()

        if self.target_transform:
            y_data = self.target_transform(y_data)
        return image, y_data

    def __len__(self):
        """
        데이터셋의 샘플 개수를 반환합니다.
        """
        return self.total_len


class CustomDataloader(Sequence):
    def __init__(self, _dataset, batch_size=1, shuffle=False):
        self.dataset = _dataset
        self.batch_size = batch_size
        self.total_len = math.ceil(len(self.dataset) / self.batch_size)
        self.shuffle = shuffle
        self.indexer = np.arange(len(self.dataset))
        self.on_epoch_end()

    def __getitem__(self, index):
        indexer = self.indexer[index * self.batch_size:(index + 1) * self.batch_size]
        batch_x = [self.dataset[i][0] for i in indexer]
        batch_y = [self.dataset[i][1] for i in indexer]
        batch_x = np.array(batch_x)
        batch_y = np.array(batch_y)
        return batch_x, batch_y

    def __len__(self):
        return self.total_len

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexer)


"""
# https://keras.io/api/layers/preprocessing_layers/image_augmentation/
https://www.tensorflow.org/tutorials/images/data_augmentation?hl=ko
참고: data_augmentation 데이터 증강은 테스트할 때 비활성화되므로 입력 이미지는 model.fit(model.evaluate 또는 model.predict가 아님) 호출 중에만 증강됩니다.
RandomCrop layer
RandomFlip layer
RandomTranslation layer
RandomRotation layer
RandomZoom layer
RandomHeight layer
RandomWidth layer
RandomContrast layer
RandomBrightness layer
"""


val_processing = tf.keras.Sequential([
    layers.Resizing(IMG_SIZE, IMG_SIZE),
    layers.CenterCrop(IMG_SIZE, IMG_SIZE),
    layers.Normalization(mean=[0.485, 0.456, 0.406], variance=[0.229, 0.224, 0.225]),
])

train_data_augmentation = tf.keras.Sequential([
    layers.RandomFlip("horizontal_and_vertical"),
    val_processing,
])

if __name__ == '__main__':
    train_dataset = CustomDataset("cats_and_dogs_filtered/train", transform=train_data_augmentation)
    val_dataset = CustomDataset("cats_and_dogs_filtered/validation", transform=val_processing)

    train_loader = CustomDataloader(_dataset=train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = CustomDataloader(_dataset=val_dataset, batch_size=batch_size, shuffle=True)


    # https://keras.io/api/layers/
    model = keras.Sequential()
    model.add(keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3)))
    model.add(layers.Conv2D(16, kernel_size=3, activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(8, 8)))
    model.add(layers.Flatten())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dense(2, activation='relu'))
    model.summary()

    # https://keras.io/api/optimizers/
    # https://keras.io/api/losses/
    INPUT_OPTIMIZER = tf.keras.optimizers.SGD(learning_rate=lr)
    LOSS = tf.keras.losses.MeanSquaredError(reduction="auto", name="mean_squared_error")
    model.compile(optimizer=INPUT_OPTIMIZER, loss=LOSS)
    model.fit(train_loader, batch_size=batch_size, epochs=n_epochs, validation_data=val_loader)
    model.save("model.keras")


훈련 결과

total images:2000
total images:1000
Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 conv2d (Conv2D)             (None, 148, 148, 16)      448       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 18, 18, 16)       0         
 )                                                               
                                                                 
 flatten (Flatten)           (None, 5184)              0         
                                                                 
 dense (Dense)               (None, 512)               2654720   
                                                                 
 dense_1 (Dense)             (None, 2)                 1026      
                                                                 
=================================================================
Total params: 2,656,194
Trainable params: 2,656,194
Non-trainable params: 0
_________________________________________________________________
Epoch 1/20
100/100 [==============================] - 29s 293ms/step - loss: 0.3131 - val_loss: 0.2775
Epoch 2/20
100/100 [==============================] - 32s 318ms/step - loss: 0.2806 - val_loss: 0.2635
Epoch 3/20
100/100 [==============================] - 61s 607ms/step - loss: 0.2680 - val_loss: 0.2556
Epoch 4/20
100/100 [==============================] - 47s 469ms/step - loss: 0.2581 - val_loss: 0.2543
Epoch 5/20
100/100 [==============================] - 33s 327ms/step - loss: 0.2535 - val_loss: 0.2477
Epoch 6/20
100/100 [==============================] - 32s 324ms/step - loss: 0.2474 - val_loss: 0.2436
Epoch 7/20
100/100 [==============================] - 33s 326ms/step - loss: 0.2450 - val_loss: 0.2413
Epoch 8/20
100/100 [==============================] - 32s 320ms/step - loss: 0.2425 - val_loss: 0.2379
Epoch 9/20
100/100 [==============================] - 32s 319ms/step - loss: 0.2360 - val_loss: 0.2377
Epoch 10/20
100/100 [==============================] - 33s 333ms/step - loss: 0.2353 - val_loss: 0.2386
Epoch 11/20
100/100 [==============================] - 34s 342ms/step - loss: 0.2317 - val_loss: 0.2347
Epoch 12/20
100/100 [==============================] - 32s 320ms/step - loss: 0.2326 - val_loss: 0.2316
Epoch 13/20
100/100 [==============================] - 33s 329ms/step - loss: 0.2298 - val_loss: 0.2310
Epoch 14/20
100/100 [==============================] - 32s 321ms/step - loss: 0.2297 - val_loss: 0.2395
Epoch 15/20
100/100 [==============================] - 32s 320ms/step - loss: 0.2215 - val_loss: 0.2326
Epoch 16/20
100/100 [==============================] - 32s 325ms/step - loss: 0.2241 - val_loss: 0.2275
Epoch 17/20
100/100 [==============================] - 34s 336ms/step - loss: 0.2231 - val_loss: 0.2281
Epoch 18/20
100/100 [==============================] - 32s 323ms/step - loss: 0.2217 - val_loss: 0.2266
Epoch 19/20
100/100 [==============================] - 31s 308ms/step - loss: 0.2207 - val_loss: 0.2265
Epoch 20/20
100/100 [==============================] - 31s 311ms/step - loss: 0.2192 - val_loss: 0.2249


Test

이제는 원하는 이미지를 넣어서 어떤 결과가 나오는지 테스트 해볼 시간입니다.

import tensorflow as tf
import trainer


if __name__ == '__main__':
    model = tf.keras.models.load_model("model.keras")
    model.summary()

    train_dataset = trainer.CustomDataset("cats_and_dogs_filtered/train", transform=trainer.val_processing)
    val_dataset = trainer.CustomDataset("cats_and_dogs_filtered/validation", transform=trainer.val_processing)

    train_loader = trainer.CustomDataloader(_dataset=train_dataset, batch_size=1, shuffle=True)
    val_loader = trainer.CustomDataloader(_dataset=val_dataset, batch_size=1, shuffle=True)

    # test
    test_images = ["cats_and_dogs_filtered/train/cats/cat.1.jpg",
                   "cats_and_dogs_filtered/train/cats/cat.2.jpg",
                   "cats_and_dogs_filtered/train/dogs/dog.1.jpg",
                   "cats_and_dogs_filtered/train/dogs/dog.2.jpg",
                   "cats_and_dogs_filtered/validation/cats/cat.2001.jpg",
                   "cats_and_dogs_filtered/validation/dogs/dog.2001.jpg",
                   "cats_and_dogs_filtered/validation/cats/cat.2004.jpg",
                   "cats_and_dogs_filtered/validation/dogs/dog.2004.jpg",
                   ]

    for test_image in test_images:
        print(test_image)
        dataset = trainer.CustomDataset("cats_and_dogs_filtered/train", transform=trainer.val_processing, file_name=test_image)
        dataloader = trainer.CustomDataloader(_dataset=dataset, batch_size=1, shuffle=False)
        x_prd = dataloader[0][0]  # batch 0 , x
        print(x_prd.shape)
        y = model.predict(x_prd)
        print("result:", y)
        if y[0][0] > y[0][1]:
            print("cat")
        else:
            print("dog")


실행 결과

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 conv2d (Conv2D)             (None, 148, 148, 16)      448       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 18, 18, 16)       0         
 )                                                               
                                                                 
 flatten (Flatten)           (None, 5184)              0         
                                                                 
 dense (Dense)               (None, 512)               2654720   
                                                                 
 dense_1 (Dense)             (None, 2)                 1026      
                                                                 
=================================================================
Total params: 2,656,194
Trainable params: 2,656,194
Non-trainable params: 0
_________________________________________________________________
total images:2000
total images:1000
cats_and_dogs_filtered/train/cats/cat.1.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 60ms/step
result: [[0.48046866 0.43707907]]
cat
cats_and_dogs_filtered/train/cats/cat.2.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 12ms/step
result: [[0.74900883 0.4924001 ]]
cat
cats_and_dogs_filtered/train/dogs/dog.1.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 12ms/step
result: [[0.45707452 0.7523619 ]]
dog
cats_and_dogs_filtered/train/dogs/dog.2.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 12ms/step
result: [[0.15739511 0.7101962 ]]
dog
cats_and_dogs_filtered/validation/cats/cat.2001.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 10ms/step
result: [[0.63321704 0.2585028 ]]
cat
cats_and_dogs_filtered/validation/dogs/dog.2001.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 12ms/step
result: [[0.1274486  0.65480846]]
dog
cats_and_dogs_filtered/validation/cats/cat.2004.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 11ms/step
result: [[0.73557734 0.3226891 ]]
cat
cats_and_dogs_filtered/validation/dogs/dog.2004.jpg
(1, 150, 150, 3)
1/1 [==============================] - 0s 12ms/step
result: [[0.48342577 0.7902105 ]]
dog

cat 사진을 넣었을때 cat이 제대로 나오는지, dog를 넣었을때 dog가 제대로 나오는지 보면됩니다. 위 결과로는 모두 제대로 나오고 있습니다.

사실 Pytorch와 비슷하게 RandomZoom을 넣었었는데... (Pytorch에서는 RandomResize항목이 있었음) 학습이 제대로 안되는 경우가 많아서 해당 부분을 뺐습니다.


sourcecode/keras/02_image_class at main · donarts/sourcecode · GitHub



2022년 8월 5일 금요일

tensorflow keras / train / Regression 회귀

 

이 글은 pytorch 로 구현한 아래 내용을 tensorflow keras 로 구현한 내용입니다.

https://swlock.blogspot.com/2022/07/machine-learning-pytorch-train.html


입력 X 를 3개를 입력받고 출력은 1개인 선형 회귀 입니다.


x, y 의 생성

data_count=5000 x1,x2,x3,y 의 데이터를 5000개 생성합니다.

numpy의 random.rand 함수를 이용해서 생성합니다.

np.random.rand(m,n) n*m matrix 로 0~1 랜덤 생성 됩니다.

그 다음으로 궁극적으로 알아내야 하는 미지의 함수입니다.

y = 1 + 2 * x_one_d + .1 * np.random.randn(data_count, 1)

뒤쪽 np.random.randn(data_count, 1)은 y값의 변화를 주기 위한 내용입니다.

결국 y = 1 + 2 * x1 * x2 * x3 라고 생각하면 됩니다.

import numpy as np

####################################### 데이터 준비
data_count = 5000
np.random.seed(42)
# np.random.rand(m,n) n*m matrix 로 0~1 랜덤 생성
x = np.random.rand(data_count, 3)
'''
[[0.37454012 0.95071431 0.73199394]
 [0.59865848 0.15601864 0.15599452]
 [0.05808361 0.86617615 0.60111501]
 ...
'''
x_one_d = x[:,0:1] * x[:,1:2] * x[:,2:3]
'''
[[2.60648878e-01]  # <= [0.37454012 * 0.95071431 * 0.73199394]
 [1.45701819e-02]
 [3.02424805e-02]
 ...
'''

# y는 원하는 값 목표값인 label로 표현합니다.
y = 1 + 2 * x_one_d + .1 * np.random.randn(data_count, 1)
'''
[[1.52585494]
 [0.96398033]
 [1.27487937]
 ...
'''
# 저장전 확인
print(x.shape,type(x),x[0])
print(y.shape,type(y),y[0])

# 저장
np.savetxt("x_data.csv", x, delimiter=',')
np.savetxt("y_data.csv", y, delimiter=',')

# 저장된것 확인
x_data = np.loadtxt("x_data.csv", delimiter=',', skiprows=0, max_rows=1)
print(x_data.shape,type(x_data),x_data)
if x_data.shape==():
    x_data = np.array([x_data]) 
print(x_data.shape,type(x_data),x_data)
y_data = np.loadtxt("y_data.csv", delimiter=',', skiprows=0, max_rows=1)
print(y_data.shape,type(y_data),y_data)
if y_data.shape==():
    y_data = np.array([y_data]) 
print(y_data.shape,type(y_data),y_data)

실행하고 나면 x_data.csv, y_data.csv 파일이 생성됩니다.


file dataloader

파일에서 데이터 로딩하는 부분은 이전 포스팅을 참고 하시기 바랍니다.

https://swlock.blogspot.com/2022/08/tensorflow-keras-sequence-dataloader.html


tensorflow keras Learning basic

여기에서는 아주 간단한 Linear Model 두 개를 연결해서 만들었습니다.


전체 코드

python 3.8 환경, CPU 환경에서 시험하였습니다.


from tensorflow import keras
from tensorflow.keras.utils import Sequence
from tensorflow.keras import layers
import tensorflow as tf
import numpy as np
import math

batch_size = 100
lr = 1e-1
n_epochs = 5
train_data_ratio = 0.8


class CustomDataset:
    def __init__(self, x_tensor_filename, y_tensor_filename, base_idx=None, cnt=None):
        self.x_fn = x_tensor_filename
        self.y_fn = y_tensor_filename
        self.base_idx = base_idx
        self.cnt = cnt
        if self.base_idx is None:
            with open(y_tensor_filename, 'r') as fp:
                for count, line in enumerate(fp):
                    pass
            self.total_len = count + 1
        else:
            self.total_len = cnt

    def __getitem__(self, index):
        """
        주어진 인덱스 index 에 해당하는 샘플을 데이터셋에서 불러오고 반환합니다.
        """
        if self.base_idx is not None:
            index = index + self.base_idx
        x_data = np.loadtxt(self.x_fn, delimiter=',', skiprows=index, max_rows=1)
        # [0.37454012 0.95071431 0.73199394]
        if x_data.shape == ():
            x_data = np.array([x_data])
        y_data = np.loadtxt(self.y_fn, delimiter=',', skiprows=index, max_rows=1)
        # 1.5258549401766552
        if y_data.shape == ():
            y_data = np.array([y_data])
        # [1.5258549401766552]
        return x_data, y_data

    def __len__(self):
        """
        데이터셋의 샘플 개수를 반환합니다.
        """
        return self.total_len


class CustomDataloader(Sequence):
    def __init__(self, _dataset, batch_size=1, shuffle=False):
        self.dataset = _dataset
        self.batch_size = batch_size
        self.total_len = math.ceil(len(self.dataset) / self.batch_size)
        self.shuffle = shuffle
        self.indexer = np.arange(len(self.dataset))
        self.on_epoch_end()

    def __getitem__(self, index):
        indexer = self.indexer[index * self.batch_size:(index + 1) * self.batch_size]
        batch_x = [self.dataset[i][0] for i in indexer]
        batch_y = [self.dataset[i][1] for i in indexer]
        return np.array(batch_x), np.array(batch_y)

    def __len__(self):
        return self.total_len

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexer)
            print("shuffle")


temp_dataset = CustomDataset("x_data.csv", "y_data.csv")
train_dataset_cnt = int(len(temp_dataset) * train_data_ratio)
val_dataset_cnt = len(temp_dataset) - train_dataset_cnt
train_dataset = CustomDataset("x_data.csv", "y_data.csv", 0, train_dataset_cnt)
val_dataset = CustomDataset("x_data.csv", "y_data.csv", train_dataset_cnt, val_dataset_cnt)
print(len(temp_dataset), train_dataset_cnt, val_dataset_cnt)
train_loader = CustomDataloader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = CustomDataloader(val_dataset, batch_size=batch_size, shuffle=True)

inputs = keras.Input(shape=(None, 3))
x = layers.Dense(6, activation="linear")(inputs)
x = layers.Dense(1, activation="linear")(x)
outputs = x
model = keras.Model(inputs, outputs)
model.summary()

# https://keras.io/api/optimizers/
# https://keras.io/api/losses/
INPUT_OPTIMIZER = tf.keras.optimizers.SGD(learning_rate=lr)
LOSS = tf.keras.losses.MeanSquaredError(reduction="auto", name="mean_squared_error")
model.compile(optimizer=INPUT_OPTIMIZER, loss=LOSS)
model.fit(train_loader, batch_size=batch_size, epochs=n_epochs, validation_data=val_loader)



# y = 1 + 2*0.1*0.2*0.1
x_prd = np.array([[0.1, 0.2, 0.1]])
print("expect:", 1 + 2 * 0.1 * 0.2 * 0.1)
print(model.predict(x_prd))



실행 결과

5000 4000 1000
shuffle
shuffle
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 input_1 (InputLayer)        [(None, None, 3)]         0         
                                                                 
 dense (Dense)               (None, None, 6)           24        
                                                                 
 dense_1 (Dense)             (None, None, 1)           7         
                                                                 
=================================================================
Total params: 31
Trainable params: 31
Non-trainable params: 0
_________________________________________________________________
Epoch 1/5
WARNING:tensorflow:Model was constructed with shape (None, None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, None, 3), dtype=tf.float32, name='input_1'), name='input_1', description="created by layer 'input_1'"), but it was called on an input with incompatible shape (None, None).
WARNING:tensorflow:Model was constructed with shape (None, None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, None, 3), dtype=tf.float32, name='input_1'), name='input_1', description="created by layer 'input_1'"), but it was called on an input with incompatible shape (None, None).
40/40 [==============================] - ETA: 0s - loss: 0.2153WARNING:tensorflow:Model was constructed with shape (None, None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, None, 3), dtype=tf.float32, name='input_1'), name='input_1', description="created by layer 'input_1'"), but it was called on an input with incompatible shape (None, None).
shuffle
40/40 [==============================] - 11s 269ms/step - loss: 0.2153 - val_loss: 0.0621
shuffle
Epoch 2/5
40/40 [==============================] - ETA: 0s - loss: 0.0514shuffle
40/40 [==============================] - 10s 260ms/step - loss: 0.0514 - val_loss: 0.0423
shuffle
Epoch 3/5
40/40 [==============================] - ETA: 0s - loss: 0.0393shuffle
40/40 [==============================] - 10s 260ms/step - loss: 0.0393 - val_loss: 0.0365
shuffle
Epoch 4/5
40/40 [==============================] - ETA: 0s - loss: 0.0355shuffle
40/40 [==============================] - 10s 264ms/step - loss: 0.0355 - val_loss: 0.0337
shuffle
Epoch 5/5
40/40 [==============================] - ETA: 0s - loss: 0.0339shuffle
40/40 [==============================] - 10s 255ms/step - loss: 0.0339 - val_loss: 0.0324
shuffle
expect: 1.004
WARNING:tensorflow:Model was constructed with shape (None, None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, None, 3), dtype=tf.float32, name='input_1'), name='input_1', description="created by layer 'input_1'"), but it was called on an input with incompatible shape (None, 3).
1/1 [==============================] - 0s 41ms/step
[[0.7554034]]


코드 설명

사용자가 조절해야 하는 parameter 입니다.

batch_size = 100
 한번에 처리하는 batch size 입니다.
 메모리에 한번에 올라가는 양이됩니다. 
 훈련값 갱신도 batch 크기 단위로 이루어 집니다.
lr = 1e-1
 learning rate(훈련 비율) 이 값이 작으면 훈련하는데 시간이 오래걸립니다.
 반대로 크면 훈련이 정상적으로 되지 않고 진동하게 됩니다.
n_epochs = 5
 epoch 라고 하는데 훈련 데이터를 전체적으로 훈련하는것을 1 epoch라고 합니다.
 수치가 높으면 훈련을 여러번하게 되는데 여러번해도 효율이 좋아지지 않는다면 수치를 낮추면 됩니다.
 즉 100으로 하나 5로 하나 차이가 없다면 5로 하면 됩니다.
train_data_ratio = 0.8
 이건 data셋을 훈련용과 test용을 나누는 비율입니다.
 0.8은 data셋 중 80%가 훈련용으로 사용하게 됩니다.

아래 class는 파일로부터 데이터를 읽어오는 코드로 file dataloader아래 링크에서 좀 더 자세한 정보를 확인하시면 됩니다.

https://swlock.blogspot.com/2022/08/tensorflow-keras-sequence-dataloader.html

아래에 있는 다음 코드들도 위 링크 설명 참고하면 됩니다.

temp_dataset = CustomDataset("x_data.csv", "y_data.csv")
train_dataset_cnt = int(len(temp_dataset) * train_data_ratio)
val_dataset_cnt = len(temp_dataset) - train_dataset_cnt
train_dataset = CustomDataset("x_data.csv", "y_data.csv", 0, train_dataset_cnt)
val_dataset = CustomDataset("x_data.csv", "y_data.csv", train_dataset_cnt, val_dataset_cnt)
print(len(temp_dataset), train_dataset_cnt, val_dataset_cnt)
train_loader = CustomDataloader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = CustomDataloader(val_dataset, batch_size=batch_size, shuffle=True)

데이터set을 train과 test 용으로 구분을 하기 위한 코드입니다.


모델

아래와 같은 형태로  model을 만듭니다. https://keras.io/api/layers/ 여기 링크에서 여러가지 layer 정보를 얻을 수 있습니다.

inputs = keras.Input(shape=(None, 3))
x = layers.Dense(6, activation="linear")(inputs)
x = layers.Dense(1, activation="linear")(x)
outputs = x
model = keras.Model(inputs, outputs)
model.summary()

모델을 훈련 시키기

keras에서는 pytorch 보다는 간단합니다. model.fit 만 호출해주면 내부 루프가 알아서 돌기 때문에 호출시 인자만 적절히 넘겨주면 됩니다.

# https://keras.io/api/optimizers/
# https://keras.io/api/losses/
INPUT_OPTIMIZER = tf.keras.optimizers.SGD(learning_rate=lr)
LOSS = tf.keras.losses.MeanSquaredError(reduction="auto", name="mean_squared_error")
model.compile(optimizer=INPUT_OPTIMIZER, loss=LOSS)
model.fit(train_loader, batch_size=batch_size, epochs=n_epochs, validation_data=val_loader)


실제 목표 값?

이제 값을 예측할 때가 되었습니다. 즉 3개의 변수 x1=0.1,x2=0.2,x3=0.1 이 주어 질 때 y는 어떻게 될까요? 당연히 계산 공식은 주어지지 않습니다. 훈련된 모델에 해당 값을 넣어서 y값을 알아 내야 합니다.

# y = 1 + 2*0.1*0.2*0.1
x_prd = np.array([[0.1, 0.2, 0.1]])
print("expect:", 1 + 2 * 0.1 * 0.2 * 0.1)
print(model.predict(x_prd))

model.predict() 함수에 인자를 넣으면 결과가 나옵니다.

expect: 1.004
WARNING:tensorflow:Model was constructed with shape (None, None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, None, 3), dtype=tf.float32, name='input_1'), name='input_1', description="created by layer 'input_1'"), but it was called on an input with incompatible shape (None, 3).
1/1 [==============================] - 0s 41ms/step
[[0.7554034]]

0.75이라... 터무니 없는 값은 아니지만 그렇다고 썩 마음에 드는 수치는 아닙니다.

이렇게 나오는 이유는 입력 데이터에 대한 데이터들이 많지 않다고 볼 수 있습니다. 즉 입력으로 넣은 0.1,0.2,0.1 이라는 데이터가 우리가 훈련은 거의 하지 않았던 영역이라고 볼 수 있는것 입니다.


모델 저장

https://www.tensorflow.org/guide/keras/save_and_serialize?hl=ko

Keras에서는 두가지 방식을 이용 가능 합니다.

  • model.save() 또는 tf.keras.models.save_model()
  • tf.keras.models.load_model()

model.fit 함수 뒤에 save 함수를 호출 하면 됩니다.

model.fit(train_loader, batch_size=batch_size, epochs=n_epochs, validation_data=val_loader)
model.save("model.keras")


load_model() 예제

import numpy as np
import tensorflow as tf

if __name__ == '__main__':
    model = tf.keras.models.load_model("model.keras")
    model.summary()

    # test
    x_prd = np.array([[0.1, 0.2, 0.1]])  # y = 1 + 2*0.1*0.2*0.1
    print("expect:", 1 + 2 * 0.1 * 0.2 * 0.1)
    print(model.predict(x_prd))


전체 소스

sourcecode/keras/01_basic at main · donarts/sourcecode · GitHub