import {BrickContext, Brick, registerBrick, Service, ServiceRequestType} from 'olympe';
import {combineLatest, merge, of} from "rxjs";
import {getLogger} from "@olympeio/core";
import {tap, retry, switchMap} from "rxjs/operators";

const CONSUMERS_KEY = '__consumerIds__';
const HANDLERS_KEY = '__consumerHandlers__';

export default class OpenService extends Brick {

    /**
     * @override
     */
    setupExecution($) {
        // Wait for the brick to be "active" and then listen to the service name.
        const [activeInput, serviceNameInput, handlerInput] = this.getInputs();
        return $.observe(activeInput).pipe(switchMap((isActive) => {
            return isActive
                ? combineLatest([$.observe(serviceNameInput), $.observe(handlerInput)])
                : of(null);
        }));
    }

    /**
     * @protected
     * @param {!BrickContext} $
     * @param {string} serviceName
     * @param {Brick} handler
     * @param {!Array} outputs
     */
    update($, [serviceName, handler], outputs) {
        if (typeof serviceName !== 'string' || serviceName === '') {
            throw new Error('Please provide a valid service name');
        }

        const [userInput, dataInput] = handler.getInputs();
        const svc = new Service(serviceName, $).setUnsubscriptionHandler(this.clearConsumer.bind(this, $));
        svc.listen().pipe(
            tap(async (req) => {
                const {id, data} = req.body();
                const user = await req.userTag();
                switch(req.requestType()) {
                    case ServiceRequestType.SUBSCRIBE:
                        // A new consumer ask for subscription:
                        let subId;
                        try {
                            subId = await req.notifyOn(id);
                        } catch (e) {
                            // Nothing to do, the consumer it probably already gone
                            return;
                        }

                        this.initConsumer($, handler, subId, id)
                            .set(userInput, user)
                            .set(dataInput, data);
                        break;
                    case ServiceRequestType.PUBLISH:
                        this.getConsumerHandler($, handler, id)
                            .set(userInput, user)
                            .set(dataInput, data);
                        break;
                    default:
                        req.fail(new Error(`Messages of type ${req.requestType()} are not supported by the service ${serviceName}`));
                }
            }),
            retry({delay: (e) => {
                    getLogger('OpenService').error(`An error occurred while handling a request for service ${serviceName}: ${e}`);
                }, resetOnSuccess: true})
        ).subscribe();
    }

    /**
     * @override
     */
    clear($) {
        // Destroy the contexts of all the consumer's bricks still registered
        $.get(HANDLERS_KEY)?.forEach(($handler) => { $handler.destroy(); });

        // Clear and remove the handlers map
        $.get(HANDLERS_KEY)?.clear();
        $.remove(HANDLERS_KEY);

        // Clear and remove the consumer ids map
        $.get(CONSUMERS_KEY)?.clear();
        $.remove(CONSUMERS_KEY);
    }

    /**
     * @private
     * @param {!BrickContext} $
     * @param {string} subId
     * @return {?string}
     */
    getConsumerId($, subId) {
        return $.get(CONSUMERS_KEY)?.get(subId) ?? null;
    }

    /**
     * @private
     * @param {!BrickContext} $
     * @param {!Brick} handler
     * @param {string} subId
     * @param {string} consumerId
     * @return {!BrickContext}
     */
    initConsumer($, handler, subId, consumerId) {
        const consumers = $.get(CONSUMERS_KEY) ?? new Map();
        if (!$.has(CONSUMERS_KEY)) {
            $.set(CONSUMERS_KEY, consumers);
        }
        consumers.set(subId, consumerId);
        const handler$ = this.getConsumerHandler($, handler, consumerId);

        // Listen to results or errors going out of the handler.
        const [results, errors] = handler.getOutputs().map((o) => handler$.observe(o));
        merge(results, errors).pipe(tap((result) => {
            // Publish answer
            Service.publish(consumerId, result).catch((e) => {
                getLogger('ProducerService').error('Error occurred while trying to publish answer:', e);
            });
        })).subscribe();

        return handler$;
    }

    /**
     * Return the brick context of the bricks that is currently executing a process for the specified consumer.
     * Create and initialize the context if it does not exist.
     *
     * @private
     * @param {!BrickContext} $
     * @param {!Brick} handler
     * @param {string} consumerId
     * @return {!BrickContext}
     */
    getConsumerHandler($, handler, consumerId) {
        const handlers = $.get(HANDLERS_KEY) ?? new Map();
        if (!$.has(HANDLERS_KEY)) {
            $.set(HANDLERS_KEY, handlers);
        }

        const $handler = handlers.get(consumerId) ?? $.runner(handler);
        if (!handlers.has(consumerId)) {
            handlers.set(consumerId, $handler);
        }

        return $handler;
    }

    /**
     * @private
     * @param {!BrickContext} $
     * @param {string} subId
     */
    clearConsumer($, subId) {
        const consumerId = this.getConsumerId($, subId);
        if (typeof consumerId === 'string') {
            const handlers = $.get(HANDLERS_KEY);
            handlers?.get(consumerId)?.destroy();
            handlers?.delete(consumerId);
        }
    }
}
registerBrick('0181eda705023f2bb683', OpenService);