import { HubConnectionBuilder, HubConnection } from "@microsoft/signalr";
import { MessagePackHubProtocol } from "@microsoft/signalr-protocol-msgpack";
import { Subscriber, Subscription, Stream } from "./message-hub.types";

export class MessageStreamHub<StreamName extends string> {
    private connection!: HubConnection;
    private streams = new Map<StreamName, Stream<unknown>>();
    // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
    private subscriptions = new Map<StreamName, Subscription<any>[]>();
    private isInitialized = false;

    constructor(private url: string) {
        this.buildConnection();
    }

    public async init() {
        try {
            await this.connection.start();
            this.isInitialized = true;
        } catch (e) {}
    }

    public async destroy() {
        this.disposeSubscriptions();
        this.streams.clear();
        this.connection.stop();
    }

    public nextRetryDelayInMilliseconds() {
        return 2000;
    }

    protected subscribe<T>(streamName: StreamName, subscriber: Subscriber<T>) {
        if (!this.isInitialized) {
            throw new Error("Initialize hub before subscribe!");
        }
        const stream = this.streams.get(streamName)!;
        const disposer = stream.subscribe(this.createSubscriberShape(subscriber));
        const subscription: Subscription<T> = {
            disposer,
            subscriber,
        };
        const existingSubs = this.subscriptions.get(streamName);
        this.subscriptions.set(streamName, existingSubs ? [...existingSubs, subscription] : [subscription]);
    }

    protected createStream(streamName: StreamName) {
        const stream = this.connection.stream(streamName, null);
        this.streams.set(streamName, stream);
    }

    private reconnect = () => {
        for (const [streamName, subscription] of this.subscriptions) {
            this.createStream(streamName);
            const stream = this.streams.get(streamName)!;
            for (let data of subscription) {
                const disposer = stream.subscribe(this.createSubscriberShape(data.subscriber));
                data.disposer.dispose();
                data = {
                    disposer,
                    subscriber: data.subscriber,
                };
                data.subscriber.onReconnect?.();
            }
        }
    };

    private disposeSubscriptions() {
        this.subscriptions.forEach(([_, sub]) => sub.disposer.dispose());
    }

    private buildConnection() {
        this.connection = new HubConnectionBuilder()
            .withUrl(this.url)
            .withHubProtocol(new MessagePackHubProtocol())
            .withAutomaticReconnect(this)
            .build();
        this.connection.onreconnected(this.reconnect);
    }

    private createSubscriberShape<T>(subscriber: Subscriber<T>) {
        return {
            next: subscriber.onMessage.bind(subscriber),
            error: console.error,
            complete: console.info,
        };
    }
}
