This commit is contained in:
Nathan Bierema 2022-06-12 20:43:48 -04:00
parent 88b1b1017a
commit ebf3625311
3 changed files with 177 additions and 141 deletions

View File

@ -1,4 +1,4 @@
import socketCluster, { SCClientSocket } from 'socketcluster-client'; import socketCluster, { AGClientSocket } from 'socketcluster-client';
import { stringify } from 'jsan'; import { stringify } from 'jsan';
import { Dispatch, MiddlewareAPI } from 'redux'; import { Dispatch, MiddlewareAPI } from 'redux';
import * as actions from '../constants/socketActionTypes'; import * as actions from '../constants/socketActionTypes';
@ -25,11 +25,16 @@ import {
import { nonReduxDispatch } from '../utils/monitorActions'; import { nonReduxDispatch } from '../utils/monitorActions';
import { StoreState } from '../reducers'; import { StoreState } from '../reducers';
let socket: SCClientSocket; let socket: AGClientSocket;
let store: MiddlewareAPI<Dispatch<StoreAction>, StoreState>; let store: MiddlewareAPI<Dispatch<StoreAction>, StoreState>;
function emit({ message: type, id, instanceId, action, state }: EmitAction) { function emit({ message: type, id, instanceId, action, state }: EmitAction) {
socket.emit(id ? `sc-${id}` : 'respond', { type, action, state, instanceId }); socket.transmit(id ? `sc-${id}` : 'respond', {
type,
action,
state,
instanceId,
});
} }
function startMonitoring(channel: string) { function startMonitoring(channel: string) {
@ -120,7 +125,7 @@ function monitoring(request: MonitoringRequest) {
instanceId === instances.selected && instanceId === instances.selected &&
(request.type === 'ACTION' || request.type === 'STATE') (request.type === 'ACTION' || request.type === 'STATE')
) { ) {
socket.emit('respond', { socket.transmit('respond', {
type: 'SYNC', type: 'SYNC',
state: stringify(instances.states[instanceId]), state: stringify(instances.states[instanceId]),
id: request.id, id: request.id,
@ -134,58 +139,78 @@ function subscribe(
subscription: typeof UPDATE_STATE | typeof UPDATE_REPORTS subscription: typeof UPDATE_STATE | typeof UPDATE_REPORTS
) { ) {
const channel = socket.subscribe(channelName); const channel = socket.subscribe(channelName);
if (subscription === UPDATE_STATE) channel.watch(monitoring); if (subscription === UPDATE_STATE) {
else { void (async () => {
for await (const data of channel) {
monitoring(data as MonitoringRequest);
}
})();
} else {
const watcher = (request: UpdateReportsRequest) => { const watcher = (request: UpdateReportsRequest) => {
store.dispatch({ type: subscription, request }); store.dispatch({ type: subscription, request });
}; };
channel.watch(watcher); void (async () => {
socket.on(channelName, watcher); for await (const data of channel) {
watcher(data as UpdateReportsRequest);
}
})();
} }
} }
function handleConnection() { function handleConnection() {
socket.on('connect', (status) => { void (async () => {
store.dispatch({ for await (const data of socket.listener('connect')) {
type: actions.CONNECT_SUCCESS, store.dispatch({
payload: { type: actions.CONNECT_SUCCESS,
id: status.id, payload: {
authState: socket.authState, id: data.id,
socketState: socket.state, authState: socket.authState,
}, socketState: socket.state,
error: status.authError, },
}); // TODO Fix
if (socket.authState !== actions.AUTHENTICATED) { // @ts-expect-error Because
store.dispatch({ type: actions.AUTH_REQUEST }); error: data.authError,
});
if (socket.authState !== actions.AUTHENTICATED) {
store.dispatch({ type: actions.AUTH_REQUEST });
}
} }
}); })();
socket.on('disconnect', (code) => { void (async () => {
store.dispatch({ type: actions.DISCONNECTED, code }); for await (const data of socket.listener('disconnect')) {
}); store.dispatch({ type: actions.DISCONNECTED, code: data.code });
}
})();
socket.on('subscribe', (channel) => { void (async () => {
store.dispatch({ type: actions.SUBSCRIBE_SUCCESS, channel }); for await (const data of socket.listener('subscribe')) {
}); store.dispatch({
socket.on('unsubscribe', (channel) => { type: actions.SUBSCRIBE_SUCCESS,
socket.unsubscribe(channel); channel: data.channel,
socket.unwatch(channel); });
socket.off(channel); }
store.dispatch({ type: actions.UNSUBSCRIBE, channel }); })();
}); void (async () => {
socket.on('subscribeFail', (error) => { for await (const data of socket.listener('unsubscribe')) {
store.dispatch({ socket.unsubscribe(data.channel);
type: actions.SUBSCRIBE_ERROR, store.dispatch({ type: actions.UNSUBSCRIBE, channel: data.channel });
error, }
status: 'subscribeFail', })();
}); void (async () => {
}); for await (const data of socket.listener('subscribeFail')) {
socket.on('dropOut', (error) => { store.dispatch({
store.dispatch({ type: actions.SUBSCRIBE_ERROR, error, status: 'dropOut' }); type: actions.SUBSCRIBE_ERROR,
}); error: data.error,
status: 'subscribeFail',
});
}
})();
socket.on('error', (error) => { void (async () => {
store.dispatch({ type: actions.CONNECT_ERROR, error }); for await (const data of socket.listener('error')) {
}); store.dispatch({ type: actions.CONNECT_ERROR, error: data.error });
}
})();
} }
function connect() { function connect() {
@ -205,43 +230,42 @@ function connect() {
function disconnect() { function disconnect() {
if (socket) { if (socket) {
socket.disconnect(); socket.disconnect();
socket.off();
} }
} }
function login() { function login() {
socket.emit('login', {}, (error: Error, baseChannel: string) => { void (async () => {
if (error) { try {
store.dispatch({ type: actions.AUTH_ERROR, error }); const baseChannel = (await socket.invoke('login', {})) as string;
return; store.dispatch({ type: actions.AUTH_SUCCESS, baseChannel });
store.dispatch({
type: actions.SUBSCRIBE_REQUEST,
channel: baseChannel,
subscription: UPDATE_STATE,
});
store.dispatch({
type: actions.SUBSCRIBE_REQUEST,
channel: 'report',
subscription: UPDATE_REPORTS,
});
} catch (error) {
store.dispatch({ type: actions.AUTH_ERROR, error: error as Error });
} }
store.dispatch({ type: actions.AUTH_SUCCESS, baseChannel }); })();
store.dispatch({
type: actions.SUBSCRIBE_REQUEST,
channel: baseChannel,
subscription: UPDATE_STATE,
});
store.dispatch({
type: actions.SUBSCRIBE_REQUEST,
channel: 'report',
subscription: UPDATE_REPORTS,
});
});
} }
function getReport(reportId: unknown) { function getReport(reportId: unknown) {
socket.emit( void (async () => {
'getReport', try {
reportId, const data = (await socket.invoke('getReport', reportId)) as {
(error: Error, data: { payload: string }) => { payload: string;
if (error) { };
store.dispatch({ type: GET_REPORT_ERROR, error });
return;
}
store.dispatch({ type: GET_REPORT_SUCCESS, data }); store.dispatch({ type: GET_REPORT_SUCCESS, data });
store.dispatch(importState(data.payload)); store.dispatch(importState(data.payload));
} catch (error) {
store.dispatch({ type: GET_REPORT_ERROR, error: error as Error });
} }
); })();
} }
export function api(inStore: MiddlewareAPI<Dispatch<StoreAction>, StoreState>) { export function api(inStore: MiddlewareAPI<Dispatch<StoreAction>, StoreState>) {

View File

@ -71,7 +71,7 @@ export default function (argv: { [arg: string]: any }): Promise<{
const channel = action.receiver; const channel = action.receiver;
const data = action.data; const data = action.data;
if ( if (
channel.substr(0, 3) === 'sc-' || channel.substring(0, 3) === 'sc-' ||
channel === 'respond' || channel === 'respond' ||
channel === 'log' channel === 'log'
) { ) {
@ -82,23 +82,21 @@ export default function (argv: { [arg: string]: any }): Promise<{
data: data, data: data,
}); });
} }
} else if (action.type === action.SUBSCRIBE) {
if (action.channel === 'report') {
store
.list()
.then(function (data) {
void agServer.exchange.transmitPublish('report', {
type: 'list',
data: data,
});
})
.catch(function (error) {
console.error(error); // eslint-disable-line no-console
});
}
} }
// TODO
// } else if (action.type === action.SUBSCRIBE) {
// if (action.channel === 'report') {
// store
// .list()
// .then(function (data) {
// action.socket.emit(action.channel, {
// type: 'list',
// data: data,
// });
// })
// .catch(function (error) {
// console.error(error); // eslint-disable-line no-console
// });
// }
// }
action.allow(); action.allow();
} }
} }
@ -117,12 +115,6 @@ export default function (argv: { [arg: string]: any }): Promise<{
channelToWatch = 'log'; channelToWatch = 'log';
channelToEmit = 'respond'; channelToEmit = 'respond';
} }
// TODO
// agServer.exchange
// .subscribe('sc-' + socket.id)
// .watch(function (msg) {
// socket.emit(channelToWatch, msg);
// });
request.end(channelToWatch); request.end(channelToWatch);
} }
})(); })();
@ -141,11 +133,10 @@ export default function (argv: { [arg: string]: any }): Promise<{
})(); })();
void (async () => { void (async () => {
for await (const data of socket.receiver('disconnect')) { for await (const data of socket.receiver('disconnect')) {
const channel = agServer.exchange.channel('sc-' + socket.id);
channel.unsubscribe();
// TODO // TODO
// const channel = agServer.exchange.channel('sc-' + socket.id); // void agServer.exchange.transmitPublish(channelToEmit, {
// channel.unsubscribe();
// channel.destroy();
// agServer.exchange.publish(channelToEmit, {
// id: socket.id, // id: socket.id,
// type: 'DISCONNECTED', // type: 'DISCONNECTED',
// }); // });

View File

@ -1,5 +1,5 @@
import { stringify, parse } from 'jsan'; import { stringify, parse } from 'jsan';
import socketCluster, { SCClientSocket } from 'socketcluster-client'; import socketCluster, { AGClientSocket } from 'socketcluster-client';
import configureStore from './configureStore'; import configureStore from './configureStore';
import { defaultSocketOptions } from './constants'; import { defaultSocketOptions } from './constants';
import getHostForRN from 'rn-host-detect'; import getHostForRN from 'rn-host-detect';
@ -179,7 +179,7 @@ class DevToolsEnhancer<S, A extends Action<unknown>> {
store!: EnhancedStore<S, A, {}>; store!: EnhancedStore<S, A, {}>;
filters: LocalFilter | undefined; filters: LocalFilter | undefined;
instanceId?: string; instanceId?: string;
socket?: SCClientSocket; socket?: AGClientSocket;
sendTo?: string; sendTo?: string;
instanceName: string | undefined; instanceName: string | undefined;
appInstanceId!: string; appInstanceId!: string;
@ -241,7 +241,8 @@ class DevToolsEnhancer<S, A extends Action<unknown>> {
) { ) {
const message: MessageToRelay = { const message: MessageToRelay = {
type, type,
id: this.socket!.id, // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
id: this.socket!.id!,
name: this.instanceName, name: this.instanceName,
instanceId: this.appInstanceId, instanceId: this.appInstanceId,
}; };
@ -279,7 +280,8 @@ class DevToolsEnhancer<S, A extends Action<unknown>> {
} else if (action) { } else if (action) {
message.action = action as ActionCreatorObject[]; message.action = action as ActionCreatorObject[];
} }
this.socket!.emit(this.socket!.id ? 'log' : 'log-noid', message); // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.socket!.transmit(this.socket!.id ? 'log' : 'log-noid', message);
} }
dispatchRemotely( dispatchRemotely(
@ -300,7 +302,9 @@ class DevToolsEnhancer<S, A extends Action<unknown>> {
if ( if (
message.type === 'IMPORT' || message.type === 'IMPORT' ||
(message.type === 'SYNC' && (message.type === 'SYNC' &&
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.socket!.id && this.socket!.id &&
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
message.id !== this.socket!.id) message.id !== this.socket!.id)
) { ) {
this.store.liftedStore.dispatch({ this.store.liftedStore.dispatch({
@ -387,15 +391,22 @@ class DevToolsEnhancer<S, A extends Action<unknown>> {
} }
login() { login() {
this.socket!.emit('login', 'master', (err: Error, channelName: string) => { void (async () => {
if (err) { try {
console.log(err); // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
return; const channelName = (await this.socket!.invoke(
'login',
'master'
)) as string;
this.channel = channelName;
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
for await (const data of this.socket!.subscribe(channelName)) {
this.handleMessages(data as Message<S, A>);
}
} catch (error) {
console.log(error);
} }
this.channel = channelName; })();
this.socket!.subscribe(channelName).watch(this.handleMessages);
this.socket!.on(channelName, this.handleMessages);
});
this.started = true; this.started = true;
this.relay('START'); this.relay('START');
} }
@ -404,11 +415,9 @@ class DevToolsEnhancer<S, A extends Action<unknown>> {
this.started = false; this.started = false;
this.isMonitored = false; this.isMonitored = false;
if (!this.socket) return; if (!this.socket) return;
this.socket.destroyChannel(this.channel!); this.socket.unsubscribe(this.channel!);
if (keepConnected) { this.socket.closeChannel(this.channel!);
this.socket.off(this.channel, this.handleMessages); if (!keepConnected) {
} else {
this.socket.off();
this.socket.disconnect(); this.socket.disconnect();
} }
}; };
@ -422,34 +431,46 @@ class DevToolsEnhancer<S, A extends Action<unknown>> {
this.socket = socketCluster.create(this.socketOptions); this.socket = socketCluster.create(this.socketOptions);
this.socket.on('error', (err) => { void (async () => {
// if we've already had this error before, increment it's counter, otherwise assign it '1' since we've had the error once. // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
// eslint-disable-next-line no-prototype-builtins for await (const data of this.socket!.listener('error')) {
this.errorCounts[err.name] = this.errorCounts.hasOwnProperty(err.name) // if we've already had this error before, increment it's counter, otherwise assign it '1' since we've had the error once.
? this.errorCounts[err.name] + 1 // eslint-disable-next-line no-prototype-builtins,@typescript-eslint/no-unsafe-argument
: 1; this.errorCounts[data.error.name] = this.errorCounts.hasOwnProperty(
data.error.name
)
? this.errorCounts[data.error.name] + 1
: 1;
if (this.suppressConnectErrors) { if (this.suppressConnectErrors) {
if (this.errorCounts[err.name] === 1) { if (this.errorCounts[data.error.name] === 1) {
console.log( console.log(
'remote-redux-devtools: Socket connection errors are being suppressed. ' + 'remote-redux-devtools: Socket connection errors are being suppressed. ' +
'\n' + '\n' +
"This can be disabled by setting suppressConnectErrors to 'false'." "This can be disabled by setting suppressConnectErrors to 'false'."
); );
console.log(err); console.log(data.error);
}
} else {
console.log(data.error);
} }
} else {
console.log(err);
} }
}); })();
this.socket.on('connect', () => {
console.log('connected to remotedev-server'); void (async () => {
this.errorCounts = {}; // clear the errorCounts object, so that we'll log any new errors in the event of a disconnect // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.login(); for await (const data of this.socket!.listener('connect')) {
}); console.log('connected to remotedev-server');
this.socket.on('disconnect', () => { this.errorCounts = {}; // clear the errorCounts object, so that we'll log any new errors in the event of a disconnect
this.stop(true); this.login();
}); }
})();
void (async () => {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
for await (const data of this.socket!.listener('disconnect')) {
this.stop(true);
}
})();
}; };
checkForReducerErrors = (liftedState = this.getLiftedStateRaw()) => { checkForReducerErrors = (liftedState = this.getLiftedStateRaw()) => {