Help Needed: Setting Up Socket.io for Multi-Instance Direct Messaging with Remix.run and Fly.io

Hi all,

I’m looking to add direct messaging to my application using Remix.run with a custom Express server. Currently, I have two Fly.io apps running: the application itself and a PostgreSQL database. Both apps run on multiple instances.

I’m having trouble setting up Socket.io properly across these multiple instances. I’ve read the Fly.io blog article on websockets (WebSockets and Fly · The Fly Blog) and found this repository (GitHub - bjarkebech/express-socketio-fly-replay: Automatically replay Socket.io connections to the correct Fly machine using Node.js and express.), but I still can’t get it to work with my application.

Here’s my code based on the bjarkebech/express-socketio-fly-replay repository. I’m considering always connecting the socket to the same instance (machine) by setting a static machineID (target_instance).

Server:

const app = express()
const httpServer = createServer(app)

const FLY_INSTANCE_ID = process.env.FLY_ALLOC_ID
	? process.env.FLY_ALLOC_ID.split('-')[0]
	: null

const TARGET_INSTANCE = process.env.FLY_CHAT_MACHINE_ID

httpServer.on('upgrade', function (req, socket) {
	console.log('Fly machine ID:', FLY_INSTANCE_ID)
	console.log('Target machine ID:', TARGET_INSTANCE)

	// Opt out on localhost/non-Fly environments.
	if (!FLY_INSTANCE_ID) return

	// Opt out on other HTTP upgrades.
	if (req.headers['upgrade'] !== 'websocket') return

	if (FLY_INSTANCE_ID === TARGET_INSTANCE) return
	console.log('Mismatch detected, replaying')

	// Create a raw HTTP response with the fly-replay header.
	// HTTP code 101 must be used to make the response replay correctly.
	const headers = [
		'HTTP/1.1 101 Switching Protocols',
		`fly-replay: instance=${TARGET_INSTANCE}`,
	]

	// Send new headers and close the socket.
	socket.end(headers.concat('\r\n').join('\r\n'))
})

const io = new Server(httpServer)

io.on('connection', socket => {
	socket.on('joinRoom', roomId => {
		socket.join(roomId)
		console.log(`User joined room: ${roomId}`)
	})

	socket.on('sendMessage', ({ roomId, message }) => {
		io.to(roomId).emit('receiveMessage', message)
	})

	socket.on('disconnect', () => {
		console.log('user disconnected')
	})
})

Component:

// ...
// all imports

export async function loader({ request, params }: LoaderFunctionArgs) {
	const userId = await requireUserId(request)

	const roomId = params.roomId
	invariantResponse(roomId, 'Not found', { status: 404 })

	const room = await getRoom({ roomId, userId })
	invariantResponse(room, 'Unauthorized', { status: 403 })

	const messages = await getMessages(roomId)
	const messagesByDay = organizeMessagesByDay(messages)

	return json({
		room,
		userId,
		messages: messagesByDay,
	})
}

export async function action({
	request,
	context,
}: ActionFunctionArgs & { context: { io: Server } }) {
	const userId = await requireUserId(request)
	const formData = await request.formData()
	
	const body = formData.get('body')
	const roomId = formData.get('roomId')
	
	const room = getRoom({ roomId, userId })
	invariantResponse(room, 'Unauthorized', { status: 403 })

	const message = await createMessage({ body, userId, roomId })
	context.io.to(roomId).emit('receiveMessage', message)

	return redirect(`/messages/${roomId}`)
}

export default function MessageUserPage() {
	const { messages, room, userId } = useLoaderData<typeof loader>()
	const [liveMessages, setLiveMessages] = useState<MessagesByDay>(messages)

	useEffect(() => {
		const socket = io()

		socket.emit('joinRoom', room.id)

		socket.on('receiveMessage', message => {
			setLiveMessages(prevMessages => {
				const date = new Date(message.createdAt).toISOString().split('T')[0]
				if (date) {
					return {
						...prevMessages,
						[date]: prevMessages[date]
							? [...prevMessages[date]!, message]
							: [message],
					}
				}
				return prevMessages
			})
		})

		return () => {
			socket.disconnect()
		}
	}, [room.id])

	return (
		<>
			<Header
				left={people.map(
					person => person.profile?.displayName ?? `@${person.username}`,
				)}
			/>
			<ul className="flex-1 p-4 md:p-8">
				{Object.entries(liveMessages).map(([date, messages]) => {
					return (
						<div key={date}>
							<Badge className="mx-auto block w-fit" variant="secondary">
								{date}
							</Badge>
							<ul className="my-8">
								{messages.map(message => {
									const isEmojiOnlyMessage = isEmojiOnly(message.body)

									return (
										<li key={message.id} className="peer">
											<Message
												{...message}
												className={
													isEmojiOnlyMessage
														? 'bg-transparent p-0 text-[64px]'
														: ''
												}
											>
												{parse({ text: message.body })}
											</Message>
										</li>
									)
								})}
							</ul>
						</div>
					)
				})}
				<li ref={messageEndRef} />
			</ul>
			<Footer>
				<Form
					action="."
					method="POST"
					ref={formRef}
					preventScrollReset
					className="flex w-full items-end gap-2"
				>
					<input type="hidden" name="roomId" value={room.id} />
					<AutoResizeTextarea autoFocus />
					<Button disabled={isPending} ref={buttonRef} type="submit">
						Send
					</Button>
				</Form>
			</Footer>
		</>
	)
}

However, this setup doesn’t work when I use a fly-prefer-region mod header in an incognito browser window to chat.

Does anyone know the proper way to connect two users on different machines via a Socket.io connection and make it possible for them to chat with each other? Any guidance on what I might be doing wrong would be greatly appreciated.

Thanks in advance!

I use Fly-Replay with websockets in my application - the key is to do the replay before the upgrade. Do you have specific routes that are known to be websockets?

Something like the following would do the trick:

app.use((request, response, next) {
  let url = new URL(req.protocol + '://' + req.get('host') + req.originalUrl)

  if (url.pathname === '...') {
    response.set('Fly-Replay', `instance=${TARGET_INSTANCE}`)
    response.status(307)
    response.send()
  } else {
    next()
  }
})

Please correct me if I’m wrong (very likely,) but fly-replay in this scenario acts like a sticky session so that the ws connection talks to a consistent host but it doesn’t handle the multi-instance use case OP has.

Eg, if you have 2 users connecting to 2 instances: UserA connects to us-west and UserB connects to eu-east. If UserA wants to send a message, the eu-east instance has no reference to the ws connections from us-west, and therefore isn’t aware of any messages sent from us-west

Typically, you can use a pub/sub service to handle this but you can do it inside fly. Here’s how I do it (may not be correct or the best way to do things)

  1. add a simple internal API endpoint to the http server, eg /api/fanout
    1b. find the roomId to userId in the request, if the instance has it, forward the request to the ws
  2. On the actual socket’s on:message, emit or send the message if roomId or userId exists, if not, then that roomId or userId may be on another instance, go to 2b below
    2.b use resolve6('global.<app-name>.internal') to get the ipv6 of all the instance, then call the /api/fanout endpoint on those instances, which would go to 1b

Note: you’ll need to add some checks/balances so you don’t spam yourself. So far, it’s been working great for me w/ multi instance and region, but your mileage may vary.

@khuezy Thank you for the detailed response. I think you’re right about the limitations of using fly-replay for this scenario. However, I’m not entirely sure I understand your proposed solution correctly. Would you be able to share some of your code as an example? That would be incredibly helpful!

@rubys, thank you for your reply. Initially, my server wasn’t deploying, and I’m not sure why, but it seems to be working now. I’ve placed the ‘replay’ code above the ‘upgrade’ code, but it still isn’t functioning correctly. When I establish a connection using useEffect, it’s still connecting to the wrong instance, even though the replay has been executed. Here is my code:

Server.ts:

...

const CURRENT_INSTANCE = process.env.FLY_ALLOC_ID
	? process.env.FLY_ALLOC_ID.split('-')[0]
	: null
const TARGET_INSTANCE = process.env.TARGET_INSTANCE

app.use(function (request, response, next) {
	const { protocol, originalUrl } = request
	let url = new URL(protocol + '://' + request.get('host') + originalUrl)

	if (
		CURRENT_INSTANCE &&
		CURRENT_INSTANCE !== TARGET_INSTANCE &&
		url.pathname.startsWith('/messages')
	) {
		console.log('Mismatch detected on Messages, replaying...')
		response.set('fly-replay', `instance=${TARGET_INSTANCE}`)
		response.status(307)
		response.send()
	} else {
		next()
	}
})

httpServer.on('upgrade', (request, socket) => {
	if (!CURRENT_INSTANCE) return
	if (request.headers['upgrade'] !== 'websocket') return
	console.log(`upgrade request: ${request.url}`)
	console.log('Current machine ID:', CURRENT_INSTANCE)
	console.log('Target machine ID:', TARGET_INSTANCE)

	if (CURRENT_INSTANCE === TARGET_INSTANCE) return
	console.log('Mismatch detected on Websocket, replaying...')

	// Create a raw HTTP response with the fly-replay header.
	// HTTP code 101 must be used to make the response replay correctly.
	const headers = [
		'HTTP/1.1 101 Switching Protocols',
		`fly-replay: instance=${TARGET_INSTANCE}`,
	]

	socket.end(headers.concat('\r\n').join('\r\n'))
})

const io = new Server(httpServer)

io.on('connection', socket => {
	console.log('Connection made on:', CURRENT_INSTANCE)

	socket.on('joinRoom', roomId => {
		console.log('Room joied on:', CURRENT_INSTANCE)
		socket.join(roomId)
		console.log(`User joined room: ${roomId}`)
	})

	socket.on('sendMessage', ({ roomId, message }) => {
		console.log('Message send on:', CURRENT_INSTANCE)
		io.to(roomId).emit('receiveMessage', message)
	})

	socket.on('disconnect', () => {
		console.log('Message send on:', CURRENT_INSTANCE)
		console.log('user disconnected')
	})
})

...

I think I need to know more about what you are trying to do. This gets the TARGET_INSTANCE from the server’s environment and is a const, so you are trying to route all web sockets to a single machine?

But looking at your code, I can’t see any way both can be true:

  • the replay has been executed
  • it is still connecting to the wrong instance

Another possible solution is to add a fly-force-instance-id on as a request header on the client side.

@rubys Thank you for your response. Yes, I am trying to route all WebSocket connections and page POST requests through a single instance. My goal is to ensure that two users who are chatting with each other are connected to the same instance. Since both users are unaware of which instance they are on, routing all POST requests and WebSocket connections through one instance seems to be the solution.

If you have a simpler way to achieve this, I would greatly appreciate your guidance. Currently, I am using Socket.io, but I am open to using alternative solutions as long as they can facilitate connecting two users within a multi-instance setup.

Imo, express and socket.io are overkill for a simple ws server. I’d just go w/ the native ws and http libraries, but that’s just like uh my opinion man.

The gist is to forward the messages to all other app instances if you don’t find the roomId or ws connections on the instance.

const THIS_INSTANCE_IP = process.env.FLY_PRIVATE_IP
const APP_NAME = process.env.FLY_APP_NAME

// eg, a map of roomIds to all connections in that room
const wsMap = new Map<string, WebSocket[]>()

// sample code, you need better error handling
const server = http.createServer(async (req, res) => {
  switch (req.url) {
    case '/api/fanout':
      // read the body from request
      const body = ...
      const _ws = wsMap.get(userId)
      if (!_ws) return
      _ws.forEach(w => {
        w.send(body) // add error handling
      })
    })
  }
  res.end()
})

const wss = new WebSocketServer({ noServer: true })

server.on('upgrade', (request, socket, head) => {
  wss.handleUpgrade(request, socket, head, async (ws) => {
     // handle auth
     // Update the wsMap with this new connection
     wss.emit('connection', ws, ...other things)
  }
})

wss.on('connection', (ws) => {
  ws.on('message', msg => {
     // if this instance contains the ws, send message
     // else propagate message to other instances, eg notifyOtherInstances(msg)
  }
  ws.on('close', () => {}) // handle a disconnect, ie update the wsMap
})

// Gets the ipv6 of all the running app instances
// See: https://fly.io/docs/networking/private-networking/
async function getFlyInstances(): Promise<string[]> {
  let address = `${APP_NAME}.internal`
  return resolve6(address)
}

async function notifyOtherInstances(evt: any) {
  const ips = await getFlyInstances()
  const others = ips.filter(ip => ip !== THIS_INSTANCE_IP)

  others.forEach(ip => {
    fetch(`http://[${ip}]:8080/api/fanout`, { // use your app's port
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: evt
    })
  })
}

You’ll want to enhance this further to optimize the need to fanout to every single instance every time, eg returning the status of the fanout…

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.