구현내용
4️⃣ 필수 기능 가이드
- 챗봇 대화 기능
- [ ] 사용자 입력에 따라 자연어 응답을 생성하고 출력합니다.
- [ ] 사용자와의 대화 내역을 저장하고 관리합니다.
- API 기능
- [ ] 사용자가 입력한 데이터를 API를 통해 전송하고, 응답을 받아옵니다.
- 대화 세션 관리
- [ ] 각 대화 세션을 구분하여 저장하고, 불러올 수 있습니다.
- RAG 관련 내용
- [ ] FAISS를 Retriever로 변환할 수 있습니다.
- [ ] 모델과 프롬프트를 연결하는 RAG 체인을 구성할 수 있습니다.
- 문서 작성
- [ ] SA(Software Architecture) 문서를 작성했습니다.
- [ ] 프로젝트 계획
- [ ] 작업 분배 방식
- [ ] 와이어프레임 문서를 작성했습니다.
- [ ] README.md를 작성했습니다.
- [ ] SA(Software Architecture) 문서를 작성했습니다.
5️⃣ 도전 기능 가이드
- 실시간 데이터 업데이트: 저장된 사용자와의 대화 내역을 통해, VectorDB의 merge를 활용하여 실시간으로 모델 성능을 개선해보세요. (⭐⭐⭐)
- 다국어 지원 기능 업데이트: 한글, 영어, 일본어 등 다양한 언어를 지원하는 모델을 개발해보세요. (⭐⭐⭐⭐)
- 음성 입력 및 출력 기능: 이전에 AI 활용에서 배우셨던 pydub 라이브러리 elevenlabs를 이용해서 입력을 음성으로 받고, 음성으로 출력하는 기능을 개발해보세요 (⭐⭐⭐⭐⭐)
현재까지의 코드
import json
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import ChatPromptTemplate
from langchain.chains import LLMChain
from langdetect import detect
import random
from datetime import datetime
from sentence_transformers import SentenceTransformer
import faiss
import os
from langchain.embeddings.base import Embeddings
from langchain.chains import RetrievalQA
model = SentenceTransformer('all-MiniLM-L6-v2', device="cpu")
def create_embeddings(texts):
return model.encode(texts, convert_to_tensor=False)
def create_index(texts, index_file="vector_index.faiss"):
print("Embedding을 생성 중입니다...")
embeddings = create_embeddings(texts)
print(f"Embedding 생성 완료. 벡터 크기: {len(embeddings)}")
print("FAISS 인덱스 생성 중...")
d = embeddings.shape[1]
index = faiss.IndexFlatL2(d)
index.add(embeddings)
faiss.write_index(index, index_file)
print("FAISS 인덱스 저장 완료.")
# FAISS 인덱스+디버깅프린트까지
def load_index(index_file="vector_index.faiss"):
try:
index = faiss.read_index(index_file)
print("FAISS 인덱스 로드 완료.")
return index
except Exception as e:
print(f"FAISS 인덱스를 로드할 수 없습니다: {e}")
return None
def query_index(query, index, texts, top_k=3):
print("쿼리를 Embedding으로 변환 중입니다...")
query_embedding = create_embeddings([query])
print("쿼리 검색 중...")
distances, indices = index.search(query_embedding, top_k)
results = [texts[i] for i in indices[0]]
return results
def retrieve_with_rag(user_query, texts, index=None, top_k=3):
if index is None:
print("FAISS 인덱스를 로드 중입니다...")
index = load_index()
if index is None:
print("FAISS 인덱스가 없습니다. 새로 생성합니다...")
create_index(texts)
index = load_index()
print(f"'{user_query}'에 대한 검색을 시작합니다...")
results = query_index(user_query, index, texts, top_k=top_k)
print(f"검색된 결과: {results}")
return results
# JSON 파일 처리부분
def process_json_data(json_files):
all_json_data = []
for file_path in json_files:
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
all_json_data.extend(data)
return [f"Title: {item['title']}\nContent: {item['content']}" for item in all_json_data]
def split_texts(texts, chunk_size=1000, chunk_overlap=200):
recursive_text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False,
)
documents = [Document(page_content=text) for text in texts]
return recursive_text_splitter.split_documents(documents)
def detect_language(text):
try:
language = detect(text)
return language
except:
return 'unknown'
def dividing_story(text):
dividing = ChatOpenAI(model="gpt-4o-mini")
contextual_prompt = ChatPromptTemplate.from_messages([
("system", "You are a story analyzer. Divide the given text into four parts: introduction, development, turn, and conclusion. Return the result as a JSON object with keys: Introduction, Development, Turn, and Conclusion."),
("user", "{text}")
])
chain = LLMChain(llm=dividing, prompt=contextual_prompt)
response = chain.run({"text": text})
# print("GPT Response:", response) # 디버깅
# ```json``` 없애기
if response.startswith("```json"):
response = response.strip("```json").strip("```")
# print("Processed Response:", response) # 디버깅
try:
story_parts = json.loads(response)
except json.JSONDecodeError as e:
raise ValueError(f"GPT response is not valid JSON: {e}")
return story_parts
def storytelling(part_name, segment, user_message=None):
host = ChatOpenAI(model="gpt-4")
detected_lang = detect_language(user_message)
if detected_lang == 'unknown':
detected_lang = 'ko'
# 프롬프트 템플릿
with open("ggoggoprompt.txt", "r", encoding="utf-8") as file:
custom_prompt = file.read()
contextual_prompt = ChatPromptTemplate.from_messages([
("system", custom_prompt),
("assistant", "We are currently discussing the '{part_name}' section of the story. Here is the content:\n\n{segment}"),
("user", "{user_input}")
])
chain = LLMChain(llm=host, prompt=contextual_prompt)
user_input = user_message
response = chain.run({"part_name": part_name, "segment": segment, "user_input": user_input, "detected_lang": detected_lang})
return response
def initialize_rag_chain(model, splits):
try:
vectorstore = FAISS.load_local(
"vector_index",
model,
allow_dangerous_deserialization=True
)
except RuntimeError:
print("FAISS 인덱스를 찾을 수 없습니다. 새로 생성합니다...")
create_index(splits)
vectorstore = FAISS.load_local(
"vector_index",
model,
allow_dangerous_deserialization=True
)
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
llm = ChatOpenAI(model="gpt-4")
return RetrievalQA.from_chain_type(llm=llm, retriever=retriever, return_source_documents=False)
def get_story_parts(user_query=None, json_texts=None, rag_chain=None):
if json_texts is None:
raise ValueError("json_texts가 제공되지 않았습니다. JSON 데이터를 먼저 로드하세요.")
if user_query is None or user_query.strip().lower() in ["재미", "아무거나", "재밌", "암꺼나"]:
print("재미난 이야기를 가져오는 중...")
try:
random_story = random.choice(json_texts)
print("랜덤 이야기 선택 완료!")
return dividing_story(random_story)
except Exception as e:
print(f"랜덤 이야기를 선택하는 중 오류 발생: {e}")
return None
else:
print("요청한 이야기를 가져오는 중...")
if rag_chain is None:
raise ValueError("RAG 체인이 초기화되지 않았습니다.")
try:
# RAG 체인 실행 및 반환값 확인
result = rag_chain.invoke({"query": user_query})
print(f"RAG 체인 반환값: {result}")
# 결과가 문자열인지 확인
if isinstance(result, dict) and "result" in result:
story_text = result["result"]
elif isinstance(result, str):
story_text = result # 반환값이 문자열인 경우
else:
print("RAG 체인에서 반환된 데이터 구조를 처리할 수 없습니다.")
return None
print("검색된 이야기:\n", story_text)
return dividing_story(story_text)
except Exception as e:
print(f"이야기를 검색하는 중 오류 발생: {e}")
return None
conversation_logs = {}
# 대화 저장
def save_conversation(session_id, user_message, assistant_message):
if session_id not in conversation_logs:
conversation_logs[session_id] = []
conversation_logs[session_id].append({
"timestamp": datetime.now().isoformat(),
"user": user_message,
"assistant": assistant_message
})
with open("conversation_logs.json", "w", encoding="utf-8") as file:
json.dump(conversation_logs, file, indent=4, ensure_ascii=False)
# 대화 로드
def load_conversation(session_id):
try:
with open("conversation_logs.json", "r", encoding="utf-8") as file:
data = json.load(file)
return data.get(session_id, [])
except FileNotFoundError:
return []
def chatbot():
session_id = str(datetime.now().timestamp())
print(f"세션 ID가 생성되었습니다: {session_id}")
json_files = ["filtered_unsolved_cases.json", "korea_crime.json"]
texts = process_json_data(json_files)
splits = [split.page_content for split in split_texts(texts)]
index = load_index()
if index is None:
create_index(splits)
index = load_index()
json_texts = texts
rag_chain = initialize_rag_chain(model, splits)
while True:
print("\n어떤 이야기를 들려드릴까요?")
user_query = input("듣고 싶은 이야기를 입력 (종료하려면 'quit'): ").strip()
if user_query.lower() in ["quit", "종료"]:
print("대화를 종료합니다.")
break
try:
story_parts = get_story_parts(user_query=user_query, json_texts=json_texts, rag_chain=rag_chain)
if story_parts is None:
print("이야기를 가져오는 데 실패했습니다. 다시 시도해주세요.")
continue
except Exception as e:
print(f"이야기를 가져오는 중 오류 발생: {e}")
continue
for part_name, segment in story_parts.items():
print(f"\n=== {part_name} ===\n")
print(f"이야기꾼: {segment}")
turns = 0
while turns < 2:
user_message = input("사용자입력: ")
if user_message.lower() in ["잘들었어", "그만", "quit", "exit"]:
print("대화를 종료합니다.")
return
if user_message.lower() in ["그래서", "next"]:
break
response = storytelling(part_name=part_name, segment=segment, user_message=user_message)
print(f"이야기꾼: {response}")
save_conversation(session_id, user_message, response)
turns += 1
while True:
next_action = input("\n새로운 이야기를 듣고 싶으신가요? (yes: 새 이야기 / no: 종료): ").lower()
if next_action in ["no", "그만", "종료"]:
print("대화를 종료합니다.")
return
elif next_action in ["yes", "새 이야기", "다음 이야기"]:
break
else:
print("잘못된 입력입니다. 'yes' 또는 'no'로 응답해주세요.")
#챗봇구동
chatbot()
특징
모듈화를 하기 위해 노력함 ->이후 부가기능을 넣거나 데이터 형태에 따라 수정하기 용이함
진행상황을 알기 위해 print() 구문을 많이 넣음
예외처리 강화
개선사항
프롬프트 엔지니어링
속도개선
실시간 데이터 업데이트 및 음성출력기능 추가 여부 결정
코드흐름
chatbot
├── process_json_data (JSON 파일 → 텍스트 리스트)
├── split_texts (텍스트 리스트 → 적합한 크기로 분할)
├── load_index (기존 인덱스 로드 시도)
│ └── create_index (인덱스 없을 시 새로 생성)
├── initialize_rag_chain (RAG 체인 초기화)
└── get_story_parts (사용자 쿼리 처리)
├── retrieve_with_rag (FAISS 기반 검색 수행)
│ ├── load_index (인덱스 로드)
│ └── query_index (FAISS 검색)
└── dividing_story (검색된 텍스트를 이야기 구조로 분할)
model = SentenceTransformer('all-MiniLM-L6-v2', device="cpu")
hugging face의 sentence trasformer 모델을 사용하는 코드
문장을 벡터로 변환한다.
device='cpu'를 명시하지 않으면 AttributeError: module 'torch.backends' has no attribute 'mps'
찾아보니 windows에서 지원하지 않는 기능이라서 발생하는 오류라고 한다.
내 컴퓨터는 gpu가 없으니 cpu를 사용하도록 했다.
참고로, 'all-MiniLM-L6-v2' 가벼운 모델이라서 cpu에서도 작동을 잘 한다. 하지만 이 모델은 주로 영어를 임베딩하는데 특화되어 있어서, 한국어 전용 버전을 사용하면 더 좋을거같긴 하다.
만약 여러 언어로 된 문서를 받으려면 'multi-qa-mpnet-base-dot-v1' 를 사용할 수 있다. (cpu에서 구동 느림)
def create_embeddings(texts):
return model.encode(texts, convert_to_tensor=False)
모델을 사용해서 텍스트를 숫자로 바꾸는 함수
convert_to_tensor=False: 넘파이 배열로 반환함
convert_to_tensor=True: 파이토치 텐서로 반환함
gpu를 사용하거나 반든 데이터를 파이토치의 다른 모델로 전달해야하면 True로 설정하는게 낫지만, 지금은 넘파이 배열로 충분할 것 같다.
def create_index(texts, index_file="vector_index.faiss"):
embeddings = create_embeddings(texts)
text로 faiss index를 생성하고 파일로 저장하는 함수
(원본 코드를 보면 오류가 많이 나서 중간중간 확인하느라 print() 문이 많이 들어가있다. 나중에 지워도 상관없다.)
인자로 text를 받고, 파일을 저장한 파일명은 기본값을 "vector_index.faiss" 로 주고있다.
이후 create_embeddings 함수를 호출해서 임베딩을 생성하고 embeddings에 저장
d = embeddings.shape[1]
임베딩은 (문장개수, 임베딩차원)으로 표현되므로 shape[1]은 임베딩 차원을 의미
d=임베딩 벡터의 차원
faiss는 차원정보를 알아야 데이터를 저장하고 검색할 수 있다.
index = faiss.IndexFlatL2(d)
IndexFlatL2는 faiss에서 제공하는 기본적인 벡터 인덱스로, 모든 벡터를 평면배열로 저장해서 유클리드 거리 기반 유사도를 계산함
모든 벡터를 비교하므로 작은 데이터셋에 적합
index.add(embeddings)
faiss.write_index(index, index_file)
생성된 embeddings를 index에 추가하고
index를 index_file("vector_index.faiss" )로 저장
trouble shooting
원래는 sentence transformer를 쓰지 않고 embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
를 이용해 임제딩을 했다. 하지만 데이터 총 토큰수가 너무 많아 오류가 낫다.
RateLimitError: Error code: 429 - {'error': {'message': 'Request too large for text-embedding-ada-002 in organization org-NVmA3arQT2vcxWGFQs0g0KSW on tokens per min (TPM): Limit 1000000, Requested 1817821. The input or output tokens must be reduced in order to run successfully. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}}
그래서 다른 모델을 사용했다.
hugging face에서 제공하는 sentence transformer는 무료로 사용가능하고, 로컬에서도 실행가능하다. 대신 데이터가 커지면 속도가 느려진다.
다른 대안
cohere API: openai와 유사하지만 약간 품질이 낮을 수 있고 초과사용시 요금 발생
Azure ai: openai와 비슷한 품질이나 유료이고 설정이 복잡해짐
Google universal sentence encoder: 무료에 뛰어난 성능, tensorflow에서 사용 가능
def load_index(index_file="vector_index.faiss"):
try:
index = faiss.read_index(index_file)
print("FAISS 인덱스 로드 완료.")
return index
except Exception as e:
print(f"FAISS 인덱스를 로드할 수 없습니다: {e}")
return None
저장한 index_file을 로드하는 코드
원래는 read_index만 있는 간단한 코드였는데, 에러가 어디서 나는지 모르겠어서 try-except와 print()를 추가했다.
자주 났던 에러
RuntimeError: Error in __cdecl faiss::FileIOReader::FileIOReader(const char *) at D:\a\faiss-wheels\faiss-wheels\faiss\faiss\impl\io.cpp:68: Error: 'f' failed: could not open vector_index\index.faiss for reading: No such file or directory -> index.faiss 파일을 찾지 못 함
def query_index(query, index, texts, top_k=3):
사용자가 입력하면(query) faiss에서 검색해서 비슷한 것을 반환하는 함수
인자로 query(사용자입력), 미리 생성한 faiss index, 원본텍스트, 반환할 유사텍스트 개수(여기서는 3개)
query_embedding = create_embeddings([query])
create_embeddings 함수로 query(사용자가 입력한 텍스트)를 숫자로 바꾼다.
이 때 [query]로 리스트 형태로 전달해서 query_embeddings는 (1,출력차원) 형태로 반환된다.
왜냐면 뒤에서 쓸 index.search는 (검색하려는 쿼리, 벡터의 차원) 형태로 데이터를 기대하기 때문이다.
이 형태를 맞춰주지 않으면 shape를 맞추라는 assertionerror가 난다.
distances, indices = index.search(query_embedding, top_k)
index.search()로 쿼리와 가장 유사한 k개의 벡터를 검색해서
그 벡터들의 유클리드거리와 인덱스를 반환 -> ( distances, indices )
이 때 indices는 2차원 배열로 반환된다.
results = [texts[i] for i in indices[0]]
return results
예를 들어
texts=["I love you", "I love her", "I will leave you", "I hate you" ]
이고 각각 인덱스0 부터 4까지가 부여되었다고 하면,
query="Do u love me?" 일 때 유사한 인덱스로 반환된 indices=[[0,3,2]]
indices[0]=[0,3,2]
texts[i] for i in indices[0] = texts[0], texts[3], texts[2]
results=[] 이므로 이 texts들을 리스트로 반환
results= ["I love you", "I hate you", "I will leave you", ]
def retrieve_with_rag(user_query, texts, index=None, top_k=3):
사용자가 입력한 질문으로 faiss인덱스를 검색한 후 결과를 반환
if index is None:
index = load_index()
인덱스가 없으면 저장된 인덱스를 우선 로드함
if index is None:
create_index(texts)
index = load_index()
로드에 실패해서 여전히 인덱스가 없으면
제공받은 text로 새로운 인덱스를 만들어 저장하고 다시 인덱스를 로드
results = query_index(user_query, index, texts, top_k=top_k)
print(f"검색된 결과: {results}")
return results
query_index() 함수를 호출하고 결과를 print
query_index() 함수만 있어도 될거같은데 굳이 retrieve_with_rag 를 추가한 이유?
retrieve with rag는 인덱스를 직접 로드하거나 생성할 수 있어서 인덱스가 준비되지 않아도 동작한다.
즉 검색과정 전체에 관여해서 인덱스를 불러오고 query index함수까지 호출해주는 역할
def process_json_data(json_files):
all_json_data = []
for file_path in json_files:
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
all_json_data.extend(data)
return [f"Title: {item['title']}\nContent: {item['content']}" for item in all_json_data]
def split_texts(texts, chunk_size=1000, chunk_overlap=200):
recursive_text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False,
)
documents = [Document(page_content=text) for text in texts]
return recursive_text_splitter.split_documents(documents)
여기는 이전에 크롤링으로 얻은 json파일을 처리하고 recursive_text_splitter를 이용해서 자르는부분을 모듈화한 것이다. 파일 로드와 텍스트 처리는 이전 게시물에서 했으므로 설명 생략
중요한건 모듈화를 해둬서 만약 파일이 json이 아닌 다른것으로 바뀌면 json data처리함수만 손보고 split_texts 함수는 그대로 사용할 수 있다는 점이다.
2편에서 계속
'머신러닝' 카테고리의 다른 글
이야기 챗봇 구현-3 (2) | 2024.11.27 |
---|---|
이야기 챗봇 구현-2 (1) | 2024.11.26 |
LLM과 RAG를 활용한 챗봇 구현-3 (3) | 2024.11.20 |
LLM과 RAG를 활용한 챗봇 구현-2 (0) | 2024.11.19 |
LLM과 RAG를 활용한 챗봇 구현 (1) | 2024.11.18 |