import { Inject, Injectable } from "@angular/core";
import { LoggerLocator } from "@tsng/logging";
import { Observable, of, ReplaySubject, throwError } from "rxjs";
import { catchError, finalize, first, startWith, switchMap, timeoutWith } from "rxjs/operators";
import { Util } from "../util/util";
import { DeliveryOptions } from "./delivery-options";
import { HandlerHolder } from "./handler-holder/handler-holder";
import { MESSAGE_CODEC, MessageCodec } from "./message-codec";
import { ErrorMessage, FailureObject } from "./message/error-message";
import { DEFAULT_MESSAGE_TIMEOUT, Message, MessageObject, MessageType } from "./message/message";

@Injectable({
	providedIn: "root"
})
export class EventBus {
	private logger = LoggerLocator.getLogger("EventBus")();
	private handlers: Map<string, HandlerHolder> = new Map();
	private registrationHandler: ReplaySubject<Message<{}>>;

	constructor(@Inject(MESSAGE_CODEC) private codec: MessageCodec<any, any>) {
		this.logger.info("Eventbus constructed");
	}

	public send<T>(address: string, body: T, options?: DeliveryOptions): void {
		const message = this.createMessage(address, MessageType.SEND, body, options);
		this.sendOrPublish(message);
	}

	public request<T, D>(address: string, body: T, options?: DeliveryOptions): Observable<Message<D>> {
		const message = this.createMessage(address, MessageType.SEND, body, options);
		message.setReplyAddress(this.generateReplyAddress());
		const replyHandler = this.createReplyHandler<T, D>(message, options);
		this.sendOrPublish(message);
		return replyHandler;
	}

	public publish<T>(address: string, body: T, options?: DeliveryOptions) {
		const message = this.createMessage(address, MessageType.PUBLISH, body, options);
		this.sendOrPublish(message);
	}

	public registrationConsumer(): Observable<Message<object>> {
		if (this.registrationHandler != null) return;
		this.registrationHandler = new ReplaySubject<Message<{}>>();
		return this.registrationHandler.pipe(finalize(() => {
			this.registrationHandler = null;
		}));
	}

	public localConsumer<T>(address: string): Observable<Message<T>> {
		const handler = new ReplaySubject<Message<T>>();
		const handlerId = this.registerHandler(address, handler);
		return handler.pipe(finalize(() => this.unregisterLocalHandler(address, handlerId)));
	}

	public consumer<T>(address: string): Observable<Message<T>> {
		const message = this.createMessage(address, MessageType.REGISTER, {});
		const replyAddress = this.generateReplyAddress();
		message.setReplyAddress(replyAddress);
		const replyHandler = this.createReplyHandler(message);
		if (this.registrationHandler == null) {
			this.logger.error(`No handlers registered for registerConsumer cannot send register message`);
			return throwError(new ErrorMessage({
				message: `No handlers registered for registerConsumer cannot send register message`,
				address: replyAddress,
				failureCode: 500,
				failureType: "NO_REGISTER_HANDLER"
			}));
		}
		this.registrationHandler.next(message);
		return replyHandler.pipe(catchError(err => {
			return of(err);
		}), switchMap(reply => {
			if (reply instanceof ErrorMessage) {
				return throwError(ErrorMessage);
			}

			//todo should check if successfully registered
			const handler = new ReplaySubject<Message<T>>();
			const handlerId = this.registerHandler(address, handler);
			// todo should this use startWith?
			return handler.pipe(startWith({success: true} as any as Message<T>),
				finalize(() => this.unregisterHandler(address, handlerId))
			);
		}));
	}

	public createErrorMessageFromObject<T>(errorObject: FailureObject) {
		return new ErrorMessage(errorObject);
	}

	public createMessageFromObject<T>(messageObject: MessageObject<T>) {
		const message = new Message(messageObject.address,
			messageObject.type,
			messageObject.body,
			this,
			this.codec,
			messageObject.hasOwnProperty("headers") ? {headers: messageObject.headers} : {}
		);
		if (messageObject.hasOwnProperty("replyAddress")) {
			message.setReplyAddress(messageObject.replyAddress);
		}
		return message;
	}

	public createMessage<T>(address: string,
		type: MessageType,
		body: T,
		options?: DeliveryOptions
	): Message<T> {
		return new Message<T>(address, type, body, this, this.codec, options);
	}

	public sendMessageObject<T>(messageObject: MessageObject<T> | FailureObject) {
		let message;
		if (messageObject.hasOwnProperty("failureCode")) {
			message = this.createErrorMessageFromObject(messageObject as FailureObject);
		} else {
			message = this.createMessageFromObject(messageObject as MessageObject<T>);
		}
		this.sendOrPublish(message);
	}

	public sendReply<T>(reply: Message<T>, options?: DeliveryOptions) {
		this.sendOrPublish(reply);
	}

	public sendAndRequestReply<T, D>(reply: Message<T>, options?: DeliveryOptions): Observable<Message<D>> {
		reply.setReplyAddress(this.generateReplyAddress());
		const replyHandler = this.createReplyHandler<T, D>(reply);
		this.sendOrPublish(reply);
		return replyHandler;
	}

	public createReplyHandler<T, D>(message: Message<T>,
		options: DeliveryOptions = {replyTimeout: DEFAULT_MESSAGE_TIMEOUT}
	): Observable<Message<D>> {
		const replyHandler: ReplaySubject<Message<D>> = new ReplaySubject(1);
		const handlerId = this.registerHandler(message.getReplyAddress(), replyHandler);
		return replyHandler.pipe(timeoutWith(options.replyTimeout, throwError(new ErrorMessage({
			message: `Event bus timeout no response received within ${options.replyTimeout / 1000} seconds`,
			address: message.getReplyAddress(),
			failureCode: 500,
			failureType: "TIMEOUT"
		}))), first(), finalize(() => this.unregisterLocalHandler(message.getReplyAddress(), handlerId)));
	}

	private sendOrPublish<T>(message: Message<T>) {
		if (this.handlers.has(message.address) === false) {
			this.logger.error(`No handlers registered for address: '${message.address}'`, {
				message
			});
			return;
		}

		// todo check if error message.
		// todo add send interceptor here !

		if (message instanceof ErrorMessage) {
			this.handlers.get(message.address).getNext().error(message);
			return;
		}

		if (message.type === MessageType.PUBLISH) {
			this.handlers.get(message.address).getAll().forEach(handler => {
				// todo create clone on message with deep clone
				handler.next(Object.create(message));

			});
			return;
		}
		this.handlers.get(message.address).getNext().next(message);

	}

	private unregisterHandler(address: string, handlerId: string) {
		const message = this.createMessage(address, MessageType.UNREGISTER, {});
		message.setReplyAddress(this.generateReplyAddress());
		this.createReplyHandler(message).subscribe({
			complete: () => {
				this.unregisterLocalHandler(address, handlerId);
			}
		});
		this.registrationHandler.next(message);
	}

	private unregisterLocalHandler(address: string, handlerId: string) {
		if (this.handlers.has(address) === false) {
			this.logger.warning(`Handler for address: '${address}' is not available and cannot be unregistered`);
			return;
		}
		this.handlers.get(address).unregister(handlerId);
		if (this.handlers.get(address).size === 0) {
			this.handlers.delete(address);
		}
	}

	private registerHandler<T>(address: string, handler: ReplaySubject<Message<T>>): string {
		if (this.handlers.has(address) == false) this.handlers.set(address, new HandlerHolder());
		return this.handlers.get(address).register(handler);
	}

	private generateReplyAddress(): string {
		// todo how to resolve this, vertx reply + uuid is 50 chars and backend has max on 36
		return ("__vertx.reply." + Util.generateUUID()).slice(0, 36);
		// return "__vertx.reply." + Util.generateUUID();
	}

	/**
	 * Add an interceptor that gets called whenever a message is send
	 * from the application
	 */
	// public addOutboundInterceptor(interceptor: any) {
	//   sendInterceptor
	// }

	// public removeInboundInterceptor(interceptor: any) {
	//   receiveInterceptor
	// }

	/**
	 * Add an interceptor that gets called whenever a message is send
	 * to the application
	 */
	// public addInboundInterceptor(interceptor: any) {
	//
	// }

	// public removeOutboundInterceptor(interceptor: any) {
	//
	// }
}
