매뉴얼 활용

인공지능 주식투자 프로그램 만들기
작성자 : 김경현 등록일 : 2019-12-13 조회수 : 184
 
 
[주식투자 AI 예측 알고리즘 (Tensorflow)]

 

☞ 코딩자료 : 다운로드

 

■ 종가 예측 프로그램 - 02 해석

 

'''

종가

12일, 에포크 1250, 학습률 0.01

'''

# 과거&현재 일별 주가와 거래량 (time series형태)을 이용하여 내일의 주가를 예측한다

# 실험적 경험 교훈

# 1) LSTM셀 활성화함수는 tanh보다 softsign가 조금 더 안정적이고 좋은 결과를 얻는다

# 2) LSTM은 AdamOptimizer 최적화 함수 추천

# 3) LSTM은 구조상 DropoutWrapper와 궁합이 안맞다

# 3) RNN에서 데이터 특성에 맞는 sequence length가 중요하다! 상황에 따라...

#    아마존 주가의 경우 대체적으로 긴 기간이 더 좋으나 너무 긴 것보다 28거래일이 더 좋게 나왔다

# 4) input_data_column_cnt(입력데이터의 컬럼 개수)가 적은 경우 stacked RNN을 사용하면 빠르게 Over Fitting된다

# 5) rnn_cell_hidden_dim(각 셀의 (hidden)출력 크기)가 많은 경우 너무 많으면 오래걸리고 추가 개선은 거의 없다

 

# TO-DO 리스트

# 조기종료(early stopping) 조건을 넣어 과적합발생 시 중단로직 추천

# 학습률(learning rate)을 상황에 따라 조정할 것

 

import tensorflow as tf  구글이 2011년에 개발을 시작하여 2015년에 오픈 소스로 공개한 기계학습 라이브러리.

import numpy as np  수학 및 과학 연산을 위한 파이썬 패키지로 넘파이라 읽는다.

import pandas as pd  파이썬 언어로 작성된 데이터를 분석 및 조작하기 위한 소프트웨어 라이브러리.

import datetime  # 파이썬 언어로 날짜 및 시간 조작을 위한 클래스 제공.

import matplotlib.pyplot as plt  파이썬에서 그래프 표시하는 라이브러리.

 

# 랜덤에 의해 똑같은 결과를 재현하도록 시드 설정

# 하이퍼파라미터를 튜닝하기 위한 용도(흔들리면 무엇때문에 좋아졌는지 알기 어려움)

tf.set_random_seed(777)  # 랜덤한 값을 다른 컴퓨터에도 동일하게 얻을 수 있게 하며 강의용으로 좋다.

 

# Standardization

def data_standardization(x):  # def 로 data_standardization 이름의 표준화 함수선언. x는 함수인자. 

  # 표준화를 하면 평균 0, 표준편차 1 이라는 데이터 분포로 재 생성된다.

    x_np = np.asarray(x)  # 입력을 배열로 변환한다.

    return (x_np - x_np.mean()) / x_np.std()  # 파이썬에서 mean(), std()로 데이터평균과 표준편차 찾는다.

 

# 너무 작거나 너무 큰 값이 학습을 방해하는 것을 방지하고자 정규화한다

# x가 양수라는 가정하에 최소값과 최대값을 이용하여 0~1사이의 값으로 변환

# Min-Max scaling

def min_max_scaling(x):  def 로 min_max_scaling 이름의 정규화 함수선언. x는 함수인자. 

  # MinMax Scaling 은 최대값 1, 최소값 0 으로 그 사이에 값들이 분포한다.

    x_np = np.asarray(x)  # 입력을 배열로 변환한다.

    return (x_np - x_np.min()) / (x_np.max() - x_np.min() + 1e-7) # 1e-7은 0 으로 나누는 오류 예방차원

 

# 정규화된 값을 원래의 값으로 되돌린다

# 정규화하기 이전의 org_x값과 되돌리고 싶은 x를 입력하면 역정규화된 값을 리턴한다

def reverse_min_max_scaling(org_x, x):   def 로 reverse_min_max_scaling 이름의 역정규화 함수선언. org_x, x는 함수인자.

  MinMax Scaling 변환 값들 이전의 원래 단위의 값으로 역정규화 한다.

    org_x_np = np.asarray(org_x)  # 입력을 배열로 변환한다.

    x_np = np.asarray(x)  # 입력을 배열로 변환한다.

    return (x_np * (org_x_np.max() - org_x_np.min() + 1e-7)) + org_x_np.min()  # MinMax Scaling 을 역산해 원래 단위로 환원.

 

 

 

 

# 하이퍼파라미터

input_data_column_cnt = 6  # 입력데이터의 컬럼 개수 (Variable 개수)

output_data_column_cnt = 1 # 결과데이터의 컬럼 개수

 

seq_length = 12            # 1개 시퀀스의 길이 (시계열데이터 입력 개수)

rnn_cell_hidden_dim = 20  # 각 셀의 숨겨진 (hidden) 출력 크기

forget_bias = 1.0           # 망각편향 (기본값 1.0)

num_stacked_layers = 1   # Stacked LSTM Layers 개수

keep_prob = 1.0            # Dropout 할 때 Keep 할 비율

 

epoch_num = 1250        # 에폭횟수 (학습용 전체 데이터를 몇 회 반복 (Iteration) 해서 학할 것인가 입력)

learning_rate = 0.01       # 학습률

 

# 데이터 전처리

print("---------------------------------------------------------------")

print("DIY 연구소 인공지능 주가 예측 프로그램 (http://cafe.daum.net/diylab)")

print("현대모비스 주식 2019.11.6. 내일 종가")

print("---------------------------------------------------------------n")

 

# 출력

 

 

 

# 데이터를 로딩한다.

stock_file_name = 'Hyundai Mobis-data-test.csv' # 현대모비스 2000년부터 현재까지 주가데이터 파일

encoding = 'euc-kr' # 문자 인코딩

names = ['Date','Open','High','Low','Close','Adj Close','Volume']

raw_dataframe = pd.read_csv(stock_file_name, names=names, encoding=encoding) #판다스이용 csv파일 로딩

raw_dataframe.info() # 데이터 정보 출력

 

 

 

# 출력

 

 

 

# raw_dataframe.drop('Date', axis=1, inplace=True) # 간열을 제거하고 dataframe 재생성하지 않기

del raw_dataframe['Date'] # 위 줄과 같은 효과

stock_info = raw_dataframe.values[1:].astype(np.float) # 금액&거래량 문자열 (2번째~끝)부동소수점형으로 변환한다

print("stock_info.shape: ", stock_info.shape)  금액&거래량 문자열 (2번째~끝)의 형태출력

print("stock_info[0]: ", stock_info[0])  금액&거래량 문자열 (2번째~끝)첫번째 출력

 

# 출력

 

 

# 데이터 전처리

# 가격과 거래량 수치의 차이가 많아나서 각각 별도로 정규화한다

# 가격형태 데이터들을 정규화한다

# ['Open','High','Low','Close','Adj Close','Volume']에서 'Adj Close'까지 취함

# 곧, 마지막 열 Volume를 제외한 모든 열

price = stock_info[:,:-1]  # 마지막 열 Volume 만을 제외한 모든 열을 price 변수로 생성 

norm_price = min_max_scaling(price)  가격데이터 price 변수를 min_max_scaling 함수로 정규화 norm_price 변수생성

print("price.shape: ", price.shape)  # 정규화 price 변수형태 출력

print("price[0]: ", price[0]# 정규화 price 변수 첫째행 출력

print("norm_price[0]: ", norm_price[0])  # 정규화후 norm_price 변수 첫째행 출력

print("="*100)  # 화면상 구분용 출력

 

# 출력

 

 

# 거래량형태 데이터를 정규화한다

# ['Open','High','Low','Close','Adj Close','Volume']에서 마지막 'Volume'만 취함

# [:,-1]이 아닌 [:,-1:]이므로 주의하자! 스칼라가아닌 벡터값 산출해야만 쉽게 병합 가능

volume = stock_info[:,-1:]  마지막 열 Volume 만을 volume 변수로 생성 

norm_volume = min_max_scaling(volume# 거래량데이터 volume 변수를 min_max_scaling 정규화 norm_volume 변수생성

print("volume.shape: ", volume.shape)  # 정규화 volume 변수형태 출력

print("volume[0]: ", volume[0])  # 정규화 volume 변수 첫째행 출력

print("norm_volume[0]: ", norm_volume[0])  # 정규화후 norm_volume 변수 첫째행 출력

print("="*100)  # 화면상 구분용 출력

 

# 출력

 

 

# 행은 그대로 두고 열을 우측에 붙여 합친다

x = np.concatenate((norm_price, norm_volume), axis=1)  # 정규화후 norm_price, norm_volume axis=1 세로합친 x 변수 생성

print("x.shape: ", x.shape)  # 정규화후 x 변수형태 출력

print("x[0]: ", x[0])    # 정규화후 x 변수의 첫째행 출력

print("x[-1]: ", x[-1]# 정규화후 x 변수의 마지막행 출력

print("="*100)  # 화면상 구분용

 

# 출력

 

 

y = x[:, [4]# 정규화후 데이터 x 변수의 전체행, 5 열은 주식 종가로 y 변수 생성

print("y[0]: ",y[0])  # 정규화후 y 변수의 첫째행 출력

print("y[-1]: ",y[-1])  # 정규화후 y 변수의 첫째행 출력

 

# 출력

 

 

dataX = []  # dataX입력으로 사용될 Sequence Data 리스트 객체다.

dataY = []  # dataY출력(타겟)으로 사용될 리스트 객체다.

 

for i in range(0, len(y) - seq_length): # for in range 는 필요한 만큼 숫자를 만드는 반복 함수다. 입력으로 주어지는 인수가 2개시

# 시작과 끝 숫자를 의미한다. 단, 끝 숫자는 해당범위에 포함되지 않는다. 1개 시퀀스의 길이 (시계열데이터 입력 개수), seq_length = 12

    _x = x[i : i+seq_length# 시작 i 가 0 이면 _x = x[0:12]로 0 인 첫번째 행부터 11인 열두번째 행의 x 데이터들이 입력된다.

    _y = y[i + seq_length]  # 다음 나타날 주가(정답)로 시작 i 가 0 이면 _y = y[12]로 12인 열세번째 행의 y 데이터가 출력된다.

    if i is 0:  # 시작 i 가 0 이면

        print(_x, "->", _y)  # 첫번째 행만 출력해 봄

 

# 출력

 

 

    dataX.append(_x) # dataX 리스트에 추가

    dataY.append(_y) # dataY 리스트에 추가

        

# 학습용/테스트용 데이터 생성

# 전체 70%를 학습용 데이터로 사용

train_size = int(len(dataY) * 0.7)

# 나머지(30%)를 테스트용 데이터로 사용

test_size = len(dataY) - train_size

 

# 데이터를 잘라 학습용 데이터 생성

trainX = np.array(dataX[0:train_size])

trainY = np.array(dataY[0:train_size])

 

# 데이터를 잘라 테스트용 데이터 생성

testX = np.array(dataX[train_size:len(dataX)])

testY = np.array(dataY[train_size:len(dataY)])

 

# 텐서플로우 플레이스홀더 생성

# 입력 X, 출력 Y를 생성한다

X = tf.placeholder(tf.float32, [None, seq_length, input_data_column_cnt])

print("X: ", X)

Y = tf.placeholder(tf.float32, [None, 1])

print("Y: ", Y)

 

# 출력

 

 

# 검증용 측정지표를 산출하기 위한 targets, predictions를 생성한다

targets = tf.placeholder(tf.float32, [None, 1])

print("targets: ", targets)

 

predictions = tf.placeholder(tf.float32, [None, 1])

print("predictions: ", predictions)

 

# 출력

 

 

# 모델(LSTM 네트워크) 생성

def lstm_cell():

    # LSTM셀을 생성

    # num_units: 각 Cell 출력 크기

    # forget_bias:  to the biases of the forget gate 

    #              (default: 1)  in order to reduce the scale of forgetting in the beginning of the training.

    # state_is_tuple: True ==> accepted and returned states are 2-tuples of the c_state and m_state.

    # state_is_tuple: False ==> they are concatenated along the column axis.

    cell = tf.contrib.rnn.BasicLSTMCell(num_units=rnn_cell_hidden_dim, 

                                        forget_bias=forget_bias, state_is_tuple=True, activation=tf.nn.softsign)

    if keep_prob < 1.0:

        cell = tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob=keep_prob)

    return cell

 

# num_stacked_layers개의 층으로 쌓인 Stacked RNNs 생성

stackedRNNs = [lstm_cell() for _ in range(num_stacked_layers)]

multi_cells = tf.contrib.rnn.MultiRNNCell(stackedRNNs, state_is_tuple=True) if num_stacked_layers > 1 else lstm_cell()

 

# RNN Cell(여기서는 LSTM셀임)들을 연결

hypothesis, _states = tf.nn.dynamic_rnn(multi_cells, X, dtype=tf.float32)

print("hypothesis: ", hypothesis)

 

# [:, -1]를 잘 살펴보자. LSTM RNN의 마지막 (hidden)출력만을 사용했다.

# 과거 여러 거래일의 주가를 이용해서 다음날의 주가 1개를 예측하기때문에 MANY-TO-ONE형태이다

hypothesis = tf.contrib.layers.fully_connected(hypothesis[:, -1], output_data_column_cnt, activation_fn=tf.identity)

 

 

# 손실함수로 평균제곱오차를 사용한다

loss = tf.reduce_sum(tf.square(hypothesis - Y))

# 최적화함수로 AdamOptimizer를 사용한다

optimizer = tf.train.AdamOptimizer(learning_rate)

# optimizer = tf.train.RMSPropOptimizer(learning_rate) # LSTM과 궁합 별로임

 

train = optimizer.minimize(loss)

 

# RMSE(Root Mean Square Error)

# 제곱오차의 평균을 구하고 다시 제곱근을 구하면 평균 오차가 나온다

# rmse = tf.sqrt(tf.reduce_mean(tf.square(targets-predictions))) # 아래 코드와 같다

rmse = tf.sqrt(tf.reduce_mean(tf.squared_difference(targets, predictions)))

 

 

train_error_summary = [] # 학습용 데이터의 오류를 중간 중간 기록한다

test_error_summary = [] # 테스트용 데이터의 오류를 중간 중간 기록한다

test_predict = ''           # 테스트용데이터로 예측한 결과

 

sess = tf.Session()

sess.run(tf.global_variables_initializer())

 

# 학습한다

start_time = datetime.datetime.now() # 시작시간을 기록한다

print('n학습을 시작합니다...')

 

# 출력

 

 

for epoch in range(epoch_num):

    _, _loss = sess.run([train, loss], feed_dict={X: trainX, Y: trainY})

    if ((epoch+1) % 100 == 0) or (epoch == epoch_num-1): # 100번째마다 또는 마지막 epoch인 경우

        # 학습용데이터로 rmse오차를 구한다

        train_predict = sess.run(hypothesis, feed_dict={X: trainX})

        train_error = sess.run(rmse, feed_dict={targets: trainY, predictions: train_predict})

        train_error_summary.append(train_error)

 

        # 테스트용데이터로 rmse오차를 구한다

        test_predict = sess.run(hypothesis, feed_dict={X: testX})

        test_error = sess.run(rmse, feed_dict={targets: testY, predictions: test_predict})

        test_error_summary.append(test_error)

        

        # 현재 오류를 출력한다

        print("epoch: {}, train_error(A): {}, test_error(B): {}, B-A: {}".format(epoch+1, train_error, test_error, test_error-train_error))

        

# 출력

 

end_time = datetime.datetime.now() # 종료시간을 기록한다

elapsed_time = end_time - start_time # 경과시간을 구한다

print('elapsed_time:',elapsed_time)

print('elapsed_time per epoch:',elapsed_time/epoch_num)

 

# 출력

 

 

# 하이퍼파라미터 출력

print('input_data_column_cnt:', input_data_column_cnt, end='')

print(',output_data_column_cnt:', output_data_column_cnt, end='')

 

print(',seq_length:', seq_length, end='')

print(',rnn_cell_hidden_dim:', rnn_cell_hidden_dim, end='')

print(',forget_bias:', forget_bias, end='')

print(',num_stacked_layers:', num_stacked_layers, end='')

print(',keep_prob:', keep_prob, end='')

 

print(',epoch_num:', epoch_num, end='')

print(',learning_rate:', learning_rate, end='')

 

print(',train_error:', train_error_summary[-1], end='')

print(',test_error:', test_error_summary[-1], end='')

print(',min_test_error:', np.min(test_error_summary))

 

# 출력

 

 

# 결과 그래프 출력

 

%matplotlib inline

plt.figure(1)

plt.plot(train_error_summary, 'gold')

plt.plot(test_error_summary, 'b')

plt.xlabel('Epoch(x100)')

plt.ylabel('Root Mean Square Error')

 

# 출력

 

 

plt.figure(2)

plt.plot(testY, 'r')

plt.plot(test_predict, 'b')

plt.xlabel('Time Period')

plt.ylabel('Stock Price(Hyundai Mobis Close)')

plt.show()

 

# 출력

 

 

# sequence length만큼의 가장 최근 데이터를 슬라이싱한다

recent_data = np.array([x[len(x)-seq_length : ]])

print("recent_data.shape:", recent_data.shape)

print("recent_data:", recent_data)

 

# 출력

 

 

# 내일 종가를 예측해본다

test_predict = sess.run(hypothesis, feed_dict={X: recent_data})

 

print("n현대모비스 주식 2019.11.6. 내일 종가(정규화) test_predict", test_predict[0])

test_predict = reverse_min_max_scaling(price,test_predict) # 금액데이터 역정규화한다

 

print("현대모비스 주식 2019.11.6. 내일 종가(KRW) Tomorrow's stock price", test_predict[0]) # 예측한 주가를 출력한다. 

 
# 출력