/* * (c) 2021 Collibra Inc. This software is protected under international copyright law. You may only install and use this * software subject to the license agreement available at https://marketplace.collibra.com/binary-code-license-agreement/. * If such an agreement is not in place, you may not use the software. */ package com.collibra.marketplace.azure.purview.component; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.*; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.http.*; import org.springframework.http.client.ClientHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.client.DefaultResponseErrorHandler; import org.springframework.web.client.ResponseErrorHandler; import org.springframework.web.client.RestTemplate; import com.collibra.marketplace.azure.purview.util.JsonUtil; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import static org.springframework.http.HttpStatus.REQUEST_TIMEOUT; import static org.springframework.http.HttpStatus.Series.CLIENT_ERROR; import static org.springframework.http.HttpStatus.Series.SERVER_ERROR; @Component public class AzurePurviewSearchComponent { private static final Logger LOGGER = LoggerFactory.getLogger(AzurePurviewSearchComponent.class); @Value("${purview.rest.api.host}") private String purviewHost; @Value("${purview.rest.api.port}") private String purviewPort; @Value("${purview.rest.api.base.path}") private String purviewBasePath; @Value("${azure.connection.timeout:30000000}") private Long connectionTimeout; @Value("${azure.read.timeout:60000000}") private Long readTimeout; @Value("${azure.api.max.retry:5}") private Integer maxRetry; @Value("${azure.api.retry.delay:1000}") private Long retryDelay; @Value("${azure.uri.limit:60}") private int uriLimit; @Value("${azure.token.refresh.seconds:900}") private long tokenRefreshSeconds; private RestTemplate purviewRestTemplate; private String postPurviewSearchUrl; private String getPurviewEntitiesUrl; private String accessToken; private Instant accessTokenCreation; private AzureAuthenticationComponent authComponent; @Autowired public AzurePurviewSearchComponent(AzureAuthenticationComponent authComponent) { this.authComponent = authComponent; } @PostConstruct public void initialise() { postPurviewSearchUrl = "https://" + purviewHost + ":" + purviewPort + purviewBasePath + "/search/advanced"; getPurviewEntitiesUrl = "https://" + purviewHost + ":" + purviewPort + purviewBasePath + "/entity/bulk?"; purviewRestTemplate = new RestTemplateBuilder() .setConnectTimeout(Duration.ofMillis(connectionTimeout)) .setReadTimeout(Duration.ofMillis(readTimeout)) .errorHandler(new RestTemplateResponseErrorHandler()) .build(); } /** * Adds the default request header for setting the content type to * application/json, and OAuth access token * * @param headers The HttpHeaders to be set. * @return The headers object with the included content type. */ private HttpHeaders addAuthenticationToHeaders(HttpHeaders headers) { if (accessToken == null || accessTokenCreation == null || Duration.between(accessTokenCreation, Instant.now()).getSeconds() > tokenRefreshSeconds) { accessToken = authComponent.getAccessToken(); accessTokenCreation = Instant.now(); } headers.remove("Authorization"); headers.set("Authorization", "Bearer " + accessToken); headers.setContentType(MediaType.APPLICATION_JSON); LOGGER.trace("Search Request Headers::\n{}", headers); return headers; } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ public JSONArray searchEntity(String assetType, Integer limit) { return (JSONArray) searchEntityFull(assetType, limit, true).get("entities"); } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ public JSONArray searchEntity(String assetType, Integer limit, Boolean minExtInfo, JSONArray additionalFilters) { return (JSONArray) searchEntityFull(assetType, limit, minExtInfo, additionalFilters).get("entities"); } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ public JSONObject searchEntityFull(String assetType, Integer limit, Boolean minExtInfo) { return searchEntityFull(assetType, limit, minExtInfo, null); } public JSONObject searchEntityFull(String assetType, Integer limit, Boolean minExtInfo, JSONArray additionalFilters) { return searchEntityFull(assetType, limit, minExtInfo, additionalFilters, false); } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ @SuppressWarnings("unchecked") public JSONObject searchEntityFull(String assetType, Integer limit, Boolean minExtInfo, JSONArray additionalFilters, Boolean includeSubTypes) { JSONArray jsonResults = new JSONArray(); JSONArray jsonRelatedResults = new JSONArray(); JSONObject jsonTotalResults = new JSONObject(); JSONObject entityFilterJSON = new JSONObject(); entityFilterJSON.put("entityType", assetType); entityFilterJSON.put("includeSubTypes", includeSubTypes); JSONArray filters = new JSONArray(); filters.add(entityFilterJSON); if (additionalFilters != null) { filters.addAll(additionalFilters); } JSONObject filterJSON = new JSONObject(); filterJSON.put("and", filters); JSONObject searchJSON = new JSONObject(); searchJSON.put("keywords", "*"); searchJSON.put("limit", limit); searchJSON.put("filter", filterJSON); Long offset = 0L; Long limitLong = new Long(limit); LOGGER.info("Initiating Search for :: {}", assetType); Set uniquePurviewGuids = new HashSet<>(); while (true) { System.gc(); int tryCount = 0; try { searchJSON.put("offset", offset); LOGGER.info( "Preparing and posting to Purview advanced search for {} starting at {} and limit to {} with:\n{}", assetType, offset, limitLong, searchJSON); HttpHeaders headers = new HttpHeaders(); headers = this.addAuthenticationToHeaders(headers); HttpEntity entity = new HttpEntity(searchJSON.toString(), headers); ResponseEntity response = purviewRestTemplate.exchange(postPurviewSearchUrl, HttpMethod.POST, entity, String.class); JSONObject jsonResponse = new JsonUtil().parseJsonStringToObject(response.getBody()); Long searchCount = (Long) jsonResponse.get("@search.count"); if (searchCount > 100000L) { LOGGER.error("There are {} entities of type {}. Due to a limitation on the Azure Purview API, not more than 100,000 entities can be retrieved. Therefore, the connector will not retrieve any of these entities.",searchCount,assetType); break; } JSONArray valueArray = (JSONArray) jsonResponse.get("value"); valueArray.removeIf(v -> !uniquePurviewGuids.add((String)((JSONObject) v).get("id"))); if (valueArray != null && valueArray.size() > 0) { JSONObject jsonEntityResults = entityResults(valueArray, "id", minExtInfo); jsonResults.addAll((JSONArray) jsonEntityResults.get("entities")); jsonRelatedResults.addAll((JSONArray) jsonEntityResults.get("referredEntities")); } LOGGER.debug("Current Search Count for :: {}, {}", assetType, searchCount); if ((offset + limitLong) >= searchCount) break; else offset = offset + limit; // End happy path logic } catch (Exception e) { LOGGER.error(e.getLocalizedMessage()); tryCount++; if (tryCount > maxRetry) { throw new RuntimeException(e); } else try { Thread.sleep(retryDelay); } catch (InterruptedException ie) { LOGGER.error(ie.getLocalizedMessage()); } } } jsonTotalResults.put("entities", jsonResults); jsonTotalResults.put("relatedEntities", jsonRelatedResults); LOGGER.info("Finished Search for :: {}\n", assetType); return jsonTotalResults; } public JSONArray entityDetails(JSONArray entitiesArray) { JSONArray entityResults = null; if (entitiesArray != null && entitiesArray.size() > 0) { JSONObject jsonResponse = entityResults(entitiesArray, "id"); entityResults = (JSONArray) jsonResponse.get("entities"); } return entityResults; } public JSONObject entityResults(JSONArray entitiesArray, String idField) { return entityResults(entitiesArray, idField, true); } @SuppressWarnings("unchecked") public JSONObject entityResults(JSONArray entitiesArray, String idField, Boolean minExtInfo) { JSONObject jsonResponse = null; JSONArray jsonEntities = new JSONArray(); JSONArray jsonReferredEntities = new JSONArray(); if (entitiesArray != null && entitiesArray.size() > 0) { jsonResponse = new JSONObject(); int pageCount = 0; List guids = new ArrayList<>(); for (int i = 0; i < entitiesArray.size(); i++) { if (pageCount == 0) { guids.clear(); } JSONObject entity = (JSONObject) entitiesArray.get(i); guids.add((String) entity.get(idField)); pageCount++; if (pageCount == uriLimit) { System.gc(); JSONObject bulkEntitiesResponse = getBulkEntities(getPurviewEntitiesUrl, guids, minExtInfo); //Commenting out DEBUG logging of entities JSON response due to memory leak //LOGGER.debug("Entity GUID Search Response at {} :: {}", i, bulkEntitiesResponse.toJSONString()); if(bulkEntitiesResponse != null) { // process jsonEntities.addAll((JSONArray) bulkEntitiesResponse.get("entities")); JSONObject referredRecords = (JSONObject) bulkEntitiesResponse.get("referredEntities"); if (referredRecords != null) { Iterator referredIds = referredRecords.keySet().iterator(); // Find all the related entities while (referredIds.hasNext()) { String referredId = referredIds.next(); jsonReferredEntities.add(referredRecords.get(referredId)); } } } // Reset pageCount = 0; } } // Catch remaining records if (pageCount > 0) { System.gc(); JSONObject bulkEntitiesResponse = getBulkEntities(getPurviewEntitiesUrl, guids, minExtInfo); //Commenting out DEBUG logging of entities JSON response due to memory leak //LOGGER.debug("Entity GUID Search Response at {} :: {}", entitiesArray.size(), bulkEntitiesResponse.toJSONString()); // process jsonEntities.addAll((JSONArray) bulkEntitiesResponse.get("entities")); JSONObject referredRecords = (JSONObject) bulkEntitiesResponse.get("referredEntities"); if (referredRecords != null) { Iterator referredIds = referredRecords.keySet().iterator(); // Find all the related entities while (referredIds.hasNext()) { String referredId = referredIds.next(); jsonReferredEntities.add(referredRecords.get(referredId)); } } } jsonResponse.put("entities", jsonEntities); jsonResponse.put("referredEntities", jsonReferredEntities); } return jsonResponse; } private JSONObject getBulkEntities(String baseUrl, List guids, Boolean minExtInfo) { int tryCount = 0; while (true) { try { HttpHeaders headers = new HttpHeaders(); headers = this.addAuthenticationToHeaders(headers); HttpEntity entity = new HttpEntity(headers); String bulkEntitiesUrl = constructBulkEntitiesUrl(baseUrl, guids, minExtInfo); LOGGER.debug("Bulk GUID search URL ::\n{}", bulkEntitiesUrl); ResponseEntity response = purviewRestTemplate.exchange(bulkEntitiesUrl, HttpMethod.GET, entity, String.class); if (response.getStatusCode() == REQUEST_TIMEOUT) { if (guids.size() == 1) { throw new RuntimeException(String.format("Request %s failed with a 408 Request Timeout exception", bulkEntitiesUrl)); } else { List singleResponses = new ArrayList<>(); guids.forEach((guid) -> singleResponses.add(getBulkEntities(baseUrl, Arrays.asList(guid), minExtInfo )) ); JSONObject responseBody = new JSONObject(); JSONArray responseBodyEntities = new JSONArray(); JSONObject responseBodyReferredEntities = new JSONObject(); singleResponses.forEach((singleResponse) -> { responseBodyEntities.addAll((JSONArray) singleResponse.get("entities")); JSONObject referredRecords = (JSONObject) singleResponse.get("referredEntities"); if (referredRecords != null) { Iterator referredIds = referredRecords.keySet().iterator(); // Find all the related entities while (referredIds.hasNext()) { String referredId = referredIds.next(); if (!responseBodyReferredEntities.containsKey(referredId)) { responseBodyReferredEntities.put(referredId,referredRecords.get(referredId)); } } } }); responseBody.put("entities", responseBodyEntities); responseBody.put("referredEntities", responseBodyReferredEntities); return responseBody; } } else { JSONObject jsonResponse = new JsonUtil().parseJsonStringToObject(response.getBody()); return jsonResponse; } } catch (Exception e) { LOGGER.error(e.getLocalizedMessage()); tryCount++; if (tryCount > maxRetry) { throw new RuntimeException(e); } else try { Thread.sleep(retryDelay); } catch (InterruptedException ie) { LOGGER.error(ie.getLocalizedMessage()); } } } } private String constructBulkEntitiesUrl(String baseUrl, List guids, Boolean minExtInfo) { StringBuilder sb = new StringBuilder(); sb.append(baseUrl); for (String guid: guids) { sb.append("guid="); sb.append(guid); sb.append("&"); } if (minExtInfo == null) minExtInfo = false; sb.append("minExtInfo="); sb.append(minExtInfo); return sb.toString(); } @SuppressWarnings("unchecked") public void filterByWorkspaces(JSONArray entitiesArray, Set workspaceNames, Set workspaceIds) { entitiesArray.removeIf(entity -> !workspaceNames.contains(getWorkspaceName((JSONObject) entity))); entitiesArray.removeIf(entity -> !workspaceIds.contains(getWorkspaceId((JSONObject) entity))); } private String getWorkspaceName(JSONObject entity) { return (String) ((JSONObject) entity.get("attributes")).get("workspaceName"); } private String getWorkspaceId(JSONObject entity) { return (String) ((JSONObject) ((JSONObject) entity.get("relationshipAttributes")).get("workspace")).get("guid"); } public class RestTemplateResponseErrorHandler implements ResponseErrorHandler { @Override public boolean hasError(ClientHttpResponse httpResponse) throws IOException { return (httpResponse.getStatusCode().series() == CLIENT_ERROR || httpResponse.getStatusCode().series() == SERVER_ERROR) && httpResponse.getStatusCode() != REQUEST_TIMEOUT; } @Override public void handleError(ClientHttpResponse response) throws IOException { DefaultResponseErrorHandler defaultErrorHandler = new DefaultResponseErrorHandler(); defaultErrorHandler.handleError(response); } } }