插件窝 干货文章 linux socket怎么实现使用线程独立收发消息

linux socket怎么实现使用线程独立收发消息

BUFFER 线程 sockfd pthread 434    来源:    2025-04-24

Linux Socket 线程独立收发消息实现

在Linux下使用Socket实现线程独立的收发消息,可以通过多线程技术将数据接收和发送分离到不同的线程中处理。以下是几种实现方式:

基本实现方法

1. 创建两个独立线程

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 1024

int sockfd;

// 接收线程函数
void *recv_thread(void *arg) {
    char buffer[BUFFER_SIZE];
    while (1) {
        memset(buffer, 0, BUFFER_SIZE);
        int n = recv(sockfd, buffer, BUFFER_SIZE, 0);
        if (n <= 0) {
            perror("recv error");
            break;
        }
        printf("Received: %s\n", buffer);
    }
    return NULL;
}

// 发送线程函数
void *send_thread(void *arg) {
    char buffer[BUFFER_SIZE];
    while (1) {
        memset(buffer, 0, BUFFER_SIZE);
        printf("Enter message to send: ");
        fgets(buffer, BUFFER_SIZE, stdin);

        // 去除换行符
        buffer[strcspn(buffer, "\n")] = 0;

        if (send(sockfd, buffer, strlen(buffer), 0) < 0) {
            perror("send error");
            break;
        }
    }
    return NULL;
}

int main() {
    struct sockaddr_in serv_addr;

    // 创建socket
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    // 设置服务器地址
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(8080);
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    // 连接服务器
    if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) {
        perror("connect failed");
        exit(EXIT_FAILURE);
    }

    // 创建线程
    pthread_t recv_tid, send_tid;
    pthread_create(&recv_tid, NULL, recv_thread, NULL);
    pthread_create(&send_tid, NULL, send_thread, NULL);

    // 等待线程结束
    pthread_join(recv_tid, NULL);
    pthread_join(send_tid, NULL);

    close(sockfd);
    return 0;
}

2. 使用互斥锁保护共享资源

如果需要共享数据,应该使用互斥锁:

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

// 在线程中访问共享资源前
pthread_mutex_lock(&lock);
// 访问共享资源
pthread_mutex_unlock(&lock);

高级实现方法

1. 使用线程池

对于高并发场景,可以使用线程池管理收发线程:

#include <pthread.h>
#include <semaphore.h>

#define THREAD_POOL_SIZE 4

typedef struct {
    int sockfd;
    pthread_t thread;
    void *(*function)(void *);
} thread_pool_t;

thread_pool_t pool[THREAD_POOL_SIZE];

void init_thread_pool() {
    for (int i = 0; i < THREAD_POOL_SIZE; i++) {
        pool[i].sockfd = -1;
        pthread_create(&pool[i].thread, NULL, pool[i].function, &pool[i]);
    }
}

2. 使用事件驱动模型

结合epoll和线程可以实现更高效的处理:

#include <sys/epoll.h>

#define MAX_EVENTS 10

void epoll_thread(int sockfd) {
    int epollfd = epoll_create1(0);
    struct epoll_event ev, events[MAX_EVENTS];

    ev.events = EPOLLIN;
    ev.data.fd = sockfd;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev);

    while (1) {
        int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == sockfd) {
                // 处理接收数据
                char buffer[BUFFER_SIZE];
                recv(sockfd, buffer, BUFFER_SIZE, 0);
                printf("Received: %s\n", buffer);
            }
        }
    }
}

注意事项

  1. 线程安全:确保对共享资源的访问是线程安全的
  2. 错误处理:正确处理socket错误和线程错误
  3. 资源释放:在程序结束时释放所有资源
  4. 性能考虑:对于高性能应用,考虑使用非阻塞IO或IO多路复用
  5. 同步机制:根据需要使用互斥锁、条件变量等同步机制

完整示例(TCP客户端)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define BUFFER_SIZE 1024

int sockfd;
int running = 1;

void *recv_thread(void *arg) {
    char buffer[BUFFER_SIZE];
    while (running) {
        memset(buffer, 0, BUFFER_SIZE);
        int n = recv(sockfd, buffer, BUFFER_SIZE, 0);
        if (n <= 0) {
            perror("recv error");
            running = 0;
            break;
        }
        printf("Received: %s\n", buffer);
    }
    return NULL;
}

void *send_thread(void *arg) {
    char buffer[BUFFER_SIZE];
    while (running) {
        memset(buffer, 0, BUFFER_SIZE);
        printf("Enter message to send (or 'exit' to quit): ");
        fgets(buffer, BUFFER_SIZE, stdin);

        buffer[strcspn(buffer, "\n")] = 0;

        if (strcmp(buffer, "exit") == 0) {
            running = 0;
            break;
        }

        if (send(sockfd, buffer, strlen(buffer), 0) < 0) {
            perror("send error");
            running = 0;
            break;
        }
    }
    return NULL;
}

int main() {
    struct sockaddr_in serv_addr;

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(8080);
    if (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
        perror("invalid address");
        exit(EXIT_FAILURE);
    }

    if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) {
        perror("connect failed");
        exit(EXIT_FAILURE);
    }

    pthread_t recv_tid, send_tid;
    pthread_create(&recv_tid, NULL, recv_thread, NULL);
    pthread_create(&send_tid, NULL, send_thread, NULL);

    pthread_join(recv_tid, NULL);
    pthread_join(send_tid, NULL);

    close(sockfd);
    printf("Connection closed\n");
    return 0;
}

这个实现创建了两个线程,一个专门负责接收数据,一个专门负责发送数据,实现了收发操作的完全独立。