job.php循环投送任务
//swoole循环定时器
swoole_timer_tick(1000,function(){
xunhuan();
});
function xunhuan()
{
$mq = msg_get_queue(0x70);
$message_queue_status = msg_stat_queue($mq);
$num = $message_queue_status['msg_qnum'];
echo "队列中url数量为: " . $num."\r\n";
$servername = "localhost";
$username = "root";
$password = "root";
$dbname = "dbname";
// 创建连接
$conn = new mysqli($servername, $username, $password, $dbname);
// Check connection
if ($conn->connect_error) {
die("连接失败: " . $conn->connect_error);
}
$sql = "SELECT * FROM temp";
$result = $conn->query($sql);
echo "总条数:" . $result->num_rows . "\r\n";
$sql = "SELECT * FROM temp order by id";
$result = $conn->query($sql);
echo "本次抓取条数:" . $result->num_rows . "\r\n";;
if ($result->num_rows > 0) {
// 输出数据
while ($row = $result->fetch_assoc()) {
$sources[] = $row["source"];
msg_send($mq, 1, $row["source"], false);
}
} else {
echo "待抓取图片为空\r\n";
}
$conn->close();
}
swoole.php 执行任务
<?php
error_reporting(0);
$pool = new Swoole\Process\Pool(50, SWOOLE_IPC_MSGQUEUE, 0x70);
$pool->on("Message", function ($pool, $message) {
get_job_from_mysql($message); //不在内存中读取
});
$pool->on("WorkerStart", function ($pool, $workerId) {
echo "Worker#{$workerId} is started\n";
});
$pool->on("WorkerStop", function ($pool, $workerId) {
echo "Worker#{$workerId} is stopped\n";
});
$pool->start();
function get_job_from_mysql($source)
{
$servername = "localhost";
$username = "root";
$password = "root";
$dbname = "dbname";
$conn = new mysqli($servername, $username, $password, $dbname);
// Check connection
if ($conn->connect_error) {
die("连接失败: " . $conn->connect_error);
}
$sql = "SELECT * FROM temp where source= '$source'";
$result = $conn->query($sql);
echo "本次URL:" . $source . "\r\n";;
if ($result->num_rows > 0) {
// 输出数据
while ($row = $result->fetch_assoc()) {
$local = $row["local"];
$refer = $row["refer"];
$path = str_replace("http://source.imgcache.top/", "", $local);
$paths = explode("/", $path);
$end = end($paths);
$paths = str_replace($end, "", $path);
if (!is_dir($paths)) {
mkdirs($paths);
}
if (file_exists($path)) {
echo $local . "已存在\r\n";
} else {//抓取图片保存文件
$pic = getpic($source, $refer);
if (strlen($pic) > 100) {
file_put_contents($path, $pic);
echo $local . "抓取成功\r\n";
}
}
$sql = "delete FROM temp where local='$local'";
$result = $conn->query($sql);
}
} else {
echo "待抓取图片为空\r\n";
}
}
//$conn->close();
function mkdirs($dir, $mode = 0777)
{
if (is_dir($dir) || @mkdir($dir, $mode)) return TRUE;
if (!mkdirs(dirname($dir), $mode)) return FALSE;
return @mkdir($dir, $mode);
}
function getpic($url, $refer)
{
$curl = curl_init(); // 启动一个CURL会话
curl_setopt($curl, CURLOPT_URL, $url); // 要访问的地址
curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, 0); // 对认证证书来源的检查
curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, 1); // 从证书中检查SSL加密算法是否存在
curl_setopt($curl, CURLOPT_FOLLOWLOCATION, 1); // 使用自动跳转
curl_setopt($curl, CURLOPT_ENCODING, "gzip");
curl_setopt($curl, CURLOPT_USERAGENT, 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727;)');
curl_setopt($curl, CURLOPT_TIMEOUT, 30); // 设置超时限制防止死循环
curl_setopt($curl, CURLOPT_CONNECTTIMEOUT, 15); // connect timeout
curl_setopt($curl, CURLOPT_HEADER, 0); // 显示返回的Header区域内容
curl_setopt($curl, CURLOPT_REFERER, $refer);//模拟来路
curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1); // 获取的信息以文件流的形式返回
$tmpInfo = curl_exec($curl); // 执行操作
$info = curl_getinfo($curl);
$content_type = $info['content_type'];
$http_code = $info['http_code'];
if (curl_errno($curl)) {
// echo 'Errno'.curl_error($curl);//捕抓异常
}
curl_close($curl); // 关闭CURL会话
/* return array(
$tmpInfo,
$content_type,
$http_code
); // 返回数组 分别为正文,类型,HTTP代码
*/
return $tmpInfo;
}
?>