From 8c2a909061f2b31a2c6341d787105cc11b3f311d Mon Sep 17 00:00:00 2001 From: Mac L Date: Thu, 25 Jun 2026 18:19:29 +0400 Subject: [PATCH] Bump `warp` and begin `axum` migration (#9001) - Bump `warp` to 0.4. This unifies `warp` and `axum` onto the same `http`, `hyper`, `h2`, `rustls`, etc versions. - Create `axum_utils` which contain common functions and types - Begins migration of all HTTP API servers from warp to axum Co-Authored-By: Mac L --- Cargo.lock | 404 +++++++------- Cargo.toml | 12 +- Makefile | 2 +- beacon_node/Cargo.toml | 1 + beacon_node/client/src/builder.rs | 4 +- .../src/test_utils/mock_builder.rs | 41 +- .../execution_layer/src/test_utils/mod.rs | 22 +- beacon_node/http_api/Cargo.toml | 3 + .../http_api/src/aggregate_attestation.rs | 7 +- .../src/beacon/execution_payload_bids.rs | 7 +- .../src/beacon/execution_payload_envelopes.rs | 13 +- beacon_node/http_api/src/beacon/states.rs | 11 +- beacon_node/http_api/src/lib.rs | 149 +++-- beacon_node/http_api/src/light_client.rs | 22 +- beacon_node/http_api/src/produce_block.rs | 47 +- beacon_node/http_api/src/test_utils.rs | 5 +- .../validator/execution_payload_envelopes.rs | 25 +- beacon_node/http_api/src/validator/mod.rs | 26 +- beacon_node/http_metrics/Cargo.toml | 5 +- beacon_node/http_metrics/src/lib.rs | 119 ++-- beacon_node/http_metrics/tests/tests.rs | 2 +- beacon_node/src/config.rs | 2 +- beacon_node/src/lib.rs | 1 + common/axum_utils/Cargo.toml | 17 + common/axum_utils/src/cors.rs | 515 ++++++++++++++++++ common/axum_utils/src/lib.rs | 6 + common/axum_utils/src/middleware.rs | 9 + common/axum_utils/src/server/builder.rs | 42 ++ common/axum_utils/src/server/error.rs | 11 + common/axum_utils/src/server/mod.rs | 116 ++++ common/axum_utils/src/tls.rs | 9 + common/task_executor/src/lib.rs | 2 +- common/warp_utils/Cargo.toml | 2 +- common/warp_utils/src/status_code.rs | 13 +- validator_client/http_api/Cargo.toml | 3 + validator_client/http_api/src/lib.rs | 68 ++- validator_client/http_api/src/test_utils.rs | 2 +- validator_client/http_api/src/tests.rs | 4 +- validator_client/http_metrics/Cargo.toml | 5 +- validator_client/http_metrics/src/lib.rs | 120 ++-- validator_client/src/lib.rs | 2 + 41 files changed, 1333 insertions(+), 543 deletions(-) create mode 100644 common/axum_utils/Cargo.toml create mode 100644 common/axum_utils/src/cors.rs create mode 100644 common/axum_utils/src/lib.rs create mode 100644 common/axum_utils/src/middleware.rs create mode 100644 common/axum_utils/src/server/builder.rs create mode 100644 common/axum_utils/src/server/error.rs create mode 100644 common/axum_utils/src/server/mod.rs create mode 100644 common/axum_utils/src/tls.rs diff --git a/Cargo.lock b/Cargo.lock index 5d200768cd..822d2eeec4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,7 +261,7 @@ checksum = "f72cf87cda808e593381fb9f005ffa4d2475552b7a6c5ac33d087bf77d82abd0" dependencies = [ "alloy-primitives", "alloy-sol-types", - "http 1.4.0", + "http", "serde", "serde_json", "thiserror 2.0.17", @@ -447,7 +447,7 @@ dependencies = [ "alloy-rlp", "alloy-serde", "alloy-sol-types", - "itertools 0.13.0", + "itertools 0.14.0", "serde", "serde_json", "serde_with", @@ -1092,7 +1092,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16e2cdb6d5ed835199484bb92bb8b3edd526effe995c61732580439c1a67e2e9" dependencies = [ "base64 0.22.1", - "http 1.4.0", + "http", "log", "url", ] @@ -1121,14 +1121,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -1141,6 +1141,39 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core 0.5.6", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit 0.8.4", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.4.5" @@ -1150,8 +1183,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", "mime", "pin-project-lite", @@ -1161,6 +1194,61 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-server" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ab4a3ec9ea8a657c72d99a03a824af695bd0fb5ec639ccbd9cd3543b41a5f9" +dependencies = [ + "arc-swap", + "bytes", + "fs-err", + "http", + "http-body", + "hyper", + "hyper-util", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + +[[package]] +name = "axum_utils" +version = "0.1.0" +dependencies = [ + "axum 0.8.9", + "axum-server", + "http", + "serde", + "thiserror 2.0.17", + "tokio", + "tower 0.5.2", + "tower-http", +] + [[package]] name = "base-x" version = "0.2.11" @@ -1279,6 +1367,7 @@ name = "beacon_node" version = "8.2.0" dependencies = [ "account_utils", + "axum_utils", "beacon_chain", "bls", "clap", @@ -1292,7 +1381,7 @@ dependencies = [ "genesis", "hex", "http_api", - "hyper 1.8.1", + "hyper", "lighthouse_network", "monitoring_api", "network_utils", @@ -1969,7 +2058,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -3114,7 +3203,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3630,6 +3719,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" +[[package]] +name = "fs-err" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73fde052dbfc920003cfd2c8e2c6e6d4cc7c1091538c3a24226cec0665ab08c0" +dependencies = [ + "autocfg", + "tokio", +] + [[package]] name = "fs2" version = "0.4.3" @@ -3733,7 +3832,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f2f12607f92c69b12ed746fabf9ca4f5c482cba46679c1a75b874ed7c26adb" dependencies = [ "futures-io", - "rustls 0.23.40", + "rustls", "rustls-pki-types", ] @@ -3904,25 +4003,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "h2" -version = "0.3.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap 2.12.1", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "h2" version = "0.4.12" @@ -3934,7 +4014,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.4.0", + "http", "indexmap 2.12.1", "slab", "tokio", @@ -4026,14 +4106,14 @@ dependencies = [ [[package]] name = "headers" -version = "0.3.9" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bytes", "headers-core", - "http 0.2.12", + "http", "httpdate", "mime", "sha1", @@ -4041,11 +4121,11 @@ dependencies = [ [[package]] name = "headers-core" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http 0.2.12", + "http", ] [[package]] @@ -4179,17 +4259,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "http" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.4.0" @@ -4200,17 +4269,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" -dependencies = [ - "bytes", - "http 0.2.12", - "pin-project-lite", -] - [[package]] name = "http-body" version = "1.0.1" @@ -4218,7 +4276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.4.0", + "http", ] [[package]] @@ -4229,8 +4287,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "pin-project-lite", ] @@ -4238,6 +4296,8 @@ dependencies = [ name = "http_api" version = "0.1.0" dependencies = [ + "axum 0.8.9", + "axum_utils", "beacon_chain", "beacon_processor", "bls", @@ -4277,6 +4337,7 @@ dependencies = [ "sysinfo", "system_health", "task_executor", + "thiserror 2.0.17", "tokio", "tokio-stream", "tracing", @@ -4290,6 +4351,8 @@ dependencies = [ name = "http_metrics" version = "0.1.0" dependencies = [ + "axum 0.8.9", + "axum_utils", "beacon_chain", "health_metrics", "lighthouse_network", @@ -4302,11 +4365,10 @@ dependencies = [ "serde", "slot_clock", "store", + "thiserror 2.0.17", "tokio", "tracing", "types", - "warp", - "warp_utils", ] [[package]] @@ -4327,30 +4389,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" -[[package]] -name = "hyper" -version = "0.14.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2 0.3.27", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2 0.5.10", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "hyper" version = "1.8.1" @@ -4361,9 +4399,9 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2 0.4.12", - "http 1.4.0", - "http-body 1.0.1", + "h2", + "http", + "http-body", "httparse", "httpdate", "itoa", @@ -4380,13 +4418,13 @@ version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http 1.4.0", - "hyper 1.8.1", + "http", + "hyper", "hyper-util", - "rustls 0.23.40", + "rustls", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.4", + "tokio-rustls", "tower-service", "webpki-roots", ] @@ -4397,7 +4435,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper 1.8.1", + "hyper", "hyper-util", "pin-project-lite", "tokio", @@ -4415,14 +4453,14 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "hyper 1.8.1", + "http", + "http-body", + "hyper", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.4", "tokio", "tower-service", "tracing", @@ -4619,9 +4657,9 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http 1.4.0", + "http", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "log", "rand 0.9.2", @@ -4639,9 +4677,9 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http 1.4.0", + "http", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "log", "rand 0.10.1", @@ -5335,7 +5373,7 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "ring", - "rustls 0.23.40", + "rustls", "socket2 0.6.4", "thiserror 2.0.17", "tokio", @@ -5401,8 +5439,8 @@ dependencies = [ "libp2p-identity", "rcgen", "ring", - "rustls 0.23.40", - "rustls-webpki 0.103.13", + "rustls", + "rustls-webpki", "thiserror 2.0.17", "x509-parser 0.18.1", "yasna 0.6.0", @@ -5798,6 +5836,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "mdbx-sys" version = "0.11.6-4" @@ -5992,10 +6036,10 @@ dependencies = [ "bytes", "colored", "futures-core", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-util", "log", "pin-project-lite", @@ -6299,7 +6343,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6497,7 +6541,7 @@ checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" dependencies = [ "async-trait", "bytes", - "http 1.4.0", + "http", "opentelemetry", "reqwest", ] @@ -6508,7 +6552,7 @@ version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" dependencies = [ - "http 1.4.0", + "http", "opentelemetry", "opentelemetry-http", "opentelemetry-proto", @@ -7137,7 +7181,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -7150,7 +7194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -7220,8 +7264,8 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls 0.23.40", - "socket2 0.5.10", + "rustls", + "socket2 0.6.4", "thiserror 2.0.17", "tokio", "tracing", @@ -7241,7 +7285,7 @@ dependencies = [ "rand 0.9.2", "ring", "rustc-hash 2.1.1", - "rustls 0.23.40", + "rustls", "rustls-pki-types", "slab", "thiserror 2.0.17", @@ -7259,9 +7303,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.4", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -7550,10 +7594,10 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-rustls", "hyper-util", "js-sys", @@ -7561,14 +7605,14 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.40", + "rustls", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-rustls 0.26.4", + "tokio-rustls", "tokio-util", "tower 0.5.2", "tower-http", @@ -7806,21 +7850,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", -] - -[[package]] -name = "rustls" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" -dependencies = [ - "log", - "ring", - "rustls-pki-types", - "rustls-webpki 0.102.8", - "subtle", - "zeroize", + "windows-sys 0.61.2", ] [[package]] @@ -7833,7 +7863,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.13", + "rustls-webpki", "subtle", "zeroize", ] @@ -7869,17 +7899,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-webpki" -version = "0.102.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" -dependencies = [ - "ring", - "rustls-pki-types", - "untrusted", -] - [[package]] name = "rustls-webpki" version = "0.103.13" @@ -8170,6 +8189,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -8880,7 +8910,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -9120,24 +9150,13 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "tokio-rustls" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" -dependencies = [ - "rustls 0.22.4", - "rustls-pki-types", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls 0.23.40", + "rustls", "tokio", ] @@ -9206,14 +9225,14 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.7.9", "base64 0.22.1", "bytes", - "h2 0.4.12", - "http 1.4.0", - "http-body 1.0.1", + "h2", + "http", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-timeout", "hyper-util", "percent-encoding", @@ -9237,10 +9256,10 @@ dependencies = [ "async-trait", "base64 0.22.1", "bytes", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-timeout", "hyper-util", "percent-encoding", @@ -9248,7 +9267,7 @@ dependencies = [ "prost 0.13.5", "rustls-native-certs", "tokio", - "tokio-rustls 0.26.4", + "tokio-rustls", "tokio-stream", "tower 0.5.2", "tower-layer", @@ -9304,8 +9323,8 @@ dependencies = [ "bitflags 2.10.0", "bytes", "futures-util", - "http 1.4.0", - "http-body 1.0.1", + "http", + "http-body", "iri-string", "pin-project-lite", "tower 0.5.2", @@ -9702,7 +9721,7 @@ dependencies = [ "eth2", "fdlimit", "graffiti_file", - "hyper 1.8.1", + "hyper", "initialized_validators", "lighthouse_validator_store", "metrics", @@ -9745,6 +9764,8 @@ name = "validator_http_api" version = "0.1.0" dependencies = [ "account_utils", + "axum 0.8.9", + "axum_utils", "beacon_node_fallback", "bls", "deposit_contract", @@ -9777,6 +9798,7 @@ dependencies = [ "system_health", "task_executor", "tempfile", + "thiserror 2.0.17", "tokio", "tokio-stream", "tracing", @@ -9795,6 +9817,8 @@ dependencies = [ name = "validator_http_metrics" version = "0.1.0" dependencies = [ + "axum 0.8.9", + "axum_utils", "health_metrics", "lighthouse_validator_store", "lighthouse_version", @@ -9804,12 +9828,11 @@ dependencies = [ "parking_lot", "serde", "slot_clock", + "thiserror 2.0.17", "tracing", "types", "validator_metrics", "validator_services", - "warp", - "warp_utils", ] [[package]] @@ -9960,28 +9983,27 @@ dependencies = [ [[package]] name = "warp" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +version = "0.4.2" +source = "git+https://github.com/macladson/warp?rev=6f5f21beab6a240e59470caaab56afd46d46b709#6f5f21beab6a240e59470caaab56afd46d46b709" dependencies = [ "bytes", - "futures-channel", "futures-util", "headers", - "http 0.2.12", - "hyper 0.14.32", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", "log", "mime", "mime_guess", "percent-encoding", "pin-project", - "rustls-pemfile", "scoped-tls", "serde", "serde_json", "serde_urlencoded", "tokio", - "tokio-rustls 0.25.0", "tokio-util", "tower-service", "tracing", @@ -10244,7 +10266,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f43786efe5..3cbe113d52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "beacon_node/timer", "boot_node", "common/account_utils", + "common/axum_utils", "common/clap_utils", "common/deposit_contract", "common/directory", @@ -106,7 +107,8 @@ alloy-signer-local = { version = "1", default-features = false } anyhow = "1" arbitrary = { version = "1", features = ["derive"] } async-channel = "1.9.0" -axum = "0.7.7" +axum = "0.8" +axum_utils = { path = "common/axum_utils" } beacon_chain = { path = "beacon_node/beacon_chain" } beacon_node = { path = "beacon_node" } beacon_node_fallback = { path = "validator_client/beacon_node_fallback" } @@ -229,9 +231,11 @@ sysinfo = "0.26" system_health = { path = "common/system_health" } task_executor = { path = "common/task_executor" } tempfile = "3" +thiserror = "2" tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal", "macros"] } tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "time"] } +tower = "0.5" tracing = "0.1.40" tracing-appender = "0.2" tracing-core = "0.1" @@ -253,7 +257,7 @@ validator_metrics = { path = "validator_client/validator_metrics" } validator_services = { path = "validator_client/validator_services" } validator_store = { path = "validator_client/validator_store" } validator_test_rig = { path = "testing/validator_test_rig" } -warp = { version = "0.3.7", default-features = false, features = ["tls"] } +warp = { version = "0.4", default-features = false, features = ["server"] } warp_utils = { path = "common/warp_utils" } workspace_members = { path = "common/workspace_members" } xdelta3 = { git = "https://github.com/sigp/xdelta3-rs", rev = "fe3906605c87b6c0515bd7c8fc671f47875e3ccc" } @@ -272,6 +276,10 @@ incremental = false inherits = "release" debug = true +[patch.crates-io] +# Temporary patch until the axum migration is complete +warp = { git = "https://github.com/macladson/warp", rev = "6f5f21beab6a240e59470caaab56afd46d46b709" } + [patch."https://github.com/libp2p/rust-libp2p.git"] libp2p = { git = "https://github.com/sigp/rust-libp2p.git", rev = "3563de5ed20e509885592b391aa816992eef55d4" } libp2p-mplex = { git = "https://github.com/sigp/rust-libp2p.git", rev = "3563de5ed20e509885592b391aa816992eef55d4" } diff --git a/Makefile b/Makefile index 94ad55bf6b..3a84d0fbde 100644 --- a/Makefile +++ b/Makefile @@ -327,7 +327,7 @@ install-audit: cargo install --force cargo-audit audit-CI: - cargo audit --ignore RUSTSEC-2026-0049 --ignore RUSTSEC-2026-0098 --ignore RUSTSEC-2026-0099 --ignore RUSTSEC-2026-0104 --ignore RUSTSEC-2026-0118 --ignore RUSTSEC-2026-0119 + cargo audit # Runs cargo deny (check for banned crates, duplicate versions, and source restrictions) deny: install-deny deny-CI diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index ebefa6a451..7b17a41174 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -16,6 +16,7 @@ testing = [] [dependencies] account_utils = { workspace = true } +axum_utils = { workspace = true } beacon_chain = { workspace = true } bls = { workspace = true } clap = { workspace = true } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 1624f73e9b..003694eb3b 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -615,7 +615,7 @@ where /// If type inference errors are being raised, see the comment on the definition of `Self`. #[allow(clippy::type_complexity)] #[instrument(name = "build_client", skip_all)] - pub fn build( + pub async fn build( mut self, ) -> Result>, String> { let runtime_context = self @@ -647,6 +647,7 @@ where let exit = runtime_context.executor.exit(); let (listen_addr, server) = http_api::serve(ctx, exit) + .await .map_err(|e| format!("Unable to start HTTP API server: {:?}", e))?; let http_api_task = async move { @@ -677,6 +678,7 @@ where let exit = runtime_context.executor.exit(); let (listen_addr, server) = http_metrics::serve(ctx, exit) + .await .map_err(|e| format!("Unable to start HTTP metrics server: {:?}", e))?; runtime_context diff --git a/beacon_node/execution_layer/src/test_utils/mock_builder.rs b/beacon_node/execution_layer/src/test_utils/mock_builder.rs index d456c9adc1..ee4f29a653 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_builder.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_builder.rs @@ -38,8 +38,11 @@ use types::{ ExecutionPayloadHeaderRefMut, ExecutionRequests, ForkName, ForkVersionDecode, Hash256, SignedBlindedBeaconBlock, SignedRoot, SignedValidatorRegistrationData, Slot, Uint256, }; -use warp::reply::{self, Reply}; -use warp::{Filter, Rejection}; +use warp::{ + Filter, Rejection, + http::StatusCode, + reply::{self, Reply}, +}; pub const DEFAULT_FEE_RECIPIENT: Address = Address::repeat_byte(42); pub const DEFAULT_GAS_LIMIT: u64 = 60_000_000; @@ -1063,11 +1066,10 @@ pub fn serve( .unwrap(), ) } else { - Ok(warp::http::Response::builder() - .status(202) - .body(&[] as &'static [u8]) - .map(|res| add_consensus_version_header(res, fork_name)) - .unwrap()) + Ok(add_consensus_version_header( + StatusCode::ACCEPTED.into_response(), + fork_name, + )) } }, ); @@ -1114,11 +1116,10 @@ pub fn serve( .unwrap(), ) } else { - Ok(warp::http::Response::builder() - .status(202) - .body("".to_string()) - .map(|res| add_consensus_version_header(res, fork_name)) - .unwrap()) + Ok(add_consensus_version_header( + StatusCode::ACCEPTED.into_response(), + fork_name, + )) } }, ); @@ -1184,9 +1185,21 @@ pub fn serve( .or(warp::get().and(status).or(header)) .map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-mock-builder-server")); - let (listening_socket, server) = warp::serve(routes) - .try_bind_ephemeral(SocketAddrV4::new(listen_addr, listen_port)) + // Use a `std::net::TcpListener` here which keeps the parent `serve` function from needing to be async. + // Once the mock_builder server has been migrated to Axum, we can use the tokio listener directly + // since we will require async anyway. + let std_listener = std::net::TcpListener::bind(SocketAddrV4::new(listen_addr, listen_port)) .expect("mock builder server should start"); + std_listener + .set_nonblocking(true) + .expect("mock builder server should set nonblocking"); + let listener = tokio::net::TcpListener::from_std(std_listener) + .expect("mock builder server should convert to tokio listener"); + let listening_socket = listener + .local_addr() + .expect("mock builder server should have a local address"); + + let server = warp::serve(routes).incoming(listener).run(); Ok((listening_socket, server)) } diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 570008b62a..9317e0f365 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -495,6 +495,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::Other(e.to_string()) + } +} + #[derive(Debug)] struct MissingIdField; @@ -725,12 +731,18 @@ pub fn serve( // Add a `Server` header. .map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-mock-execution-client")); - let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( - SocketAddrV4::new(config.listen_addr, config.listen_port), - async { + let std_listener = + std::net::TcpListener::bind(SocketAddrV4::new(config.listen_addr, config.listen_port))?; + std_listener.set_nonblocking(true)?; + let listener = tokio::net::TcpListener::from_std(std_listener)?; + let listening_socket = listener.local_addr()?; + + let server = warp::serve(routes) + .incoming(listener) + .graceful(async { shutdown.await; - }, - )?; + }) + .run(); info!( listen_address = listening_socket.to_string(), diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index fb01f655d9..0aae578a7b 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -6,6 +6,8 @@ edition = { workspace = true } autotests = false # using a single test binary compiles faster [dependencies] +axum = { workspace = true } +axum_utils = { workspace = true } beacon_chain = { workspace = true } beacon_processor = { workspace = true } bls = { workspace = true } @@ -44,6 +46,7 @@ store = { workspace = true } sysinfo = { workspace = true } system_health = { path = "../../common/system_health" } task_executor = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } diff --git a/beacon_node/http_api/src/aggregate_attestation.rs b/beacon_node/http_api/src/aggregate_attestation.rs index 183d29df22..93b61c4e39 100644 --- a/beacon_node/http_api/src/aggregate_attestation.rs +++ b/beacon_node/http_api/src/aggregate_attestation.rs @@ -6,10 +6,7 @@ use eth2::types::{self, EndpointVersion, Hash256, Slot}; use std::sync::Arc; use types::beacon_response::EmptyMetadata; use types::{CommitteeIndex, ForkVersionedResponse}; -use warp::{ - hyper::{Body, Response}, - reply::Reply, -}; +use warp::reply::{Reply, Response}; pub fn get_aggregate_attestation( slot: Slot, @@ -17,7 +14,7 @@ pub fn get_aggregate_attestation( committee_index: Option, endpoint_version: EndpointVersion, chain: Arc>, -) -> Result, warp::reject::Rejection> { +) -> Result { let fork_name = chain.spec.fork_name_at_slot::(slot); let aggregate_attestation = if fork_name.electra_enabled() { let Some(committee_index) = committee_index else { diff --git a/beacon_node/http_api/src/beacon/execution_payload_bids.rs b/beacon_node/http_api/src/beacon/execution_payload_bids.rs index 856670aa94..de469453a2 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_bids.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_bids.rs @@ -12,7 +12,10 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, warn}; use types::SignedExecutionPayloadBid; -use warp::{Filter, Rejection, Reply, hyper::Body, hyper::Response}; +use warp::{ + Filter, Rejection, + reply::{Reply, Response}, +}; // POST /eth/v1/beacon/execution_payload_bids (SSZ) pub(crate) fn post_beacon_execution_payload_bids_ssz( @@ -78,7 +81,7 @@ pub fn publish_execution_payload_bid( bid: SignedExecutionPayloadBid, chain: &Arc>, network_tx: &UnboundedSender>, -) -> Result, Rejection> { +) -> Result { let slot = bid.slot(); let builder_index = bid.message.builder_index; diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs b/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs index 456d371c71..bad05ec919 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs @@ -22,8 +22,9 @@ use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error, info, warn}; use types::{BlockImportSource, EthSpec, SignedExecutionPayloadEnvelope}; use warp::{ - Filter, Rejection, Reply, - hyper::{Body, Response}, + Filter, Rejection, + http::response::Builder, + reply::{Reply, Response}, }; // POST beacon/execution_payload_envelopes (SSZ) @@ -93,7 +94,7 @@ pub async fn publish_execution_payload_envelope( envelope: SignedExecutionPayloadEnvelope, chain: Arc>, network_tx: &UnboundedSender>, -) -> Result, Rejection> { +) -> Result { let slot = envelope.slot(); let beacon_block_root = envelope.message.beacon_block_root; @@ -345,10 +346,10 @@ pub(crate) fn get_beacon_execution_payload_envelopes( let fork_name = chain.spec.fork_name_at_slot::(envelope.slot()); match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(envelope.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(envelope.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", diff --git a/beacon_node/http_api/src/beacon/states.rs b/beacon_node/http_api/src/beacon/states.rs index 1b765aa227..0aabf0ccfe 100644 --- a/beacon_node/http_api/src/beacon/states.rs +++ b/beacon_node/http_api/src/beacon/states.rs @@ -18,10 +18,7 @@ use types::{ AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch, RelativeEpochError, }; -use warp::filters::BoxedFilter; -use warp::http::Response; -use warp::hyper::Body; -use warp::{Filter, Reply}; +use warp::{Filter, Reply, filters::BoxedFilter, http::response::Builder}; use warp_utils::query::multi_key_query; type BeaconStatesPath = BoxedFilter<( @@ -205,10 +202,10 @@ pub fn get_beacon_state_proposer_lookahead( )?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(data.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(data.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 8e31e88ff8..ffd987178a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -51,6 +51,9 @@ use crate::utils::{AnyVersionFilter, EthV1Filter}; use crate::validator::post_validator_liveness_epoch; use crate::validator::*; use crate::version::beacon_response; +use axum::Router; +use axum_utils::server::Server; +use axum_utils::tls::TlsConfig; use beacon::states; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; use beacon_processor::BeaconProcessorSend; @@ -85,7 +88,6 @@ pub use state_id::StateId; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; -use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use sysinfo::{System, SystemExt}; @@ -107,27 +109,14 @@ use version::{ execution_optimistic_finalized_beacon_response, inconsistent_fork_rejection, unsupported_version_rejection, }; -use warp::Reply; -use warp::hyper::Body; -use warp::sse::Event; -use warp::{Filter, Rejection, http::Response}; +use warp::{Filter, Rejection, Reply, http::response::Builder, reply::Response, sse::Event}; use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter}; const API_PREFIX: &str = "eth"; -/// A custom type which allows for both unsecured and TLS-enabled HTTP servers. -type HttpServer = (SocketAddr, Pin + Send>>); - /// Alias for readability. pub type ExecutionOptimistic = bool; -/// Configuration used when serving the HTTP server over TLS. -#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] -pub struct TlsConfig { - pub cert: PathBuf, - pub key: PathBuf, -} - /// A wrapper around all the items required to spawn the HTTP server. /// /// The server will gracefully handle the case where any fields are `None`. @@ -176,18 +165,18 @@ impl Default for Config { } } -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum Error { - Warp(warp::Error), + #[error("Builder error: {0}")] + Builder(#[from] axum_utils::server::BuilderError), + #[error("Server error: {0}")] + Server(#[from] axum_utils::server::ServerError), + #[error("Warp error: {0}")] + Warp(#[from] warp::Error), + #[error("{0}")] Other(String), } -impl From for Error { - fn from(e: warp::Error) -> Self { - Error::Warp(e) - } -} - impl From for Error { fn from(e: String) -> Self { Error::Other(e) @@ -339,10 +328,10 @@ pub fn tracing_logging() -> warp::filters::log::Log( +pub async fn serve( ctx: Arc>, shutdown: impl Future + Send + Sync + 'static, -) -> Result { +) -> Result<(SocketAddr, impl Future), Error> { let config = ctx.config.clone(); // Configure CORS. @@ -1175,10 +1164,10 @@ pub fn serve( }; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(block.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(block.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -1311,10 +1300,10 @@ pub fn serve( .map_err(inconsistent_fork_rejection)?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(block.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(block.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -1367,10 +1356,10 @@ pub fn serve( .map_err(inconsistent_fork_rejection)?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(blob_sidecar_list_filtered.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(blob_sidecar_list_filtered.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -1416,10 +1405,10 @@ pub fn serve( block_id.get_blobs_by_versioned_hashes(versioned_hashes, &chain)?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(response.data.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(response.data.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -1641,10 +1630,10 @@ pub fn serve( get_next_withdrawals::(&chain, state, state_id, proposal_slot)?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(withdrawals.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(withdrawals.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -1727,10 +1716,10 @@ pub fn serve( .spec .fork_name_at_slot::(update.get_slot()); match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(update.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(update.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -1775,10 +1764,10 @@ pub fn serve( .spec .fork_name_at_slot::(update.signature_slot()); match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(update.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(update.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -2000,10 +1989,10 @@ pub fn serve( block_id.get_data_columns(indices, &chain)?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(data_columns.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(data_columns.as_ssz_bytes()) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -2066,13 +2055,11 @@ pub fn serve( "HTTP state load" ); - Response::builder() + Builder::new() .status(200) - .body(response_bytes.into()) - .map(|res: Response| add_ssz_content_type_header(res)) - .map(|resp: warp::reply::Response| { - add_consensus_version_header(resp, fork_name) - }) + .body(response_bytes) + .map(add_ssz_content_type_header) + .map(|resp: Response| add_consensus_version_header(resp, fork_name)) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -3521,34 +3508,40 @@ pub fn serve( .with(cors_builder.build()) .boxed(); - let http_socket: SocketAddr = SocketAddr::new(config.listen_addr, config.listen_port); - let http_server: HttpServer = match config.tls_config { - Some(tls_config) => { - let (socket, server) = warp::serve(routes) - .tls() - .cert_path(tls_config.cert) - .key_path(tls_config.key) - .try_bind_with_graceful_shutdown(http_socket, async { - shutdown.await; - })?; + let axum_router = Router::new().fallback_service(warp::service(routes)); - info!("HTTP API is being served over TLS"); + let address = SocketAddr::new(config.listen_addr, config.listen_port); - (socket, Box::pin(server)) - } - None => { - let (socket, server) = - warp::serve(routes).try_bind_with_graceful_shutdown(http_socket, async { - shutdown.await; - })?; - (socket, Box::pin(server)) - } - }; + let mut server_builder = Server::builder(axum_router, address); + + let tls_enabled = config.tls_config.is_some(); + if let Some(tls_config) = config.tls_config { + server_builder = server_builder.with_tls(tls_config); + } + + let server = server_builder.build().await?; + + let (address, server) = server.serve_with_shutdown(shutdown).await?; + + if tls_enabled { + info!("HTTP API is being served over TLS"); + } info!( - listen_address = %http_server.0, + listen_address = %address, "HTTP API started" ); - Ok(http_server) + let server_future = async move { + match server.await { + Ok(()) => { + info!("HTTP API server stopped"); + } + Err(e) => { + tracing::error!(error = ?e, "HTTP API server error"); + } + } + }; + + Ok((address, server_future)) } diff --git a/beacon_node/http_api/src/light_client.rs b/beacon_node/http_api/src/light_client.rs index 86eef03218..336f18197e 100644 --- a/beacon_node/http_api/src/light_client.rs +++ b/beacon_node/http_api/src/light_client.rs @@ -13,8 +13,8 @@ use std::sync::Arc; use types::{EthSpec, ForkName, Hash256, LightClientBootstrap}; use warp::{ Rejection, - hyper::{Body, Response}, - reply::Reply, + http::response::Builder, + reply::{Reply, Response}, }; const MAX_REQUEST_LIGHT_CLIENT_UPDATES: u64 = 128; @@ -23,7 +23,7 @@ pub fn get_light_client_updates( chain: Arc>, query: LightClientUpdatesQuery, accept_header: Option, -) -> Result, Rejection> { +) -> Result { validate_light_client_updates_request(&chain, &query)?; let light_client_updates = chain @@ -34,17 +34,17 @@ pub fn get_light_client_updates( match accept_header { Some(api_types::Accept::Ssz) => { - let response_chunks = light_client_updates + let response_chunks: Vec = light_client_updates .into_iter() .flat_map(|update| { map_light_client_update_to_response_chunk::(&chain, update).as_ssz_bytes() }) .collect(); - Response::builder() + Builder::new() .status(200) .body(response_chunks) - .map(|res: Response>| add_ssz_content_type_header(res)) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -66,7 +66,7 @@ pub fn get_light_client_bootstrap( chain: Arc>, block_root: &Hash256, accept_header: Option, -) -> Result, Rejection> { +) -> Result { let (light_client_bootstrap, fork_name) = chain .get_light_client_bootstrap(block_root) .map_err(|err| { @@ -83,11 +83,11 @@ pub fn get_light_client_bootstrap( ))?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(light_client_bootstrap.as_ssz_bytes().into()) - .map(|res: Response| add_consensus_version_header(res, fork_name)) - .map(|res: Response| add_ssz_content_type_header(res)) + .body(light_client_bootstrap.as_ssz_bytes()) + .map(|res| add_consensus_version_header(res, fork_name)) + .map(add_ssz_content_type_header) .map_err(|e| { warp_utils::reject::custom_server_error(format!("failed to create response: {}", e)) }), diff --git a/beacon_node/http_api/src/produce_block.rs b/beacon_node/http_api/src/produce_block.rs index ed1ecb9456..f2514e6c31 100644 --- a/beacon_node/http_api/src/produce_block.rs +++ b/beacon_node/http_api/src/produce_block.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use tracing::instrument; use types::{execution::BlockProductionVersion, *}; use warp::{ - Reply, - hyper::{Body, Response}, + http::response::Builder, + reply::{Reply, Response}, }; /// If default boost factor is provided in validator/blocks v3 request, we will skip the calculation @@ -54,7 +54,7 @@ pub async fn produce_block_v4( chain: Arc>, slot: Slot, query: api_types::ValidatorBlocksQuery, -) -> Result, warp::Rejection> { +) -> Result { let randao_reveal = query.randao_reveal.decompress().map_err(|e| { warp_utils::reject::custom_bad_request(format!( "randao reveal is not a valid BLS signature: {:?}", @@ -106,7 +106,7 @@ pub async fn produce_block_v3( chain: Arc>, slot: Slot, query: api_types::ValidatorBlocksQuery, -) -> Result, warp::Rejection> { +) -> Result { let randao_reveal = query.randao_reveal.decompress().map_err(|e| { warp_utils::reject::custom_bad_request(format!( "randao reveal is not a valid BLS signature: {:?}", @@ -146,7 +146,7 @@ pub fn build_response_v4( execution_payload_included: bool, accept_header: Option, spec: &ChainSpec, -) -> Result, warp::Rejection> { +) -> Result { let fork_name = block .to_ref() .fork_name(spec) @@ -161,11 +161,11 @@ pub fn build_response_v4( }; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(block.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) - .map(|res: Response| add_consensus_version_header(res, fork_name)) + .body(block.as_ssz_bytes()) + .map(add_ssz_content_type_header) + .map(|res| add_consensus_version_header(res, fork_name)) .map(|res| add_consensus_block_value_header(res, consensus_block_value_wei)) .map(|res| add_execution_payload_included_header(res, execution_payload_included)) .map_err(|e| -> warp::Rejection { @@ -187,7 +187,7 @@ pub fn build_response_v3( chain: Arc>, block_response: BeaconBlockResponseWrapper, accept_header: Option, -) -> Result, warp::Rejection> { +) -> Result { let fork_name = block_response .fork_name(&chain.spec) .map_err(inconsistent_fork_rejection)?; @@ -205,15 +205,13 @@ pub fn build_response_v3( let block_contents = build_block_contents::build_block_contents(fork_name, block_response)?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(block_contents.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) - .map(|res: Response| add_consensus_version_header(res, fork_name)) + .body(block_contents.as_ssz_bytes()) + .map(add_ssz_content_type_header) + .map(|res| add_consensus_version_header(res, fork_name)) .map(|res| add_execution_payload_blinded_header(res, execution_payload_blinded)) - .map(|res: Response| { - add_execution_payload_value_header(res, execution_payload_value) - }) + .map(|res| add_execution_payload_value_header(res, execution_payload_value)) .map(|res| add_consensus_block_value_header(res, consensus_block_value)) .map_err(|e| -> warp::Rejection { warp_utils::reject::custom_server_error(format!("failed to create response: {}", e)) @@ -224,7 +222,6 @@ pub fn build_response_v3( data: block_contents, }) .into_response()) - .map(|res| res.into_response()) .map(|res| add_consensus_version_header(res, fork_name)) .map(|res| add_execution_payload_blinded_header(res, execution_payload_blinded)) .map(|res| add_execution_payload_value_header(res, execution_payload_value)) @@ -237,7 +234,7 @@ pub async fn produce_blinded_block_v2( chain: Arc>, slot: Slot, query: api_types::ValidatorBlocksQuery, -) -> Result, warp::Rejection> { +) -> Result { let randao_reveal = query.randao_reveal.decompress().map_err(|e| { warp_utils::reject::custom_bad_request(format!( "randao reveal is not a valid BLS signature: {:?}", @@ -273,7 +270,7 @@ pub async fn produce_block_v2( chain: Arc>, slot: Slot, query: api_types::ValidatorBlocksQuery, -) -> Result, warp::Rejection> { +) -> Result { let randao_reveal = query.randao_reveal.decompress().map_err(|e| { warp_utils::reject::custom_bad_request(format!( "randao reveal is not a valid BLS signature: {:?}", @@ -303,7 +300,7 @@ pub fn build_response_v2( chain: Arc>, block_response: BeaconBlockResponseWrapper, accept_header: Option, -) -> Result, warp::Rejection> { +) -> Result { let fork_name = block_response .fork_name(&chain.spec) .map_err(inconsistent_fork_rejection)?; @@ -311,11 +308,11 @@ pub fn build_response_v2( let block_contents = build_block_contents::build_block_contents(fork_name, block_response)?; match accept_header { - Some(api_types::Accept::Ssz) => Response::builder() + Some(api_types::Accept::Ssz) => Builder::new() .status(200) - .body(block_contents.as_ssz_bytes().into()) - .map(|res: Response| add_ssz_content_type_header(res)) - .map(|res: Response| add_consensus_version_header(res, fork_name)) + .body(block_contents.as_ssz_bytes()) + .map(add_ssz_content_type_header) + .map(|res| add_consensus_version_header(res, fork_name)) .map_err(|e| { warp_utils::reject::custom_server_error(format!("failed to create response: {}", e)) }), diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 9a705e4162..f525df6cde 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -298,8 +298,9 @@ pub async fn create_api_server_with_config( )), }); - let (listening_socket, server) = - crate::serve(ctx.clone(), test_runtime.task_executor.exit()).unwrap(); + let (listening_socket, server) = crate::serve(ctx.clone(), test_runtime.task_executor.exit()) + .await + .unwrap(); ApiServer { ctx, diff --git a/beacon_node/http_api/src/validator/execution_payload_envelopes.rs b/beacon_node/http_api/src/validator/execution_payload_envelopes.rs index 3a20b37c9b..e5c9cbe0db 100644 --- a/beacon_node/http_api/src/validator/execution_payload_envelopes.rs +++ b/beacon_node/http_api/src/validator/execution_payload_envelopes.rs @@ -9,8 +9,7 @@ use ssz::Encode; use std::sync::Arc; use tracing::debug; use types::Slot; -use warp::http::Response; -use warp::{Filter, Rejection}; +use warp::{Filter, Rejection, http::response::Builder, reply::Reply}; // GET validator/execution_payload_envelopes/{slot} pub fn get_validator_execution_payload_envelopes( @@ -58,11 +57,12 @@ pub fn get_validator_execution_payload_envelopes( let fork_name = chain.spec.fork_name_at_slot::(slot); match accept_header { - Some(Accept::Ssz) => Response::builder() + Some(Accept::Ssz) => Builder::new() .status(200) .header("Content-Type", "application/octet-stream") .header("Eth-Consensus-Version", fork_name.to_string()) - .body(envelope.as_ssz_bytes().into()) + .body(envelope.as_ssz_bytes()) + .map(|res| res.into_response()) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "Failed to build SSZ response: {e}" @@ -74,19 +74,16 @@ pub fn get_validator_execution_payload_envelopes( metadata: EmptyMetadata {}, data: envelope, }; - Response::builder() + Builder::new() .status(200) .header("Content-Type", "application/json") .header("Eth-Consensus-Version", fork_name.to_string()) - .body( - serde_json::to_string(&json_response) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "Failed to serialize response: {e}" - )) - })? - .into(), - ) + .body(serde_json::to_string(&json_response).map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to serialize response: {e}" + )) + })?) + .map(|res| res.into_response()) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "Failed to build JSON response: {e}" diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index 7f3d1cd721..1488f13898 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -33,7 +33,7 @@ use types::{ SignedContributionAndProof, SignedProposerPreferences, SignedValidatorRegistrationData, Slot, SyncContributionData, ValidatorSubscription, }; -use warp::{Filter, Rejection, Reply}; +use warp::{Filter, Rejection, Reply, http::response::Builder}; use warp_utils::reject::convert_rejection; pub mod execution_payload_envelopes; @@ -299,7 +299,6 @@ pub fn get_validator_payload_attestation_data( ) -> ResponseFilter { use eth2::beacon_response::{EmptyMetadata, ForkVersionedResponse}; use ssz::Encode; - use warp::http::Response; eth_v1 .and(warp::path("validator")) @@ -351,12 +350,12 @@ pub fn get_validator_payload_attestation_data( })?; match accept_header { - Some(Accept::Ssz) => Response::builder() + Some(Accept::Ssz) => Builder::new() .status(200) .header("Content-Type", "application/octet-stream") .header("Eth-Consensus-Version", fork_name.to_string()) - .body(payload_attestation_data.as_ssz_bytes().into()) - .map(|res: Response| res) + .body(payload_attestation_data.as_ssz_bytes()) + .map(|res| res.into_response()) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "Failed to build SSZ response: {e}" @@ -368,19 +367,16 @@ pub fn get_validator_payload_attestation_data( metadata: EmptyMetadata {}, data: payload_attestation_data, }; - Response::builder() + Builder::new() .status(200) .header("Content-Type", "application/json") .header("Eth-Consensus-Version", fork_name.to_string()) - .body( - serde_json::to_string(&json_response) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "Failed to serialize response: {e}" - )) - })? - .into(), - ) + .body(serde_json::to_string(&json_response).map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to serialize response: {e}" + )) + })?) + .map(|res| res.into_response()) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "Failed to build JSON response: {e}" diff --git a/beacon_node/http_metrics/Cargo.toml b/beacon_node/http_metrics/Cargo.toml index b74c04a4cb..ead93e67df 100644 --- a/beacon_node/http_metrics/Cargo.toml +++ b/beacon_node/http_metrics/Cargo.toml @@ -6,6 +6,8 @@ edition = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +axum = { workspace = true } +axum_utils = { workspace = true } beacon_chain = { workspace = true } health_metrics = { workspace = true } lighthouse_network = { workspace = true } @@ -17,9 +19,8 @@ network_utils = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } store = { workspace = true } +thiserror = { workspace = true } tracing = { workspace = true } -warp = { workspace = true } -warp_utils = { workspace = true } [dev-dependencies] logging = { workspace = true } diff --git a/beacon_node/http_metrics/src/lib.rs b/beacon_node/http_metrics/src/lib.rs index cfa55b54eb..58acf3e0f7 100644 --- a/beacon_node/http_metrics/src/lib.rs +++ b/beacon_node/http_metrics/src/lib.rs @@ -3,30 +3,36 @@ //! For other endpoints, see the `http_api` crate. mod metrics; +use axum::{ + Router, + extract::State, + http::{Method, StatusCode, header}, + response::IntoResponse, + routing::get, +}; +use axum_utils::{Server, cors::build_cors_layer, middleware::add_server_header}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::prometheus_client::registry::Registry; -use lighthouse_version::version_with_platform; use logging::crit; use serde::{Deserialize, Serialize}; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; -use tracing::info; -use warp::{Filter, http::Response}; +use tracing::{error, info}; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum Error { - Warp(warp::Error), + #[error("Builder error: {0}")] + Builder(#[from] axum_utils::server::BuilderError), + #[error("Server error: {0}")] + Server(#[from] axum_utils::server::ServerError), + #[error("CORS error: {0}")] + Cors(#[from] axum_utils::cors::CorsError), + #[error("{0}")] Other(String), } -impl From for Error { - fn from(e: warp::Error) -> Self { - Error::Warp(e) - } -} - impl From for Error { fn from(e: String) -> Self { Error::Other(e) @@ -81,25 +87,12 @@ impl Default for Config { /// /// Returns an error if the server is unable to bind or there is another error during /// configuration. -pub fn serve( +pub async fn serve( ctx: Arc>, shutdown: impl Future + Send + Sync + 'static, ) -> Result<(SocketAddr, impl Future), Error> { let config = &ctx.config; - // Configure CORS. - let cors_builder = { - let builder = warp::cors() - .allow_method("GET") - .allow_headers(vec!["Content-Type"]); - - warp_utils::cors::set_builder_origins( - builder, - config.allow_origin.as_deref(), - (config.listen_addr, config.listen_port), - )? - }; - // Sanity check. if !config.enabled { crit!("Cannot start disabled metrics HTTP server"); @@ -108,44 +101,52 @@ pub fn serve( )); } - let inner_ctx = ctx.clone(); - let routes = warp::get() - .and(warp::path("metrics")) - .map(move || inner_ctx.clone()) - .and_then(|ctx: Arc>| async move { - Ok::<_, warp::Rejection>( - metrics::gather_prometheus_metrics(&ctx) - .map(|body| { - Response::builder() - .status(200) - .header("Content-Type", "text/plain") - .body(body) - .unwrap() - }) - .unwrap_or_else(|e| { - Response::builder() - .status(500) - .header("Content-Type", "text/plain") - .body(format!("Unable to gather metrics: {:?}", e)) - .unwrap() - }), - ) - }) - // Add a `Server` header. - .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) - .with(cors_builder.build()); + let cors_layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + )? + .allow_methods([Method::GET]) + .allow_headers([header::CONTENT_TYPE]); - let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( - SocketAddr::new(config.listen_addr, config.listen_port), - async { - shutdown.await; - }, - )?; + let server_header: header::HeaderValue = lighthouse_version::version_with_platform() + .parse() + .map_err(|e| Error::Other(format!("invalid version header value: {e}")))?; + + let router = Router::new() + .route("/metrics", get(metrics_handler::)) + .with_state(ctx.clone()) + .layer(add_server_header(server_header)) + .layer(cors_layer); + + let address = SocketAddr::new(config.listen_addr, config.listen_port); + let server = Server::builder(router, address).build().await?; + + let (address, server) = server.serve_with_shutdown(shutdown).await?; info!( - listen_address = listening_socket.to_string(), + listen_address = %address, "Metrics HTTP server started" ); - Ok((listening_socket, server)) + let server_future = async move { + if let Err(e) = server.await { + error!(error = ?e, "Metrics HTTP server error"); + } + }; + + Ok((address, server_future)) +} + +async fn metrics_handler( + State(ctx): State>>, +) -> impl IntoResponse { + match metrics::gather_prometheus_metrics(&ctx) { + Ok(body) => (StatusCode::OK, [(header::CONTENT_TYPE, "text/plain")], body), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + [(header::CONTENT_TYPE, "text/plain")], + format!("Unable to gather metrics: {:?}", e), + ), + } } diff --git a/beacon_node/http_metrics/tests/tests.rs b/beacon_node/http_metrics/tests/tests.rs index 2ce21a62b3..8287ed340e 100644 --- a/beacon_node/http_metrics/tests/tests.rs +++ b/beacon_node/http_metrics/tests/tests.rs @@ -34,7 +34,7 @@ async fn returns_200_ok() { // It's not really interesting why this triggered, just that it happened. let _ = shutdown_rx.await; }; - let (listening_socket, server) = http_metrics::serve(ctx, server_shutdown).unwrap(); + let (listening_socket, server) = http_metrics::serve(ctx, server_shutdown).await.unwrap(); tokio::spawn(server); diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index d27909bddf..eea3976e0f 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1,4 +1,5 @@ use account_utils::{STDIN_INPUTS_FLAG, read_input_from_user}; +use axum_utils::tls::TlsConfig; use beacon_chain::chain_config::{ DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR, INVALID_HOLESKY_BLOCK_ROOT, }; @@ -12,7 +13,6 @@ use client::{ClientConfig, ClientGenesis}; use directory::{DEFAULT_BEACON_NODE_DIR, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR}; use environment::RuntimeContext; use execution_layer::DEFAULT_JWT_FILE; -use http_api::TlsConfig; use lighthouse_network::{Enr, Multiaddr, NetworkConfig, PeerIdSerialized}; use network_utils::listen_addr::ListenAddress; use sensitive_url::SensitiveUrl; diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 6400427f8c..3e515b4c31 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -139,6 +139,7 @@ impl ProductionBeaconNode { .notifier()? .http_metrics_config(client_config.http_metrics.clone()) .build() + .await .map(Self) } diff --git a/common/axum_utils/Cargo.toml b/common/axum_utils/Cargo.toml new file mode 100644 index 0000000000..d9fc13db75 --- /dev/null +++ b/common/axum_utils/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "axum_utils" +version = "0.1.0" +authors = ["Sigma Prime "] +edition = { workspace = true } + +[dependencies] +axum = { workspace = true } +axum-server = { version = "0.7", features = ["tls-rustls-no-provider"] } +http = "1" +serde = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tower-http = { version = "0.6", features = ["cors", "set-header"] } + +[dev-dependencies] +tower = { workspace = true } diff --git a/common/axum_utils/src/cors.rs b/common/axum_utils/src/cors.rs new file mode 100644 index 0000000000..ba748498f1 --- /dev/null +++ b/common/axum_utils/src/cors.rs @@ -0,0 +1,515 @@ +use serde::{Deserialize, Serialize}; +use std::net::IpAddr; +use std::str::FromStr; +use tower_http::cors::{AllowOrigin, CorsLayer}; + +/// Errors that can occur during CORS configuration +#[derive(Debug, thiserror::Error)] +pub enum CorsError { + #[error("Invalid CORS origin '{origin}': {reason}")] + InvalidOrigin { origin: String, reason: String }, + + #[error("CORS origins string cannot be empty")] + EmptyOriginsString, +} + +/// A validated CORS origin +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Origin { + /// Allow any origin (*). + Any, + /// A specific origin URL. + Exact(http::HeaderValue), +} + +impl FromStr for Origin { + type Err = CorsError; + + fn from_str(s: &str) -> Result { + let trimmed = s.trim(); + + if trimmed == "*" { + return Ok(Origin::Any); + } + + validate_origin(trimmed)?; + + let header_value = + http::HeaderValue::from_str(trimmed).map_err(|e| CorsError::InvalidOrigin { + origin: trimmed.to_string(), + reason: format!("invalid header value: {}", e), + })?; + + Ok(Origin::Exact(header_value)) + } +} + +/// Validate a CORS origin string +fn validate_origin(s: &str) -> Result<(), CorsError> { + let make_error = |reason: &str| CorsError::InvalidOrigin { + origin: s.to_string(), + reason: reason.to_string(), + }; + + if !s.contains("://") { + return Err(make_error("missing scheme (http:// or https://)")); + } + + let (scheme, rest) = s + .split_once("://") + .ok_or_else(|| make_error("failed to parse scheme"))?; + + if !matches!(scheme, "http" | "https") { + return Err(make_error(&format!( + "invalid scheme '{}' (only http and https are allowed)", + scheme + ))); + } + + if rest.is_empty() { + return Err(make_error("missing host")); + } + + let host = rest + .split(':') + .next() + .ok_or_else(|| make_error("failed to extract host"))?; + + if host.is_empty() { + return Err(make_error("empty host")); + } + + Ok(()) +} + +/// Configuration for CORS. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CorsConfig { + /// Comma-separated list of allowed origins, or "*" for any origin. + pub allowed_origins: String, +} + +impl CorsConfig { + /// Create a new CORS config from an origins string. + pub fn new(allowed_origins: impl Into) -> Self { + Self { + allowed_origins: allowed_origins.into(), + } + } + + /// Parse and validate the origins string, returning a list of validated origins. + pub fn parse_origins(&self) -> Result, CorsError> { + let trimmed = self.allowed_origins.trim(); + + if trimmed.is_empty() { + return Err(CorsError::EmptyOriginsString); + } + + let origins: Vec = trimmed + .split(',') + .map(|s| s.trim().parse::()) + .collect::, _>>()?; + + Ok(origins) + } + + /// Convert this config into a Tower-compatible CorsLayer. + pub fn into_layer(self) -> Result { + let origins = self.parse_origins()?; + + // If any origin is the wildcard `*`, allow all origins, even when the + // list also contains explicit origins. + if origins.iter().any(|o| matches!(o, Origin::Any)) { + return Ok(CorsLayer::new().allow_origin(tower_http::cors::Any)); + } + + let header_values: Vec = origins + .into_iter() + .filter_map(|o| match o { + Origin::Exact(hv) => Some(hv), + Origin::Any => None, + }) + .collect(); + + Ok(CorsLayer::new().allow_origin(AllowOrigin::list(header_values))) + } +} + +fn format_default_origin(ip: IpAddr, port: u16) -> String { + match ip { + IpAddr::V4(addr) => format!("http://{}:{}", addr, port), + IpAddr::V6(addr) => format!("http://[{}]:{}", addr, port), + } +} + +/// Build a CORS layer from an optional origins string and a default fallback +/// +/// This is the main function for CLI usage: +/// - If `allow_origin` is `Some`, parse it as comma-separated origins +/// - If `allow_origin` is `None`, use the default IP and port +/// +/// Callers can chain additional configuration like `.allow_methods()` and +/// `.allow_headers()` on the returned `CorsLayer`. +pub fn build_cors_layer( + allow_origin: Option<&str>, + default_ip: IpAddr, + default_port: u16, +) -> Result { + let origins = match allow_origin { + Some(s) if !s.trim().is_empty() => s.to_string(), + _ => format_default_origin(default_ip, default_port), + }; + + CorsConfig { + allowed_origins: origins, + } + .into_layer() +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{Router, routing::get}; + use http::{Request, StatusCode}; + use tower::ServiceExt; + + fn parse_origin(s: &str) -> Result<(), CorsError> { + s.parse::()?; + Ok(()) + } + + #[test] + fn valid_origins() { + parse_origin("*").unwrap(); + parse_origin("http://127.0.0.1").unwrap(); + parse_origin("http://localhost").unwrap(); + parse_origin("http://127.0.0.1:8000").unwrap(); + parse_origin("http://localhost:8000").unwrap(); + parse_origin("http://[::1]").unwrap(); + parse_origin("http://[::1]:8000").unwrap(); + } + + #[test] + fn invalid_origins() { + parse_origin(".*").unwrap_err(); + parse_origin("127.0.0.1").unwrap_err(); + parse_origin("localhost").unwrap_err(); + parse_origin("[::1]").unwrap_err(); + } + + #[test] + fn origin_variants() { + assert_eq!("*".parse::().unwrap(), Origin::Any); + + match "http://localhost:3000".parse::().unwrap() { + Origin::Exact(_) => {} + Origin::Any => panic!("Expected Exact variant, got Any"), + } + + match "https://example.com".parse::().unwrap() { + Origin::Exact(_) => {} + Origin::Any => panic!("Expected Exact variant, got Any"), + } + } + + struct HttpConfig { + allow_origin: Option, + listen_addr: IpAddr, + listen_port: u16, + } + + #[test] + fn default_config() { + let config = HttpConfig { + allow_origin: None, + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ); + assert!(layer.is_ok()); + } + + #[test] + fn wildcard_origin() { + // lighthouse bn --http-allow-origin "*" + let config = HttpConfig { + allow_origin: Some("*".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ); + assert!(layer.is_ok()); + } + + #[test] + fn single_origin() { + // lighthouse bn --http-allow-origin "http://localhost:3000" + let config = HttpConfig { + allow_origin: Some("http://localhost:3000".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ); + assert!(layer.is_ok()); + } + + #[test] + fn multiple_origins() { + // lighthouse bn --http-allow-origin "http://localhost:3000,https://example.com" + let config = HttpConfig { + allow_origin: Some("http://localhost:3000,https://example.com".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ); + assert!(layer.is_ok()); + } + + #[test] + fn ipv6_listen_address() { + let config = HttpConfig { + allow_origin: None, + listen_addr: "::1".parse().unwrap(), + listen_port: 5052, + }; + + let layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ); + assert!(layer.is_ok()); + } + + #[test] + fn invalid_origin_missing_scheme() { + let config = HttpConfig { + allow_origin: Some("localhost:3000".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ); + assert!(layer.is_err()); + } + + #[test] + fn invalid_origin_in_list() { + let config = HttpConfig { + allow_origin: Some("http://localhost:3000,invalid,https://example.com".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ); + assert!(layer.is_err()); + } + + #[tokio::test] + async fn verify_cors_layer() { + let config = HttpConfig { + allow_origin: Some("http://localhost:3000".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let cors_layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ) + .unwrap(); + + async fn handler() -> &'static str { + "test" + } + + let app = Router::new().route("/", get(handler)).layer(cors_layer); + + // Preflight request + let request = Request::builder() + .method("OPTIONS") + .uri("/") + .header("Origin", "http://localhost:3000") + .header("Access-Control-Request-Method", "GET") + .body(axum::body::Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Verify CORS header matches origin + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response + .headers() + .get("access-control-allow-origin") + .unwrap(), + "http://localhost:3000" + ); + } + + #[tokio::test] + async fn wildcard_overrides_exact() { + // Mix specific origin with wildcard + let config = HttpConfig { + allow_origin: Some("http://localhost:3000,*".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let cors_layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ) + .unwrap(); + + async fn handler() -> &'static str { + "test" + } + + let app = Router::new().route("/", get(handler)).layer(cors_layer); + + let request = Request::builder() + .method("OPTIONS") + .uri("/") + .header("Origin", "https://completely-different-origin.com") + .header("Access-Control-Request-Method", "GET") + .body(axum::body::Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response + .headers() + .get("access-control-allow-origin") + .unwrap(), + "*" + ); + } + + #[tokio::test] + async fn verify_allowed_methods() { + use axum::{Router, routing::get}; + use http::{Request, StatusCode}; + use tower::ServiceExt; + + let config = HttpConfig { + allow_origin: Some("http://localhost:3000".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let cors_layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ) + .unwrap() + .allow_methods([http::Method::GET, http::Method::POST]) + .allow_headers([http::header::CONTENT_TYPE]); + + async fn handler() -> &'static str { + "test" + } + + let app = Router::new().route("/", get(handler)).layer(cors_layer); + + let request = Request::builder() + .method("OPTIONS") + .uri("/") + .header("Origin", "http://localhost:3000") + .header("Access-Control-Request-Method", "GET") + .body(axum::body::Body::empty()) + .unwrap(); + + let response = app.clone().oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let allowed_methods = response + .headers() + .get("access-control-allow-methods") + .unwrap() + .to_str() + .unwrap(); + assert!(allowed_methods.contains("GET")); + assert!(allowed_methods.contains("POST")); + } + + #[tokio::test] + async fn verify_allowed_headers() { + let config = HttpConfig { + allow_origin: Some("http://localhost:3000".to_string()), + listen_addr: "127.0.0.1".parse().unwrap(), + listen_port: 5052, + }; + + let cors_layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + ) + .unwrap() + .allow_methods([http::Method::GET]) + .allow_headers([http::header::CONTENT_TYPE, http::header::AUTHORIZATION]); + + async fn handler() -> &'static str { + "test" + } + + let app = Router::new().route("/", get(handler)).layer(cors_layer); + + // Preflight request with Content-Type header + let request = Request::builder() + .method("OPTIONS") + .uri("/") + .header("Origin", "http://localhost:3000") + .header("Access-Control-Request-Method", "GET") + .header("Access-Control-Request-Headers", "content-type") + .body(axum::body::Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let allowed_headers = response + .headers() + .get("access-control-allow-headers") + .unwrap() + .to_str() + .unwrap() + .to_lowercase(); + assert!(allowed_headers.contains("content-type")); + assert!(allowed_headers.contains("authorization")); + } +} diff --git a/common/axum_utils/src/lib.rs b/common/axum_utils/src/lib.rs new file mode 100644 index 0000000000..dd20c612af --- /dev/null +++ b/common/axum_utils/src/lib.rs @@ -0,0 +1,6 @@ +pub mod cors; +pub mod middleware; +pub mod server; +pub mod tls; + +pub use server::Server; diff --git a/common/axum_utils/src/middleware.rs b/common/axum_utils/src/middleware.rs new file mode 100644 index 0000000000..2be9bb652b --- /dev/null +++ b/common/axum_utils/src/middleware.rs @@ -0,0 +1,9 @@ +use axum::http::header; +use tower_http::set_header::SetResponseHeaderLayer; + +/// Returns a layer that adds the "Server" header to all responses. +pub fn add_server_header( + value: header::HeaderValue, +) -> SetResponseHeaderLayer { + SetResponseHeaderLayer::overriding(header::SERVER, value) +} diff --git a/common/axum_utils/src/server/builder.rs b/common/axum_utils/src/server/builder.rs new file mode 100644 index 0000000000..5957b67378 --- /dev/null +++ b/common/axum_utils/src/server/builder.rs @@ -0,0 +1,42 @@ +use crate::{ + server::{Server, error::BuilderError}, + tls::TlsConfig, +}; +use axum::Router; +use axum_server::tls_rustls::RustlsConfig; +use std::net::SocketAddr; + +pub struct ServerBuilder { + router: Router, + address: SocketAddr, + tls_config: Option, +} + +impl ServerBuilder { + pub fn new(router: Router, address: SocketAddr) -> Self { + Self { + router, + address, + tls_config: None, + } + } + + pub fn with_tls(mut self, config: TlsConfig) -> Self { + self.tls_config = Some(config); + self + } + + pub async fn build(self) -> Result { + let rustls_config = if let Some(tls) = self.tls_config { + Some(RustlsConfig::from_pem_file(&tls.cert, &tls.key).await?) + } else { + None + }; + + Ok(Server { + router: self.router, + address: self.address, + rustls_config, + }) + } +} diff --git a/common/axum_utils/src/server/error.rs b/common/axum_utils/src/server/error.rs new file mode 100644 index 0000000000..2501e6d940 --- /dev/null +++ b/common/axum_utils/src/server/error.rs @@ -0,0 +1,11 @@ +#[derive(Debug, thiserror::Error)] +pub enum BuilderError { + #[error("TLS configuration failed: {0}")] + TlsConfigFailed(#[from] std::io::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum ServerError { + #[error("Server failed: {0}")] + ServerFailed(#[from] std::io::Error), +} diff --git a/common/axum_utils/src/server/mod.rs b/common/axum_utils/src/server/mod.rs new file mode 100644 index 0000000000..dc039f1bae --- /dev/null +++ b/common/axum_utils/src/server/mod.rs @@ -0,0 +1,116 @@ +mod builder; +mod error; + +pub use builder::ServerBuilder; +pub use error::{BuilderError, ServerError}; + +use axum::Router; +use axum_server::tls_rustls::RustlsConfig; +use std::net::SocketAddr; +use std::time::Duration; + +/// Default timeout for graceful shutdown. After this duration, the server will +/// stop waiting for existing connections and shut down immediately. +const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); + +pub struct Server { + pub(crate) router: Router, + pub(crate) address: SocketAddr, + pub(crate) rustls_config: Option, +} + +impl Server { + /// Initialize a new server builder. + pub fn builder(router: Router, address: SocketAddr) -> ServerBuilder { + ServerBuilder::new(router, address) + } + + /// Get information about the server configuration. + /// + /// Note that the address is only the configured address, not the actual address + /// the server is listening on (such as when using port 0). + pub fn info(&self) -> ServerInfo { + ServerInfo { + address: self.address, + protocol: if self.rustls_config.is_some() { + Protocol::Https + } else { + Protocol::Http + }, + } + } + + /// Serve the application until the shutdown signal is received. + /// Returns the actual address the server is listening on. + pub async fn serve_with_shutdown( + self, + shutdown_signal: F, + ) -> Result<(SocketAddr, impl Future>), ServerError> + where + F: std::future::Future + Send + 'static, + { + let tokio_listener = tokio::net::TcpListener::bind(self.address).await?; + + let actual_addr = tokio_listener.local_addr()?; + + let listener = tokio_listener.into_std()?; + + let handle = axum_server::Handle::new(); + + let server_future = async move { + // Spawn a task that triggers graceful shutdown when the signal fires. + // If the server exits before the signal, this task will linger until the + // signal resolves, which is harmless. + let shutdown_handle = tokio::spawn({ + let handle = handle.clone(); + async move { + shutdown_signal.await; + handle.graceful_shutdown(Some(DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT)); + } + }); + + let result = match self.rustls_config { + Some(config) => { + axum_server::from_tcp_rustls(listener, config) + .handle(handle) + .serve(self.router.into_make_service()) + .await + } + None => { + axum_server::from_tcp(listener) + .handle(handle) + .serve(self.router.into_make_service()) + .await + } + }; + + // Abort the shutdown listener if it's still running (server exited first). + shutdown_handle.abort(); + + result.map_err(ServerError::ServerFailed) + }; + + Ok((actual_addr, server_future)) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct ServerInfo { + pub address: SocketAddr, + pub protocol: Protocol, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Protocol { + Http, + Https, +} + +impl std::fmt::Display for Protocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Protocol::Http => write!(f, "http"), + Protocol::Https => write!(f, "https"), + } + } +} diff --git a/common/axum_utils/src/tls.rs b/common/axum_utils/src/tls.rs new file mode 100644 index 0000000000..c3d4632b07 --- /dev/null +++ b/common/axum_utils/src/tls.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +/// Configuration used when serving the HTTP server over TLS. +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +pub struct TlsConfig { + pub cert: PathBuf, + pub key: PathBuf, +} diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 07716fa2e7..1987050bf3 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -405,7 +405,7 @@ impl TaskExecutor { /// Returns a future that completes when `async-channel::Sender` is dropped or () is sent, /// which translates to the exit signal being triggered. - pub fn exit(&self) -> impl Future + 'static { + pub fn exit(&self) -> impl Future + use<> + 'static { let exit = self.exit.clone(); async move { let _ = exit.recv().await; diff --git a/common/warp_utils/Cargo.toml b/common/warp_utils/Cargo.toml index 80bc247cbf..5c040452df 100644 --- a/common/warp_utils/Cargo.toml +++ b/common/warp_utils/Cargo.toml @@ -8,7 +8,7 @@ edition = { workspace = true } [dependencies] bytes = { workspace = true } eth2 = { workspace = true } -headers = "0.3.2" +headers = "0.4" reqwest = { workspace = true } safe_arith = { workspace = true } serde = { workspace = true } diff --git a/common/warp_utils/src/status_code.rs b/common/warp_utils/src/status_code.rs index a654b6d2c5..3748ac6205 100644 --- a/common/warp_utils/src/status_code.rs +++ b/common/warp_utils/src/status_code.rs @@ -1,9 +1,10 @@ use reqwest::StatusCode; -use warp::Rejection; -/// Convert from a "new" `http::StatusCode` to a `warp` compatible one. -pub fn convert(code: StatusCode) -> Result { - code.as_u16().try_into().map_err(|e| { - crate::reject::custom_server_error(format!("bad status code {code:?} - {e:?}")) - }) +/// Convert a `reqwest::StatusCode` to a `warp::http::StatusCode`. +/// +/// In warp 0.4, both `reqwest` (0.12) and `warp` use the `http` v1 crate, +/// so `reqwest::StatusCode` and `warp::http::StatusCode` are the same type. +/// This function is retained for API compatibility but is now a no-op. +pub fn convert(code: StatusCode) -> Result { + Ok(code) } diff --git a/validator_client/http_api/Cargo.toml b/validator_client/http_api/Cargo.toml index e334ab9db0..e03ce2f9eb 100644 --- a/validator_client/http_api/Cargo.toml +++ b/validator_client/http_api/Cargo.toml @@ -13,6 +13,8 @@ testing = ["dep:deposit_contract", "dep:doppelganger_service", "dep:tempfile"] [dependencies] account_utils = { workspace = true } +axum = { workspace = true } +axum_utils = { workspace = true } beacon_node_fallback = { workspace = true } bls = { workspace = true } deposit_contract = { workspace = true, optional = true } @@ -42,6 +44,7 @@ sysinfo = { workspace = true } system_health = { workspace = true } task_executor = { workspace = true } tempfile = { workspace = true, optional = true } +thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } diff --git a/validator_client/http_api/src/lib.rs b/validator_client/http_api/src/lib.rs index 8e9c077e57..8543a246cb 100644 --- a/validator_client/http_api/src/lib.rs +++ b/validator_client/http_api/src/lib.rs @@ -9,23 +9,18 @@ mod keystores; mod remotekeys; mod tests; -pub use api_secret::PK_FILENAME; - -use graffiti::{delete_graffiti, get_graffiti, set_graffiti}; - -use create_signed_voluntary_exit::create_signed_voluntary_exit; -use graffiti_file::{GraffitiFile, determine_graffiti}; -use lighthouse_validator_store::LighthouseValidatorStore; -use validator_store::ValidatorStore; +pub use api_secret::{ApiSecret, PK_FILENAME}; use account_utils::{ mnemonic_from_phrase, validator_definitions::{SigningDefinition, ValidatorDefinition, Web3SignerDefinition}, }; -pub use api_secret::ApiSecret; +use axum::Router; +use axum_utils::server::Server; use beacon_node_fallback::CandidateInfo; use bls::{PublicKey, PublicKeyBytes}; use core::convert::Infallible; +use create_signed_voluntary_exit::create_signed_voluntary_exit; use create_validator::{ create_validators_mnemonic, create_validators_web3signer, get_voting_password_storage, }; @@ -37,7 +32,10 @@ use eth2::lighthouse_vc::{ UpdateCandidatesRequest, UpdateCandidatesResponse, }, }; +use graffiti::{delete_graffiti, get_graffiti, set_graffiti}; +use graffiti_file::{GraffitiFile, determine_graffiti}; use health_metrics::observe::Observe; +use lighthouse_validator_store::LighthouseValidatorStore; use lighthouse_version::version_with_platform; use logging::SSELoggingComponents; use logging::crit; @@ -54,26 +52,27 @@ use sysinfo::{System, SystemExt}; use system_health::observe_system_health_vc; use task_executor::TaskExecutor; use tokio_stream::{StreamExt, wrappers::BroadcastStream}; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; use validator_services::block_service::BlockService; +use validator_store::ValidatorStore; use warp::{Filter, reply::Response, sse::Event}; use warp_utils::reject::convert_rejection; use warp_utils::task::blocking_json_task; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum Error { - Warp(warp::Error), + #[error("Builder error: {0}")] + Builder(#[from] axum_utils::server::BuilderError), + #[error("Server error: {0}")] + Server(#[from] axum_utils::server::ServerError), + #[error("Warp error: {0}")] + Warp(#[from] warp::Error), + #[error("{0}")] Other(String), } -impl From for Error { - fn from(e: warp::Error) -> Self { - Error::Warp(e) - } -} - impl From for Error { fn from(e: String) -> Self { Error::Other(e) @@ -148,7 +147,7 @@ impl Default for Config { /// /// Returns an error if the server is unable to bind or there is another error during /// configuration. -pub fn serve( +pub async fn serve( ctx: Arc>, shutdown: impl Future + Send + Sync + 'static, ) -> Result<(SocketAddr, impl Future), Error> { @@ -1399,20 +1398,33 @@ pub fn serve( .recover(warp_utils::reject::handle_rejection) // Add a `Server` header. .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) - .with(cors_builder.build()); + .with(cors_builder.build()) + .boxed(); - let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( - SocketAddr::new(config.listen_addr, config.listen_port), - async { - shutdown.await; - }, - )?; + let axum_router = Router::new().fallback_service(warp::service(routes)); + + let address = SocketAddr::new(config.listen_addr, config.listen_port); + + let server = Server::builder(axum_router, address).build().await?; + + let (address, server) = server.serve_with_shutdown(shutdown).await?; info!( - listen_address = listening_socket.to_string(), + listen_address = %address, ?api_token_path, "HTTP API started" ); - Ok((listening_socket, server)) + let server_future = async move { + match server.await { + Ok(()) => { + info!("HTTP API server stopped"); + } + Err(e) => { + error!(error = ?e, "HTTP API server error"); + } + } + }; + + Ok((address, server_future)) } diff --git a/validator_client/http_api/src/test_utils.rs b/validator_client/http_api/src/test_utils.rs index f83d9f4d52..3ff4aa95a7 100644 --- a/validator_client/http_api/src/test_utils.rs +++ b/validator_client/http_api/src/test_utils.rs @@ -149,7 +149,7 @@ impl ApiTester { // It's not really interesting why this triggered, just that it happened. let _ = shutdown_rx.await; }; - let (listening_socket, server) = super::serve::<_, E>(ctx, server_shutdown).unwrap(); + let (listening_socket, server) = super::serve::<_, E>(ctx, server_shutdown).await.unwrap(); tokio::spawn(server); diff --git a/validator_client/http_api/src/tests.rs b/validator_client/http_api/src/tests.rs index 5cb631983c..07de0194c4 100644 --- a/validator_client/http_api/src/tests.rs +++ b/validator_client/http_api/src/tests.rs @@ -135,7 +135,9 @@ impl ApiTester { }); let ctx = context.clone(); let (listening_socket, server) = - super::serve::<_, E>(ctx, test_runtime.task_executor.exit()).unwrap(); + super::serve::<_, E>(ctx, test_runtime.task_executor.exit()) + .await + .unwrap(); tokio::spawn(server); diff --git a/validator_client/http_metrics/Cargo.toml b/validator_client/http_metrics/Cargo.toml index 24cbff7cde..f8e2cbb4e9 100644 --- a/validator_client/http_metrics/Cargo.toml +++ b/validator_client/http_metrics/Cargo.toml @@ -5,6 +5,8 @@ edition = { workspace = true } authors = ["Sigma Prime "] [dependencies] +axum = { workspace = true } +axum_utils = { workspace = true } health_metrics = { workspace = true } lighthouse_validator_store = { workspace = true } lighthouse_version = { workspace = true } @@ -14,9 +16,8 @@ metrics = { workspace = true } parking_lot = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } +thiserror = { workspace = true } tracing = { workspace = true } types = { workspace = true } validator_metrics = { workspace = true } validator_services = { workspace = true } -warp = { workspace = true } -warp_utils = { workspace = true } diff --git a/validator_client/http_metrics/src/lib.rs b/validator_client/http_metrics/src/lib.rs index a6624b4f44..a480215868 100644 --- a/validator_client/http_metrics/src/lib.rs +++ b/validator_client/http_metrics/src/lib.rs @@ -2,8 +2,15 @@ //! //! For other endpoints, see the `http_api` crate. +use axum::{ + Router, + extract::State, + http::{Method, StatusCode, header}, + response::IntoResponse, + routing::get, +}; +use axum_utils::{Server, cors::build_cors_layer, middleware::add_server_header}; use lighthouse_validator_store::LighthouseValidatorStore; -use lighthouse_version::version_with_platform; use logging::crit; use malloc_utils::scrape_allocator_metrics; use parking_lot::RwLock; @@ -13,21 +20,20 @@ use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tracing::info; +use tracing::{error, info}; use types::EthSpec; use validator_services::duties_service::DutiesService; -use warp::{Filter, http::Response}; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] pub enum Error { - Warp(#[allow(dead_code)] warp::Error), - Other(#[allow(dead_code)] String), -} - -impl From for Error { - fn from(e: warp::Error) -> Self { - Error::Warp(e) - } + #[error("Builder error: {0}")] + Builder(#[from] axum_utils::server::BuilderError), + #[error("Server error: {0}")] + Server(#[from] axum_utils::server::ServerError), + #[error("CORS error: {0}")] + Cors(#[from] axum_utils::cors::CorsError), + #[error("{0}")] + Other(String), } impl From for Error { @@ -90,26 +96,12 @@ impl Default for Config { /// /// Returns an error if the server is unable to bind or there is another error during /// configuration. -pub fn serve( +pub async fn serve( ctx: Arc>, shutdown: impl Future + Send + Sync + 'static, ) -> Result<(SocketAddr, impl Future), Error> { let config = &ctx.config; - // Configure CORS. - let cors_builder = { - let builder = warp::cors() - .allow_method("GET") - .allow_headers(vec!["Content-Type"]); - - warp_utils::cors::set_builder_origins( - builder, - config.allow_origin.as_deref(), - (config.listen_addr, config.listen_port), - )? - }; - - // Sanity check. if !config.enabled { crit!("Cannot start disabled metrics HTTP server"); return Err(Error::Other( @@ -117,46 +109,52 @@ pub fn serve( )); } - let inner_ctx = ctx.clone(); - let routes = warp::get() - .and(warp::path("metrics")) - .map(move || inner_ctx.clone()) - .and_then(|ctx: Arc>| async move { - Ok::<_, warp::Rejection>( - gather_prometheus_metrics(&ctx) - .map(|body| { - Response::builder() - .status(200) - .header("Content-Type", "text/plain") - .body(body) - .unwrap() - }) - .unwrap_or_else(|e| { - Response::builder() - .status(500) - .header("Content-Type", "text/plain") - .body(format!("Unable to gather metrics: {:?}", e)) - .unwrap() - }), - ) - }) - // Add a `Server` header. - .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) - .with(cors_builder.build()); + let cors_layer = build_cors_layer( + config.allow_origin.as_deref(), + config.listen_addr, + config.listen_port, + )? + .allow_methods([Method::GET]) + .allow_headers([header::CONTENT_TYPE]); - let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( - SocketAddr::new(config.listen_addr, config.listen_port), - async { - shutdown.await; - }, - )?; + let server_header: header::HeaderValue = lighthouse_version::version_with_platform() + .parse() + .map_err(|e| Error::Other(format!("invalid version header value: {e}")))?; + + let router = Router::new() + .route("/metrics", get(metrics_handler::)) + .with_state(ctx.clone()) + .layer(add_server_header(server_header)) + .layer(cors_layer); + + let address = SocketAddr::new(config.listen_addr, config.listen_port); + let server = Server::builder(router, address).build().await?; + + let (address, server) = server.serve_with_shutdown(shutdown).await?; info!( - listen_address = listening_socket.to_string(), + listen_address = %address, "Metrics HTTP server started" ); - Ok((listening_socket, server)) + let server_future = async move { + if let Err(e) = server.await { + error!(error = ?e, "Metrics HTTP server error"); + } + }; + + Ok((address, server_future)) +} + +async fn metrics_handler(State(ctx): State>>) -> impl IntoResponse { + match gather_prometheus_metrics(&ctx) { + Ok(body) => (StatusCode::OK, [(header::CONTENT_TYPE, "text/plain")], body), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + [(header::CONTENT_TYPE, "text/plain")], + format!("Unable to gather metrics: {:?}", e), + ), + } } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 9680189b1a..8884491843 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -153,6 +153,7 @@ impl ProductionValidatorClient { let exit = context.executor.exit(); let (_listen_addr, server) = validator_http_metrics::serve(ctx.clone(), exit) + .await .map_err(|e| format!("Unable to start metrics API server: {:?}", e))?; context @@ -622,6 +623,7 @@ impl ProductionValidatorClient { let exit = self.context.executor.exit(); let (listen_addr, server) = validator_http_api::serve::<_, E>(ctx, exit) + .await .map_err(|e| format!("Unable to start HTTP API server: {:?}", e))?; self.context