티스토리 뷰

✅ 1️⃣ AI 기반 추천 시스템의 동작 방식

💡 목표:

사용자가 특정 상품을 조회하면, AI가 해당 사용자의 이전 행동 데이터를 분석하여 최적의 상품을 추천한다.

 

[사용자 이벤트 발생 (상품 조회)]  
      ↓  
[Kafka] → [Redis (이전 본 상품 저장)]  
      ↓  
[AI 추천 시스템 (추천 상품 생성)]  
      ↓  
[추천 메시지 생성 및 전송 (SMS, 이메일, 푸시)]  

 

✅ 2️⃣ 1단계: 사용자 행동 데이터 저장 (상품 조회 이벤트 저장)

👉 사용자가 상품을 조회하면, 해당 이벤트를 Kafka에 저장하고, Redis에서 조회 이력을 관리한다.

public class UserEvent {
    private String userId;
    private String eventType;  // "PRODUCT_VIEW"
    private String productId;
    private LocalDateTime timestamp;

    public UserEvent() {}

    public UserEvent(String userId, String eventType, String productId, LocalDateTime timestamp) {
        this.userId = userId;
        this.eventType = eventType;
        this.productId = productId;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "UserEvent{" +
                "userId='" + userId + '\'' +
                ", eventType='" + eventType + '\'' +
                ", productId='" + productId + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

사용자 이벤트 수집 API

@RestController
@RequestMapping("/events")
public class MarketingEventController {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public MarketingEventController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/track")
    public ResponseEntity<String> trackEvent(@RequestBody UserEvent event) {
        String eventJson = new Gson().toJson(event);
        kafkaTemplate.send("user-events", eventJson);
        return ResponseEntity.ok("✅ 사용자 이벤트 저장: " + event.getEventType());
    }
}

테스트 API 호출 (상품 조회 이벤트)

curl -X POST "http://localhost:8080/events/track" -H "Content-Type: application/json" -d '{
    "userId": "user123",
    "eventType": "PRODUCT_VIEW",
    "productId": "product567",
    "timestamp": "2024-03-10T10:00:00"
}'

📌 Select-String을 사용하여 user-events 메시지 필터링

docker logs -f kafka
docker logs -f kafka | Select-String "user-events"
docker logs --since 10m kafka | Select-String "user-events"

[2025-03-03 23:52:50,517] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='user-e
vents', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offli
neReplicas=[]) for partition user-events-0 in response to UpdateMetadata request sent by controller 1 epoch 1 w
ith correlation id 6 (state.change.logger)

=> 사용자 상품 조회 이벤트가 Kafka에 저장된 것을 확인할 수 있음

 

✅ 3️⃣ 2단계: Redis를 이용하여 사용자별 조회 이력 저장

👉 사용자가 최근 조회한 상품을 Redis에 저장하여 AI 추천 시스템에서 활용할 수 있도록 한다.

📌 기능 개요

👉 사용자가 상품을 조회하면 해당 이력을 Redis에 저장
👉 최대 5개까지 저장, 이후 새로운 상품이 들어오면 오래된 데이터는 삭제
👉 TTL(유효기간) 7일 설정, 7일 동안 조회 이력이 유지되도록 함

 

RedisService를 수정하여 사용자별 조회 이력 저장 기능 추가

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.TimeUnit;

/*
* Redis를 이용한 사용자 이벤트 저장 서비스
* */
@Service
public class RedisService {
    private final StringRedisTemplate redisTemplate;
    private static final int MAX_RECENT_PRODUCTS = 5; // 최근 본 상품 최대 개수
    private static final long TTL_DAYS = 7; // 7일 후 자동 삭제

    public RedisService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    // ✅ 사용자별 최근 본 상품 저장 (최대 5개, 7일 유지)
    public void saveUserViewedProduct(String userId, String productId) {
        String key = "user:recent_products:" + userId;

        // 상품 추가 (중복 제거 후 저장)
        redisTemplate.opsForList().remove(key, 0, productId);
        redisTemplate.opsForList().rightPush(key, productId);

        // 리스트 길이가 5개 초과하면 가장 오래된 데이터 삭제
        if (Boolean.TRUE.equals(redisTemplate.opsForList().size(key) > MAX_RECENT_PRODUCTS)) {
            redisTemplate.opsForList().leftPop(key);
        }

        // TTL 설정 (7일 후 자동 삭제)
        redisTemplate.expire(key, TTL_DAYS, TimeUnit.DAYS);
    }

    // ✅ 사용자별 최근 본 상품 목록 가져오기
    public List<String> getUserViewedProducts(String userId) {
        String key = "user:recent_products:" + userId;
        return redisTemplate.opsForList().range(key, 0, MAX_RECENT_PRODUCTS - 1);
    }
}

 

Kafka Consumer에서 사용자 이벤트를 Redis에 저장하도록 수정

package com.example.messaging_service.messaging.service;

import com.example.messaging_service.messaging.entity.MessageLog;
import com.example.messaging_service.messaging.model.UserEvent;
import com.example.messaging_service.messaging.repository.MessageLogRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.annotation.Timed;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

/*
* Kafka 메시지 소비 및 DB 저장 (Batch 처리 + 병렬 처리 적용)
* 사용자 이벤트 Redis 저장 기능 추가
* */
@Service
public class KafkaConsumerService {

    private final MessageLogRepository messageLogRepository;
    private final Counter messageCounter;
    private final RedisService redisService;
    private final ObjectMapper objectMapper;

    public KafkaConsumerService(MessageLogRepository messageLogRepository, MeterRegistry meterRegistry, RedisService redisService, ObjectMapper objectMapper) {
        this.messageLogRepository = messageLogRepository;
        this.messageCounter = meterRegistry.counter("kafka.consumer.processed.messages"); // Kafka 메시지 처리량 카운터
        this.redisService = redisService;
        this.objectMapper = objectMapper;
    }

    //개별 메시지를 받도록 되어있으므로 Batch 처리를 지원하는 전용 팩토리 필요 (KafkaConfig.java)
    //여러 개의 메시지 예를 들어 1초에 10,000개의 메시지가 들어오더라도 싱글 스레드인 kafka의 처리 지연을 막고 병렬 소비 활성화. 여러 메시지 동시 처리 가능.
    @KafkaListener(topics = "message-topic", groupId = "messaging-group", containerFactory = "batchFactory")
    @Timed(value = "kafka.message.process.time", description = "Time taken to process messages")
    public void consumeMessages(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        long startTime = System.currentTimeMillis(); // 처리 시간 측정 시작

        System.out.println("📥 Batch Received: " + records.size() + " messages");

        List<MessageLog> logs = records.stream().map(record -> {
            String[] parts = record.value().split(":");
            MessageLog log = new MessageLog();
            log.setType(parts[0]);
            log.setRecipient(parts[1]);
            log.setContent(parts[2]);
            return log;
        }).collect(Collectors.toList());

        // 한 번에 여러 개의 메시지를 받아 Bulk Insert
        messageLogRepository.saveAll(logs);

        // Kafka 메시지 처리량 카운터 증가
        messageCounter.increment(records.size());

        long endTime = System.currentTimeMillis(); // 처리 시간 측정 종료
        System.out.println("✅ Batch 처리 시간: " + (endTime - startTime) + " ms");

        // 수동 커밋 (오프셋 관리) - 중복 처리 방지
        ack.acknowledge();
    }

    // ✅ 사용자 이벤트 처리 (Redis 저장)
    @KafkaListener(topics = "user-events", groupId = "messaging-group")
    public void consumeUserEvent(String message) {
        try {
            UserEvent event = objectMapper.readValue(message, UserEvent.class);

            if ("PRODUCT_VIEW".equals(event.getEventType())) {
                redisService.saveUserViewedProduct(event.getUserId(), event.getProductId());
                System.out.println("✅ Redis에 저장된 최근 본 상품: " + redisService.getUserViewedProducts(event.getUserId()));
            }
        } catch (Exception e) {
            System.err.println("❌ JSON 변환 오류: " + e.getMessage());
        }
    }
}

✅ 4️⃣ API를 통해 사용자의 최근 본 상품 목록을 가져오는 기능 추가

이제 특정 사용자의 최근 본 상품 목록을 조회하는 API를 추가

import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/recommendations")
public class RecommendationController {
    private final RedisService redisService;

    public RecommendationController(RedisService redisService) {
        this.redisService = redisService;
    }

    @GetMapping("/recent/{userId}")
    public ResponseEntity<List<String>> getRecentViewedProducts(@PathVariable String userId) {
        List<String> recentProducts = redisService.getUserViewedProducts(userId);
        return ResponseEntity.ok(recentProducts);
    }
}

사용자별 최근 본 상품 조회 api 테스트

http://localhost:8080/recommendations/recent/user123으로 postman get 요청 시 kafka에서 redis로 저장되어있던 정보를 조회하는 것을 확인할 수 있음

[
    "product567"
]

 

* 정리

1) 사용자가 상품 조회 시 이벤트 발생
- 사용자 조회 시 Kafka를 통해 메시지를 user-events라는 토픽에 전송
- Kafka Producer가 실행되어 kafka에 메시지를 보낸다.


2) Kafka Consumer가 메시지를 소비하고 Redis에 저장
✅ Kafka에서 메시지를 소비하여, 사용자별 최근 본 상품을 Redis에 저장
✅ 최대 5개까지만 유지하고, TTL(유효기간) 7일 설정
✅ 새로운 상품을 조회하면 가장 오래된 데이터부터 삭제


3) 사용자별 최근 본 상품 조회

- 특정 사용자의 최근 본 상품 조회

- 최근 본 상품 조회 기반 메시지 추천

'TIL(Today I Learned)' 카테고리의 다른 글

대규모 메시징 시스템 - 2) 카카오 메시지 api, SMS, 푸시 메시지 자동 전송 기능  (0) 2025.03.04
24.04.05,08  (0) 2024.04.09
24.04.04  (0) 2024.04.05
24.04.03  (0) 2024.04.03
24.04.02  (0) 2024.04.03
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함