空夜

Java WebSocket爬虫
之前有个需求,要抓取某个网站视频的弹幕数据,坑爹的是它这个实时弹幕是基于WebSocket的。因为这个,接触到了W...
扫描右侧二维码阅读全文
17
2019/04

Java WebSocket爬虫

之前有个需求,要抓取某个网站视频的弹幕数据,坑爹的是它这个实时弹幕是基于WebSocket的。因为这个,接触到了WebSocket技术,关于WebSocket基本知识的博客请参照这一篇[Java WebSocket实例]()。

今天来总结一下如何利用java模拟客户端,与目标服务器建立WebSocket连接,抓取对方的数据。

需要的依赖:

<dependency>
   <groupId>org.java-websocket</groupId>
   <artifactId>Java-WebSocket</artifactId>
   <version>1.3.5</version>
</dependency>

思路:主要就是继承WebSocketClient这个类,重写其中的onOpenonMessageonClose方法,通过onMessage方法,将数据保存至某个数据结构中。


WebSocket功能类

import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;

public class MyWebSocketClient extends WebSocketClient{

  private WebSocketExecutor executor; // 用于保存获取的数据、判定是否需要关闭连接的功能类

  public MyWebSocketClient(String url,WebSocketExecutor executor) throws URISyntaxException {
    super(new URI(url));
    this.executor = executor;
  }

  @Override
  public void onOpen(ServerHandshake shake) {
    System.out.println("握手...");
    for(Iterator<String> it=shake.iterateHttpFields();it.hasNext();) {
      String key = it.next();
      System.out.println(key+":"+shake.getFieldValue(key));
    }
  }

  @Override
  public void onMessage(String paramString) {
    System.out.println("接收到消息:" + paramString);
    executor.doMessage(paramString);
    if(executor.needClose(paramString)){
      this.close();
      executor.setClose(true);
    }
  }

  @Override
  public void onClose(int paramInt, String paramString, boolean paramBoolean) {
    if (!executor.isClosed()) {
      executor.setClose(true);
    }
    System.out.println("关闭...");
  }

  @Override
  public void onError(Exception e) {
    System.out.println("异常"+e);
  }

}

自定义的WebSocket处理器

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 自定义的WebSocket处理器
 * 用于保存或处理WebSocket数据
 * @author zfh
 * @version 1.0
 * @since 2019/4/2 16:26
 */
public class MyWebSocketExecutor implements WebSocketExecutor{

    public Queue<String> msgQueue = new LinkedBlockingQueue<>();

    private Boolean isHasClosed = false;

    @Override
    public void doMessage(String msg) {
        msgQueue.add(msg);
    }

    @Override
    public boolean needClose(String msg) {
        return msgQueue.size() >= 1000;
    }

    @Override
    public void setClose(boolean close) {
        isHasClosed = close;
    }

    @Override
    public boolean isClosed() {
        return isHasClosed;
    }
}

测试

以某直播平台为例进行测试(注:该网站可能会对消息改版,那样这个测试类可能就失效啦)

运行该测试类还需要以下依赖:

<dependency>
   <groupId>org.apache.httpcomponents</groupId>
   <artifactId>httpclient</artifactId>
   <version>4.5.2</version>
</dependency>
        
<dependency>
   <groupId>cn.hutool</groupId>
   <artifactId>hutool-all</artifactId>
   <version>4.3.2</version>
</dependency>

测试类:

package com.example.crawsocket.demo;

import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.java_websocket.WebSocket;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 一直播
 * 采集正在直播的实时弹幕,需要参数:直播url
 * @author zfh
 * @version 1.0
 * @since 2019/4/8 14:12
 */
public class YizhiboView {

    private static String paramUrl = "http://app.yizhibo.com/com.yixia.im.accbal.api.ImAccBalService/1.0.0/getWebSocketConnAcces?_p=%7B%22_did%22%3A+%22undefined%22%2C%22_appid%22%3A+%2223%22%2C%22_pkgname%22%3A+%22h5.yizhibo.live%22%7D&scid=";

    /**
     * 获取WebSocket接收评论需要的参数
     * 包括:token和memberId
     * @param scid
     * @return
     */
    private static Map<String, String> getAccessParams(String scid) {
        Map<String, String> map = new HashMap<>();
        String url = paramUrl + "%22"+ scid + "%22"; // %22表示"

        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
        HttpGet get = new HttpGet(url);

        try {
            CloseableHttpResponse response = httpClient.execute(get);
            String res = EntityUtils.toString(response.getEntity());
            System.out.println(res);
            System.out.println("----------------");
            if (res != null && res.contains("imToken")) {
                map.put("token", new JSONObject(res).getJSONObject("data").getStr("imToken"));
            }
            Header[] headers = response.getHeaders("Set-Cookie");
            if (headers != null && headers.length > 0) {
                for (Header header : headers) {
                    HeaderElement[] elements = header.getElements();
                    if (elements != null && elements.length > 0) {
                        for (HeaderElement element : elements) {
                            System.out.println(element.getName() + ":" + element.getValue());
                            if (element.getName().equalsIgnoreCase("visitor_id")) {
                                map.put("visitor_id", element.getValue());
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return map;
    }

    /**
     * 获取实时弹幕
     * @param scid
     * @param accessMap
     * @return
     */
    private static JSONArray getCommentList(String scid, Map<String, String> accessMap) {
        MyWebSocketExecutor executor = new MyWebSocketExecutor();
        try {
            MyWebSocketClient client = new MyWebSocketClient("wss://ws.yizhibo.com/websocket", executor);
            client.connect();
            while (!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
                System.out.println("还没有打开");
            }
            System.out.println("建立websocket连接");

            client.send("{\"body\":\"{\\\"did_\\\":\\\"undefined\\\",\\\"isAnchor_\\\":\\\"0\\\",\\\"memberId_\\\":\\\"" + accessMap.get("visitor_id") + "\\\",\\\"token_\\\":\\\"" + accessMap.get("token") + "\\\",\\\"scid_\\\":\\\"" + scid + "\\\"}\",\"clientVersion\":101,\"cmdId\":2,\"isZip\":0}");
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }

        while(!executor.isClosed()) {
            System.out.println("WebSocket未断开,继续接受数据中...");
            try {
                Thread.sleep( 3 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("共接受到数据: " + executor.msgQueue.size() + "条");

        return new JSONArray(executor.msgQueue);
    }

    /**
     * 提取正则匹配内容
     * @param regex
     * @param source
     * @return
     */
    private static String getMatcher(String regex, String source) {
        String result = "";
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(source);
        while (matcher.find()) {
            result = matcher.group(1);
        }
        return result;
    }


    public static void main(String[] args) {
        String url = "https://www.yizhibo.com/l/17K5RkWx2nYxX4ii.html?p_from=Phome_HotAnchorRecommand"; // 可以去该直播网站官网随机获取一个正在直播的地址
        String scid = getMatcher("l/(.*).html", url);
        Map<String, String> accessMap = getAccessParams(scid);
        JSONArray array = getCommentList(scid, accessMap);
        System.out.println(array.toString());
    }
}
Last modification:April 17th, 2019 at 07:19 pm
If you think my article is useful to you, please feel free to appreciate

2 comments

  1. 心语难诉

    抓取的接口没有时效性吗?

    1. 空夜
      @心语难诉

      这个代码获取的本身就是实时的弹幕啦,如果要保持WebSocket连接的话,估计要每隔几十秒重新send一次request。

Leave a Comment