머신러닝

이야기 챗봇 구현-1

content0474 2024. 11. 25. 13:30

구현내용

4️⃣ 필수 기능 가이드

  • 챗봇 대화 기능
    • [ ] 사용자 입력에 따라 자연어 응답을 생성하고 출력합니다.
    • [ ] 사용자와의 대화 내역을 저장하고 관리합니다.
  • API 기능
    • [ ] 사용자가 입력한 데이터를 API를 통해 전송하고, 응답을 받아옵니다.
  • 대화 세션 관리
    • [ ] 각 대화 세션을 구분하여 저장하고, 불러올 수 있습니다.
  • RAG 관련 내용
    • [ ] FAISS를 Retriever로 변환할 수 있습니다.
    • [ ] 모델과 프롬프트를 연결하는 RAG 체인을 구성할 수 있습니다.
  • 문서 작성
    • [ ] SA(Software Architecture) 문서를 작성했습니다.
      • [ ] 프로젝트 계획
      • [ ] 작업 분배 방식
      • [ ] 와이어프레임 문서를 작성했습니다.
    • [ ] README.md를 작성했습니다.

5️⃣ 도전 기능 가이드

  • 실시간 데이터 업데이트: 저장된 사용자와의 대화 내역을 통해, VectorDB의 merge를 활용하여 실시간으로 모델 성능을 개선해보세요. (⭐⭐⭐)
  • 다국어 지원 기능 업데이트: 한글, 영어, 일본어 등 다양한 언어를 지원하는 모델을 개발해보세요. (⭐⭐⭐⭐)
  • 음성 입력 및 출력 기능: 이전에 AI 활용에서 배우셨던 pydub 라이브러리 elevenlabs를 이용해서 입력을 음성으로 받고, 음성으로 출력하는 기능을 개발해보세요 (⭐⭐⭐⭐⭐)

현재까지의 코드

더보기

import json
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import ChatPromptTemplate
from langchain.chains import LLMChain
from langdetect import detect
import random
from datetime import datetime
from sentence_transformers import SentenceTransformer
import faiss
import os
from langchain.embeddings.base import Embeddings
from langchain.chains import RetrievalQA

 

model = SentenceTransformer('all-MiniLM-L6-v2', device="cpu")

def create_embeddings(texts):
    return model.encode(texts, convert_to_tensor=False)

def create_index(texts, index_file="vector_index.faiss"):
    print("Embedding을 생성 중입니다...")
    embeddings = create_embeddings(texts)
    print(f"Embedding 생성 완료. 벡터 크기: {len(embeddings)}")

    print("FAISS 인덱스 생성 중...")
    d = embeddings.shape[1] 
    index = faiss.IndexFlatL2(d) 
    index.add(embeddings)  
    faiss.write_index(index, index_file) 
    print("FAISS 인덱스 저장 완료.")

# FAISS 인덱스+디버깅프린트까지
def load_index(index_file="vector_index.faiss"):
    try:
        index = faiss.read_index(index_file)
        print("FAISS 인덱스 로드 완료.")
        return index
    except Exception as e:
        print(f"FAISS 인덱스를 로드할 수 없습니다: {e}")
        return None

def query_index(query, index, texts, top_k=3):
    print("쿼리를 Embedding으로 변환 중입니다...")
    query_embedding = create_embeddings([query])
    print("쿼리 검색 중...")
    distances, indices = index.search(query_embedding, top_k) 
    results = [texts[i] for i in indices[0]]
    return results

def retrieve_with_rag(user_query, texts, index=None, top_k=3):
    if index is None:
        print("FAISS 인덱스를 로드 중입니다...")
        index = load_index()

    if index is None:
        print("FAISS 인덱스가 없습니다. 새로 생성합니다...")
        create_index(texts)
        index = load_index()

    print(f"'{user_query}'에 대한 검색을 시작합니다...")
    results = query_index(user_query, index, texts, top_k=top_k)
    print(f"검색된 결과: {results}")
    return results

# JSON 파일 처리부분
def process_json_data(json_files):
    all_json_data = []
    for file_path in json_files:
        with open(file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)
            all_json_data.extend(data)
    return [f"Title: {item['title']}\nContent: {item['content']}" for item in all_json_data]


def split_texts(texts, chunk_size=1000, chunk_overlap=200):
    recursive_text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        is_separator_regex=False,
    )
    documents = [Document(page_content=text) for text in texts]
    return recursive_text_splitter.split_documents(documents)

 

def detect_language(text):
    try:
        language = detect(text)
        return language 
    except:
        return 'unknown'

 

def dividing_story(text):
    dividing = ChatOpenAI(model="gpt-4o-mini")
    contextual_prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a story analyzer. Divide the given text into four parts: introduction, development, turn, and conclusion. Return the result as a JSON object with keys: Introduction, Development, Turn, and Conclusion."),
        ("user", "{text}")
    ])
    chain = LLMChain(llm=dividing, prompt=contextual_prompt)
    
    response = chain.run({"text": text})
    # print("GPT Response:", response)  # 디버깅

    # ```json``` 없애기
    if response.startswith("```json"):
        response = response.strip("```json").strip("```")
        # print("Processed Response:", response)  # 디버깅

    try:
        story_parts = json.loads(response)
    except json.JSONDecodeError as e:
        raise ValueError(f"GPT response is not valid JSON: {e}")
    
    return story_parts

 

def storytelling(part_name, segment, user_message=None):
    host = ChatOpenAI(model="gpt-4")
    
    detected_lang = detect_language(user_message)
    if detected_lang == 'unknown':
        detected_lang = 'ko'  

    # 프롬프트 템플릿
    with open("ggoggoprompt.txt", "r", encoding="utf-8") as file:
        custom_prompt = file.read()
    
    contextual_prompt = ChatPromptTemplate.from_messages([
        ("system", custom_prompt),
        ("assistant", "We are currently discussing the '{part_name}' section of the story. Here is the content:\n\n{segment}"),
        ("user", "{user_input}")
    ])
    
    chain = LLMChain(llm=host, prompt=contextual_prompt)
    user_input = user_message
    
    response = chain.run({"part_name": part_name, "segment": segment, "user_input": user_input, "detected_lang": detected_lang})
    return response

 

def initialize_rag_chain(model, splits):
    try:
        vectorstore = FAISS.load_local(
            "vector_index",
            model,
            allow_dangerous_deserialization=True
        )
    except RuntimeError:
        print("FAISS 인덱스를 찾을 수 없습니다. 새로 생성합니다...")
        create_index(splits)  
        vectorstore = FAISS.load_local(
            "vector_index",
            model,
            allow_dangerous_deserialization=True
        )

    retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
    llm = ChatOpenAI(model="gpt-4")
    return RetrievalQA.from_chain_type(llm=llm, retriever=retriever, return_source_documents=False)


def get_story_parts(user_query=None, json_texts=None, rag_chain=None):
    if json_texts is None:
        raise ValueError("json_texts가 제공되지 않았습니다. JSON 데이터를 먼저 로드하세요.")

    if user_query is None or user_query.strip().lower() in ["재미", "아무거나", "재밌", "암꺼나"]:
        print("재미난 이야기를 가져오는 중...")
        try:
            random_story = random.choice(json_texts)  
            print("랜덤 이야기 선택 완료!")
            return dividing_story(random_story)
        except Exception as e:
            print(f"랜덤 이야기를 선택하는 중 오류 발생: {e}")
            return None
    else:
        print("요청한 이야기를 가져오는 중...")
        if rag_chain is None:
            raise ValueError("RAG 체인이 초기화되지 않았습니다.")

        try:
            # RAG 체인 실행 및 반환값 확인
            result = rag_chain.invoke({"query": user_query})
            print(f"RAG 체인 반환값: {result}")

            # 결과가 문자열인지 확인
            if isinstance(result, dict) and "result" in result:
                story_text = result["result"]
            elif isinstance(result, str):
                story_text = result  # 반환값이 문자열인 경우
            else:
                print("RAG 체인에서 반환된 데이터 구조를 처리할 수 없습니다.")
                return None

            print("검색된 이야기:\n", story_text)
            return dividing_story(story_text)
        except Exception as e:
            print(f"이야기를 검색하는 중 오류 발생: {e}")
            return None

 

conversation_logs = {}

# 대화 저장
def save_conversation(session_id, user_message, assistant_message):
    if session_id not in conversation_logs:
        conversation_logs[session_id] = []
    conversation_logs[session_id].append({
        "timestamp": datetime.now().isoformat(),
        "user": user_message,
        "assistant": assistant_message
    })
    with open("conversation_logs.json", "w", encoding="utf-8") as file:
        json.dump(conversation_logs, file, indent=4, ensure_ascii=False)

# 대화 로드
def load_conversation(session_id):
    try:
        with open("conversation_logs.json", "r", encoding="utf-8") as file:
            data = json.load(file)
        return data.get(session_id, [])
    except FileNotFoundError:
        return []

 

def chatbot():
    session_id = str(datetime.now().timestamp())
    print(f"세션 ID가 생성되었습니다: {session_id}")
    
    json_files = ["filtered_unsolved_cases.json", "korea_crime.json"]
    texts = process_json_data(json_files)
    splits = [split.page_content for split in split_texts(texts)]

    index = load_index()
    if index is None:
        create_index(splits)
        index = load_index()

    json_texts = texts

    rag_chain = initialize_rag_chain(model, splits)

    while True:
        print("\n어떤 이야기를 들려드릴까요?")
        user_query = input("듣고 싶은 이야기를 입력 (종료하려면 'quit'): ").strip()
        if user_query.lower() in ["quit", "종료"]:
            print("대화를 종료합니다.")
            break

        try:
            story_parts = get_story_parts(user_query=user_query, json_texts=json_texts, rag_chain=rag_chain)
            if story_parts is None:
                print("이야기를 가져오는 데 실패했습니다. 다시 시도해주세요.")
                continue
        except Exception as e:
            print(f"이야기를 가져오는 중 오류 발생: {e}")
            continue

        for part_name, segment in story_parts.items():
            print(f"\n=== {part_name} ===\n")
            print(f"이야기꾼: {segment}")

            turns = 0
            while turns < 2:
                user_message = input("사용자입력: ")
                if user_message.lower() in ["잘들었어", "그만", "quit", "exit"]:
                    print("대화를 종료합니다.")
                    return
                if user_message.lower() in ["그래서", "next"]:
                    break
                response = storytelling(part_name=part_name, segment=segment, user_message=user_message)
                print(f"이야기꾼: {response}")
                save_conversation(session_id, user_message, response)
                turns += 1

        while True:
            next_action = input("\n새로운 이야기를 듣고 싶으신가요? (yes: 새 이야기 / no: 종료): ").lower()
            if next_action in ["no", "그만", "종료"]:
                print("대화를 종료합니다.")
                return
            elif next_action in ["yes", "새 이야기", "다음 이야기"]:
                break
            else:
                print("잘못된 입력입니다. 'yes' 또는 'no'로 응답해주세요.")

 

#챗봇구동

chatbot()

특징

모듈화를 하기 위해 노력함 ->이후 부가기능을 넣거나 데이터 형태에 따라 수정하기 용이함

진행상황을 알기 위해 print() 구문을 많이 넣음

예외처리 강화

 

개선사항

프롬프트 엔지니어링

속도개선

실시간 데이터 업데이트 및 음성출력기능 추가 여부 결정

 

코드흐름

더보기

chatbot

├── process_json_data (JSON 파일 → 텍스트 리스트)

├── split_texts (텍스트 리스트 → 적합한 크기로 분할)

├── load_index (기존 인덱스 로드 시도)

│               └── create_index (인덱스 없을 시 새로 생성)

├── initialize_rag_chain (RAG 체인 초기화)

└── get_story_parts (사용자 쿼리 처리)

            ├── retrieve_with_rag (FAISS 기반 검색 수행)

            │           ├── load_index (인덱스 로드)

            │           └── query_index (FAISS 검색)

            └── dividing_story (검색된 텍스트를 이야기 구조로 분할)

 

model = SentenceTransformer('all-MiniLM-L6-v2', device="cpu")

더보기

hugging face의 sentence trasformer 모델을 사용하는 코드

문장을 벡터로 변환한다.

 

device='cpu'를 명시하지 않으면 AttributeError: module 'torch.backends' has no attribute 'mps' 

찾아보니 windows에서 지원하지 않는 기능이라서 발생하는 오류라고 한다. 

내 컴퓨터는 gpu가 없으니 cpu를 사용하도록 했다.

참고로, 'all-MiniLM-L6-v2' 가벼운 모델이라서 cpu에서도 작동을 잘 한다. 하지만 이 모델은 주로 영어를 임베딩하는데 특화되어 있어서, 한국어 전용 버전을 사용하면 더 좋을거같긴 하다.

만약 여러 언어로 된 문서를 받으려면 'multi-qa-mpnet-base-dot-v1' 를 사용할 수 있다. (cpu에서 구동 느림)

 

def create_embeddings(texts):
    return model.encode(texts, convert_to_tensor=False)

더보기

모델을 사용해서 텍스트를 숫자로 바꾸는 함수

convert_to_tensor=False: 넘파이 배열로 반환함

convert_to_tensor=True: 파이토치 텐서로 반환함

gpu를 사용하거나 반든 데이터를 파이토치의 다른 모델로 전달해야하면 True로 설정하는게 낫지만, 지금은 넘파이 배열로 충분할 것 같다.


def create_index(texts, index_file="vector_index.faiss"):

    embeddings = create_embeddings(texts)

더보기

text로 faiss index를 생성하고 파일로 저장하는 함수

(원본 코드를 보면 오류가 많이 나서 중간중간 확인하느라 print() 문이 많이 들어가있다. 나중에 지워도 상관없다.)

인자로 text를 받고, 파일을 저장한 파일명은 기본값을 "vector_index.faiss" 로 주고있다.

이후 create_embeddings 함수를 호출해서 임베딩을 생성하고 embeddings에 저장

    d = embeddings.shape[1] 

더보기

임베딩은 (문장개수, 임베딩차원)으로 표현되므로 shape[1]은 임베딩 차원을 의미

d=임베딩 벡터의 차원

faiss는 차원정보를 알아야 데이터를 저장하고 검색할 수 있다.

    index = faiss.IndexFlatL2(d) 

더보기

IndexFlatL2는 faiss에서 제공하는 기본적인 벡터 인덱스로, 모든 벡터를 평면배열로 저장해서 유클리드 거리 기반 유사도를 계산함

모든 벡터를 비교하므로 작은 데이터셋에 적합

    index.add(embeddings)  

    faiss.write_index(index, index_file) 

더보기

생성된 embeddings를 index에 추가하고

index를 index_file("vector_index.faiss" )로 저장

 

trouble shooting

더보기

원래는 sentence transformer를 쓰지 않고 embeddings = OpenAIEmbeddings(model="text-embedding-ada-002"

를 이용해 임제딩을 했다. 하지만 데이터 총 토큰수가 너무 많아 오류가 낫다.

 

RateLimitError: Error code: 429 - {'error': {'message': 'Request too large for text-embedding-ada-002 in organization org-NVmA3arQT2vcxWGFQs0g0KSW on tokens per min (TPM): Limit 1000000, Requested 1817821. The input or output tokens must be reduced in order to run successfully. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}}

 

그래서 다른 모델을 사용했다.

hugging face에서 제공하는 sentence transformer는 무료로 사용가능하고, 로컬에서도 실행가능하다. 대신 데이터가 커지면 속도가 느려진다. 

 

다른 대안

cohere API: openai와 유사하지만 약간 품질이 낮을 수 있고 초과사용시 요금 발생

Azure ai: openai와 비슷한 품질이나 유료이고 설정이 복잡해짐

Google universal sentence encoder: 무료에 뛰어난 성능, tensorflow에서 사용 가능

 

 

def load_index(index_file="vector_index.faiss"):
    try:
        index = faiss.read_index(index_file)
        print("FAISS 인덱스 로드 완료.")
        return index
    except Exception as e:
        print(f"FAISS 인덱스를 로드할 수 없습니다: {e}")
        return None

더보기

저장한 index_file을 로드하는 코드

원래는 read_index만 있는 간단한 코드였는데, 에러가 어디서 나는지 모르겠어서 try-except와 print()를 추가했다.

 

자주 났던 에러

RuntimeError: Error in __cdecl faiss::FileIOReader::FileIOReader(const char *) at D:\a\faiss-wheels\faiss-wheels\faiss\faiss\impl\io.cpp:68: Error: 'f' failed: could not open vector_index\index.faiss for reading: No such file or directory -> index.faiss 파일을 찾지 못 함

 

 

def query_index(query, index, texts, top_k=3):

더보기

사용자가 입력하면(query) faiss에서 검색해서 비슷한 것을 반환하는 함수

인자로 query(사용자입력), 미리 생성한 faiss index, 원본텍스트, 반환할 유사텍스트 개수(여기서는 3개)

    query_embedding = create_embeddings([query])

더보기

create_embeddings 함수로 query(사용자가 입력한 텍스트)를 숫자로 바꾼다.

이 때 [query]로 리스트 형태로 전달해서 query_embeddings는 (1,출력차원) 형태로 반환된다.

왜냐면 뒤에서 쓸 index.search는 (검색하려는 쿼리, 벡터의 차원) 형태로 데이터를 기대하기 때문이다.

이 형태를 맞춰주지 않으면 shape를 맞추라는 assertionerror가 난다.

    distances, indices = index.search(query_embedding, top_k) 

더보기

index.search()로 쿼리와 가장 유사한 k개의 벡터를 검색해서

그 벡터들의 유클리드거리와 인덱스를 반환 -> ( distances, indices )

이 때 indices는 2차원 배열로 반환된다. 

    results = [texts[i] for i in indices[0]]
    return results

더보기

예를 들어

texts=["I love you", "I love her", "I will leave you", "I hate you" ]

이고 각각 인덱스0 부터 4까지가 부여되었다고 하면,

query="Do u love me?" 일 때 유사한 인덱스로 반환된 indices=[[0,3,2]] 

indices[0]=[0,3,2]

texts[i] for i in indices[0] = texts[0], texts[3], texts[2]

results=[] 이므로 이 texts들을 리스트로 반환

results= ["I love you", "I hate you", "I will leave you", ]

 

 

def retrieve_with_rag(user_query, texts, index=None, top_k=3):

더보기

사용자가 입력한 질문으로 faiss인덱스를 검색한 후 결과를 반환

    if index is None:
        index = load_index()

더보기

인덱스가 없으면 저장된 인덱스를 우선 로드함

    if index is None:
        create_index(texts)
        index = load_index()

더보기

로드에 실패해서 여전히 인덱스가 없으면

제공받은 text로 새로운 인덱스를 만들어 저장하고 다시 인덱스를 로드

    results = query_index(user_query, index, texts, top_k=top_k)
    print(f"검색된 결과: {results}")
    return results

더보기

query_index() 함수를 호출하고 결과를 print

 

 

query_index() 함수만 있어도 될거같은데 굳이 retrieve_with_rag 를 추가한 이유?

더보기

retrieve with rag는 인덱스를 직접 로드하거나 생성할 수 있어서 인덱스가 준비되지 않아도 동작한다. 

즉 검색과정 전체에 관여해서 인덱스를 불러오고 query index함수까지 호출해주는 역할

 

 


def process_json_data(json_files):
    all_json_data = []
    for file_path in json_files:
        with open(file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)
            all_json_data.extend(data)
    return [f"Title: {item['title']}\nContent: {item['content']}" for item in all_json_data]


def split_texts(texts, chunk_size=1000, chunk_overlap=200):
    recursive_text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        is_separator_regex=False,
    )
    documents = [Document(page_content=text) for text in texts]
    return recursive_text_splitter.split_documents(documents)

 

여기는 이전에 크롤링으로 얻은 json파일을 처리하고 recursive_text_splitter를 이용해서 자르는부분을 모듈화한 것이다. 파일 로드와 텍스트 처리는 이전 게시물에서 했으므로 설명 생략

중요한건 모듈화를 해둬서 만약 파일이 json이 아닌 다른것으로 바뀌면 json data처리함수만 손보고 split_texts 함수는 그대로 사용할 수 있다는 점이다.

 

2편에서 계속

'머신러닝' 카테고리의 다른 글

이야기 챗봇 구현-3  (2) 2024.11.27
이야기 챗봇 구현-2  (1) 2024.11.26
LLM과 RAG를 활용한 챗봇 구현-3  (3) 2024.11.20
LLM과 RAG를 활용한 챗봇 구현-2  (0) 2024.11.19
LLM과 RAG를 활용한 챗봇 구현  (1) 2024.11.18