143 add sse endpoint (#146)

This commit is contained in:
Daniel Graf
2025-07-24 10:35:26 +02:00
committed by GitHub
parent 3c448b350f
commit 2bb02d6480
28 changed files with 403 additions and 18 deletions

View File

@@ -369,7 +369,6 @@ There are multiple ways of getting support:
- tag me on [https://discuss.tchncs.de/u/danielgraf](Lemmy)
- or join **#reitti** on [irc.dedicatedcode.com](https://irc.dedicatedcode.com)
-
## Support the Project
<a href='https://ko-fi.com/K3K01HDAUW' target='_blank'><img height='36' style='border:0px;height:36px;' src='https://storage.ko-fi.com/cdn/kofi6.png?v=6' border='0' alt='Buy Me a Coffee at ko-fi.com' /></a>

View File

@@ -28,6 +28,9 @@ public class RabbitMQConfig {
public static final String TRIGGER_PROCESSING_PIPELINE_QUEUE = "trigger-processing-queue";
public static final String TRIGGER_PROCESSING_PIPELINE_ROUTING_KEY = "trigger.processing.start";
public static final String USER_EVENT_QUEUE = "user-event-queue";
public static final String USER_EVENT_ROUTING_KEY = "user.event.update";
@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
@@ -63,6 +66,11 @@ public class RabbitMQConfig {
return new Queue(TRIGGER_PROCESSING_PIPELINE_QUEUE, false);
}
@Bean
public Queue userEventQueue() {
return new Queue(USER_EVENT_QUEUE, false);
}
@Bean
public Binding locationDataBinding(Queue locationDataQueue, TopicExchange exchange) {
return BindingBuilder.bind(locationDataQueue).to(exchange).with(LOCATION_DATA_ROUTING_KEY);
@@ -93,6 +101,11 @@ public class RabbitMQConfig {
return BindingBuilder.bind(triggerProcessingQueue).to(exchange).with(TRIGGER_PROCESSING_PIPELINE_ROUTING_KEY);
}
@Bean
public Binding userEventBinding(Queue userEventQueue, TopicExchange exchange) {
return BindingBuilder.bind(userEventQueue).to(exchange).with(USER_EVENT_ROUTING_KEY);
}
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();

View File

@@ -0,0 +1,34 @@
package com.dedicatedcode.reitti.controller;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.UserSseEmitterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
public class SseController {
private static final Logger log = LoggerFactory.getLogger(SseController.class);
private final UserSseEmitterService emitterService;
public SseController(UserSseEmitterService userSseEmitterService) {
this.emitterService = userSseEmitterService;
}
@GetMapping(path = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSseForUser(@AuthenticationPrincipal UserDetails userDetails) {
if (userDetails == null) {
throw new IllegalStateException("User not authenticated for SSE endpoint.");
}
User user = (User) userDetails;
SseEmitter emitter = emitterService.addEmitter(user.getId());
log.info("New SSE connection from user: [{}]", user.getId());
return emitter;
}
}

View File

@@ -4,7 +4,7 @@ import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.dto.OwntracksLocationRequest;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.service.importer.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -14,7 +14,10 @@ import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.Serializable;
import java.util.Collections;

View File

@@ -0,0 +1,37 @@
package com.dedicatedcode.reitti.event;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.Serializable;
import java.time.LocalDate;
public class SSEEvent implements Serializable {
private final SSEType type;
private final Long userId;
private final Long changedUserId;
private final LocalDate date;
@JsonCreator
public SSEEvent(SSEType type, Long userId, Long changedUserId, LocalDate date) {
this.type = type;
this.userId = userId;
this.changedUserId = changedUserId;
this.date = date;
}
public SSEType getType() {
return type;
}
public Long getUserId() {
return userId;
}
public Long getChangedUserId() {
return changedUserId;
}
public LocalDate getDate() {
return date;
}
}

View File

@@ -0,0 +1,7 @@
package com.dedicatedcode.reitti.event;
public enum SSEType {
TRIPS,
VISITS,
RAW_DATA
}

View File

@@ -138,4 +138,8 @@ public class UserSettingsJdbcService {
this.jdbcTemplate.update("UPDATE user_settings SET latest_data = GREATEST(latest_data, ?) WHERE user_id = ?", Timestamp.from(instant), user.getId());
});
}
public List<Long> findParentUserIds(User user) {
return this.jdbcTemplate.queryForList("SELECT from_user FROM connected_users WHERE to_user = ?", Long.class, user.getId());
}
}

View File

@@ -1,4 +1,4 @@
package com.dedicatedcode.reitti.service.importer;
package com.dedicatedcode.reitti.service;
import com.dedicatedcode.reitti.config.RabbitMQConfig;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
@@ -54,13 +54,11 @@ public class ImportBatchProcessor {
}
private void scheduleProcessingTrigger(String username) {
// Cancel any existing trigger for this user
ScheduledFuture<?> existingTrigger = pendingTriggers.get(username);
if (existingTrigger != null && !existingTrigger.isDone()) {
existingTrigger.cancel(false);
}
// Schedule new trigger for 30 seconds from now
ScheduledFuture<?> newTrigger = scheduler.schedule(() -> {
try {
TriggerProcessingEvent triggerEvent = new TriggerProcessingEvent(username);

View File

@@ -21,6 +21,7 @@ public class MessageDispatcherService {
private final TripDetectionService tripDetectionService;
private final ReverseGeocodingListener reverseGeocodingListener;
private final ProcessingPipelineTrigger processingPipelineTrigger;
private final UserSseEmitterService userSseEmitterService;
@Autowired
public MessageDispatcherService(LocationDataIngestPipeline locationDataIngestPipeline,
@@ -28,13 +29,15 @@ public class MessageDispatcherService {
VisitMergingService visitMergingService,
TripDetectionService tripDetectionService,
ReverseGeocodingListener reverseGeocodingListener,
ProcessingPipelineTrigger processingPipelineTrigger) {
ProcessingPipelineTrigger processingPipelineTrigger,
UserSseEmitterService userSseEmitterService) {
this.locationDataIngestPipeline = locationDataIngestPipeline;
this.visitDetectionService = visitDetectionService;
this.visitMergingService = visitMergingService;
this.tripDetectionService = tripDetectionService;
this.reverseGeocodingListener = reverseGeocodingListener;
this.processingPipelineTrigger = processingPipelineTrigger;
this.userSseEmitterService = userSseEmitterService;
}
@RabbitListener(queues = RabbitMQConfig.LOCATION_DATA_QUEUE, concurrency = "${reitti.events.concurrency}")
@@ -67,6 +70,12 @@ public class MessageDispatcherService {
reverseGeocodingListener.handleSignificantPlaceCreated(event);
}
@RabbitListener(queues = RabbitMQConfig.USER_EVENT_QUEUE)
public void handleUserNotificationEvent(SSEEvent event) {
logger.debug("Dispatching SSEEvent for user: {}", event.getUserId());
this.userSseEmitterService.sendEventToUser(event.getUserId(), event);
}
@RabbitListener(queues = RabbitMQConfig.TRIGGER_PROCESSING_PIPELINE_QUEUE, concurrency = "${reitti.events.concurrency}")
public void handleTriggerProcessingEvent(TriggerProcessingEvent event) {
logger.debug("Dispatching TriggerProcessingEvent for user: {}", event.getUsername());

View File

@@ -6,7 +6,6 @@ import com.dedicatedcode.reitti.model.OwnTracksRecorderIntegration;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.repository.OwnTracksRecorderIntegrationJdbcService;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.service.importer.ImportBatchProcessor;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -6,7 +6,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@Service
@@ -22,7 +25,9 @@ public class QueueStatsService {
RabbitMQConfig.STAY_DETECTION_QUEUE,
RabbitMQConfig.MERGE_VISIT_QUEUE,
RabbitMQConfig.SIGNIFICANT_PLACE_QUEUE,
RabbitMQConfig.DETECT_TRIP_QUEUE);
RabbitMQConfig.DETECT_TRIP_QUEUE,
RabbitMQConfig.USER_EVENT_QUEUE
);
private final Map<String, List<ProcessingRecord>> processingHistory = new ConcurrentHashMap<>();
@@ -124,8 +129,6 @@ public class QueueStatsService {
List<ProcessingRecord> history = processingHistory.get(queueName);
if (history.isEmpty()) {
// No processing history, base progress on queue size
// Smaller queues show higher progress
if (currentMessageCount <= 5) return 80;
if (currentMessageCount <= 20) return 60;
if (currentMessageCount <= 100) return 40;

View File

@@ -0,0 +1,83 @@
package com.dedicatedcode.reitti.service;
import com.dedicatedcode.reitti.config.RabbitMQConfig;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.event.SSEEvent;
import com.dedicatedcode.reitti.event.SSEType;
import com.dedicatedcode.reitti.model.ProcessedVisit;
import com.dedicatedcode.reitti.model.Trip;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.repository.UserSettingsJdbcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Service
public class UserNotificationQueueService {
private static final Logger log = LoggerFactory.getLogger(UserNotificationQueueService.class);
private final UserSettingsJdbcService userSettingsJdbcService;
private final RabbitTemplate rabbitTemplate;
public UserNotificationQueueService(UserSettingsJdbcService userSettingsJdbcService,
RabbitTemplate rabbitTemplate) {
this.userSettingsJdbcService = userSettingsJdbcService;
this.rabbitTemplate = rabbitTemplate;
}
public void newTrips(User user, List<Trip> trips) {
SSEType eventType = SSEType.TRIPS;
Set<Long> parentUserIds = new HashSet<>(this.userSettingsJdbcService.findParentUserIds(user));
log.debug("New trips for user [{}], will be notify [{}] number or parent users", user.getId(), parentUserIds.size());
Set<LocalDate> dates = calculateAffectedDates(trips.stream().map(Trip::getStartTime).toList(), trips.stream().map(Trip::getEndTime).toList());
sendToQueue(user, dates, parentUserIds, eventType);
}
public void newVisits(User user, List<ProcessedVisit> processedVisits) {
SSEType eventType = SSEType.VISITS;
Set<Long> parentUserIds = new HashSet<>(this.userSettingsJdbcService.findParentUserIds(user));
log.debug("New Visits for user [{}], will be notify [{}] number or parent users", user.getId(), parentUserIds.size());
Set<LocalDate> dates = calculateAffectedDates(processedVisits.stream().map(ProcessedVisit::getStartTime).toList(), processedVisits.stream().map(ProcessedVisit::getEndTime).toList());
sendToQueue(user, dates, parentUserIds, eventType);
}
public void newRawLocationData(User user, List<LocationDataRequest.LocationPoint> filtered) {
SSEType eventType = SSEType.RAW_DATA;
Set<Long> parentUserIds = new HashSet<>(this.userSettingsJdbcService.findParentUserIds(user));
log.debug("New RawLocationPoints for user [{}], will be notify [{}] number or parent users", user.getId(), parentUserIds.size());
Set<LocalDate> dates = calculateAffectedDates(filtered.stream().map(LocationDataRequest.LocationPoint::getTimestamp).map(s -> ZonedDateTime.parse(s).toInstant()).toList());
sendToQueue(user, dates, parentUserIds, eventType);
}
private void sendToQueue(User user, Set<LocalDate> dates, Set<Long> parentUserIds, SSEType eventType) {
for (LocalDate date : dates) {
for (Long parentUserId : parentUserIds) {
this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.USER_EVENT_ROUTING_KEY, new SSEEvent(eventType, parentUserId, user.getId(), date));
}
this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.USER_EVENT_ROUTING_KEY, new SSEEvent(eventType, user.getId(), user.getId(), date));
}
}
@SafeVarargs
private Set<LocalDate> calculateAffectedDates(List<Instant>... list) {
if (list == null) {
return new HashSet<>();
} else {
Set<LocalDate> result = new HashSet<>();
for (List<Instant> instants : list) {
result.addAll(instants.stream().map(instant -> instant.atZone(ZoneId.of("Z")).toLocalDate()).collect(Collectors.toSet()));
}
return result;
}
}
}

View File

@@ -0,0 +1,112 @@
package com.dedicatedcode.reitti.service;
import com.dedicatedcode.reitti.event.SSEEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@Service
public class UserSseEmitterService implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(UserSseEmitterService.class);
private final Map<Long, Set<SseEmitter>> userEmitters = new ConcurrentHashMap<>();
public SseEmitter addEmitter(Long userId) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
userEmitters.computeIfAbsent(userId, k -> new CopyOnWriteArraySet<>()).add(emitter);
emitter.onCompletion(() -> {
log.info("SSE connection completed for user: [{}]", userId);
removeEmitter(userId, emitter);
});
emitter.onTimeout(() -> {
log.info("SSE connection timed out for user: [{}]", userId);
emitter.complete(); // Complete the emitter on timeout
removeEmitter(userId, emitter);
});
emitter.onError(throwable -> {
log.error("SSE connection error for user [{}]: {}", userId, throwable.getMessage());
removeEmitter(userId, emitter);
});
log.info("Emitter added for user: {}. Total emitters for user: {}", userId, userEmitters.get(userId).size());
return emitter;
}
public void sendEventToUser(Long userId, SSEEvent eventData) {
Set<SseEmitter> emitters = userEmitters.get(userId);
if (emitters != null) {
for (SseEmitter emitter : new CopyOnWriteArraySet<>(emitters)) {
try {
emitter.send(SseEmitter.event().data(eventData));
log.info("Sent event to user: {}", userId);
} catch (IOException e) {
log.error("Error sending event to user {}: {}", userId, e.getMessage());
emitter.completeWithError(e);
removeEmitter(userId, emitter);
}
}
} else {
System.out.println("No active SSE emitters for user: " + userId);
}
}
private void removeEmitter(Long userId, SseEmitter emitter) {
Set<SseEmitter> emitters = userEmitters.get(userId);
if (emitters != null) {
emitters.remove(emitter);
if (emitters.isEmpty()) {
userEmitters.remove(userId);
}
log.info("Emitter removed for user: {}. Remaining emitters for user: {}", userId, userEmitters.containsKey(userId) ? userEmitters.get(userId).size() : 0);
}
}
public void removeEmitter(Long userId) {
Set<SseEmitter> emitters = userEmitters.get(userId);
if (emitters != null) {
for (SseEmitter emitter : emitters) {
removeEmitter(userId, emitter);
}
userEmitters.remove(userId);
log.info("Removed all emitters for user: {}. Remaining emitters for user: {}", userId, userEmitters.containsKey(userId) ? userEmitters.get(userId).size() : 0);
}
}
public void sendEventToAllUsers(Object eventData) {
userEmitters.forEach((userId, emitters) -> {
for (SseEmitter emitter : new CopyOnWriteArraySet<>(emitters)) {
try {
emitter.send(SseEmitter.event().data(eventData));
} catch (IOException e) {
emitter.completeWithError(e);
removeEmitter(userId, emitter);
}
}
});
}
@Override
public void start() {
}
@Override
public void stop() {
userEmitters.values().forEach(sseEmitters -> sseEmitters.forEach(SseEmitter::complete));
}
@Override
public boolean isRunning() {
return true;
}
}

View File

@@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,7 +13,6 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Optional;
import java.util.Random;
public abstract class BaseGoogleTimelineImporter {

View File

@@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.dedicatedcode.reitti.service.importer.dto.GoogleTimelineData;
import com.dedicatedcode.reitti.service.importer.dto.SemanticSegment;

View File

@@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.dedicatedcode.reitti.service.importer.dto.ios.IOSSemanticSegment;
import com.dedicatedcode.reitti.service.importer.dto.ios.IOSVisit;

View File

@@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;

View File

@@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -6,6 +6,7 @@ import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.repository.RawLocationPointJdbcService;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.repository.UserSettingsJdbcService;
import com.dedicatedcode.reitti.service.UserNotificationQueueService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -22,16 +23,19 @@ public class LocationDataIngestPipeline {
private final UserJdbcService userJdbcService;
private final RawLocationPointJdbcService rawLocationPointJdbcService;
private final UserSettingsJdbcService userSettingsJdbcService;
private final UserNotificationQueueService userNotificationQueueService;
@Autowired
public LocationDataIngestPipeline(GeoPointAnomalyFilter geoPointAnomalyFilter,
UserJdbcService userJdbcService,
RawLocationPointJdbcService rawLocationPointJdbcService,
UserSettingsJdbcService userSettingsJdbcService) {
UserSettingsJdbcService userSettingsJdbcService,
UserNotificationQueueService userNotificationQueueService) {
this.geoPointAnomalyFilter = geoPointAnomalyFilter;
this.userJdbcService = userJdbcService;
this.rawLocationPointJdbcService = rawLocationPointJdbcService;
this.userSettingsJdbcService = userSettingsJdbcService;
this.userNotificationQueueService = userNotificationQueueService;
}
public void processLocationData(LocationDataEvent event) {
@@ -49,6 +53,7 @@ public class LocationDataIngestPipeline {
List<LocationDataRequest.LocationPoint> filtered = this.geoPointAnomalyFilter.filterAnomalies(points);
rawLocationPointJdbcService.bulkInsert(user, filtered);
userSettingsJdbcService.updateNewestData(user, filtered);
userNotificationQueueService.newRawLocationData(user, filtered);
logger.info("Finished storing points [{}] for user [{}] in [{}]ms. Filtered out [{}] points.", filtered.size(), event.getUsername(), System.currentTimeMillis() - start, points.size() - filtered.size());
}
}

View File

@@ -6,6 +6,7 @@ import com.dedicatedcode.reitti.repository.ProcessedVisitJdbcService;
import com.dedicatedcode.reitti.repository.RawLocationPointJdbcService;
import com.dedicatedcode.reitti.repository.TripJdbcService;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.service.UserNotificationQueueService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -27,16 +28,19 @@ public class TripDetectionService {
private final RawLocationPointJdbcService rawLocationPointJdbcService;
private final TripJdbcService tripJdbcService;
private final UserJdbcService userJdbcService;
private final UserNotificationQueueService userNotificationQueueService;
private final ConcurrentHashMap<String, ReentrantLock> userLocks = new ConcurrentHashMap<>();
public TripDetectionService(ProcessedVisitJdbcService processedVisitJdbcService,
RawLocationPointJdbcService rawLocationPointJdbcService,
TripJdbcService tripJdbcService,
UserJdbcService userJdbcService) {
UserJdbcService userJdbcService,
UserNotificationQueueService userNotificationQueueService) {
this.processedVisitJdbcService = processedVisitJdbcService;
this.rawLocationPointJdbcService = rawLocationPointJdbcService;
this.tripJdbcService = tripJdbcService;
this.userJdbcService = userJdbcService;
this.userNotificationQueueService = userNotificationQueueService;
}
public void visitCreated(ProcessedVisitCreatedEvent event) {
@@ -74,7 +78,7 @@ public class TripDetectionService {
}
tripJdbcService.bulkInsert(user, trips);
userNotificationQueueService.newTrips(user, trips);
});
} finally {
userLock.unlock();

View File

@@ -6,6 +6,7 @@ import com.dedicatedcode.reitti.event.SignificantPlaceCreatedEvent;
import com.dedicatedcode.reitti.event.VisitUpdatedEvent;
import com.dedicatedcode.reitti.model.*;
import com.dedicatedcode.reitti.repository.*;
import com.dedicatedcode.reitti.service.UserNotificationQueueService;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
@@ -36,6 +37,7 @@ public class VisitMergingService {
private final RawLocationPointJdbcService rawLocationPointJdbcService;
private final GeometryFactory geometryFactory;
private final RabbitTemplate rabbitTemplate;
private final UserNotificationQueueService userNotificationQueueService;
private final long mergeThresholdSeconds;
private final long mergeThresholdMeters;
private final int searchRangeExtensionInHours;
@@ -48,6 +50,7 @@ public class VisitMergingService {
SignificantPlaceJdbcService significantPlaceJdbcService,
RawLocationPointJdbcService rawLocationPointJdbcService,
GeometryFactory geometryFactory,
UserNotificationQueueService userNotificationQueueService,
@Value("${reitti.visit.merge-max-stay-search-extension-days:2}") int maxStaySearchExtensionInDays,
@Value("${reitti.visit.merge-threshold-seconds:300}") long mergeThresholdSeconds,
@Value("${reitti.visit.merge-threshold-meters:100}") long mergeThresholdMeters) {
@@ -58,6 +61,7 @@ public class VisitMergingService {
this.significantPlaceJdbcService = significantPlaceJdbcService;
this.rawLocationPointJdbcService = rawLocationPointJdbcService;
this.geometryFactory = geometryFactory;
this.userNotificationQueueService = userNotificationQueueService;
this.mergeThresholdSeconds = mergeThresholdSeconds;
this.mergeThresholdMeters = mergeThresholdMeters;
this.searchRangeExtensionInHours = maxStaySearchExtensionInDays * 24;
@@ -119,6 +123,7 @@ public class VisitMergingService {
logger.debug("Processed [{}] visits into [{}] merged visits for user: [{}]",
allVisits.size(), processedVisits.size(), user.getUsername());
this.userNotificationQueueService.newVisits(user, processedVisits);
}

View File

@@ -389,3 +389,7 @@ month.9=September
month.10=October
month.11=November
month.12=December
# SSE Events
sse.error.connection-lost=Connection to server lost! Try reconnecting ...

View File

@@ -352,6 +352,11 @@ nav {
.htmx-request .htmx-indicator {
display: inline;
height: initial;
}
.htmx-indicator {
display: none;
}
button {
@@ -1420,3 +1425,26 @@ button:disabled {
min-width: unset;
}
}
#message-container {
text-align: center;
position: fixed;
z-index: 200;
width: 100%;
pointer-events: none;
}
#sse-message {
display: none;
background-color: var(--color-background-dark);
border-radius: 8px;
padding: 15px;
border: 1px solid var(--color-highlight);
color: var(--color-text-white);
box-shadow: 0 8px 25px rgba(245, 222, 179, 0.3);
transition: transform 0.3s ease, box-shadow 0.3s ease;
}
#sse-message.active {
display: inline-block;
}

View File

@@ -19,6 +19,9 @@
<script src="/js/TileLayer.Grayscale.js"></script>
</head>
<body>
<div id="message-container">
<div id="sse-message"></div>
</div>
<div id="map"></div>
@@ -73,7 +76,10 @@
/*[[#{datepicker.months.oct}]]*/ 'Oct',
/*[[#{datepicker.months.nov}]]*/ 'Nov',
/*[[#{datepicker.months.dec}]]*/ 'Dec'
]
],
sse: {
error: /*[[#{sse.error.connection-lost}]]*/ 'Connection to server lost! Will reconnect ...',
}
};
window.userSettings = /*[[${userSettings}]]*/ {}
@@ -474,6 +480,30 @@
}
}
const messagesDiv = document.getElementById('sse-message');
const eventSource = new EventSource('/events'); // Connect to your SSE endpoint
eventSource.onopen = function() {
console.log('SSE connection opened.');
messagesDiv.classList.remove('active')
};
// Listen for events with the name "message"
eventSource.addEventListener('message', function(event) {
console.log('Received message event:', event.data);
});
eventSource.onerror = function(error) {
console.error('EventSource failed:', error);
messagesDiv.innerHTML = `<p><strong>${window.locale.sse.error}</strong></p>`;
messagesDiv.classList.add('active')
};
eventSource.onmessage = function(event) {
console.log('Received generic event:', event.data);
};
</script>
</body>
</html>

View File

@@ -3,6 +3,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.config.RabbitMQConfig;
import com.dedicatedcode.reitti.event.LocationDataEvent;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

View File

@@ -3,6 +3,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.config.RabbitMQConfig;
import com.dedicatedcode.reitti.event.LocationDataEvent;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

View File

@@ -3,6 +3,7 @@ package com.dedicatedcode.reitti.service.importer;
import com.dedicatedcode.reitti.config.RabbitMQConfig;
import com.dedicatedcode.reitti.event.LocationDataEvent;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.ImportBatchProcessor;
import com.dedicatedcode.reitti.service.ImportStateHolder;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
@@ -10,7 +11,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;