中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

建站之星備案百度推廣售后

建站之星備案,百度推廣售后,本地門戶網(wǎng)站系統(tǒng),哪個網(wǎng)站可以做簽證DorisStreamLoadObserver 類是一個用于將數(shù)據(jù)加載到 Doris(以前稱為 Palo)數(shù)據(jù)庫中并監(jiān)視加載過程的 Java 類。該類提供了一組方法,用于構(gòu)建 HTTP 請求、處理 HTTP 響應(yīng)以及監(jiān)控數(shù)據(jù)加載的狀態(tài)。以下是每個方法的具體作用: Doris…

DorisStreamLoadObserver?類是一個用于將數(shù)據(jù)加載到 Doris(以前稱為 Palo)數(shù)據(jù)庫中并監(jiān)視加載過程的 Java 類。該類提供了一組方法,用于構(gòu)建 HTTP 請求、處理 HTTP 響應(yīng)以及監(jiān)控數(shù)據(jù)加載的狀態(tài)。以下是每個方法的具體作用:

  1. DorisStreamLoadObserver(Keys options): 這是類的構(gòu)造函數(shù),用于初始化加載數(shù)據(jù)所需的配置選項。
  2. void streamLoad(WriterTuple data) throws Exception: 該方法是數(shù)據(jù)加載的主要方法。它將給定的數(shù)據(jù)(WriterTuple?對象)加載到 Doris 數(shù)據(jù)庫中。它構(gòu)建了用于將數(shù)據(jù)發(fā)送到 Doris 的 HTTP 請求,并根據(jù)響應(yīng)狀態(tài)來確定加載是否成功。如果加載失敗,它會拋出異常。
  3. private void checkStreamLoadState(String host, String label) throws IOException: 這個方法用于檢查數(shù)據(jù)加載的狀態(tài)。它會不斷地輪詢 Doris 服務(wù)器,以獲取特定加載任務(wù)的最終狀態(tài)。根據(jù)加載狀態(tài)的不同,它可能會拋出異?;蛘咴诩虞d完成時返回。
  4. private byte[] addRows(List<byte[]> rows, int totalBytes): 此方法根據(jù)給定的數(shù)據(jù)行和總字節(jié)數(shù),構(gòu)建用于加載的字節(jié)數(shù)組。它根據(jù)配置中的數(shù)據(jù)格式(CSV 或 JSON)將數(shù)據(jù)行連接起來,并添加適當(dāng)?shù)姆指舴?/li>
  5. private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException: 該方法執(zhí)行 HTTP PUT 請求,將數(shù)據(jù)加載到 Doris 數(shù)據(jù)庫中。它構(gòu)建了包含數(shù)據(jù)的請求實體,發(fā)送到指定的加載 URL,并解析響應(yīng)以獲取加載結(jié)果。
  6. private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份驗證頭部,以便在 HTTP 請求中進(jìn)行身份驗證。
  7. private HttpEntity getHttpEntity(CloseableHttpResponse response): 這是一個實用方法,用于從 HTTP 響應(yīng)中提取實體內(nèi)容。
  8. private String getLoadHost(): 該方法從配置選項中獲取用于加載數(shù)據(jù)的主機地址列表,并嘗試連接到這些主機以檢查其可用性。它會返回第一個可用的主機地址。

DorisStreamLoadObserver?類主要用于處理數(shù)據(jù)加載任務(wù),它負(fù)責(zé)構(gòu)建適當(dāng)?shù)?HTTP 請求,將數(shù)據(jù)發(fā)送到 Doris 數(shù)據(jù)庫,并監(jiān)控加載任務(wù)的狀態(tài)。通過這些方法,可以實現(xiàn)將數(shù)據(jù)從外部系統(tǒng)加載到 Doris 數(shù)據(jù)庫中,并在加載過程中進(jìn)行必要的狀態(tài)檢查和錯誤處理。

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class DorisStreamLoadObserver {private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);private Keys options;private long pos;private static final String RESULT_FAILED = "Fail";private static final String RESULT_LABEL_EXISTED = "Label Already Exists";private static final String LAEBL_STATE_VISIBLE = "VISIBLE";private static final String LAEBL_STATE_COMMITTED = "COMMITTED";private static final String RESULT_LABEL_PREPARE = "PREPARE";private static final String RESULT_LABEL_ABORTED = "ABORTED";private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";public DorisStreamLoadObserver(Keys options) {this.options = options;}// 數(shù)據(jù)寫入 Doris 的主要方法public void streamLoad(WriterTuple data) throws Exception {String host = getLoadHost();if (host == null) {throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");}String loadUrl = new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/").append(options.getTable()).append("/_stream_load").toString();LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));final String keyStatus = "Status";if (null == loadResult || !loadResult.containsKey(keyStatus)) {throw new IOException("Unable to flush data to Doris: unknown result status.");}LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {throw new IOException(new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString());} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));checkStreamLoadState(host, data.getLabel());}}// 檢查數(shù)據(jù)加載狀態(tài)的方法private void checkStreamLoadState(String host, String label) throws IOException {int idx = 0;while (true) {try {TimeUnit.SECONDS.sleep(Math.min(++idx, 5));} catch (InterruptedException ex) {break;}try (CloseableHttpClient httpclient = HttpClients.createDefault()) {HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpGet.setHeader("Connection", "close");try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s].\n", label), null);}Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));String labelState = (String) result.get("state");if (null == labelState) {throw new IOException(String.format("Failed to flush data to Doris, Error " +"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);}LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));switch (labelState) {case LAEBL_STATE_VISIBLE:case LAEBL_STATE_COMMITTED:return;case RESULT_LABEL_PREPARE:continue;case RESULT_LABEL_ABORTED:throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null, true);case RESULT_LABEL_UNKNOWN:default:throw new IOException(String.format("Failed to flush data to Doris, Error " +"label[%s] state[%s]\n", label, labelState), null);}}}}}// 根據(jù)格式將數(shù)據(jù)行拼接成字節(jié)數(shù)組private byte[] addRows(List<byte[]> rows, int totalBytes) {if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);for (byte[] row : rows) {bos.put(row);bos.put(lineDelimiter);}return bos.array();}if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));bos.put("[".getBytes(StandardCharsets.UTF_8));byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);boolean isFirstElement = true;for (byte[] row : rows) {if (!isFirstElement) {bos.put(jsonDelimiter);}bos.put(row);isFirstElement = false;}bos.put("]".getBytes(StandardCharsets.UTF_8));return bos.array();}throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");}private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(120 * 1000).setConnectTimeout(120 * 1000).setConnectionRequestTimeout(120 * 1000).build();try (CloseableHttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).setRedirectStrategy(new DefaultRedirectStrategy()).build()) {HttpPut httpPut = new HttpPut(loadUrl);httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));httpPut.setEntity(new ByteArrayEntity(data));try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {HttpEntity respEntity = getHttpEntity(resp);if (respEntity == null) {throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");}return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));}}}// 構(gòu)造 HTTP 請求中的基本認(rèn)證頭部private String getBasicAuthHeader(String username, String password) {String credentials = username + ":" + password;byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);String base64Credentials = Base64.encodeBase64String(credentialsBytes);return "Basic " + base64Credentials;}// 從 HTTP 響應(yīng)中獲取實體內(nèi)容private HttpEntity getHttpEntity(CloseableHttpResponse response) {if (response != null) {return response.getEntity();}return null;}// 獲取用于加載數(shù)據(jù)的主機地址private String getLoadHost() {List<String> hosts = options.getDorisStreamLoadUrls();for (String host : hosts) {try {HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();connection.setRequestMethod("HEAD");int responseCode = connection.getResponseCode();if (responseCode == HttpURLConnection.HTTP_OK) {return host;}} catch (IOException e) {LOG.warn("Failed to connect to host: {}", host);}}return null;}
}
http://www.risenshineclean.com/news/66089.html

相關(guān)文章:

  • 網(wǎng)站建設(shè)服務(wù)哪家好seo免費推廣
  • 手機網(wǎng)站制作服務(wù)機構(gòu)百度法務(wù)部聯(lián)系方式
  • 石家莊網(wǎng)站建設(shè)模板服務(wù)臨沂seo建站
  • 公司名稱大全集最新三個字贛州seo唐三
  • wordpress過時了嗎手機百度seo快速排名
  • 杭州做網(wǎng)站的企業(yè)seo外鏈發(fā)布平臺有哪些
  • 網(wǎng)站建設(shè)成果成都官網(wǎng)seo服務(wù)
  • 手機網(wǎng)站制作方案微信群二維碼推廣平臺
  • 大型服裝網(wǎng)站開發(fā)代運營一般收費
  • 網(wǎng)站建設(shè)規(guī)劃表福州百度網(wǎng)站排名優(yōu)化
  • 政府網(wǎng)站建設(shè)規(guī)范產(chǎn)品營銷方案案例范文
  • 做零售出口的網(wǎng)站seo綜合查詢怎么用的
  • 怎么做全民奪寶網(wǎng)站網(wǎng)絡(luò)服務(wù)
  • 做民宿最大的網(wǎng)站百度競價推廣代運營
  • 蘭州網(wǎng)站排名優(yōu)化公司惠城網(wǎng)站設(shè)計
  • 深圳公司網(wǎng)站制作互聯(lián)網(wǎng)營銷的方式有哪些
  • 網(wǎng)頁設(shè)計網(wǎng)站免登陸企業(yè)培訓(xùn)體系
  • 一個網(wǎng)址建多個網(wǎng)站seo是什么意思?
  • 網(wǎng)站方案投放廣告
  • wordpress related posts福州整站優(yōu)化
  • 做的好的家裝網(wǎng)站網(wǎng)站頁面設(shè)計模板
  • 合肥網(wǎng)站設(shè)計建東莞百度seo推廣公司
  • 織夢教育咨詢企業(yè)網(wǎng)站模板sem什么意思
  • 代理注冊公司有什么風(fēng)險合肥seo排名優(yōu)化公司
  • 網(wǎng)站建設(shè)公司yu泰安網(wǎng)站推廣優(yōu)化
  • 網(wǎng)站主機一個g西安網(wǎng)絡(luò)推廣公司
  • 自建導(dǎo)航站wordpress百度網(wǎng)盤資源鏈接入口
  • 網(wǎng)站改版提案百度指數(shù)與百度搜索量
  • 域名備案需要哪些資料東莞網(wǎng)站建設(shè)優(yōu)化推廣
  • 微信上的網(wǎng)站怎么做百度seo搜索引擎優(yōu)化培訓(xùn)