diff --git a/README.md b/README.md index fd33de85..50ce7f1b 100644 --- a/README.md +++ b/README.md @@ -369,7 +369,6 @@ There are multiple ways of getting support: - tag me on [https://discuss.tchncs.de/u/danielgraf](Lemmy) - or join **#reitti** on [irc.dedicatedcode.com](https://irc.dedicatedcode.com) -- ## Support the Project Buy Me a Coffee at ko-fi.com diff --git a/src/main/java/com/dedicatedcode/reitti/config/RabbitMQConfig.java b/src/main/java/com/dedicatedcode/reitti/config/RabbitMQConfig.java index eca281dd..cfbf8f85 100644 --- a/src/main/java/com/dedicatedcode/reitti/config/RabbitMQConfig.java +++ b/src/main/java/com/dedicatedcode/reitti/config/RabbitMQConfig.java @@ -28,6 +28,9 @@ public class RabbitMQConfig { public static final String TRIGGER_PROCESSING_PIPELINE_QUEUE = "trigger-processing-queue"; public static final String TRIGGER_PROCESSING_PIPELINE_ROUTING_KEY = "trigger.processing.start"; + public static final String USER_EVENT_QUEUE = "user-event-queue"; + public static final String USER_EVENT_ROUTING_KEY = "user.event.update"; + @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); @@ -63,6 +66,11 @@ public class RabbitMQConfig { return new Queue(TRIGGER_PROCESSING_PIPELINE_QUEUE, false); } + @Bean + public Queue userEventQueue() { + return new Queue(USER_EVENT_QUEUE, false); + } + @Bean public Binding locationDataBinding(Queue locationDataQueue, TopicExchange exchange) { return BindingBuilder.bind(locationDataQueue).to(exchange).with(LOCATION_DATA_ROUTING_KEY); @@ -93,6 +101,11 @@ public class RabbitMQConfig { return BindingBuilder.bind(triggerProcessingQueue).to(exchange).with(TRIGGER_PROCESSING_PIPELINE_ROUTING_KEY); } + @Bean + public Binding userEventBinding(Queue userEventQueue, TopicExchange exchange) { + return BindingBuilder.bind(userEventQueue).to(exchange).with(USER_EVENT_ROUTING_KEY); + } + @Bean public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); diff --git a/src/main/java/com/dedicatedcode/reitti/controller/SseController.java b/src/main/java/com/dedicatedcode/reitti/controller/SseController.java new file mode 100644 index 00000000..3ddd02d8 --- /dev/null +++ b/src/main/java/com/dedicatedcode/reitti/controller/SseController.java @@ -0,0 +1,34 @@ +package com.dedicatedcode.reitti.controller; + +import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.UserSseEmitterService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.security.core.annotation.AuthenticationPrincipal; +import org.springframework.security.core.userdetails.UserDetails; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@RestController +public class SseController { + private static final Logger log = LoggerFactory.getLogger(SseController.class); + private final UserSseEmitterService emitterService; + + public SseController(UserSseEmitterService userSseEmitterService) { + this.emitterService = userSseEmitterService; + } + + @GetMapping(path = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter handleSseForUser(@AuthenticationPrincipal UserDetails userDetails) { + if (userDetails == null) { + throw new IllegalStateException("User not authenticated for SSE endpoint."); + } + + User user = (User) userDetails; + SseEmitter emitter = emitterService.addEmitter(user.getId()); + log.info("New SSE connection from user: [{}]", user.getId()); + return emitter; + } +} \ No newline at end of file diff --git a/src/main/java/com/dedicatedcode/reitti/controller/api/IngestApiController.java b/src/main/java/com/dedicatedcode/reitti/controller/api/IngestApiController.java index 6d34d813..96c015cc 100644 --- a/src/main/java/com/dedicatedcode/reitti/controller/api/IngestApiController.java +++ b/src/main/java/com/dedicatedcode/reitti/controller/api/IngestApiController.java @@ -4,7 +4,7 @@ import com.dedicatedcode.reitti.dto.LocationDataRequest; import com.dedicatedcode.reitti.dto.OwntracksLocationRequest; import com.dedicatedcode.reitti.model.User; import com.dedicatedcode.reitti.repository.UserJdbcService; -import com.dedicatedcode.reitti.service.importer.ImportBatchProcessor; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -14,7 +14,10 @@ import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.userdetails.UserDetails; import org.springframework.security.core.userdetails.UsernameNotFoundException; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; import java.io.Serializable; import java.util.Collections; diff --git a/src/main/java/com/dedicatedcode/reitti/event/SSEEvent.java b/src/main/java/com/dedicatedcode/reitti/event/SSEEvent.java new file mode 100644 index 00000000..f61002f9 --- /dev/null +++ b/src/main/java/com/dedicatedcode/reitti/event/SSEEvent.java @@ -0,0 +1,37 @@ +package com.dedicatedcode.reitti.event; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.io.Serializable; +import java.time.LocalDate; + +public class SSEEvent implements Serializable { + private final SSEType type; + private final Long userId; + private final Long changedUserId; + private final LocalDate date; + + @JsonCreator + public SSEEvent(SSEType type, Long userId, Long changedUserId, LocalDate date) { + this.type = type; + this.userId = userId; + this.changedUserId = changedUserId; + this.date = date; + } + + public SSEType getType() { + return type; + } + + public Long getUserId() { + return userId; + } + + public Long getChangedUserId() { + return changedUserId; + } + + public LocalDate getDate() { + return date; + } +} diff --git a/src/main/java/com/dedicatedcode/reitti/event/SSEType.java b/src/main/java/com/dedicatedcode/reitti/event/SSEType.java new file mode 100644 index 00000000..6343d53d --- /dev/null +++ b/src/main/java/com/dedicatedcode/reitti/event/SSEType.java @@ -0,0 +1,7 @@ +package com.dedicatedcode.reitti.event; + +public enum SSEType { + TRIPS, + VISITS, + RAW_DATA +} diff --git a/src/main/java/com/dedicatedcode/reitti/repository/UserSettingsJdbcService.java b/src/main/java/com/dedicatedcode/reitti/repository/UserSettingsJdbcService.java index f0413a8c..989352b7 100644 --- a/src/main/java/com/dedicatedcode/reitti/repository/UserSettingsJdbcService.java +++ b/src/main/java/com/dedicatedcode/reitti/repository/UserSettingsJdbcService.java @@ -138,4 +138,8 @@ public class UserSettingsJdbcService { this.jdbcTemplate.update("UPDATE user_settings SET latest_data = GREATEST(latest_data, ?) WHERE user_id = ?", Timestamp.from(instant), user.getId()); }); } + + public List findParentUserIds(User user) { + return this.jdbcTemplate.queryForList("SELECT from_user FROM connected_users WHERE to_user = ?", Long.class, user.getId()); + } } diff --git a/src/main/java/com/dedicatedcode/reitti/service/importer/ImportBatchProcessor.java b/src/main/java/com/dedicatedcode/reitti/service/ImportBatchProcessor.java similarity index 95% rename from src/main/java/com/dedicatedcode/reitti/service/importer/ImportBatchProcessor.java rename to src/main/java/com/dedicatedcode/reitti/service/ImportBatchProcessor.java index 2d8471db..9d0662db 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/importer/ImportBatchProcessor.java +++ b/src/main/java/com/dedicatedcode/reitti/service/ImportBatchProcessor.java @@ -1,4 +1,4 @@ -package com.dedicatedcode.reitti.service.importer; +package com.dedicatedcode.reitti.service; import com.dedicatedcode.reitti.config.RabbitMQConfig; import com.dedicatedcode.reitti.dto.LocationDataRequest; @@ -54,13 +54,11 @@ public class ImportBatchProcessor { } private void scheduleProcessingTrigger(String username) { - // Cancel any existing trigger for this user ScheduledFuture existingTrigger = pendingTriggers.get(username); if (existingTrigger != null && !existingTrigger.isDone()) { existingTrigger.cancel(false); } - // Schedule new trigger for 30 seconds from now ScheduledFuture newTrigger = scheduler.schedule(() -> { try { TriggerProcessingEvent triggerEvent = new TriggerProcessingEvent(username); diff --git a/src/main/java/com/dedicatedcode/reitti/service/MessageDispatcherService.java b/src/main/java/com/dedicatedcode/reitti/service/MessageDispatcherService.java index 761295c9..1ba0dcc7 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/MessageDispatcherService.java +++ b/src/main/java/com/dedicatedcode/reitti/service/MessageDispatcherService.java @@ -21,6 +21,7 @@ public class MessageDispatcherService { private final TripDetectionService tripDetectionService; private final ReverseGeocodingListener reverseGeocodingListener; private final ProcessingPipelineTrigger processingPipelineTrigger; + private final UserSseEmitterService userSseEmitterService; @Autowired public MessageDispatcherService(LocationDataIngestPipeline locationDataIngestPipeline, @@ -28,13 +29,15 @@ public class MessageDispatcherService { VisitMergingService visitMergingService, TripDetectionService tripDetectionService, ReverseGeocodingListener reverseGeocodingListener, - ProcessingPipelineTrigger processingPipelineTrigger) { + ProcessingPipelineTrigger processingPipelineTrigger, + UserSseEmitterService userSseEmitterService) { this.locationDataIngestPipeline = locationDataIngestPipeline; this.visitDetectionService = visitDetectionService; this.visitMergingService = visitMergingService; this.tripDetectionService = tripDetectionService; this.reverseGeocodingListener = reverseGeocodingListener; this.processingPipelineTrigger = processingPipelineTrigger; + this.userSseEmitterService = userSseEmitterService; } @RabbitListener(queues = RabbitMQConfig.LOCATION_DATA_QUEUE, concurrency = "${reitti.events.concurrency}") @@ -67,6 +70,12 @@ public class MessageDispatcherService { reverseGeocodingListener.handleSignificantPlaceCreated(event); } + @RabbitListener(queues = RabbitMQConfig.USER_EVENT_QUEUE) + public void handleUserNotificationEvent(SSEEvent event) { + logger.debug("Dispatching SSEEvent for user: {}", event.getUserId()); + this.userSseEmitterService.sendEventToUser(event.getUserId(), event); + } + @RabbitListener(queues = RabbitMQConfig.TRIGGER_PROCESSING_PIPELINE_QUEUE, concurrency = "${reitti.events.concurrency}") public void handleTriggerProcessingEvent(TriggerProcessingEvent event) { logger.debug("Dispatching TriggerProcessingEvent for user: {}", event.getUsername()); diff --git a/src/main/java/com/dedicatedcode/reitti/service/OwnTracksRecorderIntegrationService.java b/src/main/java/com/dedicatedcode/reitti/service/OwnTracksRecorderIntegrationService.java index 06649234..fd0b58e6 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/OwnTracksRecorderIntegrationService.java +++ b/src/main/java/com/dedicatedcode/reitti/service/OwnTracksRecorderIntegrationService.java @@ -6,7 +6,6 @@ import com.dedicatedcode.reitti.model.OwnTracksRecorderIntegration; import com.dedicatedcode.reitti.model.User; import com.dedicatedcode.reitti.repository.OwnTracksRecorderIntegrationJdbcService; import com.dedicatedcode.reitti.repository.UserJdbcService; -import com.dedicatedcode.reitti.service.importer.ImportBatchProcessor; import com.fasterxml.jackson.annotation.JsonProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/dedicatedcode/reitti/service/QueueStatsService.java b/src/main/java/com/dedicatedcode/reitti/service/QueueStatsService.java index c6af28a8..32055e55 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/QueueStatsService.java +++ b/src/main/java/com/dedicatedcode/reitti/service/QueueStatsService.java @@ -6,7 +6,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @Service @@ -22,7 +25,9 @@ public class QueueStatsService { RabbitMQConfig.STAY_DETECTION_QUEUE, RabbitMQConfig.MERGE_VISIT_QUEUE, RabbitMQConfig.SIGNIFICANT_PLACE_QUEUE, - RabbitMQConfig.DETECT_TRIP_QUEUE); + RabbitMQConfig.DETECT_TRIP_QUEUE, + RabbitMQConfig.USER_EVENT_QUEUE + ); private final Map> processingHistory = new ConcurrentHashMap<>(); @@ -124,8 +129,6 @@ public class QueueStatsService { List history = processingHistory.get(queueName); if (history.isEmpty()) { - // No processing history, base progress on queue size - // Smaller queues show higher progress if (currentMessageCount <= 5) return 80; if (currentMessageCount <= 20) return 60; if (currentMessageCount <= 100) return 40; diff --git a/src/main/java/com/dedicatedcode/reitti/service/UserNotificationQueueService.java b/src/main/java/com/dedicatedcode/reitti/service/UserNotificationQueueService.java new file mode 100644 index 00000000..1fb087c0 --- /dev/null +++ b/src/main/java/com/dedicatedcode/reitti/service/UserNotificationQueueService.java @@ -0,0 +1,83 @@ +package com.dedicatedcode.reitti.service; + +import com.dedicatedcode.reitti.config.RabbitMQConfig; +import com.dedicatedcode.reitti.dto.LocationDataRequest; +import com.dedicatedcode.reitti.event.SSEEvent; +import com.dedicatedcode.reitti.event.SSEType; +import com.dedicatedcode.reitti.model.ProcessedVisit; +import com.dedicatedcode.reitti.model.Trip; +import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.repository.UserSettingsJdbcService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Service +public class UserNotificationQueueService { + private static final Logger log = LoggerFactory.getLogger(UserNotificationQueueService.class); + private final UserSettingsJdbcService userSettingsJdbcService; + private final RabbitTemplate rabbitTemplate; + + public UserNotificationQueueService(UserSettingsJdbcService userSettingsJdbcService, + RabbitTemplate rabbitTemplate) { + this.userSettingsJdbcService = userSettingsJdbcService; + this.rabbitTemplate = rabbitTemplate; + } + + public void newTrips(User user, List trips) { + SSEType eventType = SSEType.TRIPS; + Set parentUserIds = new HashSet<>(this.userSettingsJdbcService.findParentUserIds(user)); + log.debug("New trips for user [{}], will be notify [{}] number or parent users", user.getId(), parentUserIds.size()); + Set dates = calculateAffectedDates(trips.stream().map(Trip::getStartTime).toList(), trips.stream().map(Trip::getEndTime).toList()); + sendToQueue(user, dates, parentUserIds, eventType); + } + + public void newVisits(User user, List processedVisits) { + SSEType eventType = SSEType.VISITS; + Set parentUserIds = new HashSet<>(this.userSettingsJdbcService.findParentUserIds(user)); + log.debug("New Visits for user [{}], will be notify [{}] number or parent users", user.getId(), parentUserIds.size()); + Set dates = calculateAffectedDates(processedVisits.stream().map(ProcessedVisit::getStartTime).toList(), processedVisits.stream().map(ProcessedVisit::getEndTime).toList()); + sendToQueue(user, dates, parentUserIds, eventType); + } + + public void newRawLocationData(User user, List filtered) { + SSEType eventType = SSEType.RAW_DATA; + Set parentUserIds = new HashSet<>(this.userSettingsJdbcService.findParentUserIds(user)); + log.debug("New RawLocationPoints for user [{}], will be notify [{}] number or parent users", user.getId(), parentUserIds.size()); + Set dates = calculateAffectedDates(filtered.stream().map(LocationDataRequest.LocationPoint::getTimestamp).map(s -> ZonedDateTime.parse(s).toInstant()).toList()); + sendToQueue(user, dates, parentUserIds, eventType); + } + + private void sendToQueue(User user, Set dates, Set parentUserIds, SSEType eventType) { + for (LocalDate date : dates) { + for (Long parentUserId : parentUserIds) { + this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.USER_EVENT_ROUTING_KEY, new SSEEvent(eventType, parentUserId, user.getId(), date)); + } + this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.USER_EVENT_ROUTING_KEY, new SSEEvent(eventType, user.getId(), user.getId(), date)); + } + } + + @SafeVarargs + private Set calculateAffectedDates(List... list) { + if (list == null) { + return new HashSet<>(); + } else { + Set result = new HashSet<>(); + for (List instants : list) { + result.addAll(instants.stream().map(instant -> instant.atZone(ZoneId.of("Z")).toLocalDate()).collect(Collectors.toSet())); + } + return result; + } + } + +} diff --git a/src/main/java/com/dedicatedcode/reitti/service/UserSseEmitterService.java b/src/main/java/com/dedicatedcode/reitti/service/UserSseEmitterService.java new file mode 100644 index 00000000..a9a69fe7 --- /dev/null +++ b/src/main/java/com/dedicatedcode/reitti/service/UserSseEmitterService.java @@ -0,0 +1,112 @@ +package com.dedicatedcode.reitti.service; + +import com.dedicatedcode.reitti.event.SSEEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; + +@Service +public class UserSseEmitterService implements SmartLifecycle { + private static final Logger log = LoggerFactory.getLogger(UserSseEmitterService.class); + private final Map> userEmitters = new ConcurrentHashMap<>(); + + public SseEmitter addEmitter(Long userId) { + SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); + userEmitters.computeIfAbsent(userId, k -> new CopyOnWriteArraySet<>()).add(emitter); + emitter.onCompletion(() -> { + log.info("SSE connection completed for user: [{}]", userId); + removeEmitter(userId, emitter); + }); + + emitter.onTimeout(() -> { + log.info("SSE connection timed out for user: [{}]", userId); + emitter.complete(); // Complete the emitter on timeout + removeEmitter(userId, emitter); + }); + + emitter.onError(throwable -> { + log.error("SSE connection error for user [{}]: {}", userId, throwable.getMessage()); + removeEmitter(userId, emitter); + }); + log.info("Emitter added for user: {}. Total emitters for user: {}", userId, userEmitters.get(userId).size()); + return emitter; + } + + + public void sendEventToUser(Long userId, SSEEvent eventData) { + Set emitters = userEmitters.get(userId); + if (emitters != null) { + for (SseEmitter emitter : new CopyOnWriteArraySet<>(emitters)) { + try { + emitter.send(SseEmitter.event().data(eventData)); + log.info("Sent event to user: {}", userId); + } catch (IOException e) { + log.error("Error sending event to user {}: {}", userId, e.getMessage()); + emitter.completeWithError(e); + removeEmitter(userId, emitter); + } + } + } else { + System.out.println("No active SSE emitters for user: " + userId); + } + } + + private void removeEmitter(Long userId, SseEmitter emitter) { + Set emitters = userEmitters.get(userId); + if (emitters != null) { + emitters.remove(emitter); + if (emitters.isEmpty()) { + userEmitters.remove(userId); + } + log.info("Emitter removed for user: {}. Remaining emitters for user: {}", userId, userEmitters.containsKey(userId) ? userEmitters.get(userId).size() : 0); + } + } + + public void removeEmitter(Long userId) { + Set emitters = userEmitters.get(userId); + if (emitters != null) { + for (SseEmitter emitter : emitters) { + removeEmitter(userId, emitter); + } + userEmitters.remove(userId); + log.info("Removed all emitters for user: {}. Remaining emitters for user: {}", userId, userEmitters.containsKey(userId) ? userEmitters.get(userId).size() : 0); + } + } + + public void sendEventToAllUsers(Object eventData) { + userEmitters.forEach((userId, emitters) -> { + for (SseEmitter emitter : new CopyOnWriteArraySet<>(emitters)) { + try { + emitter.send(SseEmitter.event().data(eventData)); + } catch (IOException e) { + emitter.completeWithError(e); + removeEmitter(userId, emitter); + } + } + }); + } + + @Override + public void start() { + + } + + @Override + public void stop() { + userEmitters.values().forEach(sseEmitters -> sseEmitters.forEach(SseEmitter::complete)); + + } + + @Override + public boolean isRunning() { + return true; + } +} \ No newline at end of file diff --git a/src/main/java/com/dedicatedcode/reitti/service/importer/BaseGoogleTimelineImporter.java b/src/main/java/com/dedicatedcode/reitti/service/importer/BaseGoogleTimelineImporter.java index 68e2ca33..a7529804 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/importer/BaseGoogleTimelineImporter.java +++ b/src/main/java/com/dedicatedcode/reitti/service/importer/BaseGoogleTimelineImporter.java @@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.dto.LocationDataRequest; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,7 +13,6 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Optional; -import java.util.Random; public abstract class BaseGoogleTimelineImporter { diff --git a/src/main/java/com/dedicatedcode/reitti/service/importer/GeoJsonImporter.java b/src/main/java/com/dedicatedcode/reitti/service/importer/GeoJsonImporter.java index 5ce3d1a5..66b169e2 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/importer/GeoJsonImporter.java +++ b/src/main/java/com/dedicatedcode/reitti/service/importer/GeoJsonImporter.java @@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.dto.LocationDataRequest; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporter.java b/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporter.java index 406e7eac..6d513dc5 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporter.java +++ b/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporter.java @@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.dto.LocationDataRequest; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import com.dedicatedcode.reitti.service.importer.dto.GoogleTimelineData; import com.dedicatedcode.reitti.service.importer.dto.SemanticSegment; diff --git a/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporter.java b/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporter.java index 15911e8b..caf75e03 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporter.java +++ b/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporter.java @@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.dto.LocationDataRequest; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import com.dedicatedcode.reitti.service.importer.dto.ios.IOSSemanticSegment; import com.dedicatedcode.reitti.service.importer.dto.ios.IOSVisit; diff --git a/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporter.java b/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporter.java index 79a9ab26..3859bf67 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporter.java +++ b/src/main/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporter.java @@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.dto.LocationDataRequest; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; diff --git a/src/main/java/com/dedicatedcode/reitti/service/importer/GpxImporter.java b/src/main/java/com/dedicatedcode/reitti/service/importer/GpxImporter.java index 84176d23..6eea2f87 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/importer/GpxImporter.java +++ b/src/main/java/com/dedicatedcode/reitti/service/importer/GpxImporter.java @@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.dto.LocationDataRequest; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/dedicatedcode/reitti/service/processing/LocationDataIngestPipeline.java b/src/main/java/com/dedicatedcode/reitti/service/processing/LocationDataIngestPipeline.java index 582b5253..5ca0b9be 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/processing/LocationDataIngestPipeline.java +++ b/src/main/java/com/dedicatedcode/reitti/service/processing/LocationDataIngestPipeline.java @@ -6,6 +6,7 @@ import com.dedicatedcode.reitti.model.User; import com.dedicatedcode.reitti.repository.RawLocationPointJdbcService; import com.dedicatedcode.reitti.repository.UserJdbcService; import com.dedicatedcode.reitti.repository.UserSettingsJdbcService; +import com.dedicatedcode.reitti.service.UserNotificationQueueService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -22,16 +23,19 @@ public class LocationDataIngestPipeline { private final UserJdbcService userJdbcService; private final RawLocationPointJdbcService rawLocationPointJdbcService; private final UserSettingsJdbcService userSettingsJdbcService; + private final UserNotificationQueueService userNotificationQueueService; @Autowired public LocationDataIngestPipeline(GeoPointAnomalyFilter geoPointAnomalyFilter, UserJdbcService userJdbcService, RawLocationPointJdbcService rawLocationPointJdbcService, - UserSettingsJdbcService userSettingsJdbcService) { + UserSettingsJdbcService userSettingsJdbcService, + UserNotificationQueueService userNotificationQueueService) { this.geoPointAnomalyFilter = geoPointAnomalyFilter; this.userJdbcService = userJdbcService; this.rawLocationPointJdbcService = rawLocationPointJdbcService; this.userSettingsJdbcService = userSettingsJdbcService; + this.userNotificationQueueService = userNotificationQueueService; } public void processLocationData(LocationDataEvent event) { @@ -49,6 +53,7 @@ public class LocationDataIngestPipeline { List filtered = this.geoPointAnomalyFilter.filterAnomalies(points); rawLocationPointJdbcService.bulkInsert(user, filtered); userSettingsJdbcService.updateNewestData(user, filtered); + userNotificationQueueService.newRawLocationData(user, filtered); logger.info("Finished storing points [{}] for user [{}] in [{}]ms. Filtered out [{}] points.", filtered.size(), event.getUsername(), System.currentTimeMillis() - start, points.size() - filtered.size()); } } diff --git a/src/main/java/com/dedicatedcode/reitti/service/processing/TripDetectionService.java b/src/main/java/com/dedicatedcode/reitti/service/processing/TripDetectionService.java index 789d0915..c967add7 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/processing/TripDetectionService.java +++ b/src/main/java/com/dedicatedcode/reitti/service/processing/TripDetectionService.java @@ -6,6 +6,7 @@ import com.dedicatedcode.reitti.repository.ProcessedVisitJdbcService; import com.dedicatedcode.reitti.repository.RawLocationPointJdbcService; import com.dedicatedcode.reitti.repository.TripJdbcService; import com.dedicatedcode.reitti.repository.UserJdbcService; +import com.dedicatedcode.reitti.service.UserNotificationQueueService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -27,16 +28,19 @@ public class TripDetectionService { private final RawLocationPointJdbcService rawLocationPointJdbcService; private final TripJdbcService tripJdbcService; private final UserJdbcService userJdbcService; + private final UserNotificationQueueService userNotificationQueueService; private final ConcurrentHashMap userLocks = new ConcurrentHashMap<>(); public TripDetectionService(ProcessedVisitJdbcService processedVisitJdbcService, RawLocationPointJdbcService rawLocationPointJdbcService, TripJdbcService tripJdbcService, - UserJdbcService userJdbcService) { + UserJdbcService userJdbcService, + UserNotificationQueueService userNotificationQueueService) { this.processedVisitJdbcService = processedVisitJdbcService; this.rawLocationPointJdbcService = rawLocationPointJdbcService; this.tripJdbcService = tripJdbcService; this.userJdbcService = userJdbcService; + this.userNotificationQueueService = userNotificationQueueService; } public void visitCreated(ProcessedVisitCreatedEvent event) { @@ -74,7 +78,7 @@ public class TripDetectionService { } tripJdbcService.bulkInsert(user, trips); - + userNotificationQueueService.newTrips(user, trips); }); } finally { userLock.unlock(); diff --git a/src/main/java/com/dedicatedcode/reitti/service/processing/VisitMergingService.java b/src/main/java/com/dedicatedcode/reitti/service/processing/VisitMergingService.java index fd3228d1..2977f059 100644 --- a/src/main/java/com/dedicatedcode/reitti/service/processing/VisitMergingService.java +++ b/src/main/java/com/dedicatedcode/reitti/service/processing/VisitMergingService.java @@ -6,6 +6,7 @@ import com.dedicatedcode.reitti.event.SignificantPlaceCreatedEvent; import com.dedicatedcode.reitti.event.VisitUpdatedEvent; import com.dedicatedcode.reitti.model.*; import com.dedicatedcode.reitti.repository.*; +import com.dedicatedcode.reitti.service.UserNotificationQueueService; import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.geom.GeometryFactory; import org.locationtech.jts.geom.Point; @@ -36,6 +37,7 @@ public class VisitMergingService { private final RawLocationPointJdbcService rawLocationPointJdbcService; private final GeometryFactory geometryFactory; private final RabbitTemplate rabbitTemplate; + private final UserNotificationQueueService userNotificationQueueService; private final long mergeThresholdSeconds; private final long mergeThresholdMeters; private final int searchRangeExtensionInHours; @@ -48,6 +50,7 @@ public class VisitMergingService { SignificantPlaceJdbcService significantPlaceJdbcService, RawLocationPointJdbcService rawLocationPointJdbcService, GeometryFactory geometryFactory, + UserNotificationQueueService userNotificationQueueService, @Value("${reitti.visit.merge-max-stay-search-extension-days:2}") int maxStaySearchExtensionInDays, @Value("${reitti.visit.merge-threshold-seconds:300}") long mergeThresholdSeconds, @Value("${reitti.visit.merge-threshold-meters:100}") long mergeThresholdMeters) { @@ -58,6 +61,7 @@ public class VisitMergingService { this.significantPlaceJdbcService = significantPlaceJdbcService; this.rawLocationPointJdbcService = rawLocationPointJdbcService; this.geometryFactory = geometryFactory; + this.userNotificationQueueService = userNotificationQueueService; this.mergeThresholdSeconds = mergeThresholdSeconds; this.mergeThresholdMeters = mergeThresholdMeters; this.searchRangeExtensionInHours = maxStaySearchExtensionInDays * 24; @@ -119,6 +123,7 @@ public class VisitMergingService { logger.debug("Processed [{}] visits into [{}] merged visits for user: [{}]", allVisits.size(), processedVisits.size(), user.getUsername()); + this.userNotificationQueueService.newVisits(user, processedVisits); } diff --git a/src/main/resources/messages.properties b/src/main/resources/messages.properties index e7ec06c2..cb50da0a 100644 --- a/src/main/resources/messages.properties +++ b/src/main/resources/messages.properties @@ -389,3 +389,7 @@ month.9=September month.10=October month.11=November month.12=December + + +# SSE Events +sse.error.connection-lost=Connection to server lost! Try reconnecting ... diff --git a/src/main/resources/static/css/main.css b/src/main/resources/static/css/main.css index 0c67c908..2b5c05c9 100644 --- a/src/main/resources/static/css/main.css +++ b/src/main/resources/static/css/main.css @@ -352,6 +352,11 @@ nav { .htmx-request .htmx-indicator { display: inline; + height: initial; +} + +.htmx-indicator { + display: none; } button { @@ -1420,3 +1425,26 @@ button:disabled { min-width: unset; } } + +#message-container { + text-align: center; + position: fixed; + z-index: 200; + width: 100%; + pointer-events: none; +} + +#sse-message { + display: none; + background-color: var(--color-background-dark); + border-radius: 8px; + padding: 15px; + border: 1px solid var(--color-highlight); + color: var(--color-text-white); + box-shadow: 0 8px 25px rgba(245, 222, 179, 0.3); + transition: transform 0.3s ease, box-shadow 0.3s ease; +} + +#sse-message.active { + display: inline-block; +} diff --git a/src/main/resources/templates/index.html b/src/main/resources/templates/index.html index 466c9731..6cad016c 100644 --- a/src/main/resources/templates/index.html +++ b/src/main/resources/templates/index.html @@ -19,6 +19,9 @@ +
+
+
@@ -73,7 +76,10 @@ /*[[#{datepicker.months.oct}]]*/ 'Oct', /*[[#{datepicker.months.nov}]]*/ 'Nov', /*[[#{datepicker.months.dec}]]*/ 'Dec' - ] + ], + sse: { + error: /*[[#{sse.error.connection-lost}]]*/ 'Connection to server lost! Will reconnect ...', + } }; window.userSettings = /*[[${userSettings}]]*/ {} @@ -474,6 +480,30 @@ } } + const messagesDiv = document.getElementById('sse-message'); + const eventSource = new EventSource('/events'); // Connect to your SSE endpoint + + eventSource.onopen = function() { + console.log('SSE connection opened.'); + messagesDiv.classList.remove('active') + }; + + // Listen for events with the name "message" + eventSource.addEventListener('message', function(event) { + console.log('Received message event:', event.data); + }); + + + eventSource.onerror = function(error) { + console.error('EventSource failed:', error); + messagesDiv.innerHTML = `

${window.locale.sse.error}

`; + messagesDiv.classList.add('active') + }; + + eventSource.onmessage = function(event) { + console.log('Received generic event:', event.data); + }; + diff --git a/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporterTest.java b/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporterTest.java index 2ec28614..34b828b8 100644 --- a/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporterTest.java +++ b/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleAndroidTimelineImporterTest.java @@ -3,6 +3,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.config.RabbitMQConfig; import com.dedicatedcode.reitti.event.LocationDataEvent; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporterTest.java b/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporterTest.java index b443884f..19ff873e 100644 --- a/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporterTest.java +++ b/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleIOSTimelineImporterTest.java @@ -3,6 +3,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.config.RabbitMQConfig; import com.dedicatedcode.reitti.event.LocationDataEvent; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporterTest.java b/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporterTest.java index 71784f8f..ec12edf6 100644 --- a/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporterTest.java +++ b/src/test/java/com/dedicatedcode/reitti/service/importer/GoogleRecordsImporterTest.java @@ -3,6 +3,7 @@ package com.dedicatedcode.reitti.service.importer; import com.dedicatedcode.reitti.config.RabbitMQConfig; import com.dedicatedcode.reitti.event.LocationDataEvent; import com.dedicatedcode.reitti.model.User; +import com.dedicatedcode.reitti.service.ImportBatchProcessor; import com.dedicatedcode.reitti.service.ImportStateHolder; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; @@ -10,7 +11,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import java.util.Map; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*;