This commit is contained in:
Nathan Bierema 2022-06-12 17:10:32 -04:00
parent e1219d5ef7
commit 88b1b1017a
3 changed files with 123 additions and 97 deletions

View File

@ -88,10 +88,13 @@ if (argv.injectserver) {
}
// eslint-disable-next-line @typescript-eslint/no-floating-promises
server(argv).then(function (r) {
server(argv).then(async function (r) {
if (argv.open && argv.open !== 'false') {
r.on('ready', async function () {
await openApp(argv.open as string, options);
});
await r
.listener('ready')
.once()
.then(async () => {
await openApp(argv.open as string, options);
});
}
});

View File

@ -1,6 +1,10 @@
import express from 'express';
import http from 'http';
import getPort from 'getport';
import SocketCluster from 'socketcluster';
import socketClusterServer from 'socketcluster-server';
import getOptions, { Options } from './options';
import routes from './routes';
import createStore from './store';
// var LOG_LEVEL_NONE = 0;
const LOG_LEVEL_ERROR = 1;
@ -8,16 +12,14 @@ const LOG_LEVEL_WARN = 2;
const LOG_LEVEL_INFO = 3;
export interface ExtendedOptions extends Options {
workerController: string;
allowClientPublish: boolean;
}
export default function (argv: { [arg: string]: any }): Promise<{
portAlreadyUsed?: boolean;
on: (status: 'ready', cb: (() => void) | (() => Promise<void>)) => void;
listener: (eventName: 'ready') => { once(): Promise<unknown> };
}> {
const options = Object.assign(getOptions(argv), {
workerController: __dirname + '/worker.js',
allowClientPublish: false,
});
const port = options.port;
@ -39,8 +41,12 @@ export default function (argv: { [arg: string]: any }): Promise<{
}
resolve({
portAlreadyUsed: true,
on: function (status: string, cb: () => void) {
cb();
listener: function (eventName: 'ready') {
return {
once() {
return Promise.resolve();
},
};
},
});
} else {
@ -48,7 +54,110 @@ export default function (argv: { [arg: string]: any }): Promise<{
console.log('[ReduxDevTools] Start server...');
console.log('-'.repeat(80) + '\n');
}
resolve(new SocketCluster(options));
const httpServer = http.createServer();
const agServer = socketClusterServer.attach(httpServer, options);
const app = express();
httpServer.on('request', app);
const store = createStore(options);
app.use(routes(options, store, agServer));
agServer.setMiddleware(
agServer.MIDDLEWARE_INBOUND,
// eslint-disable-next-line @typescript-eslint/no-misused-promises
async (middlewareStream) => {
for await (const action of middlewareStream) {
if (action.type === action.TRANSMIT) {
const channel = action.receiver;
const data = action.data;
if (
channel.substr(0, 3) === 'sc-' ||
channel === 'respond' ||
channel === 'log'
) {
void agServer.exchange.transmitPublish(channel, data);
} else if (channel === 'log-noid') {
void agServer.exchange.transmitPublish('log', {
id: action.socket.id,
data: data,
});
}
}
// TODO
// } else if (action.type === action.SUBSCRIBE) {
// if (action.channel === 'report') {
// store
// .list()
// .then(function (data) {
// action.socket.emit(action.channel, {
// type: 'list',
// data: data,
// });
// })
// .catch(function (error) {
// console.error(error); // eslint-disable-line no-console
// });
// }
// }
action.allow();
}
}
);
void (async () => {
for await (const { socket } of agServer.listener('connection')) {
let channelToWatch: string, channelToEmit: string;
void (async () => {
for await (const request of socket.procedure('login')) {
const credentials = request.data;
if (credentials === 'master') {
channelToWatch = 'respond';
channelToEmit = 'log';
} else {
channelToWatch = 'log';
channelToEmit = 'respond';
}
// TODO
// agServer.exchange
// .subscribe('sc-' + socket.id)
// .watch(function (msg) {
// socket.emit(channelToWatch, msg);
// });
request.end(channelToWatch);
}
})();
void (async () => {
for await (const request of socket.procedure('getReport')) {
const id = request.data as string;
store
.get(id)
.then(function (data) {
request.end(data);
})
.catch(function (error) {
console.error(error); // eslint-disable-line no-console
});
}
})();
void (async () => {
for await (const data of socket.receiver('disconnect')) {
// TODO
// const channel = agServer.exchange.channel('sc-' + socket.id);
// channel.unsubscribe();
// channel.destroy();
// agServer.exchange.publish(channelToEmit, {
// id: socket.id,
// type: 'DISCONNECTED',
// });
}
})();
}
})();
httpServer.listen(options.port);
// TODO Fix
// @ts-expect-error Because
resolve(agServer);
}
/* eslint-enable no-console */
});

View File

@ -1,86 +0,0 @@
import SCWorker from 'socketcluster/scworker';
import express from 'express';
import routes from './routes';
import createStore from './store';
const app = express();
class Worker extends SCWorker {
run() {
const httpServer = this.httpServer;
const scServer = this.scServer;
const options = this.options;
const store = createStore(options);
httpServer.on('request', app);
app.use(routes(options, store, scServer));
scServer.addMiddleware(scServer.MIDDLEWARE_EMIT, function (req, next) {
const channel = req.event;
const data = req.data;
if (
channel.substr(0, 3) === 'sc-' ||
channel === 'respond' ||
channel === 'log'
) {
scServer.exchange.publish(channel, data);
} else if (channel === 'log-noid') {
scServer.exchange.publish('log', { id: req.socket.id, data: data });
}
next();
});
scServer.addMiddleware(scServer.MIDDLEWARE_SUBSCRIBE, function (req, next) {
next();
if (req.channel === 'report') {
store
.list()
.then(function (data) {
req.socket.emit(req.channel!, { type: 'list', data: data });
})
.catch(function (error) {
console.error(error); // eslint-disable-line no-console
});
}
});
scServer.on('connection', function (socket) {
let channelToWatch: string, channelToEmit: string;
socket.on('login', function (this: Worker, credentials, respond) {
if (credentials === 'master') {
channelToWatch = 'respond';
channelToEmit = 'log';
} else {
channelToWatch = 'log';
channelToEmit = 'respond';
}
this.exchange.subscribe('sc-' + socket.id).watch(function (msg) {
socket.emit(channelToWatch, msg);
});
respond(null, channelToWatch);
});
socket.on('getReport', function (id: string, respond) {
store
.get(id)
.then(function (data) {
respond(null, data);
})
.catch(function (error) {
console.error(error); // eslint-disable-line no-console
});
});
socket.on('disconnect', function (this: Worker) {
const channel = this.exchange.channel('sc-' + socket.id);
channel.unsubscribe();
channel.destroy();
scServer.exchange.publish(channelToEmit, {
id: socket.id,
type: 'DISCONNECTED',
});
});
});
}
}
new Worker();