1. 同步Timer
2. 异步Timer
3. 回调函数的参数
4. 成员函数作为回调函数
5. 多线程回调同步
6. TCP客户端:对准时间
7. TCP同步时间服务器
1. 同步Timer
本章介绍asio如何在定时器上进行阻塞等待(blocking wait).实现,我们包含必要的头文件.
所有的asio类可以简单的通过include "asio.hpp"来调用.
#include <iostream>
#include <boost/asio.hpp>
此外,这个示例用到了timer,我们还要包含Boost.Date_Time的头文件来控制时间.
#include <boost/date_time/posix_time/posix_time.hpp>
使用asio至少需要一个boost::asio::io_service对象.该类提供了访问I/O的功能.我们首先在main函数中声明它.
int main()
{
boost::asio::io_service io;
下一步我们声明boost::asio::deadline_timer对象.这个asio的核心类提供I/O的功能(这里更确切的说是定时功能),总是把一个io_service对象作为他的第一个构造函数,而第二个构造函数的参数设定timer会在5秒后到时(expired).
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
这个简单的示例中我们演示了定时器上的一个阻塞等待.就是说,调用boost::asio::deadline_timer::wait()的在创建后5秒内(注意:不是等待开始后),timer到时之前不会返回任何值. 一个deadline_timer只有两种状态:到时,未到时.
如果boost::asio::deadline_timer::wait()在到时的timer对象上调用,会立即return.
t.wait();
最后,我们输出理所当然的"Hello, world!"来演示timer到时了.
std::cout << "Hello, world! ";
return 0;
}
完整的代码:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
int main()
{
boost::asio::io_service io;
boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
t.wait();
std::cout << "Hello, world! ";
return 0;
}
2. 异步Timer
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
asio的异步函数会在一个异步操作完成后被回调.这里我们定义了一个将被回调的函数.
void print(const asio::error& /*e*/)
{
std::cout << "Hello, world! ";
}
int main()
{
asio::io_service io;
asio::deadline_timer t(io, boost::posix_time::seconds(5));
这里我们调用asio::deadline_timer::async_wait()来异步等待\
t.async_wait(print);
最后,我们必须调用asio::io_service::run(). asio库只会调用那个正在运行的asio::io_service::run()的回调函数.
如果asio::io_service::run()不被调用,那么回调永远不会发生.
asio::io_service::run()会持续工作到点,这里就是timer到时,回调完成.
别忘了在调用 asio::io_service::run()之前设置好io_service的任务.比如,这里,如果我们忘记先调用asio::deadline_timer::async_wait()则asio::io_service::run()会在瞬间return.
io.run();
return 0;
}
完整的代码:
#include <iostream>
#include <asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/)
{
std::cout << "Hello, world! ";
}
int main()
{
asio::io_service io;
asio::deadline_timer t(io, boost::posix_time::seconds(5));
t.async_wait(print);
io.run();
return 0;
}
3. 回调函数的参数
这里我们将每秒回调一次,来演示如何回调函数参数的含义
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
首先,调整一下timer的持续时间,开始一个异步等待.显示,回调函数需要访问timer来实现周期运行,所以我们再介绍两个新参数
- 指向timer的指针
- 一个int*来指向计数器
void print(const asio::error& /*e*/,
asio::deadline_timer* t, int* count)
{
我们打算让这个函数运行6个周期,然而你会发现这里没有显式的方法来终止io_service.不过,回顾上一节,你会发现当 asio::io_service::run()会在所有任务完成时终止.这样我们当计算器的值达到5时(0为第一次运行的值),不再开启一个新的异步等待就可以了.
if (*count < 5)
{
std::cout << *count << " ";
++(*count);
...
然后,我们推迟的timer的终止时间.通过在原先的终止时间上增加延时,我们可以确保timer不会在处理回调函数所需时间内的到期. (原文:By calculating the new expiry time relative to the old, we can ensure that the timer does not drift away from the whole-second mark due to any delays in processing the handler.)
t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
然后我们开始一个新的同步等待.如您所见,我们用把print和他的多个参数用boost::bind函数合成一个的形为void(const asio::error&)回调函数(准确的说是function object). 在这个例子中, boost::bind的asio::placeholders::error参数是为了给回调函数传入一个error对象.当进行一个异步操作,开始 boost::bind时,你需要使用它来匹配回调函数的参数表.下一节中你会学到回调函数不需要error参数时可以省略它.
t->async_wait(boost::bind(print,
asio::placeholders::error, t, count));
}
}
int main()
{
asio::io_service io;
int count = 0;
asio::deadline_timer t(io, boost::posix_time::seconds(1));
和上面一样,我们再一次使用了绑定asio::deadline_timer::async_wait()
t.async_wait(boost::bind(print,
asio::placeholders::error, &t, &count));
io.run();
在结尾,我们打印出的最后一次没有设置timer的调用的count的值
std::cout << "Final count is " << count << " ";
return 0;
}
完整的代码:
#include <iostream>
#include <asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
void print(const asio::error& /*e*/,
bsp; asio::deadline_timer* t, int* count)
{
if (*count < 5)
{
std::cout << *count << " ";
++(*count);
t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
t->async_wait(boost::bind(print,
asio::placeholders::error, t, count));
}
}
int main()
{
asio::io_service io;
int count = 0;
asio::deadline_timer t(io, boost::posix_time::seconds(1));
t.async_wait(boost::bind(print,
asio::placeholders::error, &t, &count));
io.run();
std::cout << "Final count is " << count << " ";
return 0;
}
4. 成员函数作为回调函数
本例的运行结果和上一节类似
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
我们先定义一个printer类
class printer
{
public:
//构造函数有一个io_service参数,并且在初始化timer_时用到了它.用来计数的count_这里同样作为了成员变量
printer(boost::asio::io_service& io)
: timer_(io, boost::posix_time::seconds(1)),
count_(0)
{
boost::bind 同样可以出色的工作在成员函数上.众所周知,所有的非静态成员函数都有一个隐式的this参数,我们需要把this作为参数bind到成员函数上.和上一节类似,我们再次用bind构造出void(const boost::asio::error&)形式的函数. 注意,这里没有指定boost::asio::placeholders::error占位符,因为这个print成员函数没有接受一个error对象作为参数.
timer_.async_wait(boost::bind(&printer::print, this));
在类的折构函数中我们输出最后一次回调的count的值
~printer()
{
std::cout << "Final count is " << count_ << " ";
}
print函数于上一节的十分类似,但是用成员变量取代了参数.
void print()
{
if (count_ < 5)
{
std::cout << count_ << " ";
++count_;
timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
timer_.async_wait(boost::bind(&printer::print, this));
}
}
private:
boost::asio::deadline_timer timer_;
int count_;
};
现在main函数清爽多了,在运行io_service之前只需要简单的定义一个printer对象.
int main()
{
boost::asio::io_service io;
printer p(io);
io.run();
return 0;
}
完整的代码:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
public:
printer(boost::asio::io_service& io)
: timer_(io, boost::posix_time::seconds(1)),
count_(0)
{
timer_.async_wait(boost::bind(&printer::print, this));
}
~printer()
{
std::cout << "Final count is " << count_ << " ";
}
void print()
{
if (count_ < 5)
{
std::cout << count_ << " ";
++count_;
timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
timer_.async_wait(boost::bind(&printer::print, this));
}
}
private:
boost::asio::deadline_timer timer_;
int count_;
};
int main()
{
boost::asio::io_service io;
printer p(io);
io.run();
return 0;
}
5. 多线程回调同步
本节演示了使用boost::asio::strand在多线程程序中进行回调同步(synchronise).先前的几节阐明了如何在单线程程序中用boost::asio::io_service::run()进行同步.如您所见,asio库确保 仅当当前线程调用boost::asio::io_service::run()时产生回调.显然,仅在一个线程中调用 boost::asio::io_service::run() 来确保回调是适用于并发编程的.
一个基于asio的程序最好是从单线程入手,但是单线程有如下的限制,这一点在服务器上尤其明显:
- 当回调耗时较长时,反应迟钝.
- 在多核的系统上无能为力
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
在上一节的基础上我们定义一个printer类,此次,它将并行运行两个timer
class printer
{
public:
除了声明了一对boost::asio::deadline_timer,构造函数也初始化了类型为boost::asio::strand的strand_成员. boost::asio::strand 可以分配的回调函数.它保证无论有多少线程调用了boost::asio::io_service::run(),下一个回调函数仅在前一个回调函数完成后开始,当然回调函数仍然可以和那些不使用boost::asio::strand分配,或是使用另一个boost::asio::strand分配的回调函数一起并发执行.
printer(boost::asio::io_service& io)
: strand_(io),
timer1_(io, boost::posix_time::seconds(1)),
timer2_(io, boost::posix_time::seconds(1)),
count_(0)
{
当一个异步操作开始时,用boost::asio::strand来 "wrapped(包装)"回调函数.boost::asio::strand::wrap()会返回一个由boost::asio::strand分配的新的handler(句柄),这样,我们可以确保它们不会同时运行.
timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
}
~printer()
{
std::cout << "Final count is " << count_ << " ";
}
多线程程序中,回调函数在访问共享资源前需要同步.这里共享资源是std::cout 和count_变量.
void print1()
{
if (count_ < 10)
{
std::cout << "Timer 1: " << count_ << " ";
++count_;
timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
}
}
void print2()
{
if (count_ < 10)
{
std::cout << "Timer 2: " << count_ << " ";
++count_;
timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
}
}
private:
boost::asio::strand strand_;
boost::asio::deadline_timer timer1_;
boost::asio::deadline_timer timer2_;
int count_;
};
main函数中boost::asio::io_service::run()在两个线程中被调用:主线程、一个boost::thread线程. 正如单线程中那样,并发的boost::asio::io_service::run()会一直运行直到完成任务.后台的线程将在所有异步线程完成后终结.
int main()
{
boost::asio::io_service io;
printer p(io);
boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
io.run();
t.join();
return 0;
}
完整的代码:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
class printer
{
public:
printer(boost::asio::io_service& io)
: strand_(io),
timer1_(io, boost::posix_time::seconds(1)),
timer2_(io, boost::posix_time::seconds(1)),
count_(0)
{
timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
}
~printer()
{
std::cout << "Final count is " << count_ << " ";
}
void print1()
{
if (count_ < 10)
{
std::cout << "Timer 1: " << count_ << " ";
++count_;
timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
}
}
void print2()
{
if (count_ < 10)
{
std::cout << "Timer 2: " << count_ << " ";
++count_;
timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
}
}
private:
boost::asio::strand strand_;
boost::asio::deadline_timer timer1_;
boost::asio::deadline_timer timer2_;
int count_;
};
int main()
{
boost::asio::io_service io;
printer p(io);
boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
io.run();
t.join();
return 0;
}
6. TCP客户端:对准时间
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: client <host>" << std::endl;
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(argv[1], "daytime");
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
tcp::socket socket(io_service);
boost::asio::error error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end)
{
socket.close();
socket.connect(*endpoint_iterator++, boost::asio::assign_error(error));
}
if (error)
throw error;
for (;;)
{
boost::array<char, 128> buf;
boost::asio::error error;
size_t len = socket.read_some(
boost::asio::buffer(buf), boost::asio::assign_error(error));
if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw error; // Some other error.
std::cout.write(buf.data(), len);
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
完整的代码:
#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>
using asio::ip::tcp;
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: client <host>" << std::endl;
return 1;
}
asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(argv[1], "daytime");
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
tcp::socket socket(io_service);
asio::error error = asio::error::host_not_found;
while (error && endpoint_iterator != end)
{
socket.close();
socket.connect(*endpoint_iterator++, asio::assign_error(error));
}
if (error)
throw error;
for (;;)
{
boost::array<char, 128> buf;
asio::error error;
size_t len = socket.read_some(
asio::buffer(buf), asio::assign_error(error));
if (error == asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw error; // Some other error.
std::cout.write(buf.data(), len);
}
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
7. TCP同步时间服务器
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
std::string make_daytime_string()
{
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}
int main()
{
try
{
asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));
for (;;)
{
tcp::socket socket(io_service);
acceptor.accept(socket);
std::string message = make_daytime_string();
asio::write(socket, asio::buffer(message),
asio::transfer_all(), asio::ignore_error());
}
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
完整的代码:
#include <ctime>
#include <iostream>
#include <string>
#include <asio.hpp>
using asio::ip::tcp;
std::string make_daytime_string()
{
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}
int main()
{
try
{
asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));
for (;;)
{
tcp::socket socket(io_service);
acceptor.accept(socket);
std::string message = make_daytime_string();
asio::write(socket, asio::buffer(message),
asio::transfer_all(), asio::ignore_error());
}
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}