Fly Proxy routing most traffic to a single machine with low concurrency CPU-bound requests

Hello everyone, hope you’re all doing well.

I have a somewhat specific situation and I’d really appreciate some guidance from the community.

First, some context:

  • One of my applications doesn’t have that many end users. Most of the traffic in this particular application comes from integrations with partners who send us batch submissions.

    A “batch” is basically everything they produced during the day, which they send to us through an endpoint in our API.

  • Each of our clients sends their batches at specific times. Right now, since we don’t have many clients, it’s easy to distribute them across different time windows. However, we’re planning to grow, and this is starting to become a concern.

  • We also intentionally adopted this time-based distribution strategy because we’re using Fly.io’s managed Postgres on the shared tier, and we want to avoid overloading the database (and avoid scaling up prematurely while we still have a relatively small client base).

Now, the actual problem:

We recently onboarded a new client who sends us a batch with around 80,000 records, and depending on the case, those 80,000 records may trigger additional requests if there are specific metadata fields to send. In practice, this client ends up sending us roughly 120,000 requests.

  • However, each request they send is synchronous, meaning they wait for our API to confirm the response before sending the next one (or retrying if needed).

Because of that, the number of simultaneous “requests" or “connections" is not very high, which causes the fly-proxy to route most (or all) of the traffic to a single machine. Since these requests are CPU-bound, we end up getting throttled even when another machine is running and should be able to help handle the load.

An important detail:

  • We have a machine running Nginx as a reverse proxy, which is currently our single point of failure. We use Nginx because we need to apply rate limits on certain endpoints to protect against brute-force attacks and similar scenarios.

One thing that did help was applying throttling to this specific endpoint. By adding a rate limit in Nginx, we trigger our partner’s retry mechanism, which ends up smoothing the load on our side.
PS: Our Nginx is routing traffic via Flycast.

map $host $fqdn_xpto {
  hostnames;
  default 127.0.0.1;
  xpto.com prod-xpto.flycast;
  xpto-homolog.com homolog-xpto.flycast;
}

I’ve been considering replacing Nginx with OpenResty to implement something closer to a round-robin strategy with the fly-replay. I also tried setting the soft limit to 1, but the issue still persists.

Does anyone have suggestions on what I could do to better handle this scenario and balance the traffic between machines with existing fly.io’s tech.

I would really prefer to avoid going down the OpenResty route if possible.
The rate limiting strategy on this endpoint did help to stabilize things, but it significantly slows down the batch delivery process.

Thanks in advance!

Hi… This is perhaps the requested continuation from the previous sub-thread?

It would probably help if you could post your full fly.toml and output of fly m list, since otherwise the vast majority of forum readers are left guessing at important details…

Hm… The Fly Proxy isn’t really intended for zero-one logic. Traditionally, you would instead use a work queue for that, where the initial request from your client causes their batch to be queued, and then the workers pull from that queue whenever they find themselves idle.

Hey @mayailurus, exactly! This is a continuation of the sub-thread from a few days ago :slightly_smiling_face:

So this is not my current configuration, this was the setup I used to try to reduce throttling when I received the first batch from this specific company between March 3rd and 6th (yes, it took me 3 days to import everything).

At the time, the machine would start experiencing throttling issues because requests were not being balanced across all active instances. Because of that, I had to periodically pause processing to allow the machine to accumulate CPU credits again and get a few minutes of burst capacity.

The reason I scaled down afterward is that using a larger machine wasn’t helping, it would get throttled just like a shared-6x or shared-8x. After I implemented rate limiting via Nginx, the shared-4x became sufficient to handle the workload, but the total processing time increased significantly, no longer due to throttling, but because of the cooldown intervals introduced by the rate limiting.

app = 'xpto-app'
primary_region = 'gru'
swap_size_mb = 512

[build]
build-target = "production"

[http_service]
internal_port = 3000
force_https = false
auto_stop_machines = true
auto_start_machines = true
min_machines_running = 2
processes = ['app']

[http_service.concurrency]
type = "requests"
soft_limit = 1

[[vm]]
cpu_kind = "shared"
cpus = 8
memory = "2048mb"

Each request can take up to about 3 seconds in average to process. At the end of processing, I store a checksum (a nonce to prevent duplicates) ands persist the processing result in the database, and return the response to the partner (and yes, the client requires that response synchronously at request time :confused: )

My initial idea was to use a queue and process requests on demand, but given the requirement for an immediate response and the characteristics of this workload, I don’t believe a queue would solve the problem. Also, since the partner’s requests are already synchronous, I ended up concluding that, for my workload, I can have 1, 5, or 10 machines running, but the traffic will still end up hitting a single instance.

My expectation was that the “correct” behavior would be something like: if more than one machine is active, the fly-proxy would distribute requests in a way closer to round-robin, or at least route to the instance that is not throttling at that moment.

Do you think my only real alternative here would be moving to the performance tier?

  • I feel that having a round-robin style load balancing strategy would be more effective than relying on a single larger machine.

Here’s screenshots of my workload over the last 3 days after implementing throttling via Nginx on another machine.
The filtered traffic doesn’t even reach the application, since Nginx is holding those requests.
The machine no longer experiences throttling, but the total processing time for the batchs has increased.

PS: I can’t get the machine logs from March 3 to 6, since fly’s grafana only stores the past 15 days

Haven’t tested yet, but I asked gpt-5.4 to do this .lua script for me:

local cjson = require("cjson.safe")
local resolver = require("resty.dns.resolver")

local M = {}

local CACHE_DICT = ngx.shared.webhook_replay_cache
local DEFAULT_CACHE_TTL = 5
local DEFAULT_MAX_REPLAY_BODY_BYTES = 1024 * 1024
local CACHE_STORAGE_TTL = 60
local CACHE_LOCK_TTL = 1
local CACHE_LOCK_WAIT_SECONDS = 0.05
local CACHE_LOCK_WAIT_ATTEMPTS = 5

local function log(level, message)
  ngx.log(level, "[webhook_replay] ", message)
end

local function read_cached_state(cache_key)
  local raw_state = CACHE_DICT:get(cache_key)
  if not raw_state then
    return nil
  end

  local state = cjson.decode(raw_state)
  if type(state) ~= "table" or type(state.ids) ~= "table" then
    return nil
  end

  return state
end

local function save_cached_state(cache_key, machine_ids, cache_ttl)
  local encoded, err = cjson.encode({
    ids = machine_ids,
    expires_at = ngx.now() + cache_ttl,
  })

  if not encoded then
    return nil, err
  end

  local ok, set_err = CACHE_DICT:set(cache_key, encoded, math.max(cache_ttl * 12, CACHE_STORAGE_TTL))
  if not ok then
    return nil, set_err
  end

  return true
end

local function flatten_txt_answer(answer)
  if type(answer.txt) == "table" then
    return table.concat(answer.txt)
  end

  if type(answer.txt) == "string" then
    return answer.txt
  end

  return nil
end

local function parse_machine_ids(txt_payload)
  local tokens = {}
  for token in txt_payload:gmatch("[^,%s]+") do
    tokens[#tokens + 1] = token
  end

  local machine_ids = {}
  local seen = {}

  for index = 1, #tokens, 2 do
    local machine_id = tokens[index]
    local region = tokens[index + 1]

    if machine_id and region and not seen[machine_id] then
      seen[machine_id] = true
      machine_ids[#machine_ids + 1] = machine_id
    end
  end

  return machine_ids
end

local function fetch_machine_ids(backend_app)
  local dns, err = resolver:new({
    nameservers = { "fdaa::3" },
    retrans = 2,
    timeout = 2000,
    no_random = true,
  })

  if not dns then
    return nil, "resolver_init_failed: " .. (err or "unknown")
  end

  local answers, query_err = dns:query("vms." .. backend_app .. ".internal", {
    qtype = resolver.TYPE_TXT,
  })

  if not answers then
    return nil, "dns_query_failed: " .. (query_err or "unknown")
  end

  if answers.errcode then
    return nil, "dns_server_error: " .. (answers.errstr or answers.errcode)
  end

  local txt_payloads = {}
  for _, answer in ipairs(answers) do
    local payload = flatten_txt_answer(answer)
    if payload and payload ~= "" then
      txt_payloads[#txt_payloads + 1] = payload
    end
  end

  if #txt_payloads == 0 then
    return nil, "empty_txt_response"
  end

  local machine_ids = parse_machine_ids(table.concat(txt_payloads, ","))
  if #machine_ids == 0 then
    return nil, "empty_machine_set"
  end

  return machine_ids
end

local function wait_for_peer_refresh(cache_key)
  for _ = 1, CACHE_LOCK_WAIT_ATTEMPTS do
    ngx.sleep(CACHE_LOCK_WAIT_SECONDS)

    local state = read_cached_state(cache_key)
    if state and type(state.ids) == "table" and #state.ids > 0 then
      return state.ids, "peer-cache"
    end
  end

  return nil, "peer_refresh_timeout"
end

local function load_machine_ids(backend_app, cache_ttl)
  local cache_key = "machines:" .. backend_app
  local lock_key = "lock:" .. backend_app
  local now = ngx.now()

  local cached_state = read_cached_state(cache_key)
  if cached_state and cached_state.expires_at and cached_state.expires_at > now then
    return cached_state.ids, "cache"
  end

  local acquired_lock = CACHE_DICT:add(lock_key, true, CACHE_LOCK_TTL)
  if not acquired_lock then
    if cached_state and type(cached_state.ids) == "table" and #cached_state.ids > 0 then
      return cached_state.ids, "stale-cache"
    end

    return wait_for_peer_refresh(cache_key)
  end

  local machine_ids, err = fetch_machine_ids(backend_app)
  CACHE_DICT:delete(lock_key)

  if machine_ids and #machine_ids > 0 then
    local _, save_err = save_cached_state(cache_key, machine_ids, cache_ttl)
    if save_err then
      log(ngx.WARN, "failed to cache machine list for " .. backend_app .. ": " .. save_err)
    end

    return machine_ids, "dns"
  end

  if cached_state and type(cached_state.ids) == "table" and #cached_state.ids > 0 then
    log(ngx.WARN, "using stale machine list for " .. backend_app .. " after refresh error: " .. (err or "unknown"))
    return cached_state.ids, "stale-cache"
  end

  return nil, err
end

local function pick_machine(backend_app, machine_ids)
  if #machine_ids < 2 then
    return nil
  end

  local counter_key = "rr:" .. backend_app
  local current_index, err = CACHE_DICT:incr(counter_key, 1, 0)
  if not current_index then
    log(ngx.WARN, "round-robin counter failed for " .. backend_app .. ": " .. (err or "unknown"))
    current_index = 0
  end

  local target_index = ((current_index - 1) % #machine_ids) + 1
  return machine_ids[target_index]
end

local function body_size_allows_replay(max_replay_body_bytes)
  local content_length = tonumber(ngx.var.content_length)
  if content_length then
    return content_length <= max_replay_body_bytes, content_length
  end

  local method = ngx.req.get_method()
  if method == "GET" or method == "HEAD" or method == "OPTIONS" or method == "DELETE" then
    return true, 0
  end

  return false, nil
end

function M.run(options)
  options = options or {}

  local backend_app = options.backend_app
  if not backend_app or backend_app == "" then
    return
  end

  local max_replay_body_bytes = options.max_replay_body_bytes or DEFAULT_MAX_REPLAY_BODY_BYTES
  local cache_ttl = options.cache_ttl or DEFAULT_CACHE_TTL

  local can_replay, content_length = body_size_allows_replay(max_replay_body_bytes)
  if not can_replay then
    log(ngx.INFO, "bypassing replay for " .. backend_app .. " because request body is larger than allowed or unknown")
    return
  end

  local machine_ids, source = load_machine_ids(backend_app, cache_ttl)
  if not machine_ids or #machine_ids < 2 then
    if not machine_ids then
      log(ngx.WARN, "bypassing replay for " .. backend_app .. ": " .. (source or "machine_discovery_failed"))
    end
    return
  end

  local target_machine = pick_machine(backend_app, machine_ids)
  if not target_machine then
    return
  end

  ngx.header["Fly-Replay"] = "app=" .. backend_app .. ";prefer_instance=" .. target_machine
  log(
    ngx.INFO,
    "replaying /webhook request to " .. backend_app .. " machine " .. target_machine ..
      " using " .. (source or "unknown") .. " (content_length=" .. tostring(content_length or "n/a") .. ")"
  )

  return ngx.exit(ngx.HTTP_TEMPORARY_REDIRECT)
end

return M

I think that would be the best way, but probably not the only one. These huge batch requests sound like an awkward fit for the shared CPU class, overall, to be honest…

Since you know in advance which endpoints are CPU-intensive, you could use @lillian’s idea from someone else’s thread and create two performance-class Machines in a distinct process group. Then Nginx could unconditionally proxy the CPU-intensive requests to those heftier Machines, which would otherwise be sleeping the rest of the day.

(You would want two since one of them will disappear someday, during a hardware failure.)

The process group would need its own [[services]] section, with its own port, distinct from port 80.

Yes, as @PeterCxy said last time, that is what the Fly Proxy should be doing on its own. I do see a roughly 50/50 split in my own experiments with two Machines over Flycast, although it’s not a strict A-B-A-B-A-B alternation, :proxy_robin:.

Here’s what I used for testing:

$ fly ssh console
# curl -is -H 'flyio-debug: doit' 'http://app-name.flycast/metrics?q=[0-19]'

The significance of /metrics is that it’s harmless (on the server side) and has a small response, and the ?q= part is ignored by the server. That just causes curl to make twenty consecutive requests, via its little-known “globbing” feature. The Machine that produced each response can be seen in the sid field of the flyio-debug response header.

I’d suggest giving that a try yourself, to rule out the possibility that it’s something about your Nginx apparatus confounding things…

1 Like

I’d say in this case you should still use a job worker and poll it from your webserver, so you can respond to the current request “synchronously” and still accept more requests on the same instance. you can also scale up/down workers based on how many requests are pending, so you don’t have to keep expensive machines always running when there is no work to do.

2 Likes

Thank you for the clarifications @lillian @mayailurus

At the moment I think I will give it a try and work with 2 processes groups; the first as the end-user app itself and the second as the "worker“ for the batch with min_machines_running as 0 and auto_stop_machines/auto_start_machines as true

:red_question_mark:One more question I got is: is it possible to reach a specific process/service using the .flycast domain? for instance: prod-xpto.batch.flycast

I will try this toml file later this week:

app = 'prod-xpto'
primary_region = 'gru'
swap_size_mb = 1024

[build]
  build-target = "production"

[processes]
  app = ""
  batch = ""


[[services]]
  processes = ["app"]
  internal_port = 3000
  protocol = "tcp"

  auto_stop_machines = true
  auto_start_machines = true
  min_machines_running = 1

  [services.concurrency]
    type = "connections"
    soft_limit = 25

[[vm]]
  processes = ["app"]
  cpu_kind = "shared"
  cpus = 4
  memory = "1024mb"



[[services]]
  processes = ["batch"]
  internal_port = 3000
  protocol = "tcp"

  auto_stop_machines = true
  auto_start_machines = true
  min_machines_running = 0

  [services.concurrency]
    type = "requests"
    soft_limit = 10

[[vm]]
  processes = ["batch"]
  cpu_kind = "performance"
  cpus = 1
  memory = "2048mb"

Not that I know of, unfortunately; that’s the reason for the note about distinct ports up above.

(The .internal addresses can distinguish process groups, but they don’t provide auto-start.)

This is the right general idea, but you also need at least one [[services.ports]] stanza. (People often overlook that, since it’s separate in the docs.)

E.g., make the app group port = 80, and the batch group port = 8086. Then the latter is visible to clients as http://app-name.flycast:8086/.

1 Like

@mayailurus I did miss that detail in the documentation; thanks for pointing it out :slightly_smiling_face:

This different-ports strategy should work like a charm for my use case.

Once again, thank you very much for all the help :star_struck:

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.