notify.ts 4.03 KB
import { service as http, API_BASE_URL } from "@/utils/request";
import type { AxiosPromise } from "axios";

// Helper to get token for Authorization header
function getToken(): string {
  return localStorage.getItem("auth_token") || localStorage.getItem("token") || "";
}

function authHeaders(): { Authorization?: string } {
  const t = getToken();
  return t ? { Authorization: `Bearer ${t}` } : {};
}

export interface NotificationDTO {
  id: number;
  type: string;
  sourceId?: number;
  title: string;
  content: string;
  payloadJson?: string;
  status: number; // 0: 未读, 1: 已读
  createdAt: string;
}

export interface PagedResultNotificationDTO {
  items: NotificationDTO[];
  total: number;
  page: number;
  size: number;
}

/** 标记通知为未读 */
export function markAsUnread(id: number | string): AxiosPromise<{ message: string }> {
  return http.post(`/notifications/${id}/unread`);
}

/** 确认通知 (标记为已读) */
export function acknowledgeNotification(id: number | string): AxiosPromise<{ message: string }> {
  return http.post(`/notifications/${id}/ack`);
}

/** 确认所有通知 */
export function acknowledgeAllNotifications(): AxiosPromise<{ message: string }> {
  return http.post(`/notifications/ack-all`);
}

/** 获取未读通知 */
export function getUnreadNotifications(limit: number = 50): AxiosPromise<NotificationDTO[]> {
  return http.get("/notifications/unread", { params: { limit } });
}

/** 分页检索当前用户消息 */
export function getNotificationsPage(params: {
  page?: number;
  size?: number;
  type?: string;
  read?: boolean;
}): AxiosPromise<PagedResultNotificationDTO> {
  return http.get("/notifications/page", { params });
}

/**
 * 获取通知流(SSE)
 */
export function streamNotifications({
  onMessage,
  onOpen,
  onError,
  onEnd,
  signal,
}: {
  onMessage?: (data: any) => void;
  onOpen?: () => void;
  onError?: (err: any) => void;
  onEnd?: () => void;
  signal?: AbortSignal;
}): { close: () => void } {
  const url = `${API_BASE_URL}/notifications/stream`;
  const controller = new AbortController();
  let closed = false;

  if (signal) {
    if (signal.aborted) controller.abort();
    else signal.addEventListener("abort", () => controller.abort(), { once: true });
  }

  (async () => {
    let reader: ReadableStreamDefaultReader | undefined;
    const decoder = new TextDecoder("utf-8");
    let buf = "";

    try {
      const extraHeaders = authHeaders();
      const res = await fetch(url, {
        method: "GET",
        headers: {
          Accept: "text/event-stream",
          "X-Accel-Buffering": "no",
          ...extraHeaders,
        },
        credentials: "include",
        signal: controller.signal,
      });

      if (!res.ok || !res.body) throw new Error(`SSE failed: ${res.status} ${res.statusText}`);
      onOpen && onOpen();
      reader = res.body.getReader();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        buf += decoder.decode(value, { stream: true });

        let idx;
        while ((idx = buf.indexOf("\n")) !== -1) {
          const line = buf.slice(0, idx).trim();
          buf = buf.slice(idx + 1);
          if (!line || line.startsWith(":")) continue;
          if (line.startsWith("data:")) {
            const data = line.replace(/^data:\s?/, "");
            if (data === "[DONE]") {
              closed = true;
              break;
            }
            if (data) {
              try {
                const payload = JSON.parse(data);
                onMessage && onMessage(payload);
              } catch (_) {
                onMessage && onMessage(data);
              }
            }
          }
        }
      }
      onEnd && onEnd();
    } catch (err: any) {
      if (!closed && err?.name !== "AbortError") {
        onError && onError(err);
      } else {
        onEnd && onEnd();
      }
    } finally {
      try {
        reader && reader.releaseLock && reader.releaseLock();
      } catch {}
    }
  })();

  return {
    close: () => {
      closed = true;
      controller.abort();
    },
  };
}