message 消息块,

chunli@Linux:~/ace/AceTask$ cat message_block_test.cpp #include 
int main() {    ACE_Message_Block *head = new ACE_Message_Block(8);    ACE_Message_Block *mblk = head;    for (;;) {        ssize_t nbytes = ACE::read_n(ACE_STDIN, mblk->wr_ptr(), mblk->size());        if (nbytes <= 0)            break; // Break out at EOF or error.        // Advance the write pointer to the end of the buffer.        mblk->wr_ptr(nbytes);        // Allocate message block and chain it at the end of list.        mblk->cont(new ACE_Message_Block(8));        mblk = mblk->cont();    }    // Print the contents of the list to the standard output.    int i = 1;    for (mblk = head; mblk != 0; mblk = mblk->cont()) {        ACE_DEBUG((LM_DEBUG, "\n%dth message: ", i++));        ACE::write_n(ACE_STDOUT, mblk->rd_ptr(), mblk->length());    }    head->release(); // This releases all the memory in the chain.    return 0;}chunli@Linux:~/ace/AceTask$ chunli@Linux:~/ace/AceTask$ g++ message_block_test.cpp -lACE && ./a.out 12345678qwertyuioasdfgh111111111111111111111111th message: 123456782th message: qwertyu3th message: ioasdfg4th message: h1111115th message: 111111116th message: 111111117th message: chunli@Linux:~/ace/AceTask$


chunli@Linux:~/ace/AceTask$ cat thread_manager_test1.cpp //ACE_Guard提供了自动锁定、释放机制的同步锁,对可见的scope范围内进行自动加锁,当超出scope范围是程序自动卸锁。#include 
ACE_Thread_Mutex nm, iom;int n = 0;static ACE_THR_FUNC_RETURN func(void* arg) { while (n < 30)  { { ACE_Guard
 guard(nm);//保护n n++; } { //ACE_Guard
 guard2(iom);//保护io ACE_DEBUG((LM_DEBUG, "(%t) %d\n", n)); } ACE_OS::sleep(1); } return 0;}int main() { //创建3个线程 ACE_Thread_Manager::instance()->spawn_n(5, func, NULL, THR_SCOPE_SYSTEM | THR_NEW_LWP); ACE_Thread_Manager::instance()->wait();}chunli@Linux:~/ace/AceTask$ g++ thread_manager_test1.cpp -lACE && ./a.out (139716248389376) 2(139716158551808) 3(139716239996672) 5(139716256782080) 4(139716265174784) 2(139716158551808) 6(139716256782080) 7(139716248389376) 8(139716239996672) 9(139716265174784) 10(139716256782080) 11(139716158551808) 13(139716248389376) 12(139716239996672) 14(139716265174784) 15(139716239996672) 16(139716256782080) 17(139716248389376) 18(139716158551808) 19(139716265174784) 20(139716256782080) 21(139716248389376) 22(139716239996672) 23(139716158551808) 24(139716265174784) 25(139716239996672) 26(139716256782080) 27(139716248389376) 28(139716158551808) 29(139716265174784) 30chunli@Linux:~/ace/AceTask$

ACE_Task 多线程管理版 echo_server

chunli@Linux:~/ace/AceTask$ cat echo_server_mt.cpp #include 
static ACE_THR_FUNC_RETURN func(void* arg) { ACE_SOCK_Stream* socket = static_cast
 (arg); char buf[512]; for (;;) { memset(buf, 0, sizeof(buf)); ssize_t n; if ((n = socket->recv(buf, sizeof(buf))) <= 0) { ACE_ERROR((LM_ERROR, "%p\n", "recv()")); break; } else { if (socket->send(buf, n) == -1) { ACE_ERROR((LM_ERROR, "%p\n", "send()")); break; } } } socket->close(); delete socket; return 0;}int main() { ACE_INET_Addr server_addr; ACE_SOCK_Acceptor acceptor; if (server_addr.set(8868) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "set()"), 1); if ( == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open()"), 1); for (;;) { ACE_SOCK_Stream* peer = new ACE_SOCK_Stream; if (acceptor.accept(*peer) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "accept()"), 1); peer->disable(ACE_NONBLOCK); // Ensure blocking 
s. ACE_Thread_Manager::instance()->spawn(func, peer); } return acceptor.close() == -1 ? 1 : 0;}chunli@Linux:~/ace/AceTask$ 编译运行:chunli@Linux:~/ace/AceTask$ g++ echo_server_mt.cpp -lACE && ./a.out 第一个客户端:chunli@Linux:~$ nc localhost 8868qazqazwsxwsx第二个客户端:chunli@Linux:~$ nc localhost 8868123123eqweeqwe

局域网 组播收发实验

chunli@Linux:~/ace/AceTask$ cat chat_room.cpp #include 
class ChatRoom: public ACE_Task_Base {public:    int join(const ACE_INET_Addr& group) {        _group = group;        if (sock.join(_group) == -1)            ACE_ERROR_RETURN((LM_ERROR,"%p\n", "join"), -1);        return 0;    }    virtual int svc() {        ACE_INET_Addr remoteAddr;        for (;;) {            memset(buf, 0, sizeof(buf));            if (sock.recv(buf, sizeof(buf), remoteAddr) != -1) {                ACE_DEBUG((LM_DEBUG, "recv msg from %s:%d: %s\n",                                remoteAddr.get_host_addr(),                                remoteAddr.get_port_number(), buf));            } else {                sock.leave(_group);                break;            }        }        return 0;    }    void sendMsg() {        char sendBuf[512];        for (;;) {            memset(sendBuf, 0, sizeof(sendBuf));            std::cin.getline(sendBuf, sizeof(sendBuf));            if (sock.send(sendBuf, strlen(sendBuf)) == -1) {                ACE_ERROR((LM_ERROR,"%p\n", "send"));                sock.leave(_group);                break;            }        }    }private:    char buf[512];    ACE_INET_Addr _group;    ACE_SOCK_Dgram_Mcast sock;};int main() {    ChatRoom cm;    ACE_INET_Addr group(8000, "");    if (cm.join(group) == -1)        return 1;    cm.activate();    cm.sendMsg();}一个运行:chunli@Linux:~/ace/AceTask$ g++ chat_room.cpp -lACE && ./a.out recv msg from 1234edxs中国上海recv msg from 中国上海局域网其他机子运行chunli@Linux:~/ace/AceTask$ g++ chat_room.cpp -lACE && ./a.out1234edxsrecv msg from 1234edxsrecv msg from 中国上海

ACE_Message_Queue 生产者消费者模型

chunli@Linux:~/ace/AceTask$ cat producer.cpp #include "ace/Task.h"#include "ace/Message_Block.h"const int N = 10;//The Consumer Task.class Consumer: public ACE_Task
 {public: int open(void*)  { ACE_DEBUG((LM_DEBUG, "(%t)Consumer task opened \n")); //Activate the Task activate(THR_NEW_LWP, 3);//开启3个线程 return 0; } //The Service Processing routine int svc(void)  { ACE_Message_Block* mb = 0; while(1)  { mb = 0; getq(mb);//Get message from underlying queue //ACE_Message_Block提供了两个指针函数以供程序员进行读写操作, //rd_ptr()指向可读的数据块地址, //wr_ptr()指向可写的数据块地址, //默认情况下都执行数据块的首地址 if (*mb->rd_ptr() < N)  { ACE_DEBUG((LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr())); } else if (*mb->rd_ptr() == N)  { ACE_DEBUG((LM_DEBUG, "(%t)Got message: %d from remote task\n", *mb->rd_ptr())); ++*mb->rd_ptr(); // *mb->rd_ptr() == N+1 ungetq(mb); break;// 供后续线程查看 } else  {  // *mb->rd_ptr() > N ungetq(mb); break;// 供后续线程查看 } } return 0; } int close(u_long)  { ACE_DEBUG((LM_DEBUG, "(%t)Consumer closes down \n")); return 0; }};class Producer: public ACE_Task_Base {public: Producer(Consumer* consumer):data_(0), consumer_(consumer)  { mb_ = new ACE_Message_Block((char*) &data_, sizeof(data_)); } int open(void*)  { ACE_DEBUG((LM_DEBUG, "(%t)Producer task opened \n")); //Activate the Task activate(THR_NEW_LWP, 1); return 0; } //The Service Processing routine int svc(void)  { while (data_ <= N)  { //Send message to consumer ACE_DEBUG((LM_DEBUG, "(%t)Sending message: %d to remote task\n", data_)); consumer_->putq(mb_); //Go to sleep for a sec. ACE_OS::sleep(1); ++data_; } return 0; } int close(u_long)  { ACE_DEBUG((LM_DEBUG, "(%t)Producer closes down \n")); return 0; }private: char data_; Consumer * consumer_; ACE_Message_Block * mb_;};int main() { Consumer* consumer = new Consumer; consumer->open(0); Producer* producer = new Producer(consumer); producer->open(0); ACE_Thread_Manager::instance()->wait(); //Wait for all the tasks to exit.}chunli@Linux:~/ace/AceTask$ g++ producer.cpp -lACE -lpthread && ./a.out (140704458946432)Consumer task opened (140704458946432)Producer task opened (140704408925952)Sending message: 0 to remote task(140704434104064)Got message: 0 from remote task(140704408925952)Sending message: 1 to remote task(140704434104064)Got message: 1 from remote task(140704408925952)Sending message: 2 to remote task(140704425711360)Got message: 2 from remote task(140704408925952)Sending message: 3 to remote task(140704417318656)Got message: 3 from remote task(140704408925952)Sending message: 4 to remote task(140704434104064)Got message: 4 from remote task(140704408925952)Sending message: 5 to remote task(140704425711360)Got message: 5 from remote task(140704408925952)Sending message: 6 to remote task(140704417318656)Got message: 6 from remote task(140704408925952)Sending message: 7 to remote task(140704434104064)Got message: 7 from remote task(140704408925952)Sending message: 8 to remote task(140704425711360)Got message: 8 from remote task(140704408925952)Sending message: 9 to remote task(140704417318656)Got message: 9 from remote task(140704408925952)Sending message: 10 to remote task(140704434104064)Got message: 10 from remote task(140704434104064)Consumer closes down (140704425711360)Consumer closes down (140704417318656)Consumer closes down (140704408925952)Producer closes down chunli@Linux:~/ace/AceTask$