-
Notifications
You must be signed in to change notification settings - Fork 0
Lauren/event similarity #173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a91b9db
1e186b4
3f6e893
b22aa9c
0fea8e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,5 +7,6 @@ yarn.lock | |
| *.DS_Store | ||
| *.pem | ||
| firebase-admin-service-account.json | ||
| dumps/ | ||
| # Firebase secrets | ||
| secrets/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| import { | ||
| CurrentUser, | ||
| Get, | ||
| JsonController, | ||
| Param, | ||
| QueryParam, | ||
| } from "routing-controllers"; | ||
|
|
||
| import { UserModel } from "../../models/UserModel"; | ||
| import { EventTagModel } from "../../models/EventTagModel"; | ||
| import { EventService } from "../../services/EventService"; | ||
| import { EventPostSource, GetEventPostsResponse } from "../../types"; | ||
|
|
||
| @JsonController("event/") | ||
| export class EventController { | ||
| private eventService: EventService; | ||
|
|
||
| constructor(eventService: EventService) { | ||
| this.eventService = eventService; | ||
| } | ||
|
|
||
| @Get("available-for-tagging/") | ||
| async getAvailableEventTags( | ||
| @CurrentUser() user: UserModel, | ||
| ): Promise<EventTagModel[]> { | ||
| return this.eventService.getAvailableEventTags(); | ||
| } | ||
|
|
||
| @Get(":eventTagId/posts/") | ||
| async getEventPosts( | ||
| @CurrentUser() user: UserModel, | ||
| @Param("eventTagId") eventTagId: string, | ||
| @QueryParam("page", { required: false }) page: number = 1, | ||
| @QueryParam("limit", { required: false }) limit: number = 10, | ||
| @QueryParam("source", { required: false }) source?: EventPostSource, | ||
| ): Promise<GetEventPostsResponse> { | ||
| return this.eventService.getEventPosts(user, eventTagId, page, limit, source); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| import cron from 'node-cron'; | ||
| import { getManager } from 'typeorm'; | ||
| import { EventPostService } from '../services/EventPostService'; | ||
| import { EventTagRepository } from '../repositories/EventTagRepository'; | ||
|
|
||
| /** | ||
| * Hourly cron that reprocesses similarity for every event tag. | ||
| * For each tag it computes a centroid from user-tagged post embeddings | ||
| * and upserts similar posts into the eventPosts table. | ||
| */ | ||
| export function startEventSimilarityCron() { | ||
| cron.schedule('0 * * * *', async () => { | ||
| console.log('[CRON] Starting event similarity processing...'); | ||
|
|
||
| try { | ||
| const entityManager = getManager(); | ||
| const eventTagRepo = entityManager.getCustomRepository(EventTagRepository); | ||
| const eventTags = await eventTagRepo.getAllEventTags(); | ||
|
|
||
| if (eventTags.length === 0) { | ||
| console.log('[CRON] No event tags found, skipping similarity processing.'); | ||
| return; | ||
| } | ||
|
|
||
| const eventPostService = new EventPostService(entityManager); | ||
|
|
||
| for (const tag of eventTags) { | ||
| try { | ||
| await eventPostService.processEventSimilarity(tag.id); | ||
| console.log(`[CRON] Similarity processed for event "${tag.name}" (${tag.id})`); | ||
| } catch (err) { | ||
| console.error(`[CRON] Error processing similarity for event "${tag.name}" (${tag.id}):`, err); | ||
| } | ||
| } | ||
|
|
||
| console.log(`[CRON] Event similarity processing complete for ${eventTags.length} event(s).`); | ||
| } catch (error) { | ||
| console.error('[CRON] Fatal error in event similarity cron:', error); | ||
| } | ||
| }); | ||
|
|
||
| console.log('[CRON] Event similarity cron job started (runs every hour at :00)'); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| import { MigrationInterface, QueryRunner } from "typeorm"; | ||
|
|
||
| export class CreateEventPost1769700000000 implements MigrationInterface { | ||
| name = "CreateEventPost1769700000000"; | ||
|
|
||
| public async up(queryRunner: QueryRunner): Promise<void> { | ||
| await queryRunner.query(` | ||
| CREATE TABLE IF NOT EXISTS "eventPosts" ( | ||
| "id" uuid NOT NULL DEFAULT uuid_generate_v4(), | ||
| "postId" uuid NOT NULL, | ||
| "eventTagId" uuid NOT NULL, | ||
| "source" character varying(20) NOT NULL, | ||
| "relevanceScore" float DEFAULT NULL, | ||
| "createdAt" TIMESTAMPTZ NOT NULL DEFAULT now(), | ||
| "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT now(), | ||
| CONSTRAINT "PK_eventPosts" PRIMARY KEY ("id"), | ||
| CONSTRAINT "UQ_eventPost_postId_eventTagId" UNIQUE ("postId", "eventTagId"), | ||
| CONSTRAINT "FK_eventPost_postId" FOREIGN KEY ("postId") | ||
| REFERENCES "Post"("id") ON DELETE CASCADE, | ||
| CONSTRAINT "FK_eventPost_eventTagId" FOREIGN KEY ("eventTagId") | ||
| REFERENCES "EventTag"("id") ON DELETE CASCADE | ||
| ) | ||
| `); | ||
|
|
||
| await queryRunner.query(` | ||
| CREATE INDEX "IDX_eventPost_eventTagId" ON "eventPosts" ("eventTagId") | ||
| `); | ||
|
|
||
| await queryRunner.query(` | ||
| CREATE INDEX "IDX_eventPost_postId" ON "eventPosts" ("postId") | ||
| `); | ||
| } | ||
|
|
||
| public async down(queryRunner: QueryRunner): Promise<void> { | ||
| await queryRunner.query(`DROP INDEX IF EXISTS "IDX_eventPost_postId"`); | ||
| await queryRunner.query(`DROP INDEX IF EXISTS "IDX_eventPost_eventTagId"`); | ||
| await queryRunner.query(`DROP TABLE IF EXISTS "eventPosts"`); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| import { | ||
| Column, | ||
| CreateDateColumn, | ||
| Entity, | ||
| JoinColumn, | ||
| ManyToOne, | ||
| PrimaryGeneratedColumn, | ||
| Unique, | ||
| UpdateDateColumn, | ||
| } from "typeorm"; | ||
| import { Uuid } from "../types"; | ||
| import { PostModel } from "./PostModel"; | ||
| import { EventTagModel } from "./EventTagModel"; | ||
|
|
||
| export type EventPostSource = "user" | "similarity" | "nlp_context"; | ||
|
|
||
| @Entity("eventPosts") | ||
| // Enforces one row per post per event, so ML layer can't insert another row if event already user-tagged | ||
| @Unique(["postId", "eventTagId"]) | ||
| export class EventPostModel { | ||
| @PrimaryGeneratedColumn("uuid") | ||
| id: Uuid; | ||
|
|
||
| @Column() | ||
| postId: Uuid; | ||
|
|
||
| @ManyToOne(() => PostModel, { onDelete: "CASCADE" }) | ||
| @JoinColumn({ name: "postId" }) | ||
| post: PostModel; | ||
|
|
||
| @Column() | ||
| eventTagId: Uuid; | ||
|
|
||
| @ManyToOne(() => EventTagModel, { onDelete: "CASCADE" }) | ||
| @JoinColumn({ name: "eventTagId" }) | ||
| eventTag: EventTagModel; | ||
|
|
||
| @Column({ type: "varchar", length: 20 }) | ||
| source: EventPostSource; | ||
|
|
||
| @Column({ type: "float", nullable: true, default: null }) | ||
| relevanceScore: number | null; | ||
|
|
||
| @CreateDateColumn({ type: "timestamptz" }) | ||
| createdAt: Date; | ||
|
|
||
| @UpdateDateColumn({ type: "timestamptz" }) | ||
| updatedAt: Date; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,154 @@ | ||
| import { AbstractRepository, EntityRepository } from 'typeorm'; | ||
|
|
||
| import { EventPostModel, EventPostSource } from '../models/EventPostModel'; | ||
|
|
||
| @EntityRepository(EventPostModel) | ||
| export class EventPostRepository extends AbstractRepository<EventPostModel> { | ||
|
|
||
| /** | ||
| * Create or update a post-event relationship. | ||
| */ | ||
| public async upsertRelationship( | ||
| postId: string, | ||
| eventTagId: string, | ||
| source: EventPostSource, | ||
| relevanceScore: number | null, | ||
| ): Promise<EventPostModel> { | ||
| const existing = await this.repository | ||
| .createQueryBuilder("epr") | ||
| .where("epr.postId = :postId", { postId }) | ||
| .andWhere("epr.eventTagId = :eventTagId", { eventTagId }) | ||
| .getOne(); | ||
|
|
||
| // User source takes priority (an existing user tag is never overwritten by an ML source) | ||
| if (existing) { | ||
| if (existing.source === 'user' && source !== 'user') { | ||
| return existing; | ||
| } | ||
| existing.source = source; | ||
| existing.relevanceScore = relevanceScore; | ||
| return await this.repository.save(existing); | ||
| } | ||
|
|
||
| const relationship = this.repository.create({ postId, eventTagId, source, relevanceScore }); | ||
| return await this.repository.save(relationship); | ||
| } | ||
|
Comment on lines
+11
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Race condition in read-modify-write pattern. Two concurrent calls with the same Use TypeORM's 🔧 Proposed fix using TypeORM upsert public async upsertRelationship(
postId: string,
eventTagId: string,
source: EventPostSource,
relevanceScore: number | null,
): Promise<EventPostModel> {
- const existing = await this.repository
- .createQueryBuilder("epr")
- .where("epr.postId = :postId", { postId })
- .andWhere("epr.eventTagId = :eventTagId", { eventTagId })
- .getOne();
-
- // User source takes priority (an existing user tag is never overwritten by an ML source)
- if (existing) {
- if (existing.source === 'user' && source !== 'user') {
- return existing;
- }
- existing.source = source;
- existing.relevanceScore = relevanceScore;
- return await this.repository.save(existing);
- }
-
- const relationship = this.repository.create({ postId, eventTagId, source, relevanceScore });
- return await this.repository.save(relationship);
+ // Atomic upsert with source precedence: user tags are never overwritten by ML sources
+ await this.repository
+ .createQueryBuilder()
+ .insert()
+ .into(EventPostModel)
+ .values({ postId, eventTagId, source, relevanceScore })
+ .orUpdate(
+ ['source', 'relevanceScore', 'updatedAt'],
+ ['postId', 'eventTagId'],
+ { skipUpdateIfNoValuesChanged: true }
+ )
+ // Preserve user source: only update if existing source != 'user' OR incoming source = 'user'
+ .setParameter('source', source)
+ .execute();
+
+ // For user-source precedence, use raw SQL with conditional update:
+ // ON CONFLICT (postId, eventTagId) DO UPDATE SET source = :source, relevanceScore = :relevanceScore
+ // WHERE eventPosts.source != 'user' OR :source = 'user'
+
+ return await this.repository.findOneOrFail({ where: { postId, eventTagId } });
}Note: The conditional "user source takes priority" logic requires a 🤖 Prompt for AI Agents |
||
|
|
||
| /** | ||
| * Get all user-tagged relationships for an event, with the post (including embedding) loaded. | ||
| * Used as anchor points for similarity processing. | ||
| */ | ||
| public async getUserTaggedPostsForEvent(eventTagId: string): Promise<EventPostModel[]> { | ||
| return await this.repository | ||
| .createQueryBuilder("epr") | ||
| .leftJoinAndSelect("epr.post", "post") | ||
| .where("epr.eventTagId = :eventTagId", { eventTagId }) | ||
| .andWhere("epr.source = 'user'") | ||
| .getMany(); | ||
| } | ||
|
|
||
| /** | ||
| * Delete all relationships for an event matching a given source. | ||
| * Called before reprocessing to clear stale ML results. | ||
| */ | ||
| public async deleteRelationshipsBySourceForEvent( | ||
| eventTagId: string, | ||
| source: EventPostSource, | ||
| ): Promise<void> { | ||
| await this.repository.delete({ eventTagId, source }); | ||
| } | ||
|
|
||
| /** | ||
| * Delete a specific post-event relationship. | ||
| * Called when a user removes an event tag from their post. | ||
| */ | ||
| public async deleteRelationship(postId: string, eventTagId: string): Promise<void> { | ||
| await this.repository.delete({ postId, eventTagId }); | ||
| } | ||
|
|
||
| /** | ||
| * Get paginated posts for an event, optionally filtered by source. | ||
| * Ordered by layer priority: user-tagged (by post recency) first, | ||
| * then similarity (by relevance score). | ||
| */ | ||
| private static readonly SOURCE_PRIORITY_EXPR = | ||
| `CASE "epr"."source" WHEN 'user' THEN 0 WHEN 'similarity' THEN 1 ELSE 2 END`; | ||
| private static readonly SCORE_OR_RECENCY_EXPR = | ||
| `CASE WHEN "epr"."source" = 'user' THEN EXTRACT(EPOCH FROM "post"."created") ELSE "epr"."relevanceScore" END`; | ||
|
|
||
| public async getPostsForEvent( | ||
| eventTagId: string, | ||
| requestingUserId: string, | ||
| source?: EventPostSource, | ||
| skip: number = 0, | ||
| limit: number = 10, | ||
| ): Promise<EventPostModel[]> { | ||
| const qb = this.repository | ||
| .createQueryBuilder("epr") | ||
| .select("epr.id") | ||
| .addSelect(EventPostRepository.SOURCE_PRIORITY_EXPR, "source_priority") | ||
| .addSelect(EventPostRepository.SCORE_OR_RECENCY_EXPR, "score_or_recency") | ||
| .innerJoin("epr.post", "post") | ||
| .innerJoin("post.user", "author") | ||
| .where("epr.eventTagId = :eventTagId", { eventTagId }) | ||
| .andWhere("author.isActive = true") | ||
| .andWhere( | ||
| `author.firebaseUid NOT IN (SELECT "blocking" FROM "userBlockingUsers" WHERE "blockers" = :requestingUserId)`, | ||
| { requestingUserId }, | ||
| ); | ||
|
|
||
| if (source) { | ||
| qb.andWhere("epr.source = :source", { source }); | ||
| } | ||
|
|
||
| qb | ||
| .orderBy("source_priority", "ASC") | ||
| .addOrderBy("score_or_recency", "DESC", "NULLS LAST") | ||
| .skip(skip) | ||
| .take(limit); | ||
|
|
||
| const eprIds = await qb.getMany(); | ||
| const ids = eprIds.map((e) => e.id); | ||
| if (ids.length === 0) return []; | ||
|
|
||
| return await this.repository | ||
| .createQueryBuilder("epr") | ||
| .leftJoinAndSelect("epr.post", "post") | ||
| .leftJoinAndSelect("post.user", "user") | ||
| .leftJoinAndSelect("post.categories", "categories") | ||
| .leftJoinAndSelect("post.eventTags", "eventTags") | ||
| .addSelect(EventPostRepository.SOURCE_PRIORITY_EXPR, "source_priority") | ||
| .addSelect(EventPostRepository.SCORE_OR_RECENCY_EXPR, "score_or_recency") | ||
| .where("epr.id IN (:...ids)", { ids }) | ||
| .orderBy("source_priority", "ASC") | ||
| .addOrderBy("score_or_recency", "DESC", "NULLS LAST") | ||
| .getMany(); | ||
| } | ||
|
|
||
| /** | ||
| * Count posts for an event, optionally filtered by source. | ||
| * Used for pagination totals. | ||
| */ | ||
| public async getPostCountForEvent( | ||
| eventTagId: string, | ||
| requestingUserId: string, | ||
| source?: EventPostSource, | ||
| ): Promise<number> { | ||
| const qb = this.repository | ||
| .createQueryBuilder("epr") | ||
| .innerJoin("epr.post", "post") | ||
| .innerJoin("post.user", "author") | ||
| .where("epr.eventTagId = :eventTagId", { eventTagId }) | ||
| .andWhere("author.isActive = true") | ||
| .andWhere( | ||
| `author.firebaseUid NOT IN (SELECT "blocking" FROM "userBlockingUsers" WHERE "blockers" = :requestingUserId)`, | ||
| { requestingUserId }, | ||
| ); | ||
|
|
||
| if (source) { | ||
| qb.andWhere("epr.source = :source", { source }); | ||
| } | ||
|
|
||
| return await qb.getCount(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine but I'm curious why a cron job is needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of reprocessing similarity on all posts in an event every time a post is added or edited, we just reprocess similarity every hour for every event if that makes sense?