diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 58346348f..0a184a1eb 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -115,6 +115,8 @@ void usage(int argc, const char* argv[]) { lf_print(" -a, --auth Turn on HMAC authentication options.\n"); lf_print(" -t, --tracing Turn on tracing.\n"); lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n"); + lf_print(" -sst, --sst SST config path for RTI.\n"); + lf_print(" -tls, --tls TLS certificate and private key paths.\n"); lf_print("Command given:"); for (int i = 0; i < argc; i++) { @@ -220,7 +222,7 @@ int process_args(int argc, const char* argv[]) { rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines lf_print_info("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { -#ifdef COMM_TYPE_TCP +#if defined(COMM_TYPE_TCP) || defined(COMM_TYPE_SST) || defined(COMM_TYPE_TLS) if (argc < i + 2) { lf_print_error("--port needs a short unsigned integer argument ( > 0 and < %d).", UINT16_MAX); usage(argc, argv); @@ -252,6 +254,42 @@ int process_args(int argc, const char* argv[]) { return 0; #endif rti.authentication_enabled = true; + } else if (strcmp(argv[i], "-sst") == 0 || strcmp(argv[i], "--sst") == 0) { +#ifndef COMM_TYPE_SST + lf_print_error("--sst requires the RTI to be built with the --DCOMM_TYPE=SST option."); + usage(argc, argv); + return 0; +#else + i++; + lf_set_sst_config_path(argv[i]); +#endif + } else if (strcmp(argv[i], "-tls") == 0 || strcmp(argv[i], "--tls") == 0) { +#ifndef COMM_TYPE_TLS + lf_print_error("--tls requires the RTI to be built with the -DCOMM_TYPE=TLS option."); + usage(argc, argv); + return 0; +#else + // Need two arguments: cert path and key path + if (argc < i + 3) { + lf_print_error("--tls needs two arguments: ."); + usage(argc, argv); + return 0; + } + const char* cert_path = argv[i + 1]; + const char* key_path = argv[i + 2]; + + // Optional: basic sanity check (avoid empty strings) + if (cert_path[0] == '\0' || key_path[0] == '\0') { + lf_print_error("--tls certificate_path and private_key_path must be non-empty."); + usage(argc, argv); + return 0; + } + + lf_set_tls_configuration(cert_path, key_path); + lf_print_debug("RTI: TLS cert path: %s", cert_path); + lf_print_debug("RTI: TLS key path : %s", key_path); + i += 2; +#endif } else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) { rti.base.tracing_enabled = true; } else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) { diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 0f06b6c50..e4da2012c 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -636,23 +636,20 @@ void handle_address_query(uint16_t fed_id) { int32_t server_port; uint32_t* ip_address; - char* server_host_name; + uint32_t temp = 0; LF_MUTEX_LOCK(&rti_mutex); // Check if the RTI has initialized the remote federate's network abstraction. if (remote_fed->net == NULL) { // RTI has not set up the remote federate. Respond with -1 to indicate an unknown port number. server_port = -1; - uint32_t temp = 0; ip_address = &temp; - server_host_name = "localhost"; } else { // The network abstraction is initialized, but the RTI might still not know the port number. This can happen if the // RTI has not yet received a MSG_TYPE_ADDRESS_ADVERTISEMENT message from the remote federate. In such cases, the // returned port number might still be -1. - server_port = ((socket_priv_t*)remote_fed->net)->server_port; - ip_address = (uint32_t*)&((socket_priv_t*)remote_fed->net)->server_ip_addr; - server_host_name = ((socket_priv_t*)remote_fed->net)->server_hostname; + server_port = get_server_port(remote_fed->net); + ip_address = (uint32_t*)get_ip_addr(remote_fed->net); } encode_int32(server_port, (unsigned char*)&buffer[1]); @@ -669,8 +666,7 @@ void handle_address_query(uint16_t fed_id) { tracepoint_rti_to_federate(send_ADR_QR_REP, fed_id, NULL); } - LF_PRINT_DEBUG("Replied to address query from federate %d with address %s:%d.", fed_id, server_host_name, - server_port); + LF_PRINT_DEBUG("Replied to address query from federate %d", fed_id); } void handle_address_ad(uint16_t federate_id) { @@ -687,7 +683,8 @@ void handle_address_ad(uint16_t federate_id) { assert(server_port < 65536); LF_MUTEX_LOCK(&rti_mutex); - ((socket_priv_t*)fed->net)->server_port = server_port; + // (((sst_priv_t*)fed->net)->socket_priv)->server_port = server_port; + set_server_port(fed->net, server_port); LF_MUTEX_UNLOCK(&rti_mutex); LF_PRINT_LOG("Received address advertisement with port %d from federate %d.", server_port, federate_id); @@ -1057,7 +1054,6 @@ void send_reject(net_abstraction_t net_abs, unsigned char error_code) { } // Close the network abstraction without reading until EOF. shutdown_net(net_abs, false); - net_abs = NULL; LF_MUTEX_UNLOCK(&rti_mutex); } @@ -1325,7 +1321,7 @@ static int receive_udp_message_and_set_up_clock_sync(net_abstraction_t fed_net, // Initialize the UDP_addr field of the federate struct fed->UDP_addr.sin_family = AF_INET; fed->UDP_addr.sin_port = htons(federate_UDP_port_number); - fed->UDP_addr.sin_addr = ((socket_priv_t*)fed_net)->server_ip_addr; + fed->UDP_addr.sin_addr = *get_ip_addr(fed_net); } } else { // Disable clock sync after initial round. @@ -1427,7 +1423,6 @@ void lf_connect_to_federates(net_abstraction_t rti_net) { lf_print_warning("RTI failed to authenticate the incoming federate."); // Close the network abstraction without reading until EOF. shutdown_net(fed_net, false); - fed_net = NULL; // Ignore the federate that failed authentication. i--; continue; @@ -1496,7 +1491,6 @@ void* respond_to_erroneous_connections(void* nothing) { } // Close the network abstraction without reading until EOF. shutdown_net(fed_net, false); - fed_net = NULL; } return NULL; } @@ -1513,7 +1507,7 @@ int start_rti_server() { // Initialize RTI's network abstraction. rti_remote->rti_net = initialize_net(); // Set the user specified port to the network abstraction. - ((socket_priv_t*)rti_remote->rti_net)->user_specified_port = rti_remote->user_specified_port; + set_my_port(rti_remote->rti_net, rti_remote->user_specified_port); // Create the server if (create_server(rti_remote->rti_net)) { lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno)); diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index 0fcb4bfda..f6a7a9751 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -271,7 +271,7 @@ void handle_address_query(uint16_t fed_id); * byte. The RTI will keep a record of this number in the .server_port * field of the _RTI.federates[federate_id] array of structs. * - * The server_hostname and server_ip_addr fields are assigned + * The server_ip_addr field is assigned * in lf_connect_to_federates() upon accepting the socket * from the remote federate. * diff --git a/core/federated/federate.c b/core/federated/federate.c index c5a94ae4e..28ead4bd8 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -641,11 +641,16 @@ static int handle_tagged_message(net_abstraction_t net, int fed_id) { " Discarding message and closing the network connection.", env->current_tag.time - start_time, env->current_tag.microstep, intended_tag.time - start_time, intended_tag.microstep); - // Free the allocated memory before returning - _lf_done_using(message_token); - // Close network abstraction, reading any incoming data and discarding it. - shutdown_net(_fed.net_for_inbound_p2p_connections[fed_id], false); - _fed.net_for_inbound_p2p_connections[fed_id] = NULL; + // The token was freshly allocated by _lf_new_token with ref_count 0 and was never + // scheduled, so _lf_done_using would incorrectly treat it as already freed. + // Use _lf_free_token directly, which handles ref_count == 0 correctly. + _lf_free_token(message_token); +#ifdef FEDERATED_DECENTRALIZED + _lf_decrement_tag_barrier_locked(env); +#endif + // Close the connection to unblock the listener, but do not free the memory; + // lf_terminate_execution will free it after joining the listener thread. + close_net(net, false); LF_MUTEX_UNLOCK(&env->mutex); return -1; } else { @@ -723,7 +728,6 @@ static int handle_port_absent_message(net_abstraction_t net, int fed_id) { * network abstraction in _fed.net_for_inbound_p2p_connections * to -1 and returns, terminating the thread. * @param _args The remote federate ID (cast to void*). - * @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to. * This procedure frees the memory pointed to before returning. */ static void* listen_to_federates(void* _args) { @@ -1508,8 +1512,8 @@ static void* listen_to_rti_net(void* args) { // Listen for messages from the federate. while (!_lf_termination_executed) { // Check whether the RTI network abstraction is still valid. - if (_fed.net_to_RTI == NULL) { - lf_print_warning("network abstraction to the RTI unexpectedly closed."); + if (_fed.net_to_RTI == NULL || !is_net_open(_fed.net_to_RTI)) { + lf_print_warning("network connection to the RTI unexpectedly closed."); return NULL; } // Read one byte to get the message type. @@ -1517,14 +1521,12 @@ static void* listen_to_rti_net(void* args) { int read_failed = read_from_net(_fed.net_to_RTI, 1, buffer); if (read_failed < 0) { lf_print_error("Connection to the RTI was closed by the RTI with an error. Considering this a soft error."); - shutdown_net(_fed.net_to_RTI, false); - _fed.net_to_RTI = NULL; + close_net(_fed.net_to_RTI, false); return NULL; } else if (read_failed > 0) { // EOF received. lf_print_info("Connection to the RTI closed with an EOF."); - shutdown_net(_fed.net_to_RTI, false); - _fed.net_to_RTI = NULL; + close_net(_fed.net_to_RTI, false); return NULL; } switch (buffer[0]) { @@ -1647,11 +1649,10 @@ void lf_terminate_execution(environment_t* env) { } LF_PRINT_DEBUG("Closing incoming P2P network abstractions."); - // Close any incoming P2P network abstractions that are still open. + // Close connections to unblock any listener threads that are blocking on reads, + // but do NOT free the memory yet because listener threads hold local pointers. for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - shutdown_net(_fed.net_for_inbound_p2p_connections[i], false); - // Ignore errors. Mark the network abstraction closed. - _fed.net_for_inbound_p2p_connections[i] = NULL; + close_net(_fed.net_for_inbound_p2p_connections[i], false); } // Check for all outgoing physical connections in @@ -1659,10 +1660,6 @@ void lf_terminate_execution(environment_t* env) { // if the network abstraction ID is not NULL, the connection is still open. // Send an EOF by closing the network abstraction here. for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - - // Close outbound connections, in case they have not closed themselves. - // This will result in EOF being sent to the remote federate, except for - // abnormal termination, in which case it will just close the network abstraction. close_outbound_net(i); } @@ -1681,6 +1678,14 @@ void lf_terminate_execution(environment_t* env) { // Wait for the thread listening for messages from the RTI to close. lf_thread_join(_fed.RTI_net_listener, NULL); + // All listener threads have now exited. Safe to free network abstraction memory. + for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { + free_net(_fed.net_for_inbound_p2p_connections[i]); + _fed.net_for_inbound_p2p_connections[i] = NULL; + } + free_net(_fed.net_to_RTI); + _fed.net_to_RTI = NULL; + // For abnormal termination, there is no need to free memory. if (_lf_normal_termination) { LF_PRINT_DEBUG("Freeing memory occupied by the federate."); @@ -1753,13 +1758,24 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { assert(port > 0); uint16_t uport = (uint16_t)port; - char hostname[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &host_ip_addr, hostname, INET_ADDRSTRLEN); - - socket_connection_params_t params; +#ifdef COMM_TYPE_TCP + socket_connection_params_t params = {0}; params.type = TCP; params.port = uport; - params.server_hostname = hostname; + params.server_ip_addr = &host_ip_addr; +#elif defined(COMM_TYPE_SST) + sst_connection_params_t params = {0}; + params.socket_params.type = TCP; + params.socket_params.port = uport; + params.socket_params.server_ip_addr = &host_ip_addr; + params.target = 1; +#elif defined(COMM_TYPE_TLS) + tls_connection_params_t params = {0}; + params.socket_params.type = TCP; + params.socket_params.port = uport; + params.socket_params.server_ip_addr = &host_ip_addr; +#endif + net_abstraction_t net = connect_to_net((net_params_t)¶ms); if (net == NULL) { lf_print_error_and_exit("Failed to connect to federate."); @@ -1837,10 +1853,24 @@ void lf_connect_to_rti(const char* hostname, int port) { hostname = federation_metadata.rti_host ? federation_metadata.rti_host : hostname; port = federation_metadata.rti_port >= 0 ? federation_metadata.rti_port : port; - socket_connection_params_t params; +#ifdef COMM_TYPE_TCP + socket_connection_params_t params = {0}; params.type = TCP; params.port = port; params.server_hostname = hostname; +#elif defined(COMM_TYPE_SST) + sst_connection_params_t params = {0}; + params.socket_params.type = TCP; + params.socket_params.port = port; + params.socket_params.server_hostname = hostname; + params.target = 0; +#elif defined(COMM_TYPE_TLS) + tls_connection_params_t params = {0}; + params.socket_params.type = TCP; + params.socket_params.port = port; + params.socket_params.server_hostname = hostname; +#endif + net_abstraction_t net = connect_to_net((net_params_t)¶ms); if (net == NULL) { lf_print_error_and_exit("Failed to connect to RTI."); @@ -1949,14 +1979,13 @@ void lf_create_server(int specified_port) { assert(specified_port <= UINT16_MAX && specified_port >= 0); net_abstraction_t server_net = initialize_net(); - ((socket_priv_t*)server_net)->port = (uint16_t)specified_port; - + set_my_port(server_net, specified_port); if (create_server(server_net)) { lf_print_error_system_failure("Failed to create server: %s.", strerror(errno)); }; _fed.server_net = server_net; // Get the final server port to send to the RTI on an MSG_TYPE_ADDRESS_ADVERTISEMENT message. - int32_t server_port = ((socket_priv_t*)server_net)->port; + int32_t server_port = get_my_port(server_net); LF_PRINT_LOG("Server for communicating with other federates started using port %d.", server_port); @@ -2007,13 +2036,14 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { _fed.inbound_net_listeners = (lf_thread_t*)calloc(_fed.number_of_inbound_p2p_connections, sizeof(lf_thread_t)); while (received_federates < _fed.number_of_inbound_p2p_connections && !_lf_termination_executed) { if (rti_failed()) { - break; + return NULL; } // Wait for an incoming connection request. net_abstraction_t net = accept_net(_fed.server_net); if (net == NULL) { lf_print_warning("Federate failed to accept the network abstraction."); - return NULL; + lf_sleep(CONNECT_RETRY_INTERVAL); + continue; } LF_PRINT_LOG("Accepted new connection from remote federate."); @@ -2034,7 +2064,6 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { write_to_net(net, 2, response); } shutdown_net(net, false); - net = NULL; continue; } @@ -2055,7 +2084,6 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { write_to_net(net, 2, response); } shutdown_net(net, false); - net = NULL; continue; } diff --git a/core/lf_token.c b/core/lf_token.c index 41a4c8c0f..cf93dcef3 100644 --- a/core/lf_token.c +++ b/core/lf_token.c @@ -202,10 +202,10 @@ token_freed _lf_free_token(lf_token_t* token) { return result; } -lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { +/** Implementation of _lf_new_token that is called only within a critical section. */ +static lf_token_t* _lf_new_token_locked(token_type_t* type, void* value, size_t length) { lf_token_t* result = NULL; // Check the recycling bin. - LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); if (_lf_token_recycling_bin != NULL) { hashset_itr_t iterator = hashset_iterator(_lf_token_recycling_bin); if (hashset_iterator_next(iterator) >= 0) { @@ -223,11 +223,10 @@ lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { _lf_count_token_allocations++; #endif - LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); - if (result == NULL) { // Nothing found on the recycle bin. result = (lf_token_t*)calloc(1, sizeof(lf_token_t)); + LF_ASSERT_NON_NULL(result); LF_PRINT_DEBUG("_lf_new_token: Allocated memory for token: %p", (void*)result); } result->type = type; @@ -237,20 +236,36 @@ lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { return result; } +lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); + lf_token_t* result = _lf_new_token_locked(type, value, length); + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); + return result; +} + lf_token_t* _lf_get_token(token_template_t* tmplt) { LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); if (tmplt->token != NULL && tmplt->token->ref_count == 1) { LF_PRINT_DEBUG("_lf_get_token: Reusing template token: %p with ref_count %zu", (void*)tmplt->token, tmplt->token->ref_count); - // Free any previous value in the token. _lf_free_token_value(tmplt->token); LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); return tmplt->token; } - LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); - // If we get here, we need a new token. - lf_token_t* result = _lf_new_token((token_type_t*)tmplt, NULL, 0); + // The existing template token is shared (ref_count > 1) or NULL. + // Drop the template's reference to the old token and install a new one + // so that the invariant "template holds exactly one reference" is preserved. + // Without this, the old token is left with an unreleasable extra reference + // and its payload leaks on every subsequent cycle. + lf_token_t* old = tmplt->token; + + lf_token_t* result = _lf_new_token_locked((token_type_t*)tmplt, NULL, 0); result->ref_count = 1; + tmplt->token = result; + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); + if (old != NULL) { + _lf_done_using(old); + } return result; } diff --git a/core/reactor_common.c b/core/reactor_common.c index d1801f69c..89f66718d 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -999,6 +999,14 @@ void usage(int argc, const char* argv[]) { printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n"); printf(" -l\n"); printf(" Send stdout to individual log files for each federate.\n\n"); +#ifdef COMM_TYPE_SST + printf(" -sst, --sst \n"); + printf(" Path to the SST configuration file.\n\n"); +#endif +#ifdef COMM_TYPE_TLS + printf(" -tls, --tls \n"); + printf(" Paths to the TLS certificate and private key to use.\n\n"); +#endif #endif #endif printf("Command given:\n"); @@ -1211,6 +1219,40 @@ int process_args(int argc, const char* argv[]) { return 0; } } +#endif +#ifdef COMM_TYPE_SST + else if (strcmp(arg, "-sst") == 0 || strcmp(arg, "--sst") == 0) { + if (argc < i + 1) { + lf_print_error("--sst needs a string argument."); + usage(argc, argv); + return 0; + } + const char* fid = argv[i++]; + lf_set_sst_config_path(fid); + } +#endif +#ifdef COMM_TYPE_TLS + else if (strcmp(arg, "-tls") == 0 || strcmp(arg, "--tls") == 0) { + // Need two arguments: cert path and key path + if (argc < i + 2) { + lf_print_error("--tls needs two arguments: ."); + usage(argc, argv); + return 0; + } + + const char* cert_path = argv[i++]; + const char* key_path = argv[i++]; + + if (cert_path[0] == '\0' || key_path[0] == '\0') { + lf_print_error("--tls certificate_path and private_key_path must be non-empty."); + usage(argc, argv); + return 0; + } + + lf_set_tls_configuration(cert_path, key_path); + lf_print("TLS cert path: %s", cert_path); + lf_print("TLS key path : %s", key_path); + } #endif else if (strcmp(arg, "--ros-args") == 0) { // FIXME: Ignore ROS arguments for now diff --git a/lib/schedule.c b/lib/schedule.c index 9c37213de..166dc09d4 100644 --- a/lib/schedule.c +++ b/lib/schedule.c @@ -81,16 +81,18 @@ trigger_handle_t lf_schedule_value(void* action, interval_t extra_delay, void* v */ bool lf_check_deadline(void* self, bool invoke_deadline_handler) { reaction_t* reaction = ((self_base_t*)self)->executing_reaction; - if (lf_time_physical() > (lf_time_logical(((self_base_t*)self)->environment) + reaction->deadline)) { - if (invoke_deadline_handler && reaction->deadline_violation_handler != NULL) { + if (reaction->deadline != NEVER && + lf_time_physical() > (lf_time_logical(((self_base_t*)self)->environment) + reaction->deadline)) { + if (invoke_deadline_handler) { reaction->deadline_violation_handler(self); - return true; } + return true; } return false; } void lf_update_deadline(void* self, interval_t updated_deadline) { + LF_PRINT_DEBUG("lf_update_deadline: update deadline to " PRINTF_TIME ".", updated_deadline); reaction_t* reaction = ((self_base_t*)self)->executing_reaction; if (reaction != NULL) { reaction->deadline = updated_deadline; diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index b756b406f..1f7391f92 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -networkdriver +master diff --git a/logging/api/logging.h b/logging/api/logging.h index d926f92eb..4afd501e8 100644 --- a/logging/api/logging.h +++ b/logging/api/logging.h @@ -10,6 +10,9 @@ * It provides functions for different log levels (error, warning, info, log, debug) * and allows for custom message handling through function registration. */ +#ifndef LOGGING_H +#define LOGGING_H + #include // To silence warnings about a function being a candidate for format checking @@ -198,3 +201,5 @@ typedef void(print_message_function_t)(const char*, va_list); * @param log_level The level of messages to redirect. */ void lf_register_print_function(print_message_function_t* function, int log_level); + +#endif // LOGGING_H diff --git a/logging/api/logging_macros.h b/logging/api/logging_macros.h index 2a8a04c1b..b471f3a1e 100644 --- a/logging/api/logging_macros.h +++ b/logging/api/logging_macros.h @@ -169,4 +169,4 @@ static const bool _lf_log_level_is_debug = LOG_LEVEL >= LOG_LEVEL_DEBUG; } \ } while (0) -#endif // LOGGING_MACROS_H \ No newline at end of file +#endif // LOGGING_MACROS_H diff --git a/network/api/lf_sst_support.h b/network/api/lf_sst_support.h new file mode 100644 index 000000000..3dffa3b93 --- /dev/null +++ b/network/api/lf_sst_support.h @@ -0,0 +1,25 @@ +#ifndef LF_SST_SUPPORT_H +#define LF_SST_SUPPORT_H + +#include "socket_common.h" +#include + +typedef struct sst_priv_t { + socket_priv_t* socket_priv; + SST_ctx_t* sst_ctx; + SST_session_ctx_t* session_ctx; + unsigned char buffer[MAX_SECURE_COMM_MSG_LENGTH]; + size_t buf_filled; + size_t buf_off; +} sst_priv_t; + +typedef struct sst_connection_params_t { + socket_connection_params_t socket_params; + + // 0 for RTI, 1 for federates. + int target; +} sst_connection_params_t; + +void lf_set_sst_config_path(const char* config_path); + +#endif /* LF_SST_SUPPORT_H */ diff --git a/network/api/lf_tls_support.h b/network/api/lf_tls_support.h new file mode 100644 index 000000000..c107fdedb --- /dev/null +++ b/network/api/lf_tls_support.h @@ -0,0 +1,35 @@ +#ifndef LF_TLS_SUPPORT_H +#define LF_TLS_SUPPORT_H + +#include "socket_common.h" +#include +#include + +/** + * @brief Structure holding information about TLS-based network abstraction. + * @ingroup Network + */ +typedef struct tls_priv_t { + socket_priv_t* socket_priv; + SSL_CTX* ctx; + SSL* ssl; +} tls_priv_t; + +/** + * @brief Structure for TLS connection parameters. + * @ingroup Network + */ +typedef struct tls_connection_params_t { + /** @brief Common socket parameters. */ + socket_connection_params_t socket_params; +} tls_connection_params_t; + +/** + * @brief Set the path to the certificate and private key for TLS configuration. + * + * @param cert_path Path to the certificate file. + * @param key_path Path to the private key file. + */ +void lf_set_tls_configuration(const char* cert_path, const char* key_path); + +#endif /* LF_TLS_SUPPORT_H */ diff --git a/network/api/net_abstraction.h b/network/api/net_abstraction.h index 884d6bb16..0acf78071 100644 --- a/network/api/net_abstraction.h +++ b/network/api/net_abstraction.h @@ -16,6 +16,12 @@ #define NET_ABSTRACTION_H #include "socket_common.h" +#ifdef COMM_TYPE_SST +#include "lf_sst_support.h" +#endif +#ifdef COMM_TYPE_TLS +#include "lf_tls_support.h" +#endif /** * @brief Pointer to whatever data structure is used to maintain the state of a network connection or service. @@ -204,16 +210,109 @@ void write_to_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, uns bool is_net_open(net_abstraction_t net_abs); /** - * @brief Gracefully shut down and close a network abstraction. + * @brief Close the underlying connection of a network abstraction without freeing its memory. * @ingroup Network * + * This function closes the connection (making it unusable for I/O) but leaves the + * net_abstraction_t memory allocated. This is needed when another thread may still hold + * a pointer to the same network abstraction; the socket is closed to unblock any pending + * reads, but the memory remains valid. Call free_net() to release the memory after ensuring + * no other thread holds a reference. This function is idempotent. + * + * @param net_abs The network abstraction whose connection should be closed, or NULL (no-op). + * @param read_before_closing If true, read until EOF before closing. + * @return int Returns 0 on success, -1 on failure. + */ +int close_net(net_abstraction_t net_abs, bool read_before_closing); + +/** + * @brief Free the memory associated with a network abstraction. + * @ingroup Network + * + * The connection should already have been closed via close_net() or shutdown_net(). + * Safe to call with NULL (no-op). + * + * @param net_abs The network abstraction to free, or NULL. + */ +void free_net(net_abstraction_t net_abs); + +/** + * @brief Gracefully shut down, close, and free a network abstraction. + * @ingroup Network + * + * Equivalent to calling close_net() followed by free_net(). Do not use this function + * if another thread may still be using the same net_abstraction_t pointer; use close_net() + * to unblock the other thread first, join it, and then call free_net(). + * * If read_before_closing is false, call shutdown() with SHUT_RDWR and then close(). If true, call shutdown() with * SHUT_WR, then read() until EOF and discard received bytes before closing. * - * @param net_abs The network abstraction to shut down and close. + * @param net_abs The network abstraction to shut down and close, or NULL (no-op). * @param read_before_closing If true, read until EOF before closing the network abstraction. * @return int Returns 0 on success, -1 on failure (errno will indicate the error). */ int shutdown_net(net_abstraction_t net_abs, bool read_before_closing); +/** + * @brief Get the server port number of this network abstraction. + * @ingroup Network + * + * Get the open port number from the network abstraction. + * This is used when the federate sends a MSG_TYPE_ADDRESS_ADVERTISEMENT to the RTI, informing its port number. The RTI + * will save this port number, and send it to the other federate in a MSG_TYPE_ADDRESS_QUERY_REPLY message. + * + * @param net_abs The network abstraction. + * @return The port number of a server network abstraction. + */ +int32_t get_my_port(net_abstraction_t net_abs); + +/** + * @brief Get the connected peer's port number. + * @ingroup Network + * + * Get the port number of the connected peer. + * This is used by the RTI, when there is a request from the federate to the RTI, for the MSG_TYPE_ADDRESS_QUERY + * message. + * + * @param net_abs The network abstraction. + * @return Port number of the connected peer. + */ +int32_t get_server_port(net_abstraction_t net_abs); + +/** + * @brief Get the connected peer's IP address. + * @ingroup Network + * + * Get the IP address of the connected peer. + * + * @param net_abs The network abstraction. + * @return Pointer to the server IP address. + */ +struct in_addr* get_ip_addr(net_abstraction_t net_abs); + +/** + * @brief Set the user-specified port for this network abstraction. + * @ingroup Network + * + * Set the user specified port to the created network abstraction. + * + * @param net_abs The network abstraction. + * @param port The user specified port. + */ +void set_my_port(net_abstraction_t net_abs, int32_t port); + +/** + * @brief Set the target server's port number for this network abstraction. + * @ingroup Network + * + * Set server port number to the target network abstraction. + * The federate and RTI receives the port number from another + * federate MSG_TYPE_ADDRESS_ADVERTISEMENT message. + * This function is used to set the network abstraction's target server port number. + * + * @param net_abs The network abstraction. + * @param port The target server's port. + */ +void set_server_port(net_abstraction_t net_abs, int32_t port); + #endif /* NET_ABSTRACTION_H */ diff --git a/network/api/socket_common.h b/network/api/socket_common.h index edab21dd8..f48d1dbf7 100644 --- a/network/api/socket_common.h +++ b/network/api/socket_common.h @@ -133,6 +133,9 @@ typedef struct socket_connection_params_t { /** @brief Hostname of the remote server. */ const char* server_hostname; + + /** @brief IP address of the remote server. If provided, bypasses DNS resolution. */ + const struct in_addr* server_ip_addr; } socket_connection_params_t; /** @@ -149,8 +152,6 @@ typedef struct socket_priv_t { uint16_t port; /** @brief The desired port specified by the user on the command line. */ uint16_t user_specified_port; - /** @brief Human-readable IP address of the federate's socket server. */ - char server_hostname[INET_ADDRSTRLEN]; /** @brief Port number of the socket server of the federate. The port number will be -1 if there is no server or if * the RTI has not been informed of the port number. */ int32_t server_port; @@ -215,10 +216,11 @@ int accept_socket(int socket); * * @param sock The socket file descriptor that has already been created (using `socket()`). * @param hostname The hostname or IP address of the server to connect to. + * @param ip_addr The IPv4 address to connect to. If non-NULL, bypasses DNS lookup of hostname. * @param port The port number to connect to. If 0 is specified, a default port range will be used. * @return 0 on success, -1 on failure, and `errno` is set to indicate the specific error. */ -int connect_to_socket(int sock, const char* hostname, int port); +int connect_to_socket(int sock, const char* hostname, const struct in_addr* ip_addr, int port); /** * @brief Read the specified number of bytes from the specified socket into the specified buffer. @@ -296,7 +298,7 @@ bool is_socket_open(int socket); * @ingroup Network * * Get the connected peer name from the connected socket. - * Set it to the server_ip_addr. Also, set server_hostname if LOG_LEVEL is higher than LOG_LEVEL_DEBUG. + * Set it to the server_ip_addr. Also, print server's hostname if LOG_LEVEL is higher than LOG_LEVEL_DEBUG. * * @param priv The socket_priv struct. * @return 0 for success, -1 for failure. diff --git a/network/impl/CMakeLists.txt b/network/impl/CMakeLists.txt index 225edf3d5..c6afcf943 100644 --- a/network/impl/CMakeLists.txt +++ b/network/impl/CMakeLists.txt @@ -10,6 +10,20 @@ target_sources(lf-network-impl PUBLIC if(COMM_TYPE MATCHES TCP) target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_socket_support.c) +elseif(COMM_TYPE MATCHES SST) + find_package(OpenSSL REQUIRED) + find_package(sst-lib REQUIRED) + target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_sst_support.c) + target_link_libraries(lf-network-impl PUBLIC sst-lib::sst-c-api) +elseif(COMM_TYPE MATCHES TLS) + find_package(OpenSSL REQUIRED) + target_sources(lf-network-impl PUBLIC + ${CMAKE_CURRENT_LIST_DIR}/src/lf_tls_support.c + ) + target_link_libraries(lf-network-impl PUBLIC + OpenSSL::SSL + OpenSSL::Crypto + ) else() message(FATAL_ERROR "Your communication type is not supported! The C target supports TCP.") endif() diff --git a/network/impl/src/lf_socket_support.c b/network/impl/src/lf_socket_support.c index 76aee2308..ebd1d98cb 100644 --- a/network/impl/src/lf_socket_support.c +++ b/network/impl/src/lf_socket_support.c @@ -30,7 +30,6 @@ net_abstraction_t initialize_net() { priv->socket_descriptor = -1; // Federate initialization - strncpy(priv->server_hostname, "localhost", INET_ADDRSTRLEN); priv->server_ip_addr.s_addr = 0; priv->server_port = -1; @@ -83,11 +82,11 @@ net_abstraction_t connect_to_net(net_params_t params) { socket_priv_t* priv = (socket_priv_t*)net; socket_connection_params_t* sock_params = (socket_connection_params_t*)params; priv->server_port = sock_params->port; - memcpy(priv->server_hostname, sock_params->server_hostname, INET_ADDRSTRLEN); // Create the client network abstraction. create_client(net); // Connect to the target server. - if (connect_to_socket(priv->socket_descriptor, priv->server_hostname, priv->server_port) != 0) { + if (connect_to_socket(priv->socket_descriptor, sock_params->server_hostname, sock_params->server_ip_addr, + priv->server_port) != 0) { lf_print_error("Failed to connect to socket."); return NULL; } @@ -176,13 +175,46 @@ bool is_net_open(net_abstraction_t net_abs) { return is_socket_open(priv->socket_descriptor); } -int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) { +int close_net(net_abstraction_t net_abs, bool read_before_closing) { if (net_abs == NULL) { - LF_PRINT_LOG("Socket already closed."); return 0; } socket_priv_t* priv = (socket_priv_t*)net_abs; - int ret = shutdown_socket(&priv->socket_descriptor, read_before_closing); + return shutdown_socket(&priv->socket_descriptor, read_before_closing); +} + +int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) { + int ret = close_net(net_abs, read_before_closing); free_net(net_abs); return ret; } + +int32_t get_my_port(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + socket_priv_t* priv = (socket_priv_t*)net_abs; + return priv->port; +} + +int32_t get_server_port(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + socket_priv_t* priv = (socket_priv_t*)net_abs; + return priv->server_port; +} + +struct in_addr* get_ip_addr(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + socket_priv_t* priv = (socket_priv_t*)net_abs; + return &priv->server_ip_addr; +} + +void set_my_port(net_abstraction_t net_abs, int32_t port) { + LF_ASSERT_NON_NULL(net_abs); + socket_priv_t* priv = (socket_priv_t*)net_abs; + priv->user_specified_port = port; +} + +void set_server_port(net_abstraction_t net_abs, int32_t port) { + LF_ASSERT_NON_NULL(net_abs); + socket_priv_t* priv = (socket_priv_t*)net_abs; + priv->server_port = port; +} diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c new file mode 100644 index 000000000..3d931f31a --- /dev/null +++ b/network/impl/src/lf_sst_support.c @@ -0,0 +1,309 @@ +#include // malloc() +#include // strncpy() +#include + +#include "net_abstraction.h" +#include "lf_sst_support.h" +#include "util.h" + +const char* sst_config_path; // The SST's configuration file path. + +SST_ctx_t* ctx; + +net_abstraction_t initialize_net() { + // Initialize sst_priv. + sst_priv_t* sst_priv = malloc(sizeof(sst_priv_t)); + if (sst_priv == NULL) { + lf_print_error_and_exit("Falied to malloc sst_priv_t."); + } + // Initialize socket_priv. + socket_priv_t* socket_priv = malloc(sizeof(socket_priv_t)); + if (socket_priv == NULL) { + lf_print_error_and_exit("Falied to malloc socket_priv_t."); + } + + // Server initialization. + socket_priv->port = 0; + socket_priv->user_specified_port = 0; + socket_priv->socket_descriptor = -1; + + // Federate initialization + strncpy(socket_priv->server_hostname, "localhost", INET_ADDRSTRLEN); + socket_priv->server_ip_addr.s_addr = 0; + socket_priv->server_port = -1; + + sst_priv->socket_priv = socket_priv; + sst_priv->buf_filled = 0; + sst_priv->buf_off = 0; + + // SST initialization. Only set pointers to NULL. + sst_priv->sst_ctx = NULL; + sst_priv->session_ctx = NULL; + return (net_abstraction_t)sst_priv; +} + +void free_net(net_abstraction_t net_abs) { + if (net_abs == NULL) { + LF_PRINT_LOG("Socket already closed."); + return; + } + sst_priv_t* priv = (sst_priv_t*)net_abs; + free(priv->socket_priv); + free(priv); +} + +int create_server(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + sst_priv_t* priv = (sst_priv_t*)net_abs; + if (ctx == NULL) { + ctx = init_SST(sst_config_path); + if (ctx == NULL) { + lf_print_error_and_exit("Failed to initialze SST settings."); + } + } + priv->sst_ctx = ctx; + return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor, + &priv->socket_priv->port, TCP); +} + +net_abstraction_t accept_net(net_abstraction_t server_chan) { + LF_ASSERT_NON_NULL(server_chan); + sst_priv_t* serv_priv = (sst_priv_t*)server_chan; + + int sock = accept_socket(serv_priv->socket_priv->socket_descriptor); + if (sock != -1) { + net_abstraction_t client_net = initialize_net(); + sst_priv_t* client_priv = (sst_priv_t*)client_net; + client_priv->socket_priv->socket_descriptor = sock; + // Get the peer address from the connected socket_id. Saving this for the address query. + if (get_peer_address(client_priv->socket_priv) != 0) { + lf_print_error("Failed to save peer address."); + } + + // TODO: Do we need to copy sst_ctx form server_chan to fed_chan? + session_key_list_t* s_key_list = init_empty_session_key_list(); + SST_session_ctx_t* session_ctx = + server_secure_comm_setup(serv_priv->sst_ctx, client_priv->socket_priv->socket_descriptor, s_key_list); + // Session key used is copied to the session_ctx. + free_session_key_list_t(s_key_list); + client_priv->session_ctx = session_ctx; + + return client_net; + } else { + return NULL; + } +} + +void create_client(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + sst_priv_t* priv = (sst_priv_t*)net_abs; + priv->socket_priv->socket_descriptor = create_real_time_tcp_socket_errexit(); + if (ctx == NULL) { + ctx = init_SST(sst_config_path); + if (ctx == NULL) { + lf_print_error_and_exit("Failed to initialze SST settings."); + } + } + priv->sst_ctx = ctx; +} + +net_abstraction_t connect_to_net(net_params_t* params) { + // Create a network abstraction. + net_abstraction_t net = initialize_net(); + sst_priv_t* priv = (sst_priv_t*)net; + sst_connection_params_t* sst_params = (sst_connection_params_t*)params; + priv->socket_priv->server_port = sst_params->socket_params.port; + memcpy(priv->socket_priv->server_hostname, sst_params->socket_params.server_hostname, INET_ADDRSTRLEN); + // Create the client network abstraction. + create_client(net); + // Connect to the target server. + if (connect_to_socket(priv->socket_priv->socket_descriptor, priv->socket_priv->server_hostname, + priv->socket_priv->server_port) != 0) { + lf_print_error("Failed to connect to socket."); + return NULL; + } + if (sst_params->target == 1) { + // Override target group to federates. + snprintf(priv->sst_ctx->config.purpose[ctx->config.purpose_index], + sizeof(ctx->config.purpose[ctx->config.purpose_index]), "{\"group\":\"Federates\"}"); + } + session_key_list_t* s_key_list = get_session_key(priv->sst_ctx, NULL); + SST_session_ctx_t* session_ctx = + secure_connect_to_server_with_socket(&s_key_list->s_key[0], priv->socket_priv->socket_descriptor); + priv->session_ctx = session_ctx; + return net; +} + +int read_from_net(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + LF_ASSERT_NON_NULL(net_abs); + sst_priv_t* priv = (sst_priv_t*)net_abs; + + if (num_bytes > MAX_SECURE_COMM_MSG_LENGTH) { + lf_print_error("Unable to handle message. Expected: %zu, Maximum: %d", num_bytes, MAX_SECURE_COMM_MSG_LENGTH); + return -1; + } + size_t copied = 0; + // 1) First use buffered data. + if (priv->buf_off < priv->buf_filled) { + size_t avail = priv->buf_filled - priv->buf_off; + size_t to_copy = (avail < num_bytes) ? avail : num_bytes; + memcpy(buffer, priv->buffer + priv->buf_off, to_copy); + priv->buf_off += to_copy; + copied += to_copy; + + // Reset buffer offset when the buffer is all used. + if (priv->buf_off == priv->buf_filled) { + priv->buf_off = priv->buf_filled = 0; + } + + // Return when the buffered data is enough. + if (copied == num_bytes) { + return 0; + } + } + + // 2) Additionally try to read more bytes. + while (copied < num_bytes) { + int ret = read_secure_message(priv->buffer, priv->session_ctx); + if (ret == 0) { + // EOF received. + return 1; + } else if (ret < 0) { + lf_print_error("read_secure_message failed: %d", ret); + return -1; + } + + // Mark the filled length and reset offset. + priv->buf_filled = (size_t)ret; + priv->buf_off = 0; + + size_t need = num_bytes - copied; + size_t to_copy = (priv->buf_filled < need) ? priv->buf_filled : need; + memcpy(buffer + copied, priv->buffer + priv->buf_off, to_copy); + priv->buf_off += to_copy; + copied += to_copy; + + // Reset buffer offset when meets the end of the filled buffer. + if (priv->buf_off == priv->buf_filled) { + priv->buf_off = priv->buf_filled = 0; + } + } + + return 0; +} + +int read_from_net_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + LF_ASSERT_NON_NULL(net_abs); + sst_priv_t* priv = (sst_priv_t*)net_abs; + int read_failed = read_from_net(net_abs, num_bytes, buffer); + if (read_failed) { + // Read failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + return -1; + } + return 0; +} + +void read_from_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer, char* format, + ...) { + LF_ASSERT_NON_NULL(net_abs); + va_list args; + int read_failed = read_from_net_close_on_error(net_abs, num_bytes, buffer); + if (read_failed) { + // Read failed. + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error_system_failure("Failed to read from socket."); + } + } +} + +int write_to_net(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + LF_ASSERT_NON_NULL(net_abs); + sst_priv_t* priv = (sst_priv_t*)net_abs; + return send_secure_message((char*)buffer, (unsigned int)num_bytes, priv->session_ctx); +} + +int write_to_net_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + LF_ASSERT_NON_NULL(net_abs); + sst_priv_t* priv = (sst_priv_t*)net_abs; + int result = write_to_net(net_abs, num_bytes, buffer); + if (result) { + // Write failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + } + return result; +} + +void write_to_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + LF_ASSERT_NON_NULL(net_abs); + va_list args; + int result = write_to_net_close_on_error(net_abs, num_bytes, buffer); + if (result) { + // Write failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error("Failed to write to socket. Closing it."); + } + } +} + +bool is_net_open(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + sst_priv_t* priv = (sst_priv_t*)net_abs; + return is_socket_open(priv->socket_priv->socket_descriptor); +} + +int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) { + if (net_abs == NULL) { + LF_PRINT_LOG("Socket already closed."); + return 0; + } + sst_priv_t* priv = (sst_priv_t*)net_abs; + int ret = shutdown_socket(&priv->socket_priv->socket_descriptor, read_before_closing); + free_net(net_abs); + return ret; +} + +// Helper function. +void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; } + +// Get/set functions. +int32_t get_my_port(net_abstraction_t net_abs) { + sst_priv_t* priv = (sst_priv_t*)net_abs; + return priv->socket_priv->port; +} + +int32_t get_server_port(net_abstraction_t net_abs) { + sst_priv_t* priv = (sst_priv_t*)net_abs; + return priv->socket_priv->server_port; +} + +struct in_addr* get_ip_addr(net_abstraction_t net_abs) { + sst_priv_t* priv = (sst_priv_t*)net_abs; + return &priv->socket_priv->server_ip_addr; +} + +void set_my_port(net_abstraction_t net_abs, int32_t port) { + sst_priv_t* priv = (sst_priv_t*)net_abs; + priv->socket_priv->user_specified_port = port; +} + +void set_server_port(net_abstraction_t net_abs, int32_t port) { + sst_priv_t* priv = (sst_priv_t*)net_abs; + priv->socket_priv->server_port = port; +} diff --git a/network/impl/src/lf_tls_support.c b/network/impl/src/lf_tls_support.c new file mode 100644 index 000000000..57901b51b --- /dev/null +++ b/network/impl/src/lf_tls_support.c @@ -0,0 +1,384 @@ +#include +#include +#include +#include +#include +#include + +#include "net_abstraction.h" +#include "lf_tls_support.h" +#include "util.h" +#include "logging.h" + +#if OPENSSL_VERSION_NUMBER < 0x30000000L +#error "OpenSSL 3.0 or higher is required." +#endif + +// Global configuration for TLS (cert and key paths) +static const char* tls_cert_path = NULL; +static const char* tls_key_path = NULL; + +static SSL_CTX* global_client_ctx = NULL; +static SSL_CTX* global_server_ctx = NULL; + +void lf_set_tls_configuration(const char* cert_path, const char* key_path) { + tls_cert_path = cert_path; + tls_key_path = key_path; +} + +// Helper to initialize OpenSSL context +static SSL_CTX* create_ssl_context(const SSL_METHOD* method) { + SSL_CTX* ctx = SSL_CTX_new(method); + if (!ctx) { + lf_print_error("Unable to create SSL context"); + ERR_print_errors_fp(stderr); + exit(EXIT_FAILURE); + } + return ctx; +} + +static void configure_ssl_context(SSL_CTX* ctx) { + if (tls_cert_path && tls_key_path) { + if (SSL_CTX_use_certificate_file(ctx, tls_cert_path, SSL_FILETYPE_PEM) <= 0) { + lf_print_error("Failed to load certificate file: %s", tls_cert_path); + ERR_print_errors_fp(stderr); + exit(EXIT_FAILURE); + } + if (SSL_CTX_use_PrivateKey_file(ctx, tls_key_path, SSL_FILETYPE_PEM) <= 0) { + lf_print_error("Failed to load private key file: %s", tls_key_path); + ERR_print_errors_fp(stderr); + exit(EXIT_FAILURE); + } + } +} + +net_abstraction_t initialize_net() { + tls_priv_t* priv = (tls_priv_t*)malloc(sizeof(tls_priv_t)); + if (priv == NULL) { + lf_print_error_and_exit("Failed to allocate memory for tls_priv_t."); + } + + // Initialize socket_priv + priv->socket_priv = (socket_priv_t*)malloc(sizeof(socket_priv_t)); + if (priv->socket_priv == NULL) { + free(priv); + lf_print_error_and_exit("Failed to allocate memory for socket_priv_t."); + } + + // Default initialization for socket_priv + priv->socket_priv->port = 0; + priv->socket_priv->user_specified_port = 0; + priv->socket_priv->socket_descriptor = -1; + strncpy(priv->socket_priv->server_hostname, "localhost", INET_ADDRSTRLEN); + priv->socket_priv->server_ip_addr.s_addr = 0; + priv->socket_priv->server_port = -1; + + priv->ctx = NULL; + priv->ssl = NULL; + + return (net_abstraction_t)priv; +} + +void free_net(net_abstraction_t net_abs) { + if (net_abs == NULL) + return; + tls_priv_t* priv = (tls_priv_t*)net_abs; + + if (priv->ssl) { + SSL_free(priv->ssl); + } + // Note: We generally don't free the global context here if it's shared, + // but if it's per-connection, we should. + // Assuming global context is managed separately or lives until exit. + + if (priv->socket_priv) { + free(priv->socket_priv); + } + free(priv); +} + +int create_server(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + + // Initialize Global Context if not already done + if (global_server_ctx == NULL) { + global_server_ctx = create_ssl_context(TLS_server_method()); + configure_ssl_context(global_server_ctx); + } + priv->ctx = global_server_ctx; + + // Create the underlying socket server + return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor, + &priv->socket_priv->port, TCP); +} + +net_abstraction_t accept_net(net_abstraction_t server_chan) { + LF_ASSERT_NON_NULL(server_chan); + tls_priv_t* server_priv = (tls_priv_t*)server_chan; + + // Accept TCP connection + int client_sock = accept_socket(server_priv->socket_priv->socket_descriptor); + if (client_sock < 0) { + return NULL; + } + + // Initialize new network abstraction for the client + net_abstraction_t client_net = initialize_net(); + tls_priv_t* client_priv = (tls_priv_t*)client_net; + client_priv->socket_priv->socket_descriptor = client_sock; + + // Share the context (or create new if needed, but sharing is standard for server) + client_priv->ctx = server_priv->ctx; + + // Create SSL structure + client_priv->ssl = SSL_new(client_priv->ctx); + SSL_set_fd(client_priv->ssl, client_sock); + + SSL_set_accept_state(client_priv->ssl); + // Perform TLS handshake (accept) + if (SSL_accept(client_priv->ssl) <= 0) { + lf_print_error("SSL_accept failed."); + ERR_print_errors_fp(stderr); + + shutdown_net(client_net, false); + return NULL; + } + + // Create generic socket info (peer address) + if (get_peer_address(client_priv->socket_priv) != 0) { + lf_print_error("Failed to save peer address."); + } + + return client_net; +} + +void create_client(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + + // Create the underlying TCP socket + priv->socket_priv->socket_descriptor = create_real_time_tcp_socket_errexit(); + + if (global_client_ctx == NULL) { + global_client_ctx = create_ssl_context(TLS_client_method()); + } + priv->ctx = global_client_ctx; +} + +net_abstraction_t connect_to_net(net_params_t* params) { + tls_connection_params_t* tls_params = (tls_connection_params_t*)params; + + net_abstraction_t net = initialize_net(); + tls_priv_t* priv = (tls_priv_t*)net; + + // Set socket params + priv->socket_priv->server_port = tls_params->socket_params.port; + memcpy(priv->socket_priv->server_hostname, tls_params->socket_params.server_hostname, INET_ADDRSTRLEN); + + create_client(net); + + // TCP Connect + if (connect_to_socket(priv->socket_priv->socket_descriptor, priv->socket_priv->server_hostname, + priv->socket_priv->server_port) != 0) { + lf_print_error("Failed to connect to socket."); + free_net(net); + return NULL; + } + + // SSL Connect + priv->ssl = SSL_new(priv->ctx); + SSL_set_fd(priv->ssl, priv->socket_priv->socket_descriptor); + + if (SSL_connect(priv->ssl) <= 0) { + lf_print_error("SSL_connect failed."); + ERR_print_errors_fp(stderr); + shutdown_net(net, false); + return NULL; + } + + return net; +} + +static int is_disconnect_syscall(int err, int ret) { + if (err != SSL_ERROR_SYSCALL) + return 0; + + if (ret == 0) { + // Often: "unexpected EOF while reading" / peer closed without close_notify + return 1; + } + if (ret == -1) { + // RST/timeout/pipe, treat as disconnect if you want EOF semantics + if (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT || errno == ENOTCONN) { + return 1; + } + } + return 0; +} + +int read_from_net(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + + size_t bytes_read = 0; + while (bytes_read < num_bytes) { + int ret = SSL_read(priv->ssl, buffer + bytes_read, num_bytes - bytes_read); + if (ret > 0) { + bytes_read += (size_t)ret; + continue; + } + + int err = SSL_get_error(priv->ssl, ret); + + if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { + lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES); + continue; + } + + if (err == SSL_ERROR_ZERO_RETURN) { + // close_notify received + return 1; // EOF + } + + if (is_disconnect_syscall(err, ret)) { + // peer disconnected without close_notify (or reset) + return 1; // treat as EOF + } + + // Real TLS/protocol error + lf_print_error("SSL_read failed (ret=%d, err=%d, errno=%d)", ret, err, errno); + ERR_print_errors_fp(stderr); + return -1; + } + return 0; +} +int read_from_net_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + int ret = read_from_net(net_abs, num_bytes, buffer); + if (ret < 0) { + shutdown_net(net_abs, false); + return -1; + } + return ret; // 0 for success, 1 for EOF +} + +void read_from_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer, char* format, + ...) { + va_list args; + int read_failed = read_from_net_close_on_error(net_abs, num_bytes, buffer); + if (read_failed) { + // Read failed. + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error_system_failure("Failed to read from socket."); + } + } +} + +int write_to_net(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + + size_t bytes_written = 0; + while (bytes_written < num_bytes) { + int ret = SSL_write(priv->ssl, buffer + bytes_written, num_bytes - bytes_written); + if (ret > 0) { + bytes_written += ret; + } else { + int err = SSL_get_error(priv->ssl, ret); + if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { + continue; + } + lf_print_error("SSL_write failed with error %d", err); + ERR_print_errors_fp(stderr); + return -1; + } + } + return 0; +} + +int write_to_net_close_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer) { + int ret = write_to_net(net_abs, num_bytes, buffer); + if (ret < 0) { + shutdown_net(net_abs, false); + return -1; + } + return 0; +} + +void write_to_net_fail_on_error(net_abstraction_t net_abs, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + va_list args; + int ret = write_to_net_close_on_error(net_abs, num_bytes, buffer); + if (ret < 0) { + if (mutex) + LF_MUTEX_UNLOCK(mutex); + + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error_system_failure("Failed to write to TLS connection."); + } + } +} + +bool is_net_open(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + return is_socket_open(priv->socket_priv->socket_descriptor); +} + +int shutdown_net(net_abstraction_t net_abs, bool read_before_closing) { + if (net_abs == NULL) + return 0; + tls_priv_t* priv = (tls_priv_t*)net_abs; + + if (priv->ssl) { + SSL_shutdown(priv->ssl); + // We might want to read pending data here if read_before_closing is true, + // but SSL_shutdown usually sends notify_close. + } + + // Shutdown underlying socket + if (priv->socket_priv) { + shutdown_socket(&priv->socket_priv->socket_descriptor, read_before_closing); + } + + free_net(net_abs); + return 0; +} + +int32_t get_my_port(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + return priv->socket_priv->port; +} + +int32_t get_server_port(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + return priv->socket_priv->server_port; +} + +struct in_addr* get_ip_addr(net_abstraction_t net_abs) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + return &priv->socket_priv->server_ip_addr; +} + +void set_my_port(net_abstraction_t net_abs, int32_t port) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + priv->socket_priv->user_specified_port = port; +} + +void set_server_port(net_abstraction_t net_abs, int32_t port) { + LF_ASSERT_NON_NULL(net_abs); + tls_priv_t* priv = (tls_priv_t*)net_abs; + priv->socket_priv->server_port = port; +} diff --git a/network/impl/src/socket_common.c b/network/impl/src/socket_common.c index 53c78c68d..b80a17d6d 100644 --- a/network/impl/src/socket_common.c +++ b/network/impl/src/socket_common.c @@ -177,14 +177,10 @@ int get_peer_address(socket_priv_t* priv) { priv->server_ip_addr = peer_addr.sin_addr; #if LOG_LEVEL >= LOG_LEVEL_DEBUG - // Create the human readable format and copy that into - // the .server_hostname field of the federate. + // Create the human readable format for logging purposes char str[INET_ADDRSTRLEN + 1]; inet_ntop(AF_INET, &priv->server_ip_addr, str, INET_ADDRSTRLEN); - strncpy(priv->server_hostname, str, INET_ADDRSTRLEN - 1); // Copy up to INET_ADDRSTRLEN - 1 characters - priv->server_hostname[INET_ADDRSTRLEN - 1] = '\0'; // Null-terminate explicitly - - LF_PRINT_DEBUG("Got address %s", priv->server_hostname); + LF_PRINT_DEBUG("Got address %s", str); #endif return 0; } @@ -218,20 +214,28 @@ int accept_socket(int socket) { return socket_id; } -int connect_to_socket(int sock, const char* hostname, int port) { +int connect_to_socket(int sock, const char* hostname, const struct in_addr* ip_addr, int port) { struct addrinfo hints; - struct addrinfo* result; + struct addrinfo* result = NULL; int ret = -1; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; /* Allow IPv4 */ - hints.ai_socktype = SOCK_STREAM; /* Stream socket */ - hints.ai_protocol = IPPROTO_TCP; /* TCP protocol */ - hints.ai_addr = NULL; - hints.ai_next = NULL; - hints.ai_flags = AI_NUMERICSERV; /* Allow only numeric port numbers */ - uint16_t used_port = (port == 0) ? DEFAULT_PORT : (uint16_t)port; + struct sockaddr_in direct_addr; + + if (ip_addr == NULL) { + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; /* Allow IPv4 */ + hints.ai_socktype = SOCK_STREAM; /* Stream socket */ + hints.ai_protocol = IPPROTO_TCP; /* TCP protocol */ + hints.ai_addr = NULL; + hints.ai_next = NULL; + hints.ai_flags = AI_NUMERICSERV; /* Allow only numeric port numbers */ + } else { + memset(&direct_addr, 0, sizeof(direct_addr)); + direct_addr.sin_family = AF_INET; + direct_addr.sin_port = htons(used_port); + direct_addr.sin_addr = *ip_addr; + } instant_t start_connect = lf_time_physical(); // while (!_lf_termination_executed) { // Not working... @@ -240,30 +244,43 @@ int connect_to_socket(int sock, const char* hostname, int port) { lf_print_error("Failed to connect with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT); break; } - // Convert port number to string. - char str[6]; - snprintf(str, sizeof(str), "%u", used_port); - // Get address structure matching hostname and hints criteria, and - // set port to the port number provided in str. There should only - // ever be one matching address structure, and we connect to that. - if (getaddrinfo(hostname, (const char*)&str, &hints, &result)) { - lf_print_error("No host matching given hostname: %s", hostname); - break; + if (ip_addr != NULL) { + // Safe to type cast specific protocols (e.g., sockaddr_in) to the generic sockaddr. + ret = connect(sock, (struct sockaddr*)&direct_addr, sizeof(direct_addr)); + } else { + // Convert port number to string. + char str[6]; + snprintf(str, sizeof(str), "%u", used_port); + + // Get address structure matching hostname and hints criteria, and + // set port to the port number provided in str. There should only + // ever be one matching address structure, and we connect to that. + if (getaddrinfo(hostname, str, &hints, &result)) { + lf_print_error("No host matching given hostname: %s", hostname); + break; + } + ret = connect(sock, result->ai_addr, result->ai_addrlen); + freeaddrinfo(result); } - ret = connect(sock, result->ai_addr, result->ai_addrlen); + if (ret < 0) { lf_sleep(CONNECT_RETRY_INTERVAL); lf_print_warning("Could not connect. Will try again every " PRINTF_TIME " nanoseconds. Connecting to port %d.\n", CONNECT_RETRY_INTERVAL, used_port); - freeaddrinfo(result); continue; } else { break; } } - freeaddrinfo(result); - lf_print_info("Connected to %s:%d.", hostname, used_port); + + if (ip_addr != NULL) { + char host_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, ip_addr, host_str, INET_ADDRSTRLEN); + lf_print_info("Connected to %s:%d.", host_str, used_port); + } else { + lf_print_info("Connected to %s:%d.", hostname, used_port); + } return ret; } diff --git a/python/include/pythontarget.h b/python/include/pythontarget.h index cbf0057ce..d9599970f 100644 --- a/python/include/pythontarget.h +++ b/python/include/pythontarget.h @@ -102,6 +102,23 @@ PyObject* py_package_directory(PyObject* self, PyObject* args); */ PyObject* py_check_deadline(PyObject* self, PyObject* args); +/** + * @brief Update the deadline of the currently executing reaction. + * + * Updating the deadline with this function does not affect + * the deadline check that has been performed (at the beginning + * of the caller reaction or check_deadline called before). + * Therefore, you may want to invoke check_deadline after + * updating the deadline through this function to confirm + * whether the newly updated deadline has been violated. + * + * @param self The Python object of the reactor. + * @param args contains: + * - updated_deadline: The updated deadline. + * @return Py_None or NULL if an error occurs. + */ +PyObject* py_update_deadline(PyObject* self, PyObject* args); + /** * @brief Register a user trace event. Returns an opaque handle for use with * tracepoint_user_event and tracepoint_user_value. diff --git a/python/lib/pythontarget.c b/python/lib/pythontarget.c index a0dec881f..ad4a44661 100644 --- a/python/lib/pythontarget.c +++ b/python/lib/pythontarget.c @@ -53,25 +53,51 @@ PyObject* py_schedule(PyObject* self, PyObject* args) { } trigger_t* trigger = action->trigger; + environment_t* env = action->parent->environment; lf_token_t* t = NULL; - // Check to see if value exists and token is not NULL - if (value && (trigger->tmplt.token != NULL)) { - // DEBUG: adjust the element_size (might not be necessary) - trigger->tmplt.token->type->element_size = sizeof(PyObject*); + LF_CRITICAL_SECTION_ENTER(env); + + // Check to see if value exists + if (value) { + // Allocate a fresh token for this schedule call rather than routing through + // _lf_initialize_token_with_value / _lf_get_token. Those paths may reuse + // or replace trigger->tmplt.token, which races with the reaction prologue + // that reads trigger->tmplt.token->value after an event pop: + // + // 1. The scheduler pops an event, sets trigger->tmplt.token = T, and + // drops T->ref_count to 1 before releasing the environment lock. + // 2. A concurrent schedule acquires the environment lock and enters + // _lf_get_token, which sees ref_count == 1 and reuses T, freeing + // its payload and overwriting it with the new value. + // 3. The pending reaction finally runs and reads the corrupted value. + // + // Allocating a fresh token that lives only on the event queue until + // _lf_pop_events installs it into the template means schedule paths + // never write to trigger->tmplt.token, so concurrent schedulers cannot + // corrupt the payload of a token about to be consumed by a reaction. trigger->tmplt.type.element_size = sizeof(PyObject*); - t = _lf_initialize_token_with_value(&trigger->tmplt, value, 1); + t = lf_new_token((void*)&trigger->tmplt, value, 1); +#if !defined NDEBUG + // Keep the payload allocation counter balanced with the decrement that + // occurs when the token's value is eventually freed. + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); + extern int _lf_count_payload_allocations; + _lf_count_payload_allocations++; + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); +#endif // Also give the new value back to the Python action itself Py_INCREF(value); act->value = value; } - // Pass the token along - lf_schedule_token(action, offset, t); + lf_schedule_trigger(env, trigger, offset, t); + lf_notify_of_event(env); - // FIXME: handle is not passed to the Python side + LF_CRITICAL_SECTION_EXIT(env); + // FIXME: handle is not passed to the Python side Py_INCREF(Py_None); return Py_None; } @@ -242,6 +268,35 @@ PyObject* py_check_deadline(PyObject* self, PyObject* args) { return PyBool_FromLong(result); } +PyObject* py_update_deadline(PyObject* self, PyObject* args) { + PyObject* py_self; + int64_t updated_deadline = 0; // Default to 0 + double updated_deadline_in_double = + 0.0; // Deadline may be passed as a floating-point value in nanoseconds, e.g., SEC(0.5) → 0.5 * 1e9. + + if (!PyArg_ParseTuple(args, "O|d", &py_self, &updated_deadline_in_double)) { + return NULL; + } + + // Check overflow before converting a double to int64_t (interval_t). + if (updated_deadline_in_double > (double)INT64_MAX || updated_deadline_in_double < (double)INT64_MIN) { + PyErr_SetString(PyExc_OverflowError, "The updated deadline value is out of int64 range"); + return NULL; + } + + // Convert double to int64_t + updated_deadline = (int64_t)updated_deadline_in_double; + + void* self_ptr = get_lf_self_pointer(py_self); + if (self_ptr == NULL) { + return NULL; + } + lf_update_deadline(self_ptr, updated_deadline); + + Py_INCREF(Py_None); + return Py_None; +} + /** * Register a user trace event. Returns an opaque handle (as a Python int) * that must be passed to tracepoint_user_event and tracepoint_user_value. @@ -386,6 +441,8 @@ static PyMethodDef GEN_NAME(MODULE_NAME, _methods)[] = { {"package_directory", py_package_directory, METH_NOARGS, "Root package directory path"}, {"check_deadline", (PyCFunction)py_check_deadline, METH_VARARGS, "Check whether the deadline of the currently executing reaction has passed"}, + {"update_deadline", (PyCFunction)py_update_deadline, METH_VARARGS, + "Update the deadline of the currently executing reaction"}, {"register_user_trace_event", (PyCFunction)py_register_user_trace_event, METH_VARARGS, "Register a user trace event; returns a handle for use with tracepoint_user_event and tracepoint_user_value"}, {"tracepoint_user_event", (PyCFunction)py_tracepoint_user_event, METH_VARARGS, diff --git a/util/deque.c b/util/deque.c index 4dca52f20..8b6807992 100644 --- a/util/deque.c +++ b/util/deque.c @@ -11,7 +11,7 @@ * To use this, include the following in your target properties: *
  * target C {
- *     cmake-include: "/lib/c/reactor-c/util/deque.cmake"
+ *     cmake-include: "/lib/c/reactor-c/util/deque.cmake",
  *     files: ["/lib/c/reactor-c/util/deque.c", "/lib/c/reactor-c/util/deque.h"]
  * };
  * 
@@ -44,10 +44,6 @@ typedef struct deque_node_t { void* value; } deque_node_t; -/** - * Initialize the specified deque to an empty deque. - * @param d The deque. - */ void deque_initialize(deque_t* d) { if (d != NULL) { d->front = NULL; @@ -56,10 +52,6 @@ void deque_initialize(deque_t* d) { } } -/** - * Return true if the queue is empty. - * @param d The deque. - */ bool deque_is_empty(deque_t* d) { if (d != NULL) { return (d->front == NULL); @@ -67,11 +59,6 @@ bool deque_is_empty(deque_t* d) { return true; } -/** - * Return the size of the queue. - * @param d The deque. - * @return The size of the queue. - */ size_t deque_size(deque_t* d) { return d->size; } /** @@ -87,11 +74,6 @@ deque_node_t* _deque_create_node(void* value) { return new_node; } -/** - * Push a value to the front of the queue. - * @param d The queue. - * @param value The value to push. - */ void deque_push_front(deque_t* d, void* value) { deque_node_t* n = _deque_create_node(value); if (d->front == NULL) { @@ -105,11 +87,6 @@ void deque_push_front(deque_t* d, void* value) { } } -/** - * Push a value to the back of the queue. - * @param d The queue. - * @param value The value to push. - */ void deque_push_back(deque_t* d, void* value) { deque_node_t* n = _deque_create_node(value); if (d->back == NULL) { @@ -123,11 +100,6 @@ void deque_push_back(deque_t* d, void* value) { } } -/** - * Pop a value from the front of the queue, removing it from the queue. - * @param d The queue. - * @return The value on the front of the queue or NULL if the queue is empty. - */ void* deque_pop_front(deque_t* d) { if (d == NULL || d->front == NULL) { return NULL; @@ -147,11 +119,6 @@ void* deque_pop_front(deque_t* d) { return value; } -/** - * Pop a value from the back of the queue, removing it from the queue. - * @param d The queue. - * @return The value on the back of the queue or NULL if the queue is empty. - */ void* deque_pop_back(deque_t* d) { if (d == NULL || d->back == NULL) { return NULL; @@ -170,11 +137,6 @@ void* deque_pop_back(deque_t* d) { return value; } -/** - * Peek at the value on the front of the queue, leaving it on the queue. - * @param d The queue. - * @return The value on the front of the queue or NULL if the queue is empty. - */ void* deque_peek_back(deque_t* d) { if (d == NULL || d->back == NULL) { return NULL; @@ -182,11 +144,6 @@ void* deque_peek_back(deque_t* d) { return d->back->value; } -/** - * Peek at the value on the back of the queue, leaving it on the queue. - * @param d The queue. - * @return The value on the back of the queue or NULL if the queue is empty. - */ void* deque_peek_front(deque_t* d) { if (d == NULL || d->front == NULL) { return NULL; diff --git a/util/deque.h b/util/deque.h index 658ee456d..5acee93b9 100644 --- a/util/deque.h +++ b/util/deque.h @@ -13,7 +13,7 @@ * * ``` * target C { - * cmake-include: "/lib/c/reactor-c/util/deque.cmake" + * cmake-include: "/lib/c/reactor-c/util/deque.cmake", * files: ["/lib/c/reactor-c/util/deque.c", "/lib/c/reactor-c/util/deque.h"] * }; * ``` diff --git a/util/initialize_from_file.c b/util/initialize_from_file.c new file mode 100644 index 000000000..d945d8197 --- /dev/null +++ b/util/initialize_from_file.c @@ -0,0 +1,171 @@ +#include +#include +#include +#include +#include + +#include "logging/api/logging.h" +#include "reactor.h" +#include "initialize_from_file.h" + +typedef enum { LF_TYPE_DOUBLE, LF_TYPE_INT, LF_TYPE_STRING } lf_field_type; + +/** Remove leading and trailing whitespace from the string. */ +static void sc_csv_trim(char* s) { + char* p = s; + while (*p && isspace((unsigned char)*p)) { + p++; + } + if (p != s) { + memmove(s, p, strlen(p) + 1); + } + size_t n = strlen(s); + while (n > 0 && isspace((unsigned char)s[n - 1])) { + s[--n] = '\0'; + } +} + +/** Replace the specified delimiter with a null character and return the number of fields. */ +static int sc_csv_split(char* line, char delimiter, char** fields, int max_fields) { + int n = 0; + char* cur = line; + while (n < max_fields) { + fields[n++] = cur; + char* sep = strchr(cur, delimiter); + if (!sep) { + break; + } + *sep = '\0'; + cur = sep + 1; + } + return n; +} + +/** + * Shared implementation for lf_initialize_double, lf_initialize_int, and _lf_initialize_string. + * The `type` parameter selects the expected va_arg pointer type (double*, int*, or char**). + * The `allocations` parameter is used only for LF_TYPE_STRING to record allocated memory. + */ +static int lf_initialize_fields(const char* filename, char delimiter, size_t row_number, lf_field_type type, + struct allocation_record_t** allocations, va_list ap) { + size_t pointer_count = 0; + FILE* f = fopen(filename, "r"); + if (!f) { + lf_print_error("Could not open file \"%s\".", filename); + return -1; + } + + char line[SC_CSV_LINE_MAX]; + size_t row_count = 0; + while (fgets(line, sizeof(line), f)) { + // Detect truncation: fgets didn't capture a newline and we're not at EOF. + size_t len = strlen(line); + int truncated = (len > 0 && line[len - 1] != '\n' && !feof(f)); + + if (row_count == row_number) { + if (truncated) { + lf_print_error("Row %zu exceeds maximum line length of %d in file \"%s\".", row_number, SC_CSV_LINE_MAX - 1, + filename); + fclose(f); + return -1; + } + char* row = &line[0]; + if (len >= 3 && (unsigned char)row[0] == 0xEF && (unsigned char)row[1] == 0xBB && (unsigned char)row[2] == 0xBF) { + row += 3; + } + row[strcspn(row, "\r\n")] = '\0'; + + char* fields[SC_CSV_MAX_COLS]; + int field_count = sc_csv_split(row, delimiter, fields, SC_CSV_MAX_COLS); + for (size_t i = 0; i < (size_t)field_count; i++) { + sc_csv_trim(fields[i]); + if (type == LF_TYPE_DOUBLE) { + double* out = va_arg(ap, double*); + if (out == NULL) + break; + pointer_count++; + char* end = NULL; + double parsed = strtod(fields[i], &end); + if (end != fields[i] && *end == '\0') { + *out = parsed; + } else { + lf_print_error("Failed to parse numeric value \"%s\" at row %zu, column %zu in \"%s\".", fields[i], + row_number, i, filename); + fclose(f); + return -1; + } + } else if (type == LF_TYPE_INT) { + int* out = va_arg(ap, int*); + if (out == NULL) + break; + pointer_count++; + char* end = NULL; + long parsed = strtol(fields[i], &end, 10); + if (end != fields[i] && *end == '\0') { + *out = (int)parsed; + } else { + lf_print_error("Failed to parse integer value \"%s\" at row %zu, column %zu in \"%s\".", fields[i], + row_number, i, filename); + fclose(f); + return -1; + } + } else { + char** out = va_arg(ap, char**); + if (out == NULL) + break; + pointer_count++; + char* field = fields[i]; + size_t len = strlen(field); + if (len >= 2 && + ((field[0] == '"' && field[len - 1] == '"') || (field[0] == '\'' && field[len - 1] == '\''))) { + field++; + len -= 2; + } + char* str = (char*)lf_allocate(len + 1, sizeof(char), allocations); + memcpy(str, field, len); + str[len] = '\0'; + *out = str; + } + } + fclose(f); + return (int)pointer_count; + } + if (truncated) { + // Consume the rest of this over-long line so it counts as one row. + while (fgets(line, sizeof(line), f)) { + len = strlen(line); + if ((len > 0 && line[len - 1] == '\n') || feof(f)) + break; + } + } + row_count++; + } + lf_print_error("Requested row %zu not found in file \"%s\".", row_number, filename); + fclose(f); + return -1; +} + +int lf_initialize_double(const char* filename, char delimiter, size_t row_number, ...) { + va_list ap; + va_start(ap, row_number); + int result = lf_initialize_fields(filename, delimiter, row_number, LF_TYPE_DOUBLE, NULL, ap); + va_end(ap); + return result; +} + +int lf_initialize_int(const char* filename, char delimiter, size_t row_number, ...) { + va_list ap; + va_start(ap, row_number); + int result = lf_initialize_fields(filename, delimiter, row_number, LF_TYPE_INT, NULL, ap); + va_end(ap); + return result; +} + +int _lf_initialize_string(const char* filename, char delimiter, size_t row_number, + struct allocation_record_t** allocations, ...) { + va_list ap; + va_start(ap, allocations); + int result = lf_initialize_fields(filename, delimiter, row_number, LF_TYPE_STRING, allocations, ap); + va_end(ap); + return result; +} diff --git a/util/initialize_from_file.cmake b/util/initialize_from_file.cmake new file mode 100644 index 000000000..48f973069 --- /dev/null +++ b/util/initialize_from_file.cmake @@ -0,0 +1 @@ +target_sources(${LF_MAIN_TARGET} PRIVATE initialize_from_file.c) diff --git a/util/initialize_from_file.h b/util/initialize_from_file.h new file mode 100644 index 000000000..73510d571 --- /dev/null +++ b/util/initialize_from_file.h @@ -0,0 +1,308 @@ +/** + * @file + * @author Edward A. Lee + * + * @brief Utility functions for initializing parameters and state variables from a file. + * @ingroup Utilities + */ +#ifndef INITIALIZE_FROM_FILE_H +#define INITIALIZE_FROM_FILE_H + +#include // Defines size_t +#include "reactor.h" +#include "lf_types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define SC_CSV_LINE_MAX 256 +#define SC_CSV_MAX_COLS 256 + +/** + * @brief Read one delimited row from a file and parse as doubles. + * @ingroup Utilities + * + * Given a file path (either absolute or relative to the current working directory), + * this function reads the specified row, which it assumes is a list of doubles separated by a delimiter, + * and parses the values into a list of specified variables given as a list of double* pointers + * terminated with NULL. + * Example: + * ``` + * double a, b; + * int count = lf_initialize_double("x.csv", ',', 2, &a, &b, NULL); + * ``` + * This will read the third row of the file "x.csv" (row numbers start from 0) + * and parse the values into the variables `a` and `b`. + * The file "x.csv" may look like this:: + * ```csv + * a,b + * 1.0,2.0 + * 3.0,4.0 + * ``` + * Including a header row with the names of the variables is optional (but recommended). + * + * The return value is the number of values parsed (2 in this case), or -1 if an error occurred, + * for example if the file does not exist or the row number is out of range. + * + * To use this to initialize parameters and/or state variables of a reactor, you can do the following: + * + * ```lf + * main reactor MyReactor(x: double = 0.0, row_number: int = 0) { + * state y: double = 0.0; + * reaction(startup) {= + * lf_initialize_double("params.csv", ',', self->row_number, &self->x, &self->y, NULL); + * =} + * } + * ``` + * + * If the `row_number` is a top-level parameter (of the main reactor), as it is above, then you + * can override this parameter on the command line when running the program as follows: + * + * ``` + * bin/MyReactor --row_number 1 + * ``` + * This gives an easy way to compile once and run with different parameter values. + * + * If you wish to initialize parameters or state variables of a reactor within a bank, you + * can create a CSV file with one row per bank member and use the `bank_index` parameter to + * select the row to read. For example: + * ```lf + * reactor MyReactor(bank_index: int = 0) { + * reaction(startup) {= + * lf_initialize_double("params.csv", ',', self->bank_index + 1, &self->x, &self->y, NULL); + * =} + * } + * ``` + * The `bank_index` parameter is the index within the bank, starting from 0. + * The `+ 1` specifies to skip the header row. + * This way, each bank member can have a different set of parameter values. + * + * To use this in a Lingua Franca program, you must include the following in the target declaration: + * ```lf + * target C { + * cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake", + * files: [ + * "/lib/c/reactor-c/util/initialize_from_file.c", + * "/lib/c/reactor-c/util/initialize_from_file.h"] + * } + * ``` + * + * Then, in the reactor, you can include the header file as follows: + * ```lf + * reactor MyReactor { + * preamble {= + * #include "initialize_from_file.h" + * =} + * ... + * } + * ``` + * + * @param filename The name of the file to read. + * @param delimiter The delimiter character to use. + * @param row_number The row number in the file to read. + * @param ... The double* pointers to the variables to store the values in, terminated with NULL. + * @return The number of values parsed, or -1 if an error occurred. + */ +int lf_initialize_double(const char* filename, char delimiter, size_t row_number, ...); + +/** + * @brief Read one delimited row from a file and parse as integers. + * @ingroup Utilities + * + * Given a file path (either absolute or relative to the current working directory), + * this function reads the specified row, which it assumes is a list of integers separated by a delimiter, + * and parses the values into a list of specified variables given as a list of int* pointers + * terminated with NULL. + * Example: + * ``` + * int a, b; + * int count = lf_initialize_int("x.csv", ',', 2, &a, &b, NULL); + * ``` + * This will read the third row of the file "x.csv" (row numbers start from 0) + * and parse the values into the variables `a` and `b`. + * The file "x.csv" may look like this:: + * ```csv + * a,b + * 1,2 + * 3,4 + * ``` + * Including a header row with the names of the variables is optional (but recommended). + * + * The return value is the number of values parsed (2 in this case), or -1 if an error occurred, + * for example if the file does not exist or the row number is out of range. + * + * To use this to initialize parameters and/or state variables of a reactor, you can do the following: + * + * ```lf + * main reactor MyReactor(x: int = 0, row_number: int = 0) { + * state y: int = 0; + * reaction(startup) {= + * lf_initialize_int("params.csv", ',', self->row_number, &self->x, &self->y, NULL); + * =} + * } + * ``` + * + * If the `row_number` is a top-level parameter (of the main reactor), as it is above, then you + * can override this parameter on the command line when running the program as follows: + * + * ``` + * bin/MyReactor --row_number 1 + * ``` + * This gives an easy way to compile once and run with different parameter values. + * + * If you wish to initialize parameters or state variables of a reactor within a bank, you + * can create a CSV file with one row per bank member and use the `bank_index` parameter to + * select the row to read. For example: + * ```lf + * reactor MyReactor(bank_index: int = 0) { + * reaction(startup) {= + * lf_initialize_int("params.csv", ',', self->bank_index + 1, &self->x, &self->y, NULL); + * =} + * } + * ``` + * The `bank_index` parameter is the index within the bank, starting from 0. + * The `+ 1` specifies to skip the header row. + * This way, each bank member can have a different set of parameter values. + * + * To use this in a Lingua Franca program, you must include the following in the target declaration: + * ```lf + * target C { + * cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake", + * files: [ + * "/lib/c/reactor-c/util/initialize_from_file.c", + * "/lib/c/reactor-c/util/initialize_from_file.h"] + * } + * ``` + * + * Then, in the reactor, you can include the header file as follows: + * ```lf + * reactor MyReactor { + * preamble {= + * #include "initialize_from_file.h" + * =} + * ... + * } + * ``` + * + * @param filename The name of the file to read. + * @param delimiter The delimiter character to use. + * @param row_number The row number in the file to read. + * @param ... The int* pointers to the variables to store the values in, terminated with NULL. + * @return The number of values parsed, or -1 if an error occurred. + */ +int lf_initialize_int(const char* filename, char delimiter, size_t row_number, ...); + +/** + * @brief Read one delimited row from a file and parse as strings. + * @ingroup Utilities + * + * Given a file path (either absolute or relative to the current working directory), + * this function reads the specified row, which it assumes is a list of strings separated by a delimiter, + * and parses the values into a list of specified variables given as a list of char** pointers + * terminated with NULL. If a field is enclosed in quotation marks (double or single), the marks + * are stripped. Memory for each string is allocated using `lf_allocate` and recorded in the + * reactor's allocation list so that it is freed automatically when the reactor is freed. + * + * This macro is meant to be called from a reaction, not directly. If you wish to call it from + * somewhere other than a reaction, you can use the following function: + * ```lf + * int _lf_initialize_string(const char* filename, char delimiter, size_t row_number, + * struct allocation_record_t** allocations, ...); + * ``` + * If you pass NULL for the allocations parameter, then you will be responsible for freeing the allocated memory. + * Example: + * ``` + * char* a; + * char* b; + * int count = _lf_initialize_string("x.csv", ',', 2, NULL, &a, &b, NULL); + * ... + * free(a); + * free(b); + * ``` + * + * This will read the third row of the file "x.csv" (row numbers start from 0) + * and parse the values into the variables `a` and `b`. + * The file "x.csv" may look like this:: + * ```csv + * a,b + * "hello","world" + * foo,bar + * ``` + * The quotation marks are optional and will be stripped if present. + * Note that putting a delimiter within the quotation marks is not supported and will give unpredictable results. + * Including a header row with the names of the variables is optional (but recommended). + * + * The return value is the number of values parsed (2 in this case), or -1 if an error occurred, + * for example if the file does not exist or the row number is out of range. + * + * To use this to initialize parameters and/or state variables of a reactor, you can do the following: + * + * ```lf + * main reactor MyReactor(row_number: int = 0) { + * state name: string = ""; + * reaction(startup) {= + * lf_initialize_string("params.csv", ',', self->row_number, &self->name, NULL); + * =} + * } + * ``` + * + * If the `row_number` is a top-level parameter (of the main reactor), as it is above, then you + * can override this parameter on the command line when running the program as follows: + * + * ``` + * bin/MyReactor --row_number 1 + * ``` + * This gives an easy way to compile once and run with different parameter values. + * + * If you wish to initialize parameters or state variables of a reactor within a bank, you + * can create a CSV file with one row per bank member and use the `bank_index` parameter to + * select the row to read. For example: + * ```lf + * reactor MyReactor(bank_index: int = 0) { + * reaction(startup) {= + * lf_initialize_string("params.csv", ',', self->bank_index + 1, &self->name, NULL); + * =} + * } + * ``` + * The `bank_index` parameter is the index within the bank, starting from 0. + * The `+ 1` specifies to skip the header row. + * This way, each bank member can have a different set of parameter values. + * + * To use this in a Lingua Franca program, you must include the following in the target declaration: + * ```lf + * target C { + * cmake-include: "/lib/c/reactor-c/util/initialize_from_file.cmake", + * files: [ + * "/lib/c/reactor-c/util/initialize_from_file.c", + * "/lib/c/reactor-c/util/initialize_from_file.h"] + * } + * ``` + * + * Then, in the reactor, you can include the header file as follows: + * ```lf + * reactor MyReactor { + * preamble {= + * #include "initialize_from_file.h" + * =} + * ... + * } + * ``` + * + * @param filename The name of the file to read. + * @param delimiter The delimiter character to use. + * @param row_number The row number in the file to read. + * @param ... The char** pointers to the variables to store the values in, terminated with NULL. + * @return The number of values parsed, or -1 if an error occurred. + */ +#define lf_initialize_string(filename, delimiter, row_number, ...) \ + _lf_initialize_string(filename, delimiter, row_number, &((self_base_t*)self)->allocations, __VA_ARGS__) + +int _lf_initialize_string(const char* filename, char delimiter, size_t row_number, + struct allocation_record_t** allocations, ...); + +#ifdef __cplusplus +} +#endif + +#endif // INITIALIZE_FROM_FILE_H \ No newline at end of file