-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlambda_marketing_content_generator.py
145 lines (124 loc) · 4.44 KB
/
lambda_marketing_content_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import json
import pymysql
import boto3
import base64
import os
from datetime import datetime
# RDS와 S3 연결 정보
RDS_HOST = os.getenv("RDS_HOST")
RDS_USER = os.getenv("RDS_USER")
RDS_PASSWORD = os.getenv("RDS_PASSWORD")
RDS_DB = os.getenv("RDS_DB")
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
MODEL_ID = "anthropic.claude-3-5-sonnet-20240620-v1:0"
bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1")
s3_client = boto3.client('s3')
def connect_to_rds():
return pymysql.connect(
host=RDS_HOST,
user=RDS_USER,
password=RDS_PASSWORD,
database=RDS_DB,
cursorclass=pymysql.cursors.DictCursor
)
def upload_image_to_s3(image_data, category):
if not image_data:
raise ValueError("Image data is required and cannot be None")
image_bytes = base64.b64decode(image_data)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
image_key = f"{category}/image_{timestamp}.jpg"
s3_client.put_object(
Bucket=S3_BUCKET_NAME,
Key=image_key,
Body=image_bytes,
ContentType='image/jpeg'
)
s3_url = f"https://{S3_BUCKET_NAME}.s3.amazonaws.com/{image_key}"
return s3_url
# Bedrock 모델에 대한 응답을 가져오는 함수
def claude_model_get_response(reviews, additional_requests):
prompt = f"""
{reviews}는 우리 가게의 손님들이 남긴 후기입니다. 이 리뷰들을 바탕으로, {additional_requests}를 강조하거나 주제로 포함하여 인스타그램 마케팅용 제목과 본문을 각각 3개씩 작성해 주세요. 제목은 30자 이내, 본문은 200자 이내로 작성해 주시고, 한국어로 작성해 주세요.
답변 형식은 다음과 같습니다:
"first_title" = "...",
"first_content" = "...",
"second_title" = "...",
"second_content" = "...",
"third_title" = "...",
"third_content" = "...",
"""
try:
body = json.dumps(
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"temperature": 0.99,
"top_p": 0.99,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
}
]
}
]
}
)
response = bedrock_runtime.invoke_model(
modelId=MODEL_ID,
contentType="application/json",
accept="application/json",
body=json.dumps(body)
)
response_body = json.loads(response.get("body").read())
output_text = response_body["content"][0]["text"]
return output_text
except Exception as e:
print(e)
def lambda_handler(event, context):
body = json.loads(event['body'])
category = body.get('category')
image_data = body.get('image')
posting_time = body.get('posting_time')
additional_requests = body.get('additional_requests', None)
connection = None
try:
# 1. 이미지 S3에 업로드
s3_image_url = upload_image_to_s3(image_data, category)
# 2. RDS에서 데이터 조회
connection = connect_to_rds()
with connection.cursor() as cursor:
query = """
SELECT * FROM review
WHERE category = %s
"""
cursor.execute(query, (category,))
reviews = cursor.fetchall()
# 3. 모델 응답 생성
res = claude_model_get_response(reviews, additional_requests)
response_json = json.dumps({
'reviews': reviews,
's3_image_url': s3_image_url,
'additional_requests': additional_requests,
'model_response': res
}, ensure_ascii=False)
response_bytes = response_json.encode('utf-8')
# 4. 응답 데이터 반환
return {
'statusCode': 200,
'body': response_json,
'headers': {
'Content-Type': 'application/json'
}
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(f"Error: {str(e)}")
}
finally:
if connection:
connection.close()