package ai.tripl.arc.transform;

import ai.tripl.arc.api.API;
import ai.tripl.arc.api.API$FailModeTypeFailFast$;
import ai.tripl.arc.api.API$FailModeTypePermissive$;
import ai.tripl.arc.util.log.logger.Logger;
import java.io.InputStream;
import java.net.URI;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.NullType$;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple16;
import scala.Tuple2;
import scala.collection.BufferedIterator;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: HTTPTransform.scala */
/* loaded from: input_file:ai/tripl/arc/transform/HTTPTransformStage$.class */
public final class HTTPTransformStage$ implements Serializable {
    public static HTTPTransformStage$ MODULE$;

    static {
        new HTTPTransformStage$();
    }

    public Option<Dataset<Row>> execute(HTTPTransformStage hTTPTransformStage, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        StructType apply;
        Dataset repartition;
        Dataset dataset;
        Dataset dataset2;
        String sb = new StringBuilder(69).append("HTTPTransform requires a field named '").append(hTTPTransformStage.inputField()).append("' of type 'string' or 'binary'.").toString();
        Dataset table = sparkSession.table(hTTPTransformStage.inputView());
        StructType schema = table.schema();
        URI uri = hTTPTransformStage.uri();
        String inputField = hTTPTransformStage.inputField();
        Map<String, String> headers = hTTPTransformStage.headers();
        int batchSize = hTTPTransformStage.batchSize();
        String delimiter = hTTPTransformStage.delimiter();
        API.FailModeType failMode = hTTPTransformStage.failMode();
        List<Object> validStatusCodes = hTTPTransformStage.validStatusCodes();
        try {
            int fieldIndex = schema.fieldIndex(hTTPTransformStage.inputField());
            DataType dataType = schema.fields()[fieldIndex].dataType();
            if (dataType instanceof StringType) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!(dataType instanceof BinaryType)) {
                    throw new HTTPTransformStage$$anon$2(sb, hTTPTransformStage, schema, fieldIndex);
                }
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            API.FailModeType failMode2 = hTTPTransformStage.failMode();
            if (API$FailModeTypePermissive$.MODULE$.equals(failMode2)) {
                apply = StructType$.MODULE$.apply(new $colon.colon(new StructField("body", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()), new $colon.colon(new StructField("response", StructType$.MODULE$.apply(Nil$.MODULE$.$colon$colon(new StructField("responseTime", LongType$.MODULE$, false, StructField$.MODULE$.apply$default$4())).$colon$colon(new StructField("contentType", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4())).$colon$colon(new StructField("reasonPhrase", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4())).$colon$colon(new StructField("statusCode", IntegerType$.MODULE$, false, StructField$.MODULE$.apply$default$4()))), false, StructField$.MODULE$.apply$default$4()), Nil$.MODULE$)).$colon$colon$colon(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(table.schema().fields())).toList()));
            } else {
                if (!API$FailModeTypeFailFast$.MODULE$.equals(failMode2)) {
                    throw new MatchError(failMode2);
                }
                apply = StructType$.MODULE$.apply(new $colon.colon(new StructField("body", StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()), Nil$.MODULE$).$colon$colon$colon(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(table.schema().fields())).toList()));
            }
            try {
                ObjectRef create = ObjectRef.create(table.mapPartitions(iterator -> {
                    int i;
                    DataType dataType2;
                    PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
                    poolingHttpClientConnectionManager.setMaxTotal(50);
                    CloseableHttpClient build = HttpClients.custom().setConnectionManager(poolingHttpClientConnectionManager).setRedirectStrategy(new LaxRedirectStrategy()).build();
                    String uri2 = uri.toString();
                    BufferedIterator buffered = iterator.buffered();
                    boolean hasNext = buffered.hasNext();
                    if (true == hasNext) {
                        i = ((Row) buffered.head()).fieldIndex(inputField);
                    } else {
                        if (false != hasNext) {
                            throw new MatchError(BoxesRunTime.boxToBoolean(hasNext));
                        }
                        i = 0;
                    }
                    int i2 = i;
                    boolean hasNext2 = buffered.hasNext();
                    if (true == hasNext2) {
                        dataType2 = ((Row) buffered.head()).schema().apply(i2).dataType();
                    } else {
                        if (false != hasNext2) {
                            throw new MatchError(BoxesRunTime.boxToBoolean(hasNext2));
                        }
                        dataType2 = NullType$.MODULE$;
                    }
                    DataType dataType3 = dataType2;
                    return buffered.grouped(batchSize).flatMap(seq -> {
                        HttpEntity byteArrayEntity;
                        HttpPost httpPost = new HttpPost(uri2);
                        headers.withFilter(tuple2 -> {
                            return BoxesRunTime.boxToBoolean($anonfun$execute$3(tuple2));
                        }).foreach(tuple22 -> {
                            $anonfun$execute$4(httpPost, tuple22);
                            return BoxedUnit.UNIT;
                        });
                        if (dataType3 instanceof StringType) {
                            byteArrayEntity = new StringEntity(((TraversableOnce) seq.map(row -> {
                                return row.getString(i2);
                            }, Seq$.MODULE$.canBuildFrom())).mkString(batchSize > 1 ? delimiter : ""));
                        } else {
                            if (!(dataType3 instanceof BinaryType)) {
                                throw new MatchError(dataType3);
                            }
                            byte[] bytes = batchSize > 1 ? delimiter.getBytes() : "".getBytes();
                            byteArrayEntity = new ByteArrayEntity((byte[]) ((TraversableOnce) seq.map(row2 -> {
                                return (byte[]) row2.get(i2);
                            }, Seq$.MODULE$.canBuildFrom())).reduce((bArr, bArr2) -> {
                                return (byte[]) new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[]) new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(bArr)).$plus$plus(new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(bytes)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Byte())))).$plus$plus(new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(bArr2)), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Byte()));
                            }));
                        }
                        httpPost.setEntity(byteArrayEntity);
                        try {
                            long currentTimeMillis = System.currentTimeMillis();
                            CloseableHttpResponse execute = build.execute(httpPost);
                            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                            API$FailModeTypeFailFast$ aPI$FailModeTypeFailFast$ = API$FailModeTypeFailFast$.MODULE$;
                            if (failMode == null) {
                                if (aPI$FailModeTypeFailFast$ != null) {
                                }
                                if (!validStatusCodes.contains(BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode()))) {
                                    throw new Exception(new StringBuilder(85).append("HTTPTransform expects all response StatusCode(s) in [").append(validStatusCodes.mkString(", ")).append("] but server responded with ").append(execute.getStatusLine().getStatusCode()).append(" (").append(execute.getStatusLine().getReasonPhrase()).append(").").toString());
                                }
                            }
                            InputStream content = execute.getEntity().getContent();
                            String[] split = batchSize > 1 ? Source$.MODULE$.fromInputStream(content, Codec$.MODULE$.fallbackSystemCodec()).mkString().split(delimiter) : new String[]{Source$.MODULE$.fromInputStream(content, Codec$.MODULE$.fallbackSystemCodec()).mkString()};
                            execute.close();
                            if (split.length != seq.length()) {
                                throw new Exception(new StringBuilder(114).append("HTTPTransform expects the response to contain same number of results as 'batchSize' (").append(batchSize).append(") but server responded with ").append(split.length).append(".").toString());
                            }
                            return (GenTraversableOnce) ((TraversableLike) seq.zipWithIndex(Seq$.MODULE$.canBuildFrom())).map(tuple23 -> {
                                Row fromSeq;
                                if (tuple23 == null) {
                                    throw new MatchError(tuple23);
                                }
                                Row row3 = (Row) tuple23._1();
                                int _2$mcI$sp = tuple23._2$mcI$sp();
                                if (API$FailModeTypePermissive$.MODULE$.equals(failMode)) {
                                    fromSeq = Row$.MODULE$.fromSeq((Seq) row3.toSeq().$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Object[]{split[_2$mcI$sp], Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode()), execute.getStatusLine().getReasonPhrase(), execute.getEntity().getContentType().toString().replace("Content-Type: ", ""), BoxesRunTime.boxToLong(currentTimeMillis2)}))})), Seq$.MODULE$.canBuildFrom()));
                                } else {
                                    if (!API$FailModeTypeFailFast$.MODULE$.equals(failMode)) {
                                        throw new MatchError(failMode);
                                    }
                                    fromSeq = Row$.MODULE$.fromSeq((Seq) row3.toSeq().$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{split[_2$mcI$sp]})), Seq$.MODULE$.canBuildFrom()));
                                }
                                return fromSeq;
                            }, Seq$.MODULE$.canBuildFrom());
                        } finally {
                            httpPost.releaseConnection();
                        }
                    });
                }, RowEncoder$.MODULE$.apply(apply)));
                new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(table.schema().fields())).foreach(structField -> {
                    $anonfun$execute$9(create, structField);
                    return BoxedUnit.UNIT;
                });
                List<String> partitionBy = hTTPTransformStage.partitionBy();
                if (Nil$.MODULE$.equals(partitionBy)) {
                    Some numPartitions = hTTPTransformStage.numPartitions();
                    if (numPartitions instanceof Some) {
                        dataset2 = ((Dataset) create.elem).repartition(BoxesRunTime.unboxToInt(numPartitions.value()));
                    } else {
                        if (!None$.MODULE$.equals(numPartitions)) {
                            throw new MatchError(numPartitions);
                        }
                        dataset2 = (Dataset) create.elem;
                    }
                    dataset = dataset2;
                } else {
                    List list = (List) partitionBy.map(str -> {
                        return ((Dataset) create.elem).apply(str);
                    }, List$.MODULE$.canBuildFrom());
                    Some numPartitions2 = hTTPTransformStage.numPartitions();
                    if (numPartitions2 instanceof Some) {
                        repartition = ((Dataset) create.elem).repartition(BoxesRunTime.unboxToInt(numPartitions2.value()), list);
                    } else {
                        if (!None$.MODULE$.equals(numPartitions2)) {
                            throw new MatchError(numPartitions2);
                        }
                        repartition = ((Dataset) create.elem).repartition(list);
                    }
                    dataset = repartition;
                }
                Dataset dataset3 = dataset;
                if (aRCContext.immutableViews()) {
                    dataset3.createTempView(hTTPTransformStage.outputView());
                } else {
                    dataset3.createOrReplaceTempView(hTTPTransformStage.outputView());
                }
                if (dataset3.isStreaming()) {
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    hTTPTransformStage.stageDetail().put("outputColumns", Integer.valueOf(dataset3.schema().length()));
                    hTTPTransformStage.stageDetail().put("numPartitions", Integer.valueOf(dataset3.rdd().partitions().length));
                    if (hTTPTransformStage.persist()) {
                        dataset3.persist(aRCContext.storageLevel());
                        hTTPTransformStage.stageDetail().put("records", Long.valueOf(dataset3.count()));
                    } else {
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    }
                }
                return Option$.MODULE$.apply(dataset3);
            } catch (Exception e) {
                throw new HTTPTransformStage$$anon$3(e, hTTPTransformStage);
            }
        } catch (Exception e2) {
            throw new HTTPTransformStage$$anon$1(sb, table, hTTPTransformStage);
        }
    }

    public HTTPTransformStage apply(HTTPTransform hTTPTransform, String str, Option<String> option, URI uri, Map<String, String> map, List<Object> list, String str2, String str3, String str4, Map<String, String> map2, boolean z, int i, String str5, Option<Object> option2, List<String> list2, API.FailModeType failModeType) {
        return new HTTPTransformStage(hTTPTransform, str, option, uri, map, list, str2, str3, str4, map2, z, i, str5, option2, list2, failModeType);
    }

    public Option<Tuple16<HTTPTransform, String, Option<String>, URI, Map<String, String>, List<Object>, String, String, String, Map<String, String>, Object, Object, String, Option<Object>, List<String>, API.FailModeType>> unapply(HTTPTransformStage hTTPTransformStage) {
        return hTTPTransformStage == null ? None$.MODULE$ : new Some(new Tuple16(hTTPTransformStage.plugin(), hTTPTransformStage.name(), hTTPTransformStage.description(), hTTPTransformStage.uri(), hTTPTransformStage.headers(), hTTPTransformStage.validStatusCodes(), hTTPTransformStage.inputView(), hTTPTransformStage.outputView(), hTTPTransformStage.inputField(), hTTPTransformStage.params(), BoxesRunTime.boxToBoolean(hTTPTransformStage.persist()), BoxesRunTime.boxToInteger(hTTPTransformStage.batchSize()), hTTPTransformStage.delimiter(), hTTPTransformStage.numPartitions(), hTTPTransformStage.partitionBy(), hTTPTransformStage.failMode()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$execute$3(Tuple2 tuple2) {
        return tuple2 != null;
    }

    public static final /* synthetic */ void $anonfun$execute$4(HttpPost httpPost, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        httpPost.addHeader((String) tuple2._1(), (String) tuple2._2());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$execute$9(ObjectRef objectRef, StructField structField) {
        objectRef.elem = ((Dataset) objectRef.elem).withColumn(structField.name(), functions$.MODULE$.col(structField.name()).as(structField.name(), structField.metadata()));
    }

    private HTTPTransformStage$() {
        MODULE$ = this;
    }
}
