182 feature request create update channel to post events back to connect instances (#216)

This commit is contained in:
Daniel Graf
2025-09-04 19:59:31 +02:00
committed by GitHub
parent bb9dedadc1
commit 6ef4a565d3
34 changed files with 874 additions and 150 deletions

View File

@@ -205,10 +205,11 @@ The included `docker-compose.yml` provides a complete setup with:
| `REDIS_PORT` | Redis port | 6379 | 6379 |
| `REDIS_USERNAME` | Redis username (optional) | | username |
| `REDIS_PASSWORD` | Redis password (optional) | | password |
| `ADVERTISE_URI` | Routable URL of the instance. Used for federation of multiple instances. (optional) | | https://reitti.lab |
| `OIDC_ENABLED` | Whether to enable OIDC sign-ins | false | true |
| `OIDC_CLIENT_ID` | Your OpenID Connect Client ID (from your provider) | | google |
| `OIDC_CLIENT_SECRET` | Your OpenID Connect Client secret (from your provider) | | F0oxfg8b2rp5X97YPS92C2ERxof1oike |
| `OIDC_ISSUER_URI` | Your OpenID Connect Provider Discovery URI (don't include the /.well-known/openid-configuration part of the URI) | | https://github.com/login/oauth |
| `OIDC_ISSUER_URI` | Your OpenID Connect Provider Discovery URI (don't include the /.well-known/openid-configuration part of the URI) | | https://github.com/login/oauth |
| `OIDC_SCOPE` | Your OpenID Connect scopes for your user (optional) | openid,profile | openid,profile |
| `PHOTON_BASE_URL` | Base URL for Photon geocoding service | | |
| `PROCESSING_WAIT_TIME` | How many seconds to wait after the last data input before starting to process all unprocessed data. (⚠️ This needs to be lower than your integrated app reports data in Reitti) | 15 | 15 |

View File

@@ -0,0 +1,42 @@
# GPX Sender Tool
A CLI utility to send GPX track data to a Reitti instance via the Owntracks ingest API.
## Building
```bash
cd docs/tools/gpx-sender
mvn clean package
```
## Usage
```bash
java -jar target/gpx-sender-1.0.0.jar <gpx-file> --url <reitti-url> --token <api-token> [--interval <seconds>]
```
### Parameters
- `gpx-file`: Path to the GPX file containing track points (positional parameter)
- `--url`: Base URL of the Reitti instance (e.g., `http://localhost:8080`)
- `--token`: API token for authentication
- `--interval`: Optional interval between sending points (default: 15 seconds)
### Example
```bash
java -jar target/gpx-sender-1.0.0.jar my-track.gpx --url http://localhost:8080 --token your-api-token --interval 10
```
## How it works
1. Parses the GPX file to extract track points with coordinates and timestamps
2. Starts sending from the current time, with each subsequent point sent at the specified interval
3. Converts each point to Owntracks format and sends via HTTP POST to `/api/v1/ingest/owntracks`
4. Waits the specified interval between each point transmission
## Requirements
- Java 17 or higher
- Valid API token for the Reitti instance
- GPX file with track points

View File

@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dedicatedcode.reitti.tools</groupId>
<artifactId>gpx-sender</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dedicatedcode.reitti.tools.GpxSender</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,209 @@
package com.dedicatedcode.reitti.tools;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class GpxSender {
private static class TrackPoint {
public final double latitude;
public final double longitude;
public final Instant timestamp;
public TrackPoint(double latitude, double longitude, Instant timestamp) {
this.latitude = latitude;
this.longitude = longitude;
this.timestamp = timestamp;
}
}
private static class OwntracksMessage {
public String _type = "location";
public double acc;
public double lat;
public double lon;
public long tst;
public OwntracksMessage(double lat, double lon, long tst, double acc) {
this.lat = lat;
this.lon = lon;
this.tst = tst;
}
}
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Usage: java -jar gpx-sender.jar <gpx-file> --url <reitti-url> --token <api-token> [--interval <seconds>]");
System.err.println("Example: java -jar gpx-sender.jar track.gpx --url http://localhost:8080 --token your-api-token --interval 15");
System.exit(1);
}
String gpxFile = args[0];
String reittiUrl = null;
String apiToken = null;
int intervalSeconds = 15;
// Parse named parameters
for (int i = 1; i < args.length; i++) {
switch (args[i]) {
case "--url":
if (i + 1 < args.length) {
reittiUrl = args[++i];
}
break;
case "--token":
if (i + 1 < args.length) {
apiToken = args[++i];
}
break;
case "--interval":
if (i + 1 < args.length) {
intervalSeconds = Integer.parseInt(args[++i]);
}
break;
default:
System.err.println("Unknown parameter: " + args[i]);
System.exit(1);
}
}
if (reittiUrl == null || apiToken == null) {
System.err.println("Both --url and --token parameters are required");
System.exit(1);
}
try {
List<TrackPoint> trackPoints = parseGpxFile(gpxFile);
if (trackPoints.isEmpty()) {
System.err.println("No track points found in GPX file");
System.exit(1);
}
System.out.println("Loaded " + trackPoints.size() + " track points from " + gpxFile);
System.out.println("Sending to: " + reittiUrl);
System.out.println("Interval: " + intervalSeconds + " seconds");
sendTrackPoints(trackPoints, reittiUrl, apiToken, intervalSeconds);
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
e.printStackTrace();
System.exit(1);
}
}
private static List<TrackPoint> parseGpxFile(String gpxFile) throws Exception {
List<TrackPoint> trackPoints = new ArrayList<>();
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document document = builder.parse(new File(gpxFile));
NodeList trkptNodes = document.getElementsByTagName("trkpt");
for (int i = 0; i < trkptNodes.getLength(); i++) {
Element trkpt = (Element) trkptNodes.item(i);
double lat = Double.parseDouble(trkpt.getAttribute("lat"));
double lon = Double.parseDouble(trkpt.getAttribute("lon"));
NodeList timeNodes = trkpt.getElementsByTagName("time");
Instant timestamp = null;
if (timeNodes.getLength() > 0) {
String timeStr = timeNodes.item(0).getTextContent();
timestamp = Instant.parse(timeStr);
}
trackPoints.add(new TrackPoint(lat, lon, timestamp));
}
return trackPoints;
}
private static void sendTrackPoints(List<TrackPoint> trackPoints, String reittiUrl, String apiToken, int intervalSeconds) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
// Start from current time and send points with their original intervals
Instant startTime = Instant.now();
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
for (int i = 0; i < trackPoints.size(); i++) {
TrackPoint point = trackPoints.get(i);
// Calculate adjusted timestamp - start from now and preserve original intervals
Instant adjustedTime;
if (i == 0) {
// First point gets current time
adjustedTime = startTime;
} else if (i > 0 && trackPoints.get(i-1).timestamp != null && point.timestamp != null) {
// Calculate time difference from previous point and add to previous adjusted time
TrackPoint prevPoint = trackPoints.get(i-1);
long durationFromPrev = point.timestamp.getEpochSecond() - prevPoint.timestamp.getEpochSecond();
adjustedTime = startTime.plusSeconds((long) i * intervalSeconds);
} else {
// Fallback: distribute points evenly from start time
adjustedTime = startTime.plusSeconds((long) i * intervalSeconds);
}
// Create Owntracks message
OwntracksMessage message = new OwntracksMessage(
point.latitude,
point.longitude,
adjustedTime.getEpochSecond(),
10.0
);
// Send HTTP request
String url = reittiUrl + "/api/v1/ingest/owntracks";
HttpPost post = new HttpPost(url);
post.setHeader("Authorization", "Bearer " + apiToken);
post.setHeader("Content-Type", "application/json");
String json = objectMapper.writeValueAsString(message);
post.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
System.out.printf("Sending point %d/%d: lat=%.6f, lon=%.6f, time=%s%n",
i + 1, trackPoints.size(), point.latitude, point.longitude,
adjustedTime.toString());
try {
httpClient.execute(post, response -> {
int statusCode = response.getCode();
if (statusCode >= 200 && statusCode < 300) {
System.out.println("✓ Sent successfully");
} else {
System.err.println("✗ Failed with status: " + statusCode);
}
return null;
});
} catch (Exception e) {
System.err.println("✗ Error sending point: " + e.getMessage());
}
// Wait before sending next point (except for the last one)
if (i < trackPoints.size() - 1) {
Thread.sleep(intervalSeconds * 1000L);
}
}
}
System.out.println("Finished sending all track points");
}
}

View File

@@ -16,7 +16,6 @@ public class AppConfig {
return new GeometryFactory(new PrecisionModel(), 4326);
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();

View File

@@ -34,6 +34,7 @@ public class SecurityConfig {
.requestMatchers("/login").permitAll()
.requestMatchers("/css/**", "/js/**", "/images/**").permitAll()
.requestMatchers("/actuator/health").permitAll()
.requestMatchers("/api/v1/reitti-integration/notify/**").permitAll()
.anyRequest().authenticated()
)
.addFilterBefore(bearerTokenAuthFilter, AuthorizationFilter.class)

View File

@@ -2,11 +2,11 @@ package com.dedicatedcode.reitti.controller;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.UserSseEmitterService;
import com.dedicatedcode.reitti.service.integration.ReittiIntegrationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -15,20 +15,19 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
public class SseController {
private static final Logger log = LoggerFactory.getLogger(SseController.class);
private final UserSseEmitterService emitterService;
private final ReittiIntegrationService reittiIntegrationService;
public SseController(UserSseEmitterService userSseEmitterService) {
public SseController(UserSseEmitterService userSseEmitterService,
ReittiIntegrationService reittiIntegrationService) {
this.emitterService = userSseEmitterService;
this.reittiIntegrationService = reittiIntegrationService;
}
@GetMapping(path = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSseForUser(@AuthenticationPrincipal UserDetails userDetails) {
if (userDetails == null) {
throw new IllegalStateException("User not authenticated for SSE endpoint.");
}
User user = (User) userDetails;
SseEmitter emitter = emitterService.addEmitter(user.getId());
public SseEmitter handleSseForUser(@AuthenticationPrincipal User user) {
SseEmitter emitter = emitterService.addEmitter(user);
reittiIntegrationService.registerSubscriptionsForUser(user);
log.info("New SSE connection from user: [{}]", user.getId());
return emitter;
}
}
}

View File

@@ -1,33 +1,56 @@
package com.dedicatedcode.reitti.controller.api;
import com.dedicatedcode.reitti.dto.ReittiRemoteInfo;
import com.dedicatedcode.reitti.dto.SubscriptionRequest;
import com.dedicatedcode.reitti.dto.SubscriptionResponse;
import com.dedicatedcode.reitti.dto.TimelineEntry;
import com.dedicatedcode.reitti.model.NotificationData;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.service.TimelineService;
import com.dedicatedcode.reitti.service.UserNotificationService;
import com.dedicatedcode.reitti.service.VersionService;
import com.dedicatedcode.reitti.service.integration.ReittiIntegrationService;
import com.dedicatedcode.reitti.service.integration.ReittiSubscription;
import com.dedicatedcode.reitti.service.integration.ReittiSubscriptionService;
import jakarta.validation.Valid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
@RestController
@RequestMapping("/api/v1/reitti-integration")
public class ReittiIntegrationApiController {
private static final Logger log = LoggerFactory.getLogger(ReittiIntegrationApiController.class);
private final VersionService versionService;
private final TimelineService timelineService;
private final ReittiSubscriptionService subscriptionService;
private final ReittiIntegrationService integrationService;
private final UserNotificationService userNotificationService;
private final UserJdbcService userJdbcService;
public ReittiIntegrationApiController(VersionService versionService,
TimelineService timelineService) {
TimelineService timelineService,
ReittiSubscriptionService subscriptionService,
ReittiIntegrationService integrationService,
UserNotificationService userNotificationService,
UserJdbcService userJdbcService) {
this.versionService = versionService;
this.timelineService = timelineService;
this.subscriptionService = subscriptionService;
this.integrationService = integrationService;
this.userNotificationService = userNotificationService;
this.userJdbcService = userJdbcService;
}
@GetMapping("/info")
@@ -45,10 +68,37 @@ public class ReittiIntegrationApiController {
LocalDate selectedDate = LocalDate.parse(date);
ZoneId userTimezone = ZoneId.of(timezone);
// Convert LocalDate to start and end Instant for the selected date in user's timezone
Instant startOfDay = selectedDate.atStartOfDay(userTimezone).toInstant();
Instant endOfDay = selectedDate.plusDays(1).atStartOfDay(userTimezone).toInstant().minusMillis(1);
return this.timelineService.buildTimelineEntries(user, userTimezone, selectedDate, startOfDay, endOfDay);
}
@PostMapping("/subscribe")
public ResponseEntity<SubscriptionResponse> subscribe(@AuthenticationPrincipal User user,
@Valid @RequestBody SubscriptionRequest request) {
SubscriptionResponse response = subscriptionService.createSubscription(user, request.getCallbackUrl());
return ResponseEntity.ok(response);
}
@PostMapping("/notify/{subscriptionId}")
public ResponseEntity<Void> notify(@PathVariable String subscriptionId,
@RequestBody NotificationData notificationData) {
try {
Optional<Long> userId = this.integrationService.getUserIdForSubscription(subscriptionId);
if (userId.isEmpty()) {
log.warn("Subscription with id {} not found", subscriptionId);
return ResponseEntity.notFound().build();
}
this.userJdbcService.findById(userId.get()).ifPresentOrElse(user -> {
this.userNotificationService.sendToQueue(user, notificationData.getAffectedDates(), notificationData.getEventType());
}, () -> log.warn("Unable to find user for [{}]", subscriptionId));
return ResponseEntity.ok().build();
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

View File

@@ -0,0 +1,18 @@
package com.dedicatedcode.reitti.dto;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Pattern;
public class SubscriptionRequest {
@NotBlank
@Pattern(regexp = "^https?://.*", message = "Callback URL must be a valid HTTP/HTTPS URL")
private String callbackUrl;
public String getCallbackUrl() {
return callbackUrl;
}
public void setCallbackUrl(String callbackUrl) {
this.callbackUrl = callbackUrl;
}
}

View File

@@ -0,0 +1,27 @@
package com.dedicatedcode.reitti.dto;
import java.time.Instant;
public class SubscriptionResponse {
private final String subscriptionId;
private final String status;
private final Instant createdAt;
public SubscriptionResponse(String subscriptionId, String status, Instant createdAt) {
this.subscriptionId = subscriptionId;
this.status = status;
this.createdAt = createdAt;
}
public String getSubscriptionId() {
return subscriptionId;
}
public String getStatus() {
return status;
}
public Instant getCreatedAt() {
return createdAt;
}
}

View File

@@ -7,6 +7,10 @@ public record GeoPoint(double latitude, double longitude) {
return new GeoPoint(point.getY(), point.getX());
}
public static GeoPoint from(double latitude, double longitude) {
return new GeoPoint(latitude, longitude);
}
public boolean near(GeoPoint point) {
return GeoUtils.distanceInMeters(this, point) < 100;
}

View File

@@ -0,0 +1,30 @@
package com.dedicatedcode.reitti.model;
import com.dedicatedcode.reitti.event.SSEType;
import java.time.LocalDate;
import java.util.Set;
public class NotificationData {
private final SSEType eventType;
private final Long userId;
private final Set<LocalDate> affectedDates;
public NotificationData(SSEType eventType, Long userId, Set<LocalDate> affectedDates) {
this.eventType = eventType;
this.userId = userId;
this.affectedDates = affectedDates;
}
public SSEType getEventType() {
return eventType;
}
public Long getUserId() {
return userId;
}
public Set<LocalDate> getAffectedDates() {
return affectedDates;
}
}

View File

@@ -1,7 +1,5 @@
package com.dedicatedcode.reitti.model;
import org.locationtech.jts.geom.Point;
import java.io.Serializable;
import java.util.Objects;
@@ -13,24 +11,26 @@ public class SignificantPlace implements Serializable {
private final String countryCode;
private final Double latitudeCentroid;
private final Double longitudeCentroid;
private final Point geom;
private final PlaceType type;
private final boolean geocoded;
private final Long version;
public static SignificantPlace create(Double latitude, Double longitude, Point point) {
return new SignificantPlace(null, null, latitude, longitude, point, PlaceType.OTHER, null);
public static SignificantPlace create(Double latitude, Double longitude) {
return new SignificantPlace(null, null, latitude, longitude, PlaceType.OTHER, null);
}
public SignificantPlace() {
this(null, null, null, null, null, null, null, false, null);
}
private SignificantPlace(String name,
String address,
Double latitudeCentroid,
Double longitudeCentroid,
Point geom,
PlaceType type,
String countryCode) {
this(null, name, address, countryCode, latitudeCentroid, longitudeCentroid, geom, type, false, 1L);
this(null, name, address, countryCode, latitudeCentroid, longitudeCentroid, type, false, 1L);
}
public SignificantPlace(Long id,
@@ -39,7 +39,6 @@ public class SignificantPlace implements Serializable {
String countryCode,
Double latitudeCentroid,
Double longitudeCentroid,
Point geom,
PlaceType type,
boolean geocoded,
Long version) {
@@ -49,7 +48,6 @@ public class SignificantPlace implements Serializable {
this.countryCode = countryCode;
this.latitudeCentroid = latitudeCentroid;
this.longitudeCentroid = longitudeCentroid;
this.geom = geom;
this.type = type;
this.geocoded = geocoded;
this.version = version;
@@ -84,11 +82,7 @@ public class SignificantPlace implements Serializable {
return type;
}
public Point getGeom() {
return geom;
}
public boolean isGeocoded() {
public boolean isGeocoded() {
return geocoded;
}
@@ -98,27 +92,27 @@ public class SignificantPlace implements Serializable {
// Wither methods
public SignificantPlace withGeocoded(boolean geocoded) {
return new SignificantPlace(this.id, this.name, this.address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.geom, this.type, geocoded, this.version);
return new SignificantPlace(this.id, this.name, this.address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.type, geocoded, this.version);
}
public SignificantPlace withName(String name) {
return new SignificantPlace(this.id, name, this.address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.geom, this.type, this.geocoded, this.version);
return new SignificantPlace(this.id, name, this.address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.type, this.geocoded, this.version);
}
public SignificantPlace withAddress(String address) {
return new SignificantPlace(this.id, this.name, address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.geom, this.type, this.geocoded, this.version);
return new SignificantPlace(this.id, this.name, address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.type, this.geocoded, this.version);
}
public SignificantPlace withCountryCode(String countryCode) {
return new SignificantPlace(this.id, this.name, this.address, countryCode, this.latitudeCentroid, this.longitudeCentroid, this.geom, this.type, this.geocoded, this.version);
return new SignificantPlace(this.id, this.name, this.address, countryCode, this.latitudeCentroid, this.longitudeCentroid, this.type, this.geocoded, this.version);
}
public SignificantPlace withType(PlaceType type) {
return new SignificantPlace(this.id, this.name, this.address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.geom, type, this.geocoded, this.version);
return new SignificantPlace(this.id, this.name, this.address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, type, this.geocoded, this.version);
}
public SignificantPlace withId(Long id) {
return new SignificantPlace(id, this.name, address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.geom, this.type, this.geocoded, this.version);
return new SignificantPlace(id, this.name, address, this.countryCode, this.latitudeCentroid, this.longitudeCentroid, this.type, this.geocoded, this.version);
}
@Override
@@ -138,7 +132,6 @@ public class SignificantPlace implements Serializable {
return "SignificantPlace{" +
"id=" + id +
", name='" + name + '\'' +
", geom=" + geom +
'}';
}

View File

@@ -6,6 +6,7 @@ import org.springframework.security.core.userdetails.UserDetails;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
public class User implements UserDetails {
@@ -101,4 +102,16 @@ public class User implements UserDetails {
public User withRole(Role role) {
return new User(this.id, this.username, this.password, this.displayName, role, this.version);
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
User user = (User) o;
return Objects.equals(id, user.id);
}
@Override
public int hashCode() {
return Objects.hashCode(id);
}
}

View File

@@ -1,5 +1,6 @@
package com.dedicatedcode.reitti.repository;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.io.ParseException;
@@ -10,8 +11,11 @@ import org.springframework.stereotype.Component;
public class PointReaderWriter {
private final WKTReader wktReader;
private final GeometryFactory geometryFactory;
public PointReaderWriter(GeometryFactory geometryFactory) {
this.wktReader = new WKTReader(geometryFactory);
this.geometryFactory = geometryFactory;
}
public Point read(String wkt) {
@@ -20,6 +24,9 @@ public class PointReaderWriter {
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
public String write(double x, double y) {
return geometryFactory.createPoint(new Coordinate(x, y)).toString();
}
}

View File

@@ -20,9 +20,11 @@ import java.util.Optional;
public class SignificantPlaceJdbcService {
private final JdbcTemplate jdbcTemplate;
private final PointReaderWriter pointReaderWriter;
public SignificantPlaceJdbcService(JdbcTemplate jdbcTemplate, PointReaderWriter pointReaderWriter) {
this.jdbcTemplate = jdbcTemplate;
this.pointReaderWriter = pointReaderWriter;
this.significantPlaceRowMapper = (rs, _) -> new SignificantPlace(
rs.getLong("id"),
rs.getString("name"),
@@ -30,7 +32,6 @@ public class SignificantPlaceJdbcService {
rs.getString("country_code"),
rs.getDouble("latitude_centroid"),
rs.getDouble("longitude_centroid"),
pointReaderWriter.read(rs.getString("geom")),
SignificantPlace.PlaceType.valueOf(rs.getString("type")),
rs.getBoolean("geocoded"),
rs.getLong("version"));
@@ -69,7 +70,7 @@ public class SignificantPlaceJdbcService {
place.getName(),
place.getLatitudeCentroid(),
place.getLongitudeCentroid(),
place.getGeom().toString()
this.pointReaderWriter.write(place.getLongitudeCentroid(), place.getLatitudeCentroid())
);
return place.withId(id);
}
@@ -84,7 +85,7 @@ public class SignificantPlaceJdbcService {
place.getType().name(),
place.getLatitudeCentroid(),
place.getLongitudeCentroid(),
place.getGeom().toString(),
this.pointReaderWriter.write(place.getLongitudeCentroid(), place.getLatitudeCentroid()),
place.isGeocoded(),
place.getId()
);

View File

@@ -2,6 +2,7 @@ package com.dedicatedcode.reitti.service;
import com.dedicatedcode.reitti.config.RabbitMQConfig;
import com.dedicatedcode.reitti.event.*;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.service.geocoding.ReverseGeocodingListener;
import com.dedicatedcode.reitti.service.processing.*;
import org.slf4j.Logger;
@@ -22,6 +23,7 @@ public class MessageDispatcherService {
private final ReverseGeocodingListener reverseGeocodingListener;
private final ProcessingPipelineTrigger processingPipelineTrigger;
private final UserSseEmitterService userSseEmitterService;
private final UserJdbcService userJdbcService;
@Autowired
public MessageDispatcherService(LocationDataIngestPipeline locationDataIngestPipeline,
@@ -30,7 +32,8 @@ public class MessageDispatcherService {
TripDetectionService tripDetectionService,
ReverseGeocodingListener reverseGeocodingListener,
ProcessingPipelineTrigger processingPipelineTrigger,
UserSseEmitterService userSseEmitterService) {
UserSseEmitterService userSseEmitterService,
UserJdbcService userJdbcService) {
this.locationDataIngestPipeline = locationDataIngestPipeline;
this.visitDetectionService = visitDetectionService;
this.visitMergingService = visitMergingService;
@@ -38,6 +41,7 @@ public class MessageDispatcherService {
this.reverseGeocodingListener = reverseGeocodingListener;
this.processingPipelineTrigger = processingPipelineTrigger;
this.userSseEmitterService = userSseEmitterService;
this.userJdbcService = userJdbcService;
}
@RabbitListener(queues = RabbitMQConfig.LOCATION_DATA_QUEUE, concurrency = "${reitti.events.concurrency}")
@@ -73,7 +77,7 @@ public class MessageDispatcherService {
@RabbitListener(queues = RabbitMQConfig.USER_EVENT_QUEUE)
public void handleUserNotificationEvent(SSEEvent event) {
logger.debug("Dispatching SSEEvent for user: {}", event.getUserId());
this.userSseEmitterService.sendEventToUser(event.getUserId(), event);
this.userJdbcService.findById(event.getUserId()).ifPresentOrElse(user -> this.userSseEmitterService.sendEventToUser(user, event), () -> logger.warn("User not found for user: {}", event.getUserId()));
}
@RabbitListener(queues = RabbitMQConfig.TRIGGER_PROCESSING_PIPELINE_QUEUE, concurrency = "${reitti.events.concurrency}")

View File

@@ -4,10 +4,11 @@ import com.dedicatedcode.reitti.config.RabbitMQConfig;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.event.SSEEvent;
import com.dedicatedcode.reitti.event.SSEType;
import com.dedicatedcode.reitti.model.NotificationData;
import com.dedicatedcode.reitti.model.ProcessedVisit;
import com.dedicatedcode.reitti.model.Trip;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.repository.UserSettingsJdbcService;
import com.dedicatedcode.reitti.service.integration.ReittiSubscriptionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -23,15 +24,15 @@ import java.util.Set;
import java.util.stream.Collectors;
@Service
public class UserNotificationQueueService {
private static final Logger log = LoggerFactory.getLogger(UserNotificationQueueService.class);
private final UserSettingsJdbcService userSettingsJdbcService;
public class UserNotificationService {
private static final Logger log = LoggerFactory.getLogger(UserNotificationService.class);
private final RabbitTemplate rabbitTemplate;
private final ReittiSubscriptionService reittiSubscriptionService;
public UserNotificationQueueService(UserSettingsJdbcService userSettingsJdbcService,
RabbitTemplate rabbitTemplate) {
this.userSettingsJdbcService = userSettingsJdbcService;
public UserNotificationService(RabbitTemplate rabbitTemplate,
ReittiSubscriptionService reittiSubscriptionService) {
this.rabbitTemplate = rabbitTemplate;
this.reittiSubscriptionService = reittiSubscriptionService;
}
public void newTrips(User user, List<Trip> trips) {
@@ -39,6 +40,7 @@ public class UserNotificationQueueService {
log.debug("New trips for user [{}]", user.getId());
Set<LocalDate> dates = calculateAffectedDates(trips.stream().map(Trip::getStartTime).toList(), trips.stream().map(Trip::getEndTime).toList());
sendToQueue(user, dates, eventType);
notifyReittiSubscriptions(user, eventType, dates);
}
public void newVisits(User user, List<ProcessedVisit> processedVisits) {
@@ -46,6 +48,7 @@ public class UserNotificationQueueService {
log.debug("New Visits for user [{}]", user.getId());
Set<LocalDate> dates = calculateAffectedDates(processedVisits.stream().map(ProcessedVisit::getStartTime).toList(), processedVisits.stream().map(ProcessedVisit::getEndTime).toList());
sendToQueue(user, dates, eventType);
notifyReittiSubscriptions(user, eventType, dates);
}
public void newRawLocationData(User user, List<LocationDataRequest.LocationPoint> filtered) {
@@ -53,14 +56,24 @@ public class UserNotificationQueueService {
log.debug("New RawLocationPoints for user [{}]", user.getId());
Set<LocalDate> dates = calculateAffectedDates(filtered.stream().map(LocationDataRequest.LocationPoint::getTimestamp).map(s -> ZonedDateTime.parse(s).toInstant()).toList());
sendToQueue(user, dates, eventType);
notifyReittiSubscriptions(user, eventType, dates);
}
private void sendToQueue(User user, Set<LocalDate> dates, SSEType eventType) {
public void sendToQueue(User user, Set<LocalDate> dates, SSEType eventType) {
for (LocalDate date : dates) {
this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.USER_EVENT_ROUTING_KEY, new SSEEvent(eventType, user.getId(), user.getId(), date));
}
}
private void notifyReittiSubscriptions(User user, SSEType eventType, Set<LocalDate> dates) {
try {
NotificationData notificationData = new NotificationData(eventType, user.getId(), dates);
reittiSubscriptionService.notifyAllSubscriptions(user, notificationData);
} catch (Exception e) {
log.error("Failed to notify Reitti subscriptions for user: {}", user.getId(), e);
}
}
@SafeVarargs
private Set<LocalDate> calculateAffectedDates(List<Instant>... list) {
if (list == null) {

View File

@@ -1,6 +1,8 @@
package com.dedicatedcode.reitti.service;
import com.dedicatedcode.reitti.event.SSEEvent;
import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.service.integration.ReittiIntegrationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
@@ -16,68 +18,63 @@ import java.util.concurrent.CopyOnWriteArraySet;
@Service
public class UserSseEmitterService implements SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(UserSseEmitterService.class);
private final Map<Long, Set<SseEmitter>> userEmitters = new ConcurrentHashMap<>();
private final ReittiIntegrationService reittiIntegrationService;
private final Map<User, Set<SseEmitter>> userEmitters = new ConcurrentHashMap<>();
public SseEmitter addEmitter(Long userId) {
public UserSseEmitterService(ReittiIntegrationService reittiIntegrationService) {
this.reittiIntegrationService = reittiIntegrationService;
}
public SseEmitter addEmitter(User user) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
userEmitters.computeIfAbsent(userId, k -> new CopyOnWriteArraySet<>()).add(emitter);
userEmitters.computeIfAbsent(user, _ -> new CopyOnWriteArraySet<>()).add(emitter);
emitter.onCompletion(() -> {
log.info("SSE connection completed for user: [{}]", userId);
removeEmitter(userId, emitter);
log.info("SSE connection completed for user: [{}]", user);
removeEmitter(user, emitter);
});
emitter.onTimeout(() -> {
log.info("SSE connection timed out for user: [{}]", userId);
emitter.complete(); // Complete the emitter on timeout
removeEmitter(userId, emitter);
log.info("SSE connection timed out for user: [{}]", user);
emitter.complete();
removeEmitter(user, emitter);
});
emitter.onError(throwable -> {
log.error("SSE connection error for user [{}]: {}", userId, throwable.getMessage());
removeEmitter(userId, emitter);
log.error("SSE connection error for user [{}]: {}", user, throwable.getMessage());
removeEmitter(user, emitter);
});
log.info("Emitter added for user: {}. Total emitters for user: {}", userId, userEmitters.get(userId).size());
log.info("Emitter added for user: {}. Total emitters for user: {}", user, userEmitters.get(user).size());
return emitter;
}
public void sendEventToUser(Long userId, SSEEvent eventData) {
Set<SseEmitter> emitters = userEmitters.get(userId);
public void sendEventToUser(User user, SSEEvent eventData) {
Set<SseEmitter> emitters = userEmitters.get(user);
if (emitters != null) {
for (SseEmitter emitter : new CopyOnWriteArraySet<>(emitters)) {
try {
emitter.send(SseEmitter.event().data(eventData));
log.debug("Sent event to user: {}", userId);
log.debug("Sent event to user: {}", user);
} catch (IOException e) {
log.error("Error sending event to user {}: {}", userId, e.getMessage());
log.error("Error sending event to user {}: {}", user, e.getMessage());
emitter.completeWithError(e);
removeEmitter(userId, emitter);
removeEmitter(user, emitter);
}
}
} else {
log.debug("No active SSE emitters for user: {}", userId);
log.debug("No active SSE emitters for user: {}", user);
}
}
private void removeEmitter(Long userId, SseEmitter emitter) {
Set<SseEmitter> emitters = userEmitters.get(userId);
private void removeEmitter(User user, SseEmitter emitter) {
Set<SseEmitter> emitters = userEmitters.get(user);
if (emitters != null) {
emitters.remove(emitter);
if (emitters.isEmpty()) {
userEmitters.remove(userId);
}
log.info("Emitter removed for user: {}. Remaining emitters for user: {}", userId, userEmitters.containsKey(userId) ? userEmitters.get(userId).size() : 0);
}
}
public void removeEmitter(Long userId) {
Set<SseEmitter> emitters = userEmitters.get(userId);
if (emitters != null) {
for (SseEmitter emitter : emitters) {
removeEmitter(userId, emitter);
}
userEmitters.remove(userId);
log.info("Removed all emitters for user: {}. Remaining emitters for user: {}", userId, userEmitters.containsKey(userId) ? userEmitters.get(userId).size() : 0);
userEmitters.remove(user);
reittiIntegrationService.unsubscribeFromIntegrations(user);
}
log.info("Emitter removed for user: {}. Remaining emitters for user: {}", user, userEmitters.containsKey(user) ? userEmitters.get(user).size() : 0);
}
}

View File

@@ -1,9 +1,6 @@
package com.dedicatedcode.reitti.service.integration;
import com.dedicatedcode.reitti.dto.LocationDataRequest;
import com.dedicatedcode.reitti.dto.ReittiRemoteInfo;
import com.dedicatedcode.reitti.dto.TimelineEntry;
import com.dedicatedcode.reitti.dto.UserTimelineData;
import com.dedicatedcode.reitti.dto.*;
import com.dedicatedcode.reitti.model.ReittiIntegration;
import com.dedicatedcode.reitti.model.RemoteUser;
import com.dedicatedcode.reitti.model.User;
@@ -14,6 +11,7 @@ import com.dedicatedcode.reitti.service.RequestFailedException;
import com.dedicatedcode.reitti.service.RequestTemporaryFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
@@ -28,19 +26,24 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class ReittiIntegrationService {
private static final Logger log = LoggerFactory.getLogger(ReittiIntegrationService.class);
private static final List<ReittiIntegration.Status> VALID_INTEGRATION_STATUS = List.of(ReittiIntegration.Status.ACTIVE, ReittiIntegration.Status.RECOVERABLE);
private final String advertiseUri;
private final ReittiIntegrationJdbcService jdbcService;
private final RestTemplate restTemplate;
private final AvatarService avatarService;
private final Map<Long, String> integrationSubscriptions = new ConcurrentHashMap<>();
private final Map<String, Long> userForSubscriptions = new ConcurrentHashMap<>();
public ReittiIntegrationService(ReittiIntegrationJdbcService jdbcService,
RestTemplate restTemplate, AvatarService avatarService) {
public ReittiIntegrationService(@Value("${reitti.server.advertise-uri}") String advertiseUri, ReittiIntegrationJdbcService jdbcService,
RestTemplate restTemplate,
AvatarService avatarService) {
this.advertiseUri = advertiseUri;
this.jdbcService = jdbcService;
this.restTemplate = restTemplate;
this.avatarService = avatarService;
@@ -56,7 +59,7 @@ public class ReittiIntegrationService {
try {
RemoteUser remoteUser = handleRemoteUser(integration);
List<TimelineEntry> timelineEntries = loadTimeLineEntries(integration, selectedDate, userTimezone);
update(integration.withStatus(ReittiIntegration.Status.ACTIVE).withLastUsed(LocalDateTime.now()));
integration = update(integration.withStatus(ReittiIntegration.Status.ACTIVE).withLastUsed(LocalDateTime.now()));
return new UserTimelineData("remote:" + integration.getId(),
remoteUser.getDisplayName(),
this.avatarService.generateInitials(remoteUser.getDisplayName()),
@@ -155,6 +158,7 @@ public class ReittiIntegrationService {
.findFirst().orElse(Collections.emptyList());
}
private ReittiIntegration update(ReittiIntegration integration) {
try {
return this.jdbcService.update(integration).orElseThrow();
@@ -232,4 +236,133 @@ public class ReittiIntegrationService {
return persisted.get();
}
}
public void registerSubscriptionsForUser(User user) {
log.info("Registering subscriptions for user: [{}]", user.getId());
if (advertiseUri == null || advertiseUri.isEmpty()) {
log.warn("Advertise URI is null or empty, remote updates are disabled. Consider setting 'reitti.server.advertise-uri'");
return;
}
List<ReittiIntegration> activeIntegrations = getActiveIntegrationsForUser(user);
for (ReittiIntegration integration : activeIntegrations) {
try {
registerSubscriptionOnIntegration(integration, user);
log.debug("Successfully registered subscription for integration: [{}]", integration.getId());
} catch (Exception | RequestFailedException e) {
log.error("couldn't fetch user info for [{}]", integration, e);
update(integration.withStatus(ReittiIntegration.Status.FAILED).withLastUsed(LocalDateTime.now()).withEnabled(false));
} catch (RequestTemporaryFailedException e) {
log.warn("couldn't temporarily fetch user info for [{}]", integration, e);
update(integration.withStatus(ReittiIntegration.Status.RECOVERABLE).withLastUsed(LocalDateTime.now()));
}
}
}
public List<ReittiIntegration> getActiveIntegrationsForUser(User user) {
return this.jdbcService
.findAllByUser(user)
.stream()
.filter(integration -> integration.isEnabled() && VALID_INTEGRATION_STATUS.contains(integration.getStatus()))
.toList();
}
private void registerSubscriptionOnIntegration(ReittiIntegration integration, User user) throws RequestFailedException, RequestTemporaryFailedException {
if (advertiseUri == null || advertiseUri.isEmpty()) {
log.warn("No advertise URI configured, skipping subscription registration for integration: [{}]", integration.getId());
return;
}
HttpHeaders headers = new HttpHeaders();
headers.set("X-API-TOKEN", integration.getToken());
headers.setContentType(MediaType.APPLICATION_JSON);
SubscriptionRequest subscriptionRequest = new SubscriptionRequest();
subscriptionRequest.setCallbackUrl(advertiseUri);
HttpEntity<SubscriptionRequest> entity = new HttpEntity<>(subscriptionRequest, headers);
String subscribeUrl = integration.getUrl().endsWith("/") ?
integration.getUrl() + "api/v1/reitti-integration/subscribe" :
integration.getUrl() + "/api/v1/reitti-integration/subscribe";
try {
ResponseEntity<SubscriptionResponse> response = restTemplate.exchange(
subscribeUrl,
HttpMethod.POST,
entity,
SubscriptionResponse.class
);
if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) {
log.debug("Successfully subscribed to integration: [{}]", integration.getId());
synchronized (integrationSubscriptions) {
this.integrationSubscriptions.put(integration.getId(), response.getBody().getSubscriptionId());
this.userForSubscriptions.put(response.getBody().getSubscriptionId(), user.getId());
}
} else if (response.getStatusCode().is4xxClientError()) {
throw new RequestFailedException(subscribeUrl, response.getStatusCode(), response.getBody());
} else {
throw new RequestTemporaryFailedException(subscribeUrl, response.getStatusCode(), response.getBody());
}
} catch (RestClientException ex) {
throw new RequestFailedException(subscribeUrl, HttpStatusCode.valueOf(500), "Connection refused");
}
}
public void unsubscribeFromIntegrations(User user) {
log.info("Unsubscribing from integrations for user: [{}]", user.getId());
List<ReittiIntegration> activeIntegrations = getActiveIntegrationsForUser(user);
for (ReittiIntegration integration : activeIntegrations) {
String subscriptionId = integrationSubscriptions.get(integration.getId());
if (subscriptionId != null) {
try {
unsubscribeFromIntegration(integration, subscriptionId);
integrationSubscriptions.remove(integration.getId());
userForSubscriptions.remove(subscriptionId);
log.debug("Successfully unsubscribed from integration: [{}]", integration.getId());
} catch (Exception | RequestFailedException e) {
log.warn("Failed to unsubscribe from integration: [{}]", integration.getId(), e);
update(integration.withStatus(ReittiIntegration.Status.FAILED).withLastUsed(LocalDateTime.now()).withEnabled(false));
} catch (RequestTemporaryFailedException e) {
update(integration.withStatus(ReittiIntegration.Status.RECOVERABLE).withLastUsed(LocalDateTime.now()));
}
}
}
}
private void unsubscribeFromIntegration(ReittiIntegration integration, String subscriptionId) throws RequestFailedException, RequestTemporaryFailedException {
HttpHeaders headers = new HttpHeaders();
headers.set("X-API-TOKEN", integration.getToken());
HttpEntity<String> entity = new HttpEntity<>(headers);
String unsubscribeUrl = integration.getUrl().endsWith("/") ?
integration.getUrl() + "api/v1/reitti-integration/subscribe/" + subscriptionId :
integration.getUrl() + "/api/v1/reitti-integration/subscribe/" + subscriptionId;
try {
ResponseEntity<Void> response = restTemplate.exchange(
unsubscribeUrl,
HttpMethod.DELETE,
entity,
Void.class
);
if (!response.getStatusCode().is2xxSuccessful()) {
if (response.getStatusCode().is4xxClientError()) {
throw new RequestFailedException(unsubscribeUrl, response.getStatusCode(), null);
} else {
throw new RequestTemporaryFailedException(unsubscribeUrl, response.getStatusCode(), null);
}
}
} catch (RestClientException ex) {
throw new RequestFailedException(unsubscribeUrl, HttpStatusCode.valueOf(500), "Connection refused");
}
}
public Optional<Long> getUserIdForSubscription(String subscriptionId) {
return Optional.ofNullable(this.userForSubscriptions.get(subscriptionId));
}
}

View File

@@ -0,0 +1,25 @@
package com.dedicatedcode.reitti.service.integration;
public class ReittiSubscription {
private final String subscriptionId;
private final Long userId;
private final String callbackUrl;
public ReittiSubscription(String subscriptionId, Long userId, String callbackUrl) {
this.subscriptionId = subscriptionId;
this.userId = userId;
this.callbackUrl = callbackUrl;
}
public String getSubscriptionId() {
return subscriptionId;
}
public Long getUserId() {
return userId;
}
public String getCallbackUrl() {
return callbackUrl;
}
}

View File

@@ -0,0 +1,66 @@
package com.dedicatedcode.reitti.service.integration;
import com.dedicatedcode.reitti.dto.SubscriptionResponse;
import com.dedicatedcode.reitti.model.NotificationData;
import com.dedicatedcode.reitti.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class ReittiSubscriptionService {
private static final Logger log = LoggerFactory.getLogger(ReittiSubscriptionService.class);
private final Map<String, ReittiSubscription> subscriptions = new ConcurrentHashMap<>();
private final RestTemplate restTemplate;
public ReittiSubscriptionService() {
this.restTemplate = new RestTemplate();
}
public SubscriptionResponse createSubscription(User user, String callbackUrl) {
String subscriptionId = "sub_" + UUID.randomUUID().toString().replace("-", "").substring(0, 12);
Instant now = Instant.now();
ReittiSubscription subscription = new ReittiSubscription(subscriptionId, user.getId(), callbackUrl);
subscriptions.put(subscriptionId, subscription);
return new SubscriptionResponse(subscriptionId, "active", now);
}
public ReittiSubscription getSubscription(String subscriptionId) {
return subscriptions.get(subscriptionId);
}
public void notifyAllSubscriptions(User user, NotificationData notificationData) {
subscriptions.values().stream()
.filter(subscription -> subscription.getUserId().equals(user.getId()))
.forEach(subscription -> sendNotificationToCallback(subscription, notificationData));
}
private void sendNotificationToCallback(ReittiSubscription subscription, NotificationData notificationData) {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Object> request = new HttpEntity<>(notificationData, headers);
String notifyUrl = subscription.getCallbackUrl().endsWith("/") ?
subscription.getCallbackUrl() + "api/v1/reitti-integration/notify/" + subscription.getSubscriptionId() :
subscription.getCallbackUrl() + "/api/v1/reitti-integration/notify/" + subscription.getSubscriptionId();
restTemplate.postForEntity(notifyUrl, request, String.class);
log.debug("Notification sent successfully to subscription: {}", subscription.getSubscriptionId());
} catch (Exception e) {
log.error("Failed to send notification to subscription: {}, callback URL: {}",
subscription.getSubscriptionId(), subscription.getCallbackUrl(), e);
this.subscriptions.remove(subscription.getSubscriptionId());
}
}
}

View File

@@ -6,7 +6,7 @@ import com.dedicatedcode.reitti.model.User;
import com.dedicatedcode.reitti.repository.RawLocationPointJdbcService;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.repository.UserSettingsJdbcService;
import com.dedicatedcode.reitti.service.UserNotificationQueueService;
import com.dedicatedcode.reitti.service.UserNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -23,19 +23,19 @@ public class LocationDataIngestPipeline {
private final UserJdbcService userJdbcService;
private final RawLocationPointJdbcService rawLocationPointJdbcService;
private final UserSettingsJdbcService userSettingsJdbcService;
private final UserNotificationQueueService userNotificationQueueService;
private final UserNotificationService userNotificationService;
@Autowired
public LocationDataIngestPipeline(GeoPointAnomalyFilter geoPointAnomalyFilter,
UserJdbcService userJdbcService,
RawLocationPointJdbcService rawLocationPointJdbcService,
UserSettingsJdbcService userSettingsJdbcService,
UserNotificationQueueService userNotificationQueueService) {
UserNotificationService userNotificationService) {
this.geoPointAnomalyFilter = geoPointAnomalyFilter;
this.userJdbcService = userJdbcService;
this.rawLocationPointJdbcService = rawLocationPointJdbcService;
this.userSettingsJdbcService = userSettingsJdbcService;
this.userNotificationQueueService = userNotificationQueueService;
this.userNotificationService = userNotificationService;
}
public void processLocationData(LocationDataEvent event) {
@@ -53,7 +53,7 @@ public class LocationDataIngestPipeline {
List<LocationDataRequest.LocationPoint> filtered = this.geoPointAnomalyFilter.filterAnomalies(points);
rawLocationPointJdbcService.bulkInsert(user, filtered);
userSettingsJdbcService.updateNewestData(user, filtered);
userNotificationQueueService.newRawLocationData(user, filtered);
userNotificationService.newRawLocationData(user, filtered);
logger.info("Finished storing points [{}] for user [{}] in [{}]ms. Filtered out [{}] points.", filtered.size(), event.getUsername(), System.currentTimeMillis() - start, points.size() - filtered.size());
}
}

View File

@@ -6,7 +6,7 @@ import com.dedicatedcode.reitti.repository.ProcessedVisitJdbcService;
import com.dedicatedcode.reitti.repository.RawLocationPointJdbcService;
import com.dedicatedcode.reitti.repository.TripJdbcService;
import com.dedicatedcode.reitti.repository.UserJdbcService;
import com.dedicatedcode.reitti.service.UserNotificationQueueService;
import com.dedicatedcode.reitti.service.UserNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -28,19 +28,19 @@ public class TripDetectionService {
private final RawLocationPointJdbcService rawLocationPointJdbcService;
private final TripJdbcService tripJdbcService;
private final UserJdbcService userJdbcService;
private final UserNotificationQueueService userNotificationQueueService;
private final UserNotificationService userNotificationService;
private final ConcurrentHashMap<String, ReentrantLock> userLocks = new ConcurrentHashMap<>();
public TripDetectionService(ProcessedVisitJdbcService processedVisitJdbcService,
RawLocationPointJdbcService rawLocationPointJdbcService,
TripJdbcService tripJdbcService,
UserJdbcService userJdbcService,
UserNotificationQueueService userNotificationQueueService) {
UserNotificationService userNotificationService) {
this.processedVisitJdbcService = processedVisitJdbcService;
this.rawLocationPointJdbcService = rawLocationPointJdbcService;
this.tripJdbcService = tripJdbcService;
this.userJdbcService = userJdbcService;
this.userNotificationQueueService = userNotificationQueueService;
this.userNotificationService = userNotificationService;
}
public void visitCreated(ProcessedVisitCreatedEvent event) {
@@ -78,7 +78,7 @@ public class TripDetectionService {
}
tripJdbcService.bulkInsert(user, trips);
userNotificationQueueService.newTrips(user, trips);
userNotificationService.newTrips(user, trips);
});
} finally {
userLock.unlock();

View File

@@ -6,7 +6,7 @@ import com.dedicatedcode.reitti.event.SignificantPlaceCreatedEvent;
import com.dedicatedcode.reitti.event.VisitUpdatedEvent;
import com.dedicatedcode.reitti.model.*;
import com.dedicatedcode.reitti.repository.*;
import com.dedicatedcode.reitti.service.UserNotificationQueueService;
import com.dedicatedcode.reitti.service.UserNotificationService;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
@@ -37,7 +37,7 @@ public class VisitMergingService {
private final RawLocationPointJdbcService rawLocationPointJdbcService;
private final GeometryFactory geometryFactory;
private final RabbitTemplate rabbitTemplate;
private final UserNotificationQueueService userNotificationQueueService;
private final UserNotificationService userNotificationService;
private final long mergeThresholdSeconds;
private final long mergeThresholdMeters;
private final int searchRangeExtensionInHours;
@@ -50,7 +50,7 @@ public class VisitMergingService {
SignificantPlaceJdbcService significantPlaceJdbcService,
RawLocationPointJdbcService rawLocationPointJdbcService,
GeometryFactory geometryFactory,
UserNotificationQueueService userNotificationQueueService,
UserNotificationService userNotificationService,
@Value("${reitti.visit.merge-max-stay-search-extension-days:2}") int maxStaySearchExtensionInDays,
@Value("${reitti.visit.merge-threshold-seconds:300}") long mergeThresholdSeconds,
@Value("${reitti.visit.merge-threshold-meters:100}") long mergeThresholdMeters) {
@@ -61,7 +61,7 @@ public class VisitMergingService {
this.significantPlaceJdbcService = significantPlaceJdbcService;
this.rawLocationPointJdbcService = rawLocationPointJdbcService;
this.geometryFactory = geometryFactory;
this.userNotificationQueueService = userNotificationQueueService;
this.userNotificationService = userNotificationService;
this.mergeThresholdSeconds = mergeThresholdSeconds;
this.mergeThresholdMeters = mergeThresholdMeters;
this.searchRangeExtensionInHours = maxStaySearchExtensionInDays * 24;
@@ -123,7 +123,7 @@ public class VisitMergingService {
logger.debug("Processed [{}] visits into [{}] merged visits for user: [{}]",
allVisits.size(), processedVisits.size(), user.getUsername());
this.userNotificationQueueService.newVisits(user, processedVisits);
this.userNotificationService.newVisits(user, processedVisits);
}
@@ -219,7 +219,7 @@ public class VisitMergingService {
private SignificantPlace createSignificantPlace(User user, Visit visit) {
Point point = geometryFactory.createPoint(new Coordinate(visit.getLongitude(), visit.getLatitude()));
SignificantPlace significantPlace = SignificantPlace.create(visit.getLatitude(), visit.getLongitude(), point);
SignificantPlace significantPlace = SignificantPlace.create(visit.getLatitude(), visit.getLongitude());
significantPlace = this.significantPlaceJdbcService.create(user, significantPlace);
publishSignificantPlaceCreatedEvent(significantPlace);
return significantPlace;

View File

@@ -10,3 +10,8 @@ reitti.events.concurrency=4-12
reitti.import.batch-size=1000
spring.thymeleaf.cache=false
reitti.server.advertise-uri=http://localhost:8080
reitti.import.processing-idle-start-time=5

View File

@@ -22,6 +22,8 @@ spring.security.oauth2.client.registration.oauth.client-secret=${OIDC_CLIENT_SEC
spring.security.oauth2.client.provider.oauth.issuer-uri=${OIDC_ISSUER_URI:}
spring.security.oauth2.client.registration.oauth.scope=${OIDC_SCOPE:openid,profile}
reitti.server.advertise-uri=${ADVERTISE_URI:}
reitti.data-management.enabled=${DANGEROUS_LIFE:false}
reitti.import.processing-idle-start-time=${PROCESSING_WAIT_TIME:15}

View File

@@ -46,11 +46,14 @@ spring.servlet.multipart.max-file-size=5GB
spring.servlet.multipart.max-request-size=5GB
server.tomcat.max-part-count=100
# Application specific settings
reitti.server.advertise-uri=
# OAuth configuration
# For now, we only support having one OIDC provider. If you need multiple, create a ticket in the reitti github.
reitti.security.oidc.enabled=false
# Application specific settings
reitti.import.batch-size=1000
# How many seconds should we wait after the last data input before starting to process all unprocessed data?
reitti.import.processing-idle-start-time=15

View File

@@ -105,10 +105,5 @@
</div>
</div>
</div>
<style>
</style>
</body>
</html>

View File

@@ -215,33 +215,57 @@
path.remove();
}
for (const element of timelineContainer) {
let bounds = L.latLngBounds();
const fetchPromises = [];
for (let i = 0; i < timelineContainer.length; i++) {
const element = timelineContainer[i];
const rawLocationPointsUrl = element?.dataset.rawLocationPointsUrl;
const color = element?.dataset.baseColor;
if (rawLocationPointsUrl) {
// Fetch raw location points for map display
fetch(rawLocationPointsUrl).then(response => {
// Create fetch promise for raw location points with index to maintain order
const fetchPromise = fetch(rawLocationPointsUrl).then(response => {
if (!response.ok) {
console.warn('Could not fetch raw location points');
return { points: [] };
return { points: [], index: i, color: color };
}
return response.json();
}).then(rawPointsData => {
updateMapWithRawPoints(rawPointsData, color);
}).then(() => {
window.originalBounds = bounds;
map.fitBounds(bounds, fitToBoundsConfig)
return { ...rawPointsData, index: i, color: color };
}).catch(error => {
console.warn('Error fetching raw location points:', error);
return { points: [], index: i, color: color }; // Return empty data with index on error
});
fetchPromises.push(fetchPromise);
}
}
// Wait for all fetch operations to complete, then update map in correct order
Promise.all(fetchPromises).then(results => {
// Sort results by original index to maintain order
results.sort((a, b) => a.index - b.index);
// Process results in order
results.forEach(result => {
const fetchBounds = updateMapWithRawPoints(result, result.color);
if (fetchBounds.isValid()) {
bounds.extend(fetchBounds);
}
});
// Update map bounds after all fetch operations are complete
if (bounds.isValid()) {
window.originalBounds = bounds;
map.fitBounds(bounds, fitToBoundsConfig);
}
});
}
// Function to update map with raw location points
function updateMapWithRawPoints(rawPointsData, color) {
const bounds = L.latLngBounds();
const rawPointsPath = L.geodesic([], {
color: color == null ? '#f1ba63' : color,
weight: 6,
@@ -264,6 +288,8 @@
addPulsatingMarker(latestPoint.latitude, latestPoint.longitude, color);
}
}
return bounds;
}
const selectedPath = L.geodesic([], {
@@ -282,7 +308,6 @@
document.body.addEventListener('htmx:afterSwap', function(event) {
if (event.detail.target.classList.contains('timeline-container')) {
// Timeline content has been updated, update map markers
bounds = L.latLngBounds()
loadTimelineData(getSelectedDate())
updateMapFromTimeline();
// Initialize scroll indicator after timeline is updated
@@ -293,9 +318,10 @@
window.timelineScrollIndicator.init();
}
});
let bounds = L.latLngBounds();
// Function to update map markers from timeline entries
function updateMapFromTimeline() {
const bounds = L.latLngBounds();
// Clear existing markers and paths (except tile layer)
map.eachLayer(layer => {
if (!layer._url) {
@@ -303,8 +329,6 @@
}
});
window.originalBounds = L.latLngBounds();
let hasValidCoords = false;
// Group places by coordinates to avoid duplicate markers
@@ -404,6 +428,7 @@
});
}
return bounds;
}
// Helper function to parse duration text (simple implementation)
@@ -688,7 +713,6 @@
// Schedule reload after 5 seconds of idle time
reloadTimeoutId = setTimeout(() => {
debugger
if (autoUpdateMode && pendingEvents.length > 0) {
console.log(`Auto-update: Reloading timeline data after ${pendingEvents.length} accumulated events`);
document.body.dispatchEvent(new CustomEvent('dateChanged'));

View File

@@ -1,5 +1,6 @@
package com.dedicatedcode.reitti.repository;
import com.dedicatedcode.reitti.IntegrationTest;
import com.dedicatedcode.reitti.TestingService;
import com.dedicatedcode.reitti.model.GeocodingResponse;
import com.dedicatedcode.reitti.model.SignificantPlace;
@@ -17,7 +18,7 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@IntegrationTest
@ActiveProfiles("test")
@Transactional
class GeocodingResponseJdbcServiceTest {
@@ -26,7 +27,6 @@ class GeocodingResponseJdbcServiceTest {
private GeocodingResponseJdbcService geocodingResponseJdbcService;
@Autowired
private SignificantPlaceJdbcService placeService;
@Autowired
private GeometryFactory geometryFactory;
@@ -40,7 +40,7 @@ class GeocodingResponseJdbcServiceTest {
double latitudeCentroid = 53.863149;
double longitudeCentroid = 10.700927;
Point point = geometryFactory.createPoint(new Coordinate(longitudeCentroid, latitudeCentroid));
SignificantPlace place = placeService.create(testingService.admin(), SignificantPlace.create(latitudeCentroid, longitudeCentroid, point));
SignificantPlace place = placeService.create(testingService.admin(), SignificantPlace.create(latitudeCentroid, longitudeCentroid));
GeocodingResponse response = new GeocodingResponse(
place.getId(),
@@ -72,7 +72,7 @@ class GeocodingResponseJdbcServiceTest {
double latitudeCentroid = 53.863149;
double longitudeCentroid = 10.700927;
Point point = geometryFactory.createPoint(new Coordinate(longitudeCentroid, latitudeCentroid));
SignificantPlace place = placeService.create(testingService.admin(), SignificantPlace.create(latitudeCentroid, longitudeCentroid, point));
SignificantPlace place = placeService.create(testingService.admin(), SignificantPlace.create(latitudeCentroid, longitudeCentroid));
// When
List<GeocodingResponse> found = geocodingResponseJdbcService.findBySignificantPlace(place);
@@ -88,7 +88,7 @@ class GeocodingResponseJdbcServiceTest {
double latitudeCentroid = 53.863149;
double longitudeCentroid = 10.700927;
Point point = geometryFactory.createPoint(new Coordinate(longitudeCentroid, latitudeCentroid));
SignificantPlace place = placeService.create(testingService.admin(), SignificantPlace.create(latitudeCentroid, longitudeCentroid, point));
SignificantPlace place = placeService.create(testingService.admin(), SignificantPlace.create(latitudeCentroid, longitudeCentroid));
GeocodingResponse response = new GeocodingResponse(
place.getId(),

View File

@@ -124,7 +124,6 @@ class SignificantPlaceJdbcServiceTest {
"DE",
53.863149,
10.700927,
created.getGeom(),
SignificantPlace.PlaceType.RESTAURANT,
true,
created.getVersion()
@@ -207,7 +206,6 @@ class SignificantPlaceJdbcServiceTest {
"DE",
created1.getLatitudeCentroid(),
created1.getLongitudeCentroid(),
created1.getGeom(),
SignificantPlace.PlaceType.HOME,
true, // geocoded = true
created1.getVersion()
@@ -254,7 +252,6 @@ class SignificantPlaceJdbcServiceTest {
}
private SignificantPlace createTestPlace(String name, double latitude, double longitude) {
Point point = geometryFactory.createPoint(new Coordinate(longitude, latitude));
return new SignificantPlace(
null,
name,
@@ -262,7 +259,6 @@ class SignificantPlaceJdbcServiceTest {
null,
latitude,
longitude,
point,
SignificantPlace.PlaceType.OTHER,
false,
0L
@@ -270,7 +266,6 @@ class SignificantPlaceJdbcServiceTest {
}
private SignificantPlace createTestPlaceForUser(User user, String name, double latitude, double longitude) {
Point point = geometryFactory.createPoint(new Coordinate(longitude, latitude));
return new SignificantPlace(
null,
name,
@@ -278,7 +273,6 @@ class SignificantPlaceJdbcServiceTest {
null,
latitude,
longitude,
point,
SignificantPlace.PlaceType.OTHER,
false,
0L

View File

@@ -59,7 +59,7 @@ class DefaultGeocodeServiceManagerTest {
.thenReturn(Collections.emptyList());
// When
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(53.863149, 10.700927, null));
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(53.863149, 10.700927));
// Then
assertThat(result).isEmpty();
@@ -100,7 +100,7 @@ class DefaultGeocodeServiceManagerTest {
.thenReturn(mockResponse);
// When
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude, null));
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude));
// Then
assertThat(result).isPresent();
@@ -135,7 +135,7 @@ class DefaultGeocodeServiceManagerTest {
.thenReturn(mockResponse);
// When
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude, null));
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude));
// Then
assertThat(result).isPresent();
@@ -217,7 +217,7 @@ class DefaultGeocodeServiceManagerTest {
.thenReturn(mockResponse);
// When
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude, null));
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude));
// Then
assertThat(result).isPresent();
@@ -269,7 +269,7 @@ class DefaultGeocodeServiceManagerTest {
.thenReturn(photonResponse);
// When
Optional<GeocodeResult> result = managerWithFixedService.reverseGeocode(SignificantPlace.create(latitude, longitude, null));
Optional<GeocodeResult> result = managerWithFixedService.reverseGeocode(SignificantPlace.create(latitude, longitude));
// Then
assertThat(result).isPresent();
@@ -300,7 +300,7 @@ class DefaultGeocodeServiceManagerTest {
.thenThrow(new RuntimeException("Service unavailable"));
// When
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude, null));
Optional<GeocodeResult> result = geocodeServiceManager.reverseGeocode(SignificantPlace.create(latitude, longitude));
// Then
assertThat(result).isEmpty();

View File

@@ -130,7 +130,7 @@ public class ProcessingPipelineTest {
private static void assertVisit(ProcessedVisit processedVisit, String startTime, String endTime, GeoPoint location) {
assertEquals(Instant.parse(startTime), processedVisit.getStartTime());
assertEquals(Instant.parse(endTime), processedVisit.getEndTime());
GeoPoint currentLocation = GeoPoint.from(processedVisit.getPlace().getGeom());
GeoPoint currentLocation = new GeoPoint(processedVisit.getPlace().getLatitudeCentroid(), processedVisit.getPlace().getLongitudeCentroid());
assertTrue(location.near(currentLocation), "Locations are not near to each other. \nExpected [" + currentLocation + "] to be in range \nto [" + location + "]");
}
@@ -146,11 +146,11 @@ public class ProcessingPipelineTest {
assertEquals(Instant.parse(startTime), trip.getStartTime());
assertEquals(Instant.parse(endTime), trip.getEndTime());
GeoPoint actualStartLocation = GeoPoint.from(trip.getStartVisit().getPlace().getGeom());
GeoPoint actualStartLocation = GeoPoint.from(trip.getStartVisit().getPlace().getLatitudeCentroid(), trip.getStartVisit().getPlace().getLongitudeCentroid());
assertTrue(startLocation.near(actualStartLocation),
"Start locations are not near to each other. \nExpected [" + actualStartLocation + "] to be in range \nto [" + startLocation + "]");
GeoPoint actualEndLocation = GeoPoint.from(trip.getEndVisit().getPlace().getGeom());
GeoPoint actualEndLocation = GeoPoint.from(trip.getEndVisit().getPlace().getLatitudeCentroid(), trip.getEndVisit().getPlace().getLongitudeCentroid());
assertTrue(endLocation.near(actualEndLocation),
"End locations are not near to each other. \nExpected [" + actualEndLocation + "] to be in range \nto [" + endLocation + "]");
}