Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ yarn.lock
*.DS_Store
*.pem
firebase-admin-service-account.json
dumps/
# Firebase secrets
secrets/
3 changes: 3 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ module.exports = {
roots: ["src"],
maxWorkers: 1,
verbose: true,
moduleNameMapper: {
"^firebase-admin/(.*)$": "<rootDir>/node_modules/firebase-admin/lib/$1/index.js",
},
};
39 changes: 39 additions & 0 deletions src/api/controllers/EventController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import {
CurrentUser,
Get,
JsonController,
Param,
QueryParam,
} from "routing-controllers";

import { UserModel } from "../../models/UserModel";
import { EventTagModel } from "../../models/EventTagModel";
import { EventService } from "../../services/EventService";
import { EventPostSource, GetEventPostsResponse } from "../../types";

@JsonController("event/")
export class EventController {
private eventService: EventService;

constructor(eventService: EventService) {
this.eventService = eventService;
}

@Get("available-for-tagging/")
async getAvailableEventTags(
@CurrentUser() user: UserModel,
): Promise<EventTagModel[]> {
return this.eventService.getAvailableEventTags();
}

@Get(":eventTagId/posts/")
async getEventPosts(
@CurrentUser() user: UserModel,
@Param("eventTagId") eventTagId: string,
@QueryParam("page", { required: false }) page: number = 1,
@QueryParam("limit", { required: false }) limit: number = 10,
@QueryParam("source", { required: false }) source?: EventPostSource,
): Promise<GetEventPostsResponse> {
return this.eventService.getEventPosts(user, eventTagId, page, limit, source);
}
}
2 changes: 2 additions & 0 deletions src/api/controllers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AuthController } from './AuthController';
import { AvailabilityController } from './AvailabilityController';
import { EventController } from './EventController';
import { FeedbackController } from './FeedbackController';
import { ImageController } from './ImageController';
import { PostController } from './PostController';
Expand All @@ -19,6 +20,7 @@ export const controllers = [
AuthTokenController,
AvailabilityController,
ChatController,
EventController,
FeedbackController,
ImageController,
NotifController,
Expand Down
2 changes: 2 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import resellConnection from './utils/DB';
import { ReportService } from './services/ReportService';
import { reportToString } from './utils/Requests';
import { startTransactionConfirmationCron } from './cron/transactionCron';
import { startEventSimilarityCron } from './cron/eventSimilarityCron';

// Setup dependency injection containers
routingUseContainer(Container);
Expand Down Expand Up @@ -199,6 +200,7 @@ async function main() {
console.log(`Resell backend bartering on http://localhost:${port}`);

startTransactionConfirmationCron();
startEventSimilarityCron();
});
}

Expand Down
43 changes: 43 additions & 0 deletions src/cron/eventSimilarityCron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import cron from 'node-cron';
import { getManager } from 'typeorm';
import { EventPostService } from '../services/EventPostService';
import { EventTagRepository } from '../repositories/EventTagRepository';

/**
* Hourly cron that reprocesses similarity for every event tag.
* For each tag it computes a centroid from user-tagged post embeddings
* and upserts similar posts into the eventPosts table.
*/
Comment on lines +6 to +10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine but I'm curious why a cron job is needed here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reprocessing similarity on all posts in an event every time a post is added or edited, we just reprocess similarity every hour for every event if that makes sense?

export function startEventSimilarityCron() {
cron.schedule('0 * * * *', async () => {
console.log('[CRON] Starting event similarity processing...');

try {
const entityManager = getManager();
const eventTagRepo = entityManager.getCustomRepository(EventTagRepository);
const eventTags = await eventTagRepo.getAllEventTags();

if (eventTags.length === 0) {
console.log('[CRON] No event tags found, skipping similarity processing.');
return;
}

const eventPostService = new EventPostService(entityManager);

for (const tag of eventTags) {
try {
await eventPostService.processEventSimilarity(tag.id);
console.log(`[CRON] Similarity processed for event "${tag.name}" (${tag.id})`);
} catch (err) {
console.error(`[CRON] Error processing similarity for event "${tag.name}" (${tag.id}):`, err);
}
}

console.log(`[CRON] Event similarity processing complete for ${eventTags.length} event(s).`);
} catch (error) {
console.error('[CRON] Fatal error in event similarity cron:', error);
}
});

console.log('[CRON] Event similarity cron job started (runs every hour at :00)');
}
39 changes: 39 additions & 0 deletions src/migrations/1769700000000-CreateEventPost.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class CreateEventPost1769700000000 implements MigrationInterface {
name = "CreateEventPost1769700000000";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE IF NOT EXISTS "eventPosts" (
"id" uuid NOT NULL DEFAULT uuid_generate_v4(),
"postId" uuid NOT NULL,
"eventTagId" uuid NOT NULL,
"source" character varying(20) NOT NULL,
"relevanceScore" float DEFAULT NULL,
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT now(),
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT "PK_eventPosts" PRIMARY KEY ("id"),
CONSTRAINT "UQ_eventPost_postId_eventTagId" UNIQUE ("postId", "eventTagId"),
CONSTRAINT "FK_eventPost_postId" FOREIGN KEY ("postId")
REFERENCES "Post"("id") ON DELETE CASCADE,
CONSTRAINT "FK_eventPost_eventTagId" FOREIGN KEY ("eventTagId")
REFERENCES "EventTag"("id") ON DELETE CASCADE
)
`);

await queryRunner.query(`
CREATE INDEX "IDX_eventPost_eventTagId" ON "eventPosts" ("eventTagId")
`);

await queryRunner.query(`
CREATE INDEX "IDX_eventPost_postId" ON "eventPosts" ("postId")
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_eventPost_postId"`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_eventPost_eventTagId"`);
await queryRunner.query(`DROP TABLE IF EXISTS "eventPosts"`);
}
}
49 changes: 49 additions & 0 deletions src/models/EventPostModel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {
Column,
CreateDateColumn,
Entity,
JoinColumn,
ManyToOne,
PrimaryGeneratedColumn,
Unique,
UpdateDateColumn,
} from "typeorm";
import { Uuid } from "../types";
import { PostModel } from "./PostModel";
import { EventTagModel } from "./EventTagModel";

export type EventPostSource = "user" | "similarity" | "nlp_context";

@Entity("eventPosts")
// Enforces one row per post per event, so ML layer can't insert another row if event already user-tagged
@Unique(["postId", "eventTagId"])
export class EventPostModel {
@PrimaryGeneratedColumn("uuid")
id: Uuid;

@Column()
postId: Uuid;

@ManyToOne(() => PostModel, { onDelete: "CASCADE" })
@JoinColumn({ name: "postId" })
post: PostModel;

@Column()
eventTagId: Uuid;

@ManyToOne(() => EventTagModel, { onDelete: "CASCADE" })
@JoinColumn({ name: "eventTagId" })
eventTag: EventTagModel;

@Column({ type: "varchar", length: 20 })
source: EventPostSource;

@Column({ type: "float", nullable: true, default: null })
relevanceScore: number | null;

@CreateDateColumn({ type: "timestamptz" })
createdAt: Date;

@UpdateDateColumn({ type: "timestamptz" })
updatedAt: Date;
}
2 changes: 1 addition & 1 deletion src/models/EventTagModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class EventTagModel {

@Column()
name: string;

@ManyToMany(() => PostModel, (post) => post.eventTags)
posts: PostModel[];

Expand Down
2 changes: 2 additions & 0 deletions src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { FcmTokenModel } from "./FcmTokenModel";
import { CategoryModel } from "./CategoryModel";
import { SearchModel } from "./SearchModel";
import { EventTagModel } from "./EventTagModel";
import { EventPostModel } from "./EventPostModel";

export const models = [
FeedbackModel,
Expand All @@ -28,4 +29,5 @@ export const models = [
CategoryModel,
SearchModel,
EventTagModel,
EventPostModel,
];
154 changes: 154 additions & 0 deletions src/repositories/EventPostRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { AbstractRepository, EntityRepository } from 'typeorm';

import { EventPostModel, EventPostSource } from '../models/EventPostModel';

@EntityRepository(EventPostModel)
export class EventPostRepository extends AbstractRepository<EventPostModel> {

/**
* Create or update a post-event relationship.
*/
public async upsertRelationship(
postId: string,
eventTagId: string,
source: EventPostSource,
relevanceScore: number | null,
): Promise<EventPostModel> {
const existing = await this.repository
.createQueryBuilder("epr")
.where("epr.postId = :postId", { postId })
.andWhere("epr.eventTagId = :eventTagId", { eventTagId })
.getOne();

// User source takes priority (an existing user tag is never overwritten by an ML source)
if (existing) {
if (existing.source === 'user' && source !== 'user') {
return existing;
}
existing.source = source;
existing.relevanceScore = relevanceScore;
return await this.repository.save(existing);
}

const relationship = this.repository.create({ postId, eventTagId, source, relevanceScore });
return await this.repository.save(relationship);
}
Comment on lines +11 to +35
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition in read-modify-write pattern.

Two concurrent calls with the same (postId, eventTagId) can both read existing = null, then both attempt to insert, causing a unique constraint violation and an unhandled exception.

Use TypeORM's upsert or a raw ON CONFLICT clause to make this atomic:

🔧 Proposed fix using TypeORM upsert
  public async upsertRelationship(
    postId: string,
    eventTagId: string,
    source: EventPostSource,
    relevanceScore: number | null,
  ): Promise<EventPostModel> {
-   const existing = await this.repository
-     .createQueryBuilder("epr")
-     .where("epr.postId = :postId", { postId })
-     .andWhere("epr.eventTagId = :eventTagId", { eventTagId })
-     .getOne();
-
-   // User source takes priority (an existing user tag is never overwritten by an ML source)
-   if (existing) {
-     if (existing.source === 'user' && source !== 'user') {
-       return existing;
-     }
-     existing.source = source;
-     existing.relevanceScore = relevanceScore;
-     return await this.repository.save(existing);
-   }
-
-   const relationship = this.repository.create({ postId, eventTagId, source, relevanceScore });
-   return await this.repository.save(relationship);
+   // Atomic upsert with source precedence: user tags are never overwritten by ML sources
+   await this.repository
+     .createQueryBuilder()
+     .insert()
+     .into(EventPostModel)
+     .values({ postId, eventTagId, source, relevanceScore })
+     .orUpdate(
+       ['source', 'relevanceScore', 'updatedAt'],
+       ['postId', 'eventTagId'],
+       { skipUpdateIfNoValuesChanged: true }
+     )
+     // Preserve user source: only update if existing source != 'user' OR incoming source = 'user'
+     .setParameter('source', source)
+     .execute();
+
+   // For user-source precedence, use raw SQL with conditional update:
+   // ON CONFLICT (postId, eventTagId) DO UPDATE SET source = :source, relevanceScore = :relevanceScore
+   // WHERE eventPosts.source != 'user' OR :source = 'user'
+
+   return await this.repository.findOneOrFail({ where: { postId, eventTagId } });
  }

Note: The conditional "user source takes priority" logic requires a WHERE clause in the ON CONFLICT DO UPDATE, which may need raw SQL depending on your TypeORM version.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/repositories/EventPostRepository.ts` around lines 11 - 35, The
upsertRelationship method has a race between the read (existing) and insert that
causes unique constraint failures; replace the read-modify-write with an atomic
upsert using TypeORM's upsert or a raw INSERT ... ON CONFLICT to perform
insert-or-update in one DB statement, and encode the "user source takes
priority" rule into the conflict update (e.g., ON CONFLICT (postId,eventTagId)
DO UPDATE SET source = EXCLUDED.source, relevanceScore = EXCLUDED.relevanceScore
WHERE NOT (existing.source = 'user' AND EXCLUDED.source <> 'user')); use the
repository.upsert or repository.createQueryBuilder().insert().onConflict(...)
targeting the same table/entity as this.repository and ensure the method returns
the persisted EventPostModel (fetching the row after upsert if necessary).


/**
* Get all user-tagged relationships for an event, with the post (including embedding) loaded.
* Used as anchor points for similarity processing.
*/
public async getUserTaggedPostsForEvent(eventTagId: string): Promise<EventPostModel[]> {
return await this.repository
.createQueryBuilder("epr")
.leftJoinAndSelect("epr.post", "post")
.where("epr.eventTagId = :eventTagId", { eventTagId })
.andWhere("epr.source = 'user'")
.getMany();
}

/**
* Delete all relationships for an event matching a given source.
* Called before reprocessing to clear stale ML results.
*/
public async deleteRelationshipsBySourceForEvent(
eventTagId: string,
source: EventPostSource,
): Promise<void> {
await this.repository.delete({ eventTagId, source });
}

/**
* Delete a specific post-event relationship.
* Called when a user removes an event tag from their post.
*/
public async deleteRelationship(postId: string, eventTagId: string): Promise<void> {
await this.repository.delete({ postId, eventTagId });
}

/**
* Get paginated posts for an event, optionally filtered by source.
* Ordered by layer priority: user-tagged (by post recency) first,
* then similarity (by relevance score).
*/
private static readonly SOURCE_PRIORITY_EXPR =
`CASE "epr"."source" WHEN 'user' THEN 0 WHEN 'similarity' THEN 1 ELSE 2 END`;
private static readonly SCORE_OR_RECENCY_EXPR =
`CASE WHEN "epr"."source" = 'user' THEN EXTRACT(EPOCH FROM "post"."created") ELSE "epr"."relevanceScore" END`;

public async getPostsForEvent(
eventTagId: string,
requestingUserId: string,
source?: EventPostSource,
skip: number = 0,
limit: number = 10,
): Promise<EventPostModel[]> {
const qb = this.repository
.createQueryBuilder("epr")
.select("epr.id")
.addSelect(EventPostRepository.SOURCE_PRIORITY_EXPR, "source_priority")
.addSelect(EventPostRepository.SCORE_OR_RECENCY_EXPR, "score_or_recency")
.innerJoin("epr.post", "post")
.innerJoin("post.user", "author")
.where("epr.eventTagId = :eventTagId", { eventTagId })
.andWhere("author.isActive = true")
.andWhere(
`author.firebaseUid NOT IN (SELECT "blocking" FROM "userBlockingUsers" WHERE "blockers" = :requestingUserId)`,
{ requestingUserId },
);

if (source) {
qb.andWhere("epr.source = :source", { source });
}

qb
.orderBy("source_priority", "ASC")
.addOrderBy("score_or_recency", "DESC", "NULLS LAST")
.skip(skip)
.take(limit);

const eprIds = await qb.getMany();
const ids = eprIds.map((e) => e.id);
if (ids.length === 0) return [];

return await this.repository
.createQueryBuilder("epr")
.leftJoinAndSelect("epr.post", "post")
.leftJoinAndSelect("post.user", "user")
.leftJoinAndSelect("post.categories", "categories")
.leftJoinAndSelect("post.eventTags", "eventTags")
.addSelect(EventPostRepository.SOURCE_PRIORITY_EXPR, "source_priority")
.addSelect(EventPostRepository.SCORE_OR_RECENCY_EXPR, "score_or_recency")
.where("epr.id IN (:...ids)", { ids })
.orderBy("source_priority", "ASC")
.addOrderBy("score_or_recency", "DESC", "NULLS LAST")
.getMany();
}

/**
* Count posts for an event, optionally filtered by source.
* Used for pagination totals.
*/
public async getPostCountForEvent(
eventTagId: string,
requestingUserId: string,
source?: EventPostSource,
): Promise<number> {
const qb = this.repository
.createQueryBuilder("epr")
.innerJoin("epr.post", "post")
.innerJoin("post.user", "author")
.where("epr.eventTagId = :eventTagId", { eventTagId })
.andWhere("author.isActive = true")
.andWhere(
`author.firebaseUid NOT IN (SELECT "blocking" FROM "userBlockingUsers" WHERE "blockers" = :requestingUserId)`,
{ requestingUserId },
);

if (source) {
qb.andWhere("epr.source = :source", { source });
}

return await qb.getCount();
}
}
Loading
Loading