티스토리 뷰
✅ 1️⃣ AI 기반 추천 시스템의 동작 방식
💡 목표:
사용자가 특정 상품을 조회하면, AI가 해당 사용자의 이전 행동 데이터를 분석하여 최적의 상품을 추천한다.
[사용자 이벤트 발생 (상품 조회)]
↓
[Kafka] → [Redis (이전 본 상품 저장)]
↓
[AI 추천 시스템 (추천 상품 생성)]
↓
[추천 메시지 생성 및 전송 (SMS, 이메일, 푸시)]
✅ 2️⃣ 1단계: 사용자 행동 데이터 저장 (상품 조회 이벤트 저장)
👉 사용자가 상품을 조회하면, 해당 이벤트를 Kafka에 저장하고, Redis에서 조회 이력을 관리한다.
public class UserEvent {
private String userId;
private String eventType; // "PRODUCT_VIEW"
private String productId;
private LocalDateTime timestamp;
public UserEvent() {}
public UserEvent(String userId, String eventType, String productId, LocalDateTime timestamp) {
this.userId = userId;
this.eventType = eventType;
this.productId = productId;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "UserEvent{" +
"userId='" + userId + '\'' +
", eventType='" + eventType + '\'' +
", productId='" + productId + '\'' +
", timestamp=" + timestamp +
'}';
}
}
사용자 이벤트 수집 API
@RestController
@RequestMapping("/events")
public class MarketingEventController {
private final KafkaTemplate<String, String> kafkaTemplate;
public MarketingEventController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/track")
public ResponseEntity<String> trackEvent(@RequestBody UserEvent event) {
String eventJson = new Gson().toJson(event);
kafkaTemplate.send("user-events", eventJson);
return ResponseEntity.ok("✅ 사용자 이벤트 저장: " + event.getEventType());
}
}
✅ 테스트 API 호출 (상품 조회 이벤트)
curl -X POST "http://localhost:8080/events/track" -H "Content-Type: application/json" -d '{
"userId": "user123",
"eventType": "PRODUCT_VIEW",
"productId": "product567",
"timestamp": "2024-03-10T10:00:00"
}'
📌 Select-String을 사용하여 user-events 메시지 필터링
docker logs -f kafka
docker logs -f kafka | Select-String "user-events"
docker logs --since 10m kafka | Select-String "user-events"
[2025-03-03 23:52:50,517] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='user-e
vents', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offli
neReplicas=[]) for partition user-events-0 in response to UpdateMetadata request sent by controller 1 epoch 1 w
ith correlation id 6 (state.change.logger)
=> 사용자 상품 조회 이벤트가 Kafka에 저장된 것을 확인할 수 있음
✅ 3️⃣ 2단계: Redis를 이용하여 사용자별 조회 이력 저장
👉 사용자가 최근 조회한 상품을 Redis에 저장하여 AI 추천 시스템에서 활용할 수 있도록 한다.
📌 기능 개요
👉 사용자가 상품을 조회하면 해당 이력을 Redis에 저장
👉 최대 5개까지 저장, 이후 새로운 상품이 들어오면 오래된 데이터는 삭제
👉 TTL(유효기간) 7일 설정, 7일 동안 조회 이력이 유지되도록 함
RedisService를 수정하여 사용자별 조회 이력 저장 기능 추가
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
/*
* Redis를 이용한 사용자 이벤트 저장 서비스
* */
@Service
public class RedisService {
private final StringRedisTemplate redisTemplate;
private static final int MAX_RECENT_PRODUCTS = 5; // 최근 본 상품 최대 개수
private static final long TTL_DAYS = 7; // 7일 후 자동 삭제
public RedisService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
// ✅ 사용자별 최근 본 상품 저장 (최대 5개, 7일 유지)
public void saveUserViewedProduct(String userId, String productId) {
String key = "user:recent_products:" + userId;
// 상품 추가 (중복 제거 후 저장)
redisTemplate.opsForList().remove(key, 0, productId);
redisTemplate.opsForList().rightPush(key, productId);
// 리스트 길이가 5개 초과하면 가장 오래된 데이터 삭제
if (Boolean.TRUE.equals(redisTemplate.opsForList().size(key) > MAX_RECENT_PRODUCTS)) {
redisTemplate.opsForList().leftPop(key);
}
// TTL 설정 (7일 후 자동 삭제)
redisTemplate.expire(key, TTL_DAYS, TimeUnit.DAYS);
}
// ✅ 사용자별 최근 본 상품 목록 가져오기
public List<String> getUserViewedProducts(String userId) {
String key = "user:recent_products:" + userId;
return redisTemplate.opsForList().range(key, 0, MAX_RECENT_PRODUCTS - 1);
}
}
Kafka Consumer에서 사용자 이벤트를 Redis에 저장하도록 수정
package com.example.messaging_service.messaging.service;
import com.example.messaging_service.messaging.entity.MessageLog;
import com.example.messaging_service.messaging.model.UserEvent;
import com.example.messaging_service.messaging.repository.MessageLogRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.annotation.Timed;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
/*
* Kafka 메시지 소비 및 DB 저장 (Batch 처리 + 병렬 처리 적용)
* 사용자 이벤트 Redis 저장 기능 추가
* */
@Service
public class KafkaConsumerService {
private final MessageLogRepository messageLogRepository;
private final Counter messageCounter;
private final RedisService redisService;
private final ObjectMapper objectMapper;
public KafkaConsumerService(MessageLogRepository messageLogRepository, MeterRegistry meterRegistry, RedisService redisService, ObjectMapper objectMapper) {
this.messageLogRepository = messageLogRepository;
this.messageCounter = meterRegistry.counter("kafka.consumer.processed.messages"); // Kafka 메시지 처리량 카운터
this.redisService = redisService;
this.objectMapper = objectMapper;
}
//개별 메시지를 받도록 되어있으므로 Batch 처리를 지원하는 전용 팩토리 필요 (KafkaConfig.java)
//여러 개의 메시지 예를 들어 1초에 10,000개의 메시지가 들어오더라도 싱글 스레드인 kafka의 처리 지연을 막고 병렬 소비 활성화. 여러 메시지 동시 처리 가능.
@KafkaListener(topics = "message-topic", groupId = "messaging-group", containerFactory = "batchFactory")
@Timed(value = "kafka.message.process.time", description = "Time taken to process messages")
public void consumeMessages(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
long startTime = System.currentTimeMillis(); // 처리 시간 측정 시작
System.out.println("📥 Batch Received: " + records.size() + " messages");
List<MessageLog> logs = records.stream().map(record -> {
String[] parts = record.value().split(":");
MessageLog log = new MessageLog();
log.setType(parts[0]);
log.setRecipient(parts[1]);
log.setContent(parts[2]);
return log;
}).collect(Collectors.toList());
// 한 번에 여러 개의 메시지를 받아 Bulk Insert
messageLogRepository.saveAll(logs);
// Kafka 메시지 처리량 카운터 증가
messageCounter.increment(records.size());
long endTime = System.currentTimeMillis(); // 처리 시간 측정 종료
System.out.println("✅ Batch 처리 시간: " + (endTime - startTime) + " ms");
// 수동 커밋 (오프셋 관리) - 중복 처리 방지
ack.acknowledge();
}
// ✅ 사용자 이벤트 처리 (Redis 저장)
@KafkaListener(topics = "user-events", groupId = "messaging-group")
public void consumeUserEvent(String message) {
try {
UserEvent event = objectMapper.readValue(message, UserEvent.class);
if ("PRODUCT_VIEW".equals(event.getEventType())) {
redisService.saveUserViewedProduct(event.getUserId(), event.getProductId());
System.out.println("✅ Redis에 저장된 최근 본 상품: " + redisService.getUserViewedProducts(event.getUserId()));
}
} catch (Exception e) {
System.err.println("❌ JSON 변환 오류: " + e.getMessage());
}
}
}
✅ 4️⃣ API를 통해 사용자의 최근 본 상품 목록을 가져오는 기능 추가
이제 특정 사용자의 최근 본 상품 목록을 조회하는 API를 추가
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/recommendations")
public class RecommendationController {
private final RedisService redisService;
public RecommendationController(RedisService redisService) {
this.redisService = redisService;
}
@GetMapping("/recent/{userId}")
public ResponseEntity<List<String>> getRecentViewedProducts(@PathVariable String userId) {
List<String> recentProducts = redisService.getUserViewedProducts(userId);
return ResponseEntity.ok(recentProducts);
}
}
사용자별 최근 본 상품 조회 api 테스트
http://localhost:8080/recommendations/recent/user123으로 postman get 요청 시 kafka에서 redis로 저장되어있던 정보를 조회하는 것을 확인할 수 있음
* 정리
1) 사용자가 상품 조회 시 이벤트 발생
- 사용자 조회 시 Kafka를 통해 메시지를 user-events라는 토픽에 전송
- Kafka Producer가 실행되어 kafka에 메시지를 보낸다.
2) Kafka Consumer가 메시지를 소비하고 Redis에 저장
✅ Kafka에서 메시지를 소비하여, 사용자별 최근 본 상품을 Redis에 저장
✅ 최대 5개까지만 유지하고, TTL(유효기간) 7일 설정
✅ 새로운 상품을 조회하면 가장 오래된 데이터부터 삭제
3) 사용자별 최근 본 상품 조회
- 특정 사용자의 최근 본 상품 조회
- 최근 본 상품 조회 기반 메시지 추천
'TIL(Today I Learned)' 카테고리의 다른 글
대규모 메시징 시스템 - 2) 카카오 메시지 api, SMS, 푸시 메시지 자동 전송 기능 (0) | 2025.03.04 |
---|---|
24.04.05,08 (0) | 2024.04.09 |
24.04.04 (0) | 2024.04.05 |
24.04.03 (0) | 2024.04.03 |
24.04.02 (0) | 2024.04.03 |